You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/09/18 02:12:51 UTC

[zeppelin] branch master updated: [ZEPPELIN-4273] Support Flink 1.9 for Flink Interpreter

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new c876a37  [ZEPPELIN-4273] Support Flink 1.9 for Flink Interpreter
c876a37 is described below

commit c876a373c4b0f58515d96b0f7da769d9309edae6
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu May 23 10:44:14 2019 +0800

    [ZEPPELIN-4273] Support Flink 1.9 for Flink Interpreter
    
    ### What is this PR for?
    This PR is to support latest flink 1.9.0.  Main features:
    * Support table api
    * Support batch sql
    * Support streaming sql
    * Support both flink planner and blink planner.
    * Support pyflink
    
    ### What type of PR is it?
    [ Improvement | Feature | Documentation ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://jira.apache.org/jira/browse/ZEPPELIN-4273
    
    ### How should this be tested?
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? NO
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3422 from zjffdu/ZEPPELIN-4273 and squashes the following commits:
    
    b0c94cfd6 [Jeff Zhang] [ZEPPELIN-4273] Support Flink 1.9 for Flink Interpreter
---
 .travis.yml                                        |   9 +-
 bin/interpreter.sh                                 |   5 +-
 docs/interpreter/flink.md                          | 174 ++++--
 flink/pom.xml                                      | 619 +++++++++++++++++++--
 .../zeppelin/flink/FlinkBatchSqlInterpreter.java   |  95 ++++
 .../apache/zeppelin/flink/FlinkInterpreter.java    |  61 +-
 .../apache/zeppelin/flink/FlinkSQLInterpreter.java |  72 ---
 .../apache/zeppelin/flink/FlinkSqlInterrpeter.java | 300 ++++++++++
 .../zeppelin/flink/FlinkStreamSqlInterpreter.java  | 113 ++++
 .../apache/zeppelin/flink/IPyFlinkInterpreter.java |  97 ++++
 .../java/org/apache/zeppelin/flink/JobManager.java | 173 ++++++
 .../apache/zeppelin/flink/PyFlinkInterpreter.java  | 157 ++++++
 .../zeppelin/flink/sql/AbstractStreamSqlJob.java   | 244 ++++++++
 .../zeppelin/flink/sql/CollectStreamTableSink.java |  96 ++++
 .../zeppelin/flink/sql/RetractStreamSqlJob.java    | 108 ++++
 .../zeppelin/flink/sql/SingleRowStreamSqlJob.java  |  83 +++
 .../zeppelin/flink/sql/SqlCommandParser.java       | 213 +++++++
 .../org/apache/zeppelin/flink/sql/SqlInfo.java     |  53 ++
 .../org/apache/zeppelin/flink/sql/SqlLists.java    | 204 +++++++
 .../zeppelin/flink/sql/TimeSeriesStreamSqlJob.java | 129 +++++
 flink/src/main/resources/interpreter-setting.json  | 165 +++++-
 .../src/main/resources/python/zeppelin_ipyflink.py |  64 +++
 .../src/main/resources/python/zeppelin_pyflink.py  |  57 ++
 .../org/apache/zeppelin/flink/FlinkExprTyper.scala |  75 +++
 .../zeppelin/flink/FlinkILoopInterpreter.scala     | 240 ++++++++
 .../zeppelin/flink/FlinkSQLScalaInterpreter.scala  |   5 +-
 .../zeppelin/flink/FlinkScalaInterpreter.scala     | 369 +++++++++---
 .../zeppelin/flink/FlinkZeppelinContext.scala      |  65 ++-
 .../org/apache/zeppelin/flink/SqlJobRunner.scala   |  40 ++
 .../scala/org/apache/zeppelin/flink/types.scala    |  26 +
 .../zeppelin/flink/util/DependencyUtils.scala      | 381 +++++++++++++
 .../org/apache/zeppelin/flink/util/TableUtil.scala | 135 +++++
 .../zeppelin/flink/BatchSqlInterpreterTest.java    | 118 ++++
 .../flink/BlinkBatchSqlInterpreterTest.java        |  28 +
 .../flink/FlinkBatchSqlInterpreterTest.java        |  27 +
 .../zeppelin/flink/FlinkInterpreterTest.java       | 100 +++-
 .../zeppelin/flink/FlinkSQLInterpreterTest.java    | 110 ----
 .../zeppelin/flink/FlinkSqlInterpreterTest.java    | 368 ++++++++++++
 .../flink/FlinkStreamSqlInterpreterTest.java       |  92 +++
 .../zeppelin/flink/IPyFlinkInterpreterTest.java    | 178 ++++++
 .../zeppelin/flink/PyFlinkInterpreterTest.java     | 103 ++++
 flink/src/test/resources/flink-conf.yaml           |  16 +-
 flink/src/test/resources/init_stream.scala         |  45 ++
 flink/src/test/resources/log4j.properties          |   1 -
 flink/src/test/resources/log4j2.properties         |  64 +++
 .../apache/zeppelin/python/IPythonInterpreter.java |   1 +
 .../apache/zeppelin/python/PythonInterpreter.java  |   2 +-
 .../zeppelin/integration/FlinkIntegrationTest.java |  18 +-
 .../zeppelin/integration/MiniHadoopCluster.java    |  13 +-
 .../launcher/StandardInterpreterLauncher.java      |   1 +
 .../interpreter/integration/DownloadUtils.java     |  44 +-
 51 files changed, 5516 insertions(+), 440 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 2838e1b..b0ee366 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -51,7 +51,7 @@ services:
 env:
   global:
     # Interpreters does not required by zeppelin-server integration tests
-    - INTERPRETERS='!beam,!hbase,!pig,!jdbc,!file,!ignite,!kylin,!lens,!cassandra,!elasticsearch,!bigquery,!alluxio,!scio,!livy,!groovy,!sap,!java,!geode,!neo4j,!hazelcastjet,!submarine'
+    - INTERPRETERS='!beam,!hbase,!pig,!jdbc,!file,!flink,!ignite,!kylin,!lens,!cassandra,!elasticsearch,!bigquery,!alluxio,!scio,!livy,!groovy,!sap,!java,!geode,!neo4j,!hazelcastjet,!submarine'
 
 matrix:
   include:
@@ -88,6 +88,13 @@ matrix:
       dist: xenial
       env: PYTHON="3" SPARKR="true" PROFILE="-Pspark-2.2 -Phelium-dev -Pexamples -Pspark-scala-2.11" BUILD_FLAG="install -Pbuild-distr -DskipRat -DskipTests" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_PROJECTS="-Dtests.to.exclude=**/JdbcIntegrationTest.java,**/SparkIntegrationTest.java,**/ZeppelinSparkClusterTest.java,**/org/apache/zeppelin/spark/*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false"
 
+    # Test flink module
+    - sudo: required
+      jdk: "openjdk8"
+      dist: xenial
+      env: PYTHON="3" SPARKR="true" PROFILE="-Pscala-2.11" BUILD_FLAG="clean install -Pbuild-distr -DskipRat -am" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl flink" TEST_PROJECTS="-DfailIfNoTests=false"
+
+
     # Test selenium with spark module for spark 2.3
     - jdk: "openjdk8"
       dist: xenial
diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index 0c09c55..76d5435 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -203,8 +203,11 @@ elif [[ "${INTERPRETER_ID}" == "pig" ]]; then
     echo "TEZ_CONF_DIR is not set, configuration might not be loaded"
   fi
 elif [[ "${INTERPRETER_ID}" == "flink" ]]; then
+  addJarInDirForIntp "${FLINK_HOME}/lib"
+  addJarInDirForIntp "${FLINK_HOME}/opt"
+
   if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
-    ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
+    ZEPPELIN_INTP_CLASSPATH+=`hadoop classpath`
     export HADOOP_CONF_DIR=${HADOOP_CONF_DIR}
   else
     # autodetect HADOOP_CONF_HOME by heuristic
diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index d3f2223..ad71487 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -26,51 +26,155 @@ limitations under the License.
 ## Overview
 [Apache Flink](https://flink.apache.org) is an open source platform for distributed stream and batch data processing. Flinkā€™s core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink also builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization.
 
-## How to start local Flink cluster, to test the interpreter
-Zeppelin comes with pre-configured flink-local interpreter, which starts Flink in a local mode on your machine, so you do not need to install anything.
+Apache Flink is supported in Zeppelin with Flink interpreter group which consists of below five interpreters.
 
-## How to configure interpreter to point to Flink cluster
-At the "Interpreters" menu, you have to create a new Flink interpreter and provide next properties:
+<table class="table-configuration">
+  <tr>
+    <th>Name</th>
+    <th>Class</th>
+    <th>Description</th>
+  </tr>
+  <tr>
+    <td>%flink</td>
+    <td>FlinkInterpreter</td>
+    <td>Creates ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironment and provides a Scala environment</td>
+  </tr>
+  <tr>
+    <td>%flink.pyflink</td>
+    <td>PyFlinkInterpreter</td>
+    <td>Provides a python environment</td>
+  </tr>
+  <tr>
+    <td>%flink.ipyflink</td>
+    <td>IPyFlinkInterpreter</td>
+    <td>Provides an ipython environment</td>
+  </tr>
+  <tr>
+    <td>%flink.ssql</td>
+    <td>FlinkStreamSqlInterpreter</td>
+    <td>Provides a stream sql environment</td>
+  </tr>
+  <tr>
+    <td>%flink.bsql</td>
+    <td>FlinkBatchSqlInterpreter</td>
+    <td>Provides a batch sql environment</td>
+  </tr>
+</table>
 
+## Configuration
+The Flink interpreter can be configured with properties provided by Zeppelin.
+You can also set other flink properties which are not listed in the table. For a list of additional properties, refer to [Flink Available Properties](https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html).
 <table class="table-configuration">
   <tr>
-    <th>property</th>
-    <th>value</th>
+    <th>Property</th>
+    <th>Default</th>
     <th>Description</th>
   </tr>
   <tr>
-    <td>host</td>
+    <td>FLINK_HOME</td>
+    <td></td>
+    <td>Location of flink installation. It is must be specified, otherwise you can not use flink in zeppelin</td>
+  </tr>
+  <tr>
+    <td>flink.execution.mode</td>
     <td>local</td>
-    <td>host name of running JobManager. 'local' runs flink in local mode (default)</td>
+    <td>Execution mode of flink, e.g. local/yarn/remote</td>
+  </tr>
+  <tr>
+    <td>flink.execution.remote.host</td>
+    <td></td>
+    <td>jobmanager hostname if it is remote mode</td>
+  </tr>
+  <tr>
+    <td>flink.execution.remote.port</td>
+    <td></td>
+    <td>jobmanager port if it is remote mode</td>
+  </tr>
+  <tr>
+    <td>flink.jm.memory</td>
+    <td>1024</td>
+    <td>Total number of memory(mb) of JobManager</td>
+  </tr>
+  <tr>
+    <td>flink.tm.memory</td>
+    <td>1024</td>
+    <td>Total number of memory(mb) of TaskManager</td>
+  </tr>
+  <tr>
+    <td>flink.tm.num</td>
+    <td>2</td>
+    <td>Number of TaskManager</td>
   </tr>
   <tr>
-    <td>port</td>
-    <td>6123</td>
-    <td>port of running JobManager</td>
+    <td>flink.tm.slot</td>
+    <td>1</td>
+    <td>Number of slot per TaskManager</td>
+  </tr>
+  <tr>
+    <td>flink.yarn.appName</td>
+    <td>Zeppelin Flink Session</td>
+    <td>Yarn app name</td>
+  </tr>
+  <tr>
+    <td>flink.yarn.queue</td>
+    <td></td>
+    <td>queue name of yarn app</td>
+  </tr>
+  <tr>
+    <td>flink.yarn.jars</td>
+    <td></td>
+    <td>additional user jars (comma separated)</td>
+  </tr>
+  <tr>
+    <td>zeppelin.flink.scala.color</td>
+    <td>true</td>
+    <td>whether display scala shell output in colorful format</td>
+  </tr>
+  <tr>
+    <td>zeppelin.flink.enableHive</td>
+    <td>false</td>
+    <td>whether enable hive</td>
+  </tr>
+  <tr>
+    <td>zeppelin.flink.printREPLOutput</td>
+    <td>true</td>
+    <td>Print REPL output</td>
+  </tr>
+  <tr>
+    <td>zeppelin.flink.maxResult</td>
+    <td>1000</td>
+    <td>max number of row returned by sql interpreter</td>
+  </tr>
+  <tr>
+    <td>zeppelin.flink.planner</td>
+    <td>blink</td>
+    <td>planner or flink table api, blink or flink</td>
+  </tr>
+  <tr>
+    <td>zeppelin.pyflink.python</td>
+    <td>python</td>
+    <td>python executable for pyflink</td>
   </tr>
 </table>
 
-For more information about Flink configuration, you can find it [here](https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html).
-
-## How to test it's working
-You can find an example of Flink usage in the Zeppelin Tutorial folder or try the following word count example, by using the [Zeppelin notebook](https://www.zeppelinhub.com/viewer/notebooks/aHR0cHM6Ly9yYXcuZ2l0aHVidXNlcmNvbnRlbnQuY29tL05GTGFicy96ZXBwZWxpbi1ub3RlYm9va3MvbWFzdGVyL25vdGVib29rcy8yQVFFREs1UEMvbm90ZS5qc29u) from Till Rohrmann's presentation [Interactive data analysis with Apache Flink](http://www.slideshare.net/tillrohrmann/data-analysis-49806564) for Apache Flink Meetup.
-
-```
-%sh
-rm 10.txt.utf-8
-wget http://www.gutenberg.org/ebooks/10.txt.utf-8
-```
-{% highlight scala %}
-%flink
-case class WordCount(word: String, frequency: Int)
-val bible:DataSet[String] = benv.readTextFile("10.txt.utf-8")
-val partialCounts: DataSet[WordCount] = bible.flatMap{
-    line =>
-        """\b\w+\b""".r.findAllIn(line).map(word => WordCount(word, 1))
-//        line.split(" ").map(word => WordCount(word, 1))
-}
-val wordCounts = partialCounts.groupBy("word").reduce{
-    (left, right) => WordCount(left.word, left.frequency + right.frequency)
-}
-val result10 = wordCounts.first(10).collect()
-{% endhighlight %}
+
+## StreamExecutionEnvironment, ExecutionEnvironment, StreamTableEnvironment, BatchTableEnvironment
+
+Zeppelin will create 4 variables to represent flink's entrypoint:
+* `senv`    (StreamExecutionEnvironment), 
+* `env`     (ExecutionEnvironment)
+* `stenv`   (StreamTableEnvironment) 
+* `btenv`   (BatchTableEnvironment)
+
+
+## ZeppelinContext
+Zeppelin automatically injects `ZeppelinContext` as variable `z` in your Scala/Python environment. `ZeppelinContext` provides some additional functions and utilities.
+See [Zeppelin-Context](../usage/other_features/zeppelin_context.html) for more details.
+
+## IPython support
+
+By default, zeppelin would use IPython in `pyflink` when IPython is available, Otherwise it would fall back to the original PyFlink implementation.
+If you don't want to use IPython, then you can set `zeppelin.pyflink.useIPython` as `false` in interpreter setting. For the IPython features, you can refer doc
+[Python Interpreter](python.html)
+
+
diff --git a/flink/pom.xml b/flink/pom.xml
index b6cacac..4a9ec8a 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -37,11 +37,14 @@
   <properties>
     <!--library versions-->
     <interpreter.name>flink</interpreter.name>
-    <flink.version>1.7.1</flink.version>
-    <flink.akka.version>2.3.7</flink.akka.version>
+    <flink.version>1.9.0</flink.version>
+    <hadoop.version>2.6.5</hadoop.version>
+    <hive.version>2.3.4</hive.version>
+    <hiverunner.version>4.0.0</hiverunner.version>
+
     <scala.macros.version>2.0.1</scala.macros.version>
     <scala.binary.version>2.11</scala.binary.version>
-    <scala.version>2.11.8</scala.version>
+    <scala.version>2.11.12</scala.version>
 
     <!--plugin versions-->
     <plugin.scalamaven.version>3.2.2</plugin.scalamaven.version>
@@ -49,48 +52,116 @@
     <plugin.buildhelper.version>1.7</plugin.buildhelper.version>
     <plugin.scalastyle.version>0.5.0</plugin.scalastyle.version>
 
+    <flink.bin.download.url>https://archive.apache.org/dist/flink/flink-${flink.version}/flink-${flink.version}-bin-scala_${scala.binary.version}.tgz</flink.bin.download.url>
   </properties>
 
   <dependencies>
+
     <dependency>
-      <groupId>com.google.code.gson</groupId>
-      <artifactId>gson</artifactId>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>zeppelin-python</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.zeppelin</groupId>
+          <artifactId>zeppelin-interpreter</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>zeppelin-interpreter</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.zeppelin</groupId>
+          <artifactId>zeppelin-interpreter</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>zeppelin-python</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-python_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-core</artifactId>
       <version>${flink.version}</version>
+      <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-clients_${scala.binary.version}</artifactId>
       <version>${flink.version}</version>
+      <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-runtime_${scala.binary.version}</artifactId>
       <version>${flink.version}</version>
+      <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-yarn_${scala.binary.version}</artifactId>
       <version>${flink.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-shaded-hadoop2</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.eclipse.jetty</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-table_${scala.binary.version}</artifactId>
+      <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
       <version>${flink.version}</version>
+      <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-scala_${scala.binary.version}</artifactId>
       <version>${flink.version}</version>
+      <scope>provided</scope>
     </dependency>
 
     <dependency>
@@ -100,9 +171,66 @@
     </dependency>
 
     <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-      <version>20.0</version>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-java_2.11</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-scala_2.11</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-java</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-connector-hive_2.11</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-connector-hive_2.11</artifactId>
+      <version>${flink.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- hadoop compatibility dependency -->
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.ivy</groupId>
+      <artifactId>ivy</artifactId>
+      <version>2.4.0</version>
+    </dependency>
+
+    <dependency>
+      <groupId>oro</groupId>
+      <!-- oro is needed by ivy, but only listed as an optional dependency, so we include it. -->
+      <artifactId>oro</artifactId>
+      <version>2.0.8</version>
+    </dependency>
+
+
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
     </dependency>
 
     <dependency>
@@ -123,19 +251,307 @@
       <version>${scala.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>com.mashape.unirest</groupId>
+      <artifactId>unirest-java</artifactId>
+      <version>1.4.9</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-metastore</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-exec</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-exec</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-metastore</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table-planner_2.11</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table-planner-blink_2.11</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.reflections</groupId>
+          <artifactId>reflections</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-api-mockito</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-module-junit4</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <!--<dependency>-->
+      <!--<groupId>com.klarna</groupId>-->
+      <!--<artifactId>hiverunner</artifactId>-->
+      <!--<version>${hiverunner.version}</version>-->
+      <!--<scope>test</scope>-->
+      <!--<exclusions>-->
+        <!--<exclusion>-->
+          <!--<groupId>org.apache.hive</groupId>-->
+          <!--<artifactId>hive-serde</artifactId>-->
+        <!--</exclusion>-->
+        <!--<exclusion>-->
+          <!--<groupId>org.apache.hive</groupId>-->
+          <!--<artifactId>hive-jdbc</artifactId>-->
+        <!--</exclusion>-->
+        <!--<exclusion>-->
+          <!--<groupId>org.apache.hive.hcatalog</groupId>-->
+          <!--<artifactId>hive-webhcat-java-client</artifactId>-->
+        <!--</exclusion>-->
+        <!--<exclusion>-->
+          <!--<groupId>org.apache.hive</groupId>-->
+          <!--<artifactId>hive-service</artifactId>-->
+        <!--</exclusion>-->
+        <!--<exclusion>-->
+          <!--<groupId>org.apache.hive</groupId>-->
+          <!--<artifactId>hive-contrib</artifactId>-->
+        <!--</exclusion>-->
+        <!--&lt;!&ndash;<exclusion>&ndash;&gt;-->
+          <!--&lt;!&ndash;<groupId>com.google.guava</groupId>&ndash;&gt;-->
+          <!--&lt;!&ndash;<artifactId>guava</artifactId>&ndash;&gt;-->
+        <!--&lt;!&ndash;</exclusion>&ndash;&gt;-->
+        <!--<exclusion>-->
+          <!--<groupId>io.netty</groupId>-->
+          <!--<artifactId>netty</artifactId>-->
+        <!--</exclusion>-->
+      <!--</exclusions>-->
+    <!--</dependency>-->
+
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-metastore</artifactId>
+      <version>${hive.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>hadoop-auth</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-all</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+      <version>${hive.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.calcite</groupId>
+          <artifactId>calcite-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.calcite</groupId>
+          <artifactId>calcite-druid</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.calcite.avatica</groupId>
+          <artifactId>avatica</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-codec</groupId>
+          <artifactId>commons-codec</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-httpclient</groupId>
+          <artifactId>commons-httpclient</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-io</groupId>
+          <artifactId>commons-io</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.logging.log4j</groupId>
+          <artifactId>log4j-1.2-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.logging.log4j</groupId>
+          <artifactId>log4j-slf4j-impl</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.curator</groupId>
+          <artifactId>curator-framework</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.curator</groupId>
+          <artifactId>apache-curator</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.code.gson</groupId>
+          <artifactId>gson</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>jline</groupId>
+          <artifactId>jline</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-all</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-webhcat-java-client</artifactId>
+      <version>${hive.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.calcite</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-contrib</artifactId>
+      <version>${hive.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-core</artifactId>
+      <version>${hive.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+      <exclusions>
+        <exclusion>
+          <groupId>jline</groupId>
+          <artifactId>jline</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-all</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>net.jodah</groupId>
+      <artifactId>concurrentunit</artifactId>
+      <version>0.4.4</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
     <plugins>
-
-      <!-- Scala Compiler -->
       <plugin>
         <groupId>net.alchim31.maven</groupId>
         <artifactId>scala-maven-plugin</artifactId>
-        <version>${plugin.scalamaven.version}</version>
+        <version>3.2.2</version>
         <executions>
-          <!-- Run scala compiler in the process-resources phase, so that dependencies on
-               scala classes can be resolved later in the (Java) compile phase -->
+          <execution>
+            <id>eclipse-add-source</id>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+          </execution>
           <execution>
             <id>scala-compile-first</id>
             <phase>process-resources</phase>
@@ -143,11 +559,8 @@
               <goal>compile</goal>
             </goals>
           </execution>
-
-          <!-- Run scala compiler in the process-test-resources phase, so that dependencies on
-               scala classes can be resolved later in the (Java) test-compile phase -->
           <execution>
-            <id>scala-test-compile</id>
+            <id>scala-test-compile-first</id>
             <phase>process-test-resources</phase>
             <goals>
               <goal>testCompile</goal>
@@ -155,20 +568,65 @@
           </execution>
         </executions>
         <configuration>
+          <scalaVersion>${scala.version}</scalaVersion>
+          <!--<recompileMode>incremental</recompileMode>-->
+          <!--<useZincServer>true</useZincServer>-->
+          <args>
+            <arg>-unchecked</arg>
+            <arg>-deprecation</arg>
+            <arg>-feature</arg>
+            <arg>-target:jvm-1.8</arg>
+          </args>
           <jvmArgs>
-            <jvmArg>-Xms128m</jvmArg>
-            <jvmArg>-Xmx512m</jvmArg>
+            <jvmArg>-Xms1024m</jvmArg>
+            <jvmArg>-Xmx1024m</jvmArg>
+            <jvmArg>-XX:PermSize=${PermGen}</jvmArg>
+            <jvmArg>-XX:MaxPermSize=${MaxPermGen}</jvmArg>
           </jvmArgs>
-          <compilerPlugins combine.children="append">
-            <compilerPlugin>
-              <groupId>org.scalamacros</groupId>
-              <artifactId>paradise_${scala.version}</artifactId>
-              <version>${scala.macros.version}</version>
-            </compilerPlugin>
-          </compilerPlugins>
+          <javacArgs>
+            <javacArg>-source</javacArg>
+            <javacArg>${java.version}</javacArg>
+            <javacArg>-target</javacArg>
+            <javacArg>${java.version}</javacArg>
+            <javacArg>-Xlint:all,-serial,-path,-options</javacArg>
+          </javacArgs>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>com.googlecode.maven-download-plugin</groupId>
+        <artifactId>download-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>download-pyflink-files</id>
+            <phase>validate</phase>
+            <goals>
+              <goal>wget</goal>
+            </goals>
+            <configuration>
+              <readTimeOut>60000</readTimeOut>
+              <retries>5</retries>
+              <unpack>true</unpack>
+              <url>${flink.bin.download.url}</url>
+              <outputDirectory>${project.build.directory}</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <forkCount>1</forkCount>
+          <reuseForks>false</reuseForks>
+          <environmentVariables>
+            <PYTHONPATH>${project.build.directory}/flink-${flink.version}/opt/python/py4j-0.10.8.1-src.zip:${project.build.directory}/flink-${flink.version}/opt/python/pyflink.zip</PYTHONPATH>
+          </environmentVariables>
         </configuration>
       </plugin>
 
+
       <!-- Eclipse Integration -->
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
@@ -234,30 +692,6 @@
       </plugin>
 
       <plugin>
-        <groupId>org.scalastyle</groupId>
-        <artifactId>scalastyle-maven-plugin</artifactId>
-        <version>${plugin.scalastyle.version}</version>
-        <executions>
-          <execution>
-            <goals>
-              <goal>check</goal>
-            </goals>
-          </execution>
-        </executions>
-        <configuration>
-          <verbose>false</verbose>
-          <failOnViolation>true</failOnViolation>
-          <includeTestSourceDirectory>true</includeTestSourceDirectory>
-          <failOnWarning>false</failOnWarning>
-          <sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
-          <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
-          <configLocation>${project.basedir}/../_tools/scalastyle.xml</configLocation>
-          <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
-          <outputEncoding>UTF-8</outputEncoding>
-        </configuration>
-      </plugin>
-
-      <plugin>
         <artifactId>maven-surefire-plugin</artifactId>
         <configuration>
           <forkMode>always</forkMode>
@@ -277,8 +711,59 @@
         <artifactId>maven-resources-plugin</artifactId>
       </plugin>
       <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-shade-plugin</artifactId>
+        <version>${plugin.shade.version}</version>
+        <configuration>
+          <filters>
+            <filter>
+              <artifact>*:*</artifact>
+              <excludes>
+                <exclude>org/datanucleus/**</exclude>
+                <exclude>META-INF/*.SF</exclude>
+                <exclude>META-INF/*.DSA</exclude>
+                <exclude>META-INF/*.RSA</exclude>
+              </excludes>
+            </filter>
+          </filters>
+
+          <artifactSet>
+            <excludes>
+              <exclude>org.scala-lang:scala-library</exclude>
+              <exclude>org.scala-lang:scala-compiler</exclude>
+              <exclude>org.scala-lang:scala-reflect</exclude>
+              <exclude>org.apache.flink:*</exclude>
+            </excludes>
+          </artifactSet>
+
+          <transformers>
+            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+            <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+              <resource>reference.conf</resource>
+            </transformer>
+          </transformers>
+          <relocations>
+            <relocation>
+              <pattern>io.netty</pattern>
+              <shadedPattern>org.apache.zeppelin.shaded.io.netty</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>com.google</pattern>
+              <shadedPattern>org.apache.zeppelin.shaded.com.google</shadedPattern>
+            </relocation>
+          </relocations>
+          <outputFile>${project.basedir}/../interpreter/${interpreter.name}/${project.artifactId}-${project.version}.jar</outputFile>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+          </execution>
+        </executions>
       </plugin>
+
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
@@ -289,4 +774,36 @@
 
     </plugins>
   </build>
+
+  <profiles>
+
+    <profile>
+      <id>hive2</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <properties>
+        <hive.version>2.3.4</hive.version>
+        <hiverunner.version>4.0.0</hiverunner.version>
+      </properties>
+    </profile>
+
+    <profile>
+      <id>hive1</id>
+      <properties>
+        <hive.version>1.2.1</hive.version>
+        <hiverunner.version>3.2.1</hiverunner.version>
+      </properties>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+          <version>2.7.5</version>
+          <scope>provided</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+
+  </profiles>
+
 </project>
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
new file mode 100644
index 0000000..a35b464
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+
+import com.google.common.collect.Lists;
+import org.apache.flink.table.api.Table;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class FlinkBatchSqlInterpreter extends FlinkSqlInterrpeter {
+
+  private FlinkZeppelinContext z;
+
+  public FlinkBatchSqlInterpreter(Properties properties) {
+    super(properties);
+  }
+
+
+  @Override
+  public void open() throws InterpreterException {
+    super.open();
+    this.tbenv = flinkInterpreter.getBatchTableEnvironment();
+    this.z = flinkInterpreter.getZeppelinContext();
+  }
+
+  @Override
+  public void close() throws InterpreterException {
+
+  }
+
+  @Override
+  public void callSelect(String sql, InterpreterContext context) throws IOException {
+    Table table = this.tbenv.sqlQuery(sql);
+    z.setCurrentSql(sql);
+    String result = z.showData(table);
+    context.out.write(result);
+  }
+
+  protected void checkLocalProperties(Map<String, String> localProperties)
+          throws InterpreterException {
+    List<String> validLocalProperties = Lists.newArrayList("parallelism");
+    for (String key : localProperties.keySet()) {
+      if (!validLocalProperties.contains(key)) {
+        throw new InterpreterException("Invalid property: " + key + ", Only the following " +
+                "properties are valid: " + validLocalProperties);
+      }
+    }
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) throws InterpreterException {
+    flinkInterpreter.getJobManager().cancelJob(context);
+  }
+
+  @Override
+  public FormType getFormType() throws InterpreterException {
+    return FormType.SIMPLE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) throws InterpreterException {
+    return 0;
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    int maxConcurrency = Integer.parseInt(
+            getProperty("zeppelin.flink.concurrentBatchSql.max", "10"));
+    return SchedulerFactory.singleton().createOrGetParallelScheduler(
+            FlinkBatchSqlInterpreter.class.getName() + this.hashCode(), maxConcurrency);
+  }
+}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index c14407d..59772b6 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -18,18 +18,25 @@
 package org.apache.zeppelin.flink;
 
 import org.apache.flink.api.scala.ExecutionEnvironment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.scala.StreamTableEnvironment;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
 public class FlinkInterpreter extends Interpreter {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(FlinkInterpreter.class);
+
   private FlinkScalaInterpreter innerIntp;
   private FlinkZeppelinContext z;
 
@@ -41,14 +48,7 @@ public class FlinkInterpreter extends Interpreter {
   @Override
   public void open() throws InterpreterException {
     this.innerIntp.open();
-
-    // bind ZeppelinContext
-    int maxRow = Integer.parseInt(getProperty("zeppelin.flink.maxResult", "1000"));
-    this.z = new FlinkZeppelinContext(innerIntp.getBatchTableEnviroment(),
-        getInterpreterGroup().getInterpreterHookRegistry(), maxRow);
-    List<String> modifiers = new ArrayList<>();
-    modifiers.add("@transient");
-    this.innerIntp.bind("z", z.getClass().getCanonicalName(), z, modifiers);
+    this.z = this.innerIntp.getZeppelinContext();
   }
 
   @Override
@@ -59,6 +59,7 @@ public class FlinkInterpreter extends Interpreter {
   @Override
   public InterpreterResult interpret(String st, InterpreterContext context)
       throws InterpreterException {
+    LOGGER.debug("Interpret code: " + st);
     this.z.setInterpreterContext(context);
     this.z.setGui(context.getGui());
     this.z.setNoteGui(context.getNoteGui());
@@ -67,17 +68,17 @@ public class FlinkInterpreter extends Interpreter {
 
   @Override
   public void cancel(InterpreterContext context) throws InterpreterException {
-
+    this.innerIntp.cancel(context);
   }
 
   @Override
   public FormType getFormType() throws InterpreterException {
-    return FormType.NATIVE;
+    return FormType.SIMPLE;
   }
 
   @Override
   public int getProgress(InterpreterContext context) throws InterpreterException {
-    return 0;
+    return this.innerIntp.getProgress(context);
   }
 
   @Override
@@ -88,16 +89,44 @@ public class FlinkInterpreter extends Interpreter {
     return innerIntp.completion(buf, cursor, interpreterContext);
   }
 
-  FlinkScalaInterpreter getInnerScalaInterpreter() {
-    return this.innerIntp;
+  ExecutionEnvironment getExecutionEnvironment() {
+    return this.innerIntp.getExecutionEnvironment();
+  }
+
+  StreamExecutionEnvironment getStreamExecutionEnvironment() {
+    return this.innerIntp.getStreamExecutionEnvironment();
+  }
+
+  StreamTableEnvironment getStreamTableEnvironment() {
+    return this.innerIntp.getStreamTableEnvionment();
+  }
+
+  TableEnvironment getBatchTableEnvironment() {
+    return this.innerIntp.getBatchTableEnvironment();
   }
 
-  ExecutionEnvironment getExecutionEnviroment() {
-    return this.innerIntp.getExecutionEnviroment();
+  JobManager getJobManager() {
+    return this.innerIntp.getJobManager();
+  }
+
+  int getDefaultParallelism() {
+    return this.innerIntp.getDefaultParallelism();
+  }
+
+  public ClassLoader getFlinkScalaShellLoader() {
+    return innerIntp.getFlinkScalaShellLoader();
   }
 
   FlinkZeppelinContext getZeppelinContext() {
     return this.z;
   }
 
+  Configuration getFlinkConfiguration() {
+    return this.innerIntp.getConfiguration();
+  }
+
+  public FlinkScalaInterpreter getInnerIntp() {
+    return this.innerIntp;
+  }
+
 }
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSQLInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSQLInterpreter.java
deleted file mode 100644
index 1ac3547..0000000
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSQLInterpreter.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink;
-
-
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-
-import java.util.Properties;
-
-public class FlinkSQLInterpreter extends Interpreter {
-
-  private FlinkSQLScalaInterpreter sqlScalaInterpreter;
-
-  public FlinkSQLInterpreter(Properties properties) {
-    super(properties);
-  }
-
-
-  @Override
-  public void open() throws InterpreterException {
-    FlinkInterpreter flinkInterpreter =
-        getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class);
-    FlinkZeppelinContext z = flinkInterpreter.getZeppelinContext();
-    int maxRow = Integer.parseInt(getProperty("zeppelin.flink.maxResult", "1000"));
-    this.sqlScalaInterpreter = new FlinkSQLScalaInterpreter(
-        flinkInterpreter.getInnerScalaInterpreter(), z, maxRow);
-  }
-
-  @Override
-  public void close() throws InterpreterException {
-
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context)
-      throws InterpreterException {
-    return sqlScalaInterpreter.interpret(st, context);
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) throws InterpreterException {
-
-  }
-
-  @Override
-  public FormType getFormType() throws InterpreterException {
-    return FormType.SIMPLE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) throws InterpreterException {
-    return 0;
-  }
-}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
new file mode 100644
index 0000000..704b3d0
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.scala.StreamTableEnvironment;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.factories.ComponentFactoryService;
+import org.apache.flink.table.planner.delegation.ExecutorBase;
+import org.apache.zeppelin.flink.sql.SqlCommandParser;
+import org.apache.zeppelin.flink.sql.SqlInfo;
+import org.apache.zeppelin.flink.sql.SqlLists;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.regex.Matcher;
+
+public abstract class FlinkSqlInterrpeter extends Interpreter {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(FlinkSqlInterrpeter.class);
+
+  protected FlinkInterpreter flinkInterpreter;
+  protected TableEnvironment tbenv;
+
+  public FlinkSqlInterrpeter(Properties properties) {
+    super(properties);
+  }
+
+  @Override
+  public void open() throws InterpreterException {
+    flinkInterpreter =
+            getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class);
+  }
+
+  @Override
+  public InterpreterResult interpret(String st,
+                                     InterpreterContext context) throws InterpreterException {
+    LOGGER.debug("Interpret code: " + st);
+    flinkInterpreter.getZeppelinContext().setInterpreterContext(context);
+    flinkInterpreter.getZeppelinContext().setNoteGui(context.getNoteGui());
+    flinkInterpreter.getZeppelinContext().setGui(context.getGui());
+
+    checkLocalProperties(context.getLocalProperties());
+
+    // set ClassLoader of current Thread to be the ClassLoader of Flink scala-shell,
+    // otherwise codegen will fail to find classes defined in scala-shell
+    ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader();
+    try {
+      Thread.currentThread().setContextClassLoader(flinkInterpreter.getFlinkScalaShellLoader());
+      return runSqlList(st, context);
+    } finally {
+      Thread.currentThread().setContextClassLoader(originClassLoader);
+    }
+  }
+
+
+  protected abstract void checkLocalProperties(Map<String, String> localProperties)
+          throws InterpreterException;
+
+  private Optional<SqlCommandParser.SqlCommandCall> parse(String stmt) {
+    // normalize
+    stmt = stmt.trim();
+    // remove ';' at the end
+    if (stmt.endsWith(";")) {
+      stmt = stmt.substring(0, stmt.length() - 1).trim();
+    }
+
+    // parse
+    for (SqlCommandParser.SqlCommand cmd : SqlCommandParser.SqlCommand.values()) {
+      final Matcher matcher = cmd.pattern.matcher(stmt);
+      if (matcher.matches()) {
+        final String[] groups = new String[matcher.groupCount()];
+        for (int i = 0; i < groups.length; i++) {
+          groups[i] = matcher.group(i + 1);
+        }
+        return cmd.operandConverter.apply(groups)
+                .map((operands) -> new SqlCommandParser.SqlCommandCall(cmd, operands));
+      }
+    }
+    return Optional.empty();
+  }
+
+  private InterpreterResult runSqlList(String sql, InterpreterContext context) {
+    List<SqlInfo> sqlLists = SqlLists.getSQLList(sql);
+    List<SqlCommandParser.SqlCommandCall> sqlCommands = new ArrayList<>();
+    for (SqlInfo sqlInfo : sqlLists) {
+      Optional<SqlCommandParser.SqlCommandCall> sqlCommand = parse(sqlInfo.getSqlContent());
+      if (!sqlCommand.isPresent()) {
+        return new InterpreterResult(InterpreterResult.Code.ERROR, "Invalid Sql statement: "
+                + sqlInfo.getSqlContent());
+      }
+      sqlCommands.add(sqlCommand.get());
+    }
+    for (SqlCommandParser.SqlCommandCall sqlCommand : sqlCommands) {
+      try {
+        callCommand(sqlCommand, context);
+        context.out.flush();
+      }  catch (Throwable e) {
+        LOGGER.error("Fail to run sql:" + sqlCommand.operands[0], e);
+        return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to run sql command: " +
+                sqlCommand.operands[0] + "\n" + ExceptionUtils.getStackTrace(e));
+      }
+    }
+    return new InterpreterResult(InterpreterResult.Code.SUCCESS);
+  }
+
+  private void callCommand(SqlCommandParser.SqlCommandCall cmdCall,
+                                        InterpreterContext context) throws Exception {
+    switch (cmdCall.command) {
+      case SHOW_CATALOGS:
+        callShowCatalogs(context);
+        break;
+      case SHOW_DATABASES:
+        callShowDatabases(context);
+        break;
+      case SHOW_TABLES:
+        callShowTables(context);
+        break;
+      case SHOW_FUNCTIONS:
+        callShowFunctions(context);
+        break;
+      case USE_DATABASE:
+        callUseDatabase(cmdCall.operands[0], context);
+        break;
+      case DESCRIBE:
+        callDescribe(cmdCall.operands[0], context);
+        break;
+      case EXPLAIN:
+        callExplain(cmdCall.operands[0], context);
+        break;
+      case SELECT:
+        callSelect(cmdCall.operands[0], context);
+        break;
+      case INSERT_INTO:
+        callInsertInto(cmdCall.operands[0], context);
+        break;
+      default:
+        throw new Exception("Unsupported command: " + cmdCall.command);
+    }
+  }
+
+  private void callShowCatalogs(InterpreterContext context) throws IOException {
+    String[] catalogs = this.tbenv.listCatalogs();
+    context.out.write("%table catalog\n" + StringUtils.join(catalogs, "\n") + "\n");
+  }
+
+  private void callShowDatabases(InterpreterContext context) throws IOException {
+    String[] databases = this.tbenv.listDatabases();
+    context.out.write(
+            "%table database\n" + StringUtils.join(databases, "\n") + "\n");
+  }
+
+  private void callShowTables(InterpreterContext context) throws IOException {
+    String[] tables = this.tbenv.listTables();
+    context.out.write(
+            "%table table\n" + StringUtils.join(tables, "\n") + "\n");
+  }
+
+  private void callShowFunctions(InterpreterContext context) throws IOException {
+    String[] functions = this.tbenv.listUserDefinedFunctions();
+    context.out.write(
+            "%table function\n" + StringUtils.join(functions, "\n") + "\n");
+  }
+
+  private void callUseDatabase(String databaseName,
+                               InterpreterContext context) throws IOException {
+    tbenv.useDatabase(databaseName);
+  }
+
+  private void callDescribe(String name, InterpreterContext context) throws IOException {
+    TableSchema schema = tbenv.scan(name).getSchema();
+    StringBuilder builder = new StringBuilder();
+    builder.append("Column\tType\n");
+    for (int i = 0; i < schema.getFieldCount(); ++i) {
+      builder.append(schema.getFieldName(i) + "\t" + schema.getFieldDataType(i) + "\n");
+    }
+    context.out.write(builder.toString());
+  }
+
+  private void callExplain(String sql, InterpreterContext context) throws IOException {
+    Table table = this.tbenv.sqlQuery(sql);
+    context.out.write(this.tbenv.explain(table) + "\n");
+  }
+
+  public abstract void callSelect(String sql, InterpreterContext context) throws IOException;
+
+  private void callInsertInto(String sql,
+                              InterpreterContext context) throws IOException {
+
+    this.tbenv.sqlUpdate(sql);
+
+    JobGraph jobGraph = createJobGraph(sql);
+    jobGraph.addJar(new Path(flinkInterpreter.getInnerIntp().getFlinkILoop()
+            .writeFilesToDisk().getAbsoluteFile().toURI()));
+    SqlJobRunner jobRunner =
+            new SqlJobRunner(flinkInterpreter.getInnerIntp().getCluster(), jobGraph, sql,
+                    flinkInterpreter.getFlinkScalaShellLoader());
+    jobRunner.run();
+    context.out.write("Insert Succeeded.\n");
+  }
+
+  private FlinkPlan createPlan(String name, Configuration flinkConfig) {
+    if (this.tbenv instanceof StreamTableEnvironment) {
+      if (flinkInterpreter.getInnerIntp().getPlanner() == "blink") {
+        Executor executor = lookupExecutor(
+                flinkInterpreter.getInnerIntp().getStEnvSetting().toExecutorProperties(),
+                flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv());
+        // special case for Blink planner to apply batch optimizations
+        // note: it also modifies the ExecutionConfig!
+        if (executor instanceof ExecutorBase) {
+          return ((ExecutorBase) executor).generateStreamGraph(name);
+        }
+      }
+      return flinkInterpreter.getStreamExecutionEnvironment().getStreamGraph();
+    } else {
+      final int parallelism = flinkInterpreter.getExecutionEnvironment().getParallelism();
+      final Plan unoptimizedPlan =
+              flinkInterpreter.getExecutionEnvironment().createProgramPlan(name);
+      unoptimizedPlan.setJobName(name);
+      final Optimizer compiler =
+              new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig);
+      return ClusterClient.getOptimizedPlan(compiler, unoptimizedPlan, parallelism);
+    }
+  }
+
+  public JobGraph createJobGraph(String name) {
+    final FlinkPlan plan = createPlan(name, flinkInterpreter.getFlinkConfiguration());
+    return ClusterClient.getJobGraph(
+            flinkInterpreter.getFlinkConfiguration(),
+            plan,
+            new ArrayList<>(),
+            new ArrayList<>(),
+            SavepointRestoreSettings.none());
+  }
+
+  private static Executor lookupExecutor(
+          Map<String, String> executorProperties,
+          StreamExecutionEnvironment executionEnvironment) {
+    try {
+      ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class,
+              executorProperties);
+      Method createMethod = executorFactory.getClass()
+              .getMethod("create", Map.class, StreamExecutionEnvironment.class);
+
+      return (Executor) createMethod.invoke(
+              executorFactory,
+              executorProperties,
+              executionEnvironment);
+    } catch (Exception e) {
+      throw new TableException(
+              "Could not instantiate the executor. Make sure a planner module is on the classpath",
+              e);
+    }
+  }
+
+}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
new file mode 100644
index 0000000..b68d025
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.apache.zeppelin.flink.sql.RetractStreamSqlJob;
+import org.apache.zeppelin.flink.sql.SingleRowStreamSqlJob;
+import org.apache.zeppelin.flink.sql.TimeSeriesStreamSqlJob;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+public class FlinkStreamSqlInterpreter extends FlinkSqlInterrpeter {
+
+  public FlinkStreamSqlInterpreter(Properties properties) {
+    super(properties);
+  }
+
+
+  @Override
+  public void open() throws InterpreterException {
+    this.flinkInterpreter =
+            getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class);
+    this.tbenv = flinkInterpreter.getStreamTableEnvironment();
+  }
+
+  @Override
+  public void close() throws InterpreterException {
+
+  }
+
+  @Override
+  protected void checkLocalProperties(Map<String, String> localProperties)
+          throws InterpreterException {
+
+  }
+
+  @Override
+  public void callSelect(String sql, InterpreterContext context) throws IOException {
+    String streamType = context.getLocalProperties().get("type");
+    if (streamType == null) {
+      throw new IOException("type must be specified for stream sql");
+    }
+    if (streamType.equalsIgnoreCase("single")) {
+      SingleRowStreamSqlJob streamJob = new SingleRowStreamSqlJob(
+              flinkInterpreter.getStreamExecutionEnvironment(),
+              flinkInterpreter.getStreamTableEnvironment(), context,
+              flinkInterpreter.getDefaultParallelism());
+      streamJob.run(sql);
+    } else if (streamType.equalsIgnoreCase("ts")) {
+      TimeSeriesStreamSqlJob streamJob = new TimeSeriesStreamSqlJob(
+              flinkInterpreter.getStreamExecutionEnvironment(),
+              flinkInterpreter.getStreamTableEnvironment(), context,
+              flinkInterpreter.getDefaultParallelism());
+      streamJob.run(sql);
+    } else if (streamType.equalsIgnoreCase("retract")) {
+      RetractStreamSqlJob streamJob = new RetractStreamSqlJob(
+              flinkInterpreter.getStreamExecutionEnvironment(),
+              flinkInterpreter.getStreamTableEnvironment(), context,
+              flinkInterpreter.getDefaultParallelism());
+      streamJob.run(sql);
+    } else {
+      throw new IOException("Unrecognized stream type: " + streamType);
+    }
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) throws InterpreterException {
+    this.flinkInterpreter.getZeppelinContext().setInterpreterContext(context);
+    this.flinkInterpreter.getZeppelinContext().setNoteGui(context.getNoteGui());
+    this.flinkInterpreter.getZeppelinContext().setGui(context.getGui());
+    this.flinkInterpreter.getJobManager().cancelJob(context);
+  }
+
+  @Override
+  public Interpreter.FormType getFormType() throws InterpreterException {
+    return Interpreter.FormType.SIMPLE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) throws InterpreterException {
+    return 0;
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    int maxConcurrency = Integer.parseInt(
+            getProperty("zeppelin.flink.concurrentStreamSql.max", "10"));
+    return SchedulerFactory.singleton().createOrGetParallelScheduler(
+            FlinkStreamSqlInterpreter.class.getName() + this.hashCode(), maxConcurrency);
+  }
+}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
new file mode 100644
index 0000000..dee6328
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.apache.zeppelin.interpreter.BaseZeppelinContext;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.python.IPythonInterpreter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * IPyFlinkInterpreter which use IPython underlying.
+ */
+public class IPyFlinkInterpreter extends IPythonInterpreter {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(IPyFlinkInterpreter.class);
+
+  private FlinkInterpreter flinkInterpreter;
+
+  public IPyFlinkInterpreter(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() throws InterpreterException {
+    FlinkInterpreter pyFlinkInterpreter =
+        getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class, false);
+    setProperty("zeppelin.python",
+            pyFlinkInterpreter.getProperty("zeppelin.pyflink.python", "python"));
+    flinkInterpreter = getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class);
+    setAdditionalPythonInitFile("python/zeppelin_ipyflink.py");
+    super.open();
+  }
+
+  @Override
+  public BaseZeppelinContext buildZeppelinContext() {
+    return flinkInterpreter.getZeppelinContext();
+  }
+
+  @Override
+  protected Map<String, String> setupIPythonEnv() throws IOException {
+    Map<String, String> envs = super.setupIPythonEnv();
+    String pythonPath = envs.getOrDefault("PYTHONPATH", "");
+    String pyflinkPythonPath = PyFlinkInterpreter.getPyFlinkPythonPath(properties);
+    envs.put("PYTHONPATH", pythonPath + ":" + pyflinkPythonPath);
+    return envs;
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) throws InterpreterException {
+    super.cancel(context);
+    flinkInterpreter.cancel(context);
+  }
+
+  @Override
+  public void close() throws InterpreterException {
+    LOGGER.info("Close IPyFlinkInterpreter");
+    super.close();
+    if (flinkInterpreter != null) {
+      flinkInterpreter.close();
+    }
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) throws InterpreterException {
+    return flinkInterpreter.getProgress(context);
+  }
+
+  public org.apache.flink.api.java.ExecutionEnvironment getJavaExecutionEnvironment() {
+    return flinkInterpreter.getExecutionEnvironment().getJavaEnv();
+  }
+
+  public org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+      getJavaStreamExecutionEnvironment() {
+    return flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv();
+  }
+}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java
new file mode 100644
index 0000000..33975c0
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import com.mashape.unirest.http.JsonNode;
+import com.mashape.unirest.http.Unirest;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.scala.ExecutionEnvironment;
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class JobManager {
+
+  private static Logger LOGGER = LoggerFactory.getLogger(JobManager.class);
+
+  private Map<String, JobID> jobs = new HashMap<>();
+  private Map<String, String> savePointMap = new HashMap<>();
+  private ConcurrentHashMap<JobID, FlinkJobProgressPoller> jobProgressPollerMap =
+          new ConcurrentHashMap<>();
+  private ExecutionEnvironment env;
+  private StreamExecutionEnvironment senv;
+  private FlinkZeppelinContext z;
+  private String flinkWebUI;
+
+  public JobManager(ExecutionEnvironment env,
+                    StreamExecutionEnvironment senv,
+                    FlinkZeppelinContext z,
+                    String flinkWebUI) {
+    this.env = env;
+    this.senv = senv;
+    this.z = z;
+    this.flinkWebUI = flinkWebUI;
+  }
+
+  public void addJob(String paragraphId, JobID jobId) {
+    JobID previousJobId = this.jobs.put(paragraphId, jobId);
+    FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUI, jobId);
+    thread.start();
+    this.jobProgressPollerMap.put(jobId, thread);
+    if (previousJobId != null) {
+      LOGGER.warn("There's another Job {} that is associated with paragraph {}",
+              jobId, paragraphId);
+    }
+  }
+
+  public void removeJob(String paragraphId) {
+    JobID jobID = this.jobs.remove(paragraphId);
+    if (jobID == null) {
+      LOGGER.warn("Unable to remove job, because no job is associated with paragraph: "
+              + paragraphId);
+      return;
+    }
+    FlinkJobProgressPoller jobProgressPoller = this.jobProgressPollerMap.remove(jobID);
+    jobProgressPoller.cancel();
+  }
+
+  public int getJobProgress(String paragraphId) {
+    JobID jobId = this.jobs.get(paragraphId);
+    if (jobId == null) {
+      LOGGER.warn("Unable to get job progress for paragraph: " + paragraphId +
+              ", because no job is associated with this paragraph");
+      return 0;
+    }
+    FlinkJobProgressPoller jobProgressPoller = this.jobProgressPollerMap.get(jobId);
+    if (jobProgressPoller == null) {
+      LOGGER.warn("Unable to get job progress for paragraph: " + paragraphId +
+              ", because no job progress is associated with this jobId: " + jobId);
+      return 0;
+    }
+    return jobProgressPoller.getProgress();
+  }
+
+  public void cancelJob(InterpreterContext context) throws InterpreterException {
+    JobID jobId = this.jobs.remove(context.getParagraphId());
+    if (jobId == null) {
+      LOGGER.warn("Unable to remove Job from paragraph {}", context.getParagraphId());
+      return;
+    }
+
+    try {
+      //this.env.cancel(jobId);
+    } catch (Exception e) {
+      String errorMessage = String.format("Fail to cancel job %s that is associated " +
+              "with paragraph %s", jobId, context.getParagraphId());
+      LOGGER.warn(errorMessage, e);
+      throw new InterpreterException(errorMessage, e);
+    }
+
+    FlinkJobProgressPoller jobProgressPoller = jobProgressPollerMap.remove(jobId);
+    jobProgressPoller.interrupt();
+  }
+
+  class FlinkJobProgressPoller extends Thread {
+
+    private String flinkWebUI;
+    private JobID jobId;
+    private int progress;
+    private AtomicBoolean running = new AtomicBoolean(true);
+
+    FlinkJobProgressPoller(String flinkWebUI, JobID jobId) {
+      this.flinkWebUI = flinkWebUI;
+      this.jobId = jobId;
+    }
+
+    @Override
+    public void run() {
+      try {
+        while (!Thread.currentThread().isInterrupted() && running.get()) {
+          JsonNode rootNode = Unirest.get(flinkWebUI + "/jobs/" + jobId.toString())
+                  .asJson().getBody();
+          JSONArray vertices = rootNode.getObject().getJSONArray("vertices");
+          int totalTasks = 0;
+          int finishedTasks = 0;
+          for (int i = 0; i < vertices.length(); ++i) {
+            JSONObject vertex = vertices.getJSONObject(i);
+            totalTasks += vertex.getInt("parallelism");
+            finishedTasks += vertex.getJSONObject("tasks").getInt("FINISHED");
+          }
+          LOGGER.debug("Total tasks:" + totalTasks);
+          LOGGER.debug("Finished tasks:" + finishedTasks);
+          if (finishedTasks != 0) {
+            this.progress = finishedTasks * 100 / totalTasks;
+          }
+          String jobState = rootNode.getObject().getString("state");
+          if (jobState.equalsIgnoreCase("finished")) {
+            break;
+          }
+          synchronized (running) {
+            running.wait(1000);
+          }
+        }
+      } catch (Exception e) {
+        LOGGER.error("Fail to poll flink job progress via rest api", e);
+      }
+    }
+
+    public void cancel() {
+      this.running.set(false);
+      synchronized (running) {
+        running.notify();
+      }
+    }
+
+    public int getProgress() {
+      return progress;
+    }
+  }
+}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
new file mode 100644
index 0000000..1e1ce62
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.apache.zeppelin.interpreter.BaseZeppelinContext;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.python.IPythonInterpreter;
+import org.apache.zeppelin.python.PythonInterpreter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class PyFlinkInterpreter extends PythonInterpreter {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PyFlinkInterpreter.class);
+
+  private FlinkInterpreter flinkInterpreter;
+
+  public PyFlinkInterpreter(Properties properties) {
+    super(properties);
+  }
+
+  @Override
+  public void open() throws InterpreterException {
+    this.flinkInterpreter = getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class);
+
+    setProperty("zeppelin.python.useIPython", getProperty("zeppelin.pyflink.useIPython", "true"));
+    URL[] urls = new URL[0];
+    List<URL> urlList = new LinkedList<>();
+    String localRepo = getProperty("zeppelin.interpreter.localRepo");
+    if (localRepo != null) {
+      File localRepoDir = new File(localRepo);
+      if (localRepoDir.exists()) {
+        File[] files = localRepoDir.listFiles();
+        if (files != null) {
+          for (File f : files) {
+            try {
+              urlList.add(f.toURI().toURL());
+            } catch (MalformedURLException e) {
+              LOGGER.error("Error", e);
+            }
+          }
+        }
+      }
+    }
+
+    urls = urlList.toArray(urls);
+    ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
+    try {
+      URLClassLoader newCl = new URLClassLoader(urls, oldCl);
+      Thread.currentThread().setContextClassLoader(newCl);
+      // must create flink interpreter after ClassLoader is set, otherwise the additional jars
+      // can not be loaded by flink repl.
+      this.flinkInterpreter = getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class);
+      // create Python Process and JVM gateway
+      super.open();
+    } finally {
+      Thread.currentThread().setContextClassLoader(oldCl);
+    }
+
+    if (!useIPython()) {
+      // Initialize Flink in Python Process
+      try {
+        bootstrapInterpreter("python/zeppelin_pyflink.py");
+      } catch (IOException e) {
+        throw new InterpreterException("Fail to bootstrap pyflink", e);
+      }
+    }
+  }
+
+  @Override
+  protected Map<String, String> setupPythonEnv() throws IOException {
+    Map<String, String> envs = super.setupPythonEnv();
+    String pythonPath = envs.getOrDefault("PYTHONPATH", "");
+    String pyflinkPythonPath = getPyFlinkPythonPath(properties);
+    envs.put("PYTHONPATH", pythonPath + ":" + pyflinkPythonPath);
+    return envs;
+  }
+
+  public static String getPyFlinkPythonPath(Properties properties) throws IOException {
+    String flinkHome = System.getenv("FLINK_HOME");
+    boolean isTest = Boolean.parseBoolean(properties.getProperty("zeppelin.flink.test", "false"));
+    if (isTest) {
+      return "";
+    }
+    if (flinkHome != null) {
+      File pythonFolder = new File(flinkHome + "/opt/python");
+      StringBuilder builder = new StringBuilder();
+      for (File file : pythonFolder.listFiles()) {
+        if (file.getName().endsWith(".zip")) {
+          builder.append(file.getAbsolutePath() + ":");
+        }
+      }
+      return builder.toString();
+    } else {
+      throw new IOException("No FLINK_HOME is specified");
+    }
+  }
+
+  @Override
+  protected IPythonInterpreter getIPythonInterpreter() throws InterpreterException {
+    return getInterpreterInTheSameSessionByClassName(IPyFlinkInterpreter.class, false);
+  }
+
+  @Override
+  public void close() throws InterpreterException {
+    super.close();
+    if (flinkInterpreter != null) {
+      flinkInterpreter.close();
+    }
+  }
+
+  @Override
+  public BaseZeppelinContext getZeppelinContext() {
+    return flinkInterpreter.getZeppelinContext();
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) throws InterpreterException {
+    return 0;
+  }
+
+  public org.apache.flink.api.java.ExecutionEnvironment getJavaExecutionEnvironment() {
+    return flinkInterpreter.getExecutionEnvironment().getJavaEnv();
+  }
+
+  public org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+      getJavaStreamExecutionEnvironment() {
+    return flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv();
+  }
+
+}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
new file mode 100644
index 0000000..8ebbfb5
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.sql;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
+import org.apache.flink.streaming.experimental.SocketStreamIterator;
+import org.apache.flink.table.api.StreamQueryConfig;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.scala.StreamTableEnvironment;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.types.Row;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+/**
+ * Abstract class for all kinds of stream sql job
+ *
+ */
+public abstract class AbstractStreamSqlJob {
+  private static Logger LOGGER = LoggerFactory.getLogger(AbstractStreamSqlJob.class);
+
+  protected StreamExecutionEnvironment senv;
+  protected StreamTableEnvironment stenv;
+  protected InterpreterContext context;
+  protected TableSchema schema;
+  protected SocketStreamIterator<Tuple2<Boolean, Row>> iterator;
+  protected Object resultLock = new Object();
+  protected int defaultParallelism;
+
+  public AbstractStreamSqlJob(StreamExecutionEnvironment senv,
+                              StreamTableEnvironment stenv,
+                              InterpreterContext context,
+                              int defaultParallelism) {
+    this.senv = senv;
+    this.stenv = stenv;
+    this.context = context;
+    this.defaultParallelism = defaultParallelism;
+  }
+
+  private static TableSchema removeTimeAttributes(TableSchema schema) {
+    final TableSchema.Builder builder = TableSchema.builder();
+    for (int i = 0; i < schema.getFieldCount(); i++) {
+      final TypeInformation<?> type = schema.getFieldTypes()[i];
+      final TypeInformation<?> convertedType;
+      if (FlinkTypeFactory.isTimeIndicatorType(type)) {
+        convertedType = Types.SQL_TIMESTAMP;
+      } else {
+        convertedType = type;
+      }
+      builder.field(schema.getFieldNames()[i], convertedType);
+    }
+    return builder.build();
+  }
+
+  protected abstract String getType();
+
+  public InterpreterResult run(String st) throws IOException {
+    try {
+      checkLocalProperties(context.getLocalProperties());
+
+      int parallelism = Integer.parseInt(context.getLocalProperties()
+              .getOrDefault("parallelism", defaultParallelism + ""));
+
+      Table table = stenv.sqlQuery(st);
+      this.schema = removeTimeAttributes(table.getSchema());
+      checkTableSchema(schema);
+
+      LOGGER.info("ResultTable Schema: " + this.schema);
+      final RowTypeInfo outputType = new RowTypeInfo(schema.getFieldTypes(),
+              schema.getFieldNames());
+
+      // create socket stream iterator
+      TypeInformation<Tuple2<Boolean, Row>> socketType = Types.TUPLE(Types.BOOLEAN, outputType);
+      TypeSerializer<Tuple2<Boolean, Row>> serializer =
+              socketType.createSerializer(senv.getConfig());
+
+      // pass gateway port and address such that iterator knows where to bind to
+      iterator = new SocketStreamIterator<>(0,
+              InetAddress.getByName(RemoteInterpreterUtils.findAvailableHostAddress()),
+              serializer);
+      // create table sink
+      // pass binding address and port such that sink knows where to send to
+      LOGGER.debug("Collecting data at address: " + iterator.getBindAddress() +
+              ":" + iterator.getPort());
+      CollectStreamTableSink collectTableSink =
+              new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer);
+      collectTableSink = collectTableSink.configure(
+              outputType.getFieldNames(), outputType.getFieldTypes());
+
+      // workaround, otherwise it won't find the sink properly
+      String originalCatalog = stenv.getCurrentCatalog();
+      String originalDatabase = stenv.getCurrentDatabase();
+      try {
+        stenv.useCatalog("default_catalog");
+        stenv.useDatabase("default_database");
+        stenv.registerTableSink(st, collectTableSink);
+        table.insertInto(new StreamQueryConfig(), st);
+      } finally {
+        stenv.useCatalog(originalCatalog);
+        stenv.useDatabase(originalDatabase);
+      }
+
+      ScheduledExecutorService refreshScheduler = Executors.newScheduledThreadPool(1);
+      long delay = 1000L;
+      long period = Long.parseLong(
+              context.getLocalProperties().getOrDefault("refreshInterval", "3000"));
+      refreshScheduler.scheduleAtFixedRate(new RefreshTask(context), delay, period, MILLISECONDS);
+
+      ResultRetrievalThread retrievalThread = new ResultRetrievalThread(refreshScheduler);
+      retrievalThread.start();
+
+      LOGGER.info("Run job without savePointPath, " + ", parallelism: " + parallelism);
+      JobExecutionResult jobExecutionResult = stenv.execute(st);
+      LOGGER.info("Flink Job is finished");
+      return new InterpreterResult(InterpreterResult.Code.SUCCESS);
+    } catch (Exception e) {
+      LOGGER.error("Fail to run stream sql job", e);
+      throw new IOException("Fail to run stream sql job", e);
+    }
+  }
+
+  protected void checkTableSchema(TableSchema schema) throws Exception {
+  }
+
+  protected void checkLocalProperties(Map<String, String> localProperties) throws Exception {
+    List<String> validLocalProperties = getValidLocalProperties();
+    for (String key : localProperties.keySet()) {
+      if (!validLocalProperties.contains(key)) {
+        throw new Exception("Invalid property: " + key + ", Only the following properties " +
+                "are valid for stream type '" + getType() + "': " + validLocalProperties);
+      }
+    }
+  };
+
+  protected abstract List<String> getValidLocalProperties();
+
+  protected void processRecord(Tuple2<Boolean, Row> change) {
+    synchronized (resultLock) {
+      // insert
+      if (change.f0) {
+        processInsert(change.f1);
+      }
+      // delete
+      else {
+        processDelete(change.f1);
+      }
+    }
+  }
+
+  protected abstract void processInsert(Row row);
+
+  protected abstract void processDelete(Row row);
+
+  private class ResultRetrievalThread extends Thread {
+
+    private ScheduledExecutorService refreshExecutorService;
+    volatile boolean isRunning = true;
+
+    ResultRetrievalThread(ScheduledExecutorService refreshExecutorService) {
+      this.refreshExecutorService = refreshExecutorService;
+    }
+
+    @Override
+    public void run() {
+      try {
+        while (isRunning && iterator.hasNext()) {
+          final Tuple2<Boolean, Row> change = iterator.next();
+          processRecord(change);
+        }
+      } catch (Exception e) {
+        // ignore socket exceptions
+        LOGGER.error("Fail to process record", e);
+      }
+
+      // no result anymore
+      // either the job is done or an error occurred
+      isRunning = false;
+      LOGGER.info("ResultRetrieval Thread is done");
+      refreshExecutorService.shutdown();
+    }
+
+    public void cancel() {
+      isRunning = false;
+    }
+  }
+
+  protected abstract void refresh(InterpreterContext context) throws Exception;
+
+
+  private class RefreshTask implements Runnable {
+
+    private InterpreterContext context;
+
+    RefreshTask(InterpreterContext context) {
+      this.context = context;
+    }
+
+    @Override
+    public void run() {
+      try {
+        synchronized (resultLock) {
+          refresh(context);
+        }
+      } catch (Exception e) {
+        LOGGER.error("Fail to refresh task", e);
+      }
+    }
+  }
+}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/CollectStreamTableSink.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/CollectStreamTableSink.java
new file mode 100644
index 0000000..843f3db
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/CollectStreamTableSink.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.sql;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.experimental.CollectSink;
+import org.apache.flink.table.sinks.RetractStreamTableSink;
+import org.apache.flink.types.Row;
+
+import java.net.InetAddress;
+
+/**
+ * Table sink for collecting the results locally using sockets.
+ */
+public class CollectStreamTableSink implements RetractStreamTableSink<Row> {
+
+  private final InetAddress targetAddress;
+  private final int targetPort;
+  private final TypeSerializer<Tuple2<Boolean, Row>> serializer;
+
+  private String[] fieldNames;
+  private TypeInformation<?>[] fieldTypes;
+
+  public CollectStreamTableSink(InetAddress targetAddress,
+                                int targetPort,
+                                TypeSerializer<Tuple2<Boolean, Row>> serializer) {
+    this.targetAddress = targetAddress;
+    this.targetPort = targetPort;
+    this.serializer = serializer;
+  }
+
+  @Override
+  public String[] getFieldNames() {
+    return fieldNames;
+  }
+
+  @Override
+  public TypeInformation<?>[] getFieldTypes() {
+    return fieldTypes;
+  }
+
+  @Override
+  public CollectStreamTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+    final CollectStreamTableSink copy =
+            new CollectStreamTableSink(targetAddress, targetPort, serializer);
+    copy.fieldNames = fieldNames;
+    copy.fieldTypes = fieldTypes;
+    return copy;
+  }
+
+  @Override
+  public TypeInformation<Row> getRecordType() {
+    return Types.ROW_NAMED(fieldNames, fieldTypes);
+  }
+
+  @Override
+  public void emitDataStream(DataStream<Tuple2<Boolean, Row>> stream) {
+    consumeDataStream(stream);
+  }
+
+  @Override
+  public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> stream) {
+    // add sink
+    return stream
+            .addSink(new CollectSink<>(targetAddress, targetPort, serializer))
+            .name("Zeppelin Flink Sql Stream Collect Sink")
+            .setParallelism(1);
+  }
+
+  @Override
+  public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
+    return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
+  }
+}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/RetractStreamSqlJob.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/RetractStreamSqlJob.java
new file mode 100644
index 0000000..44ee37a
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/RetractStreamSqlJob.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.sql;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
+import org.apache.flink.table.api.scala.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RetractStreamSqlJob extends AbstractStreamSqlJob {
+
+  private static Logger LOGGER = LoggerFactory.getLogger(RetractStreamSqlJob.class);
+
+  private List<Row> materializedTable = new ArrayList<>();
+
+
+  public RetractStreamSqlJob(StreamExecutionEnvironment senv,
+                             StreamTableEnvironment stEnv,
+                             InterpreterContext context,
+                             int defaultParallelism) {
+    super(senv, stEnv, context, defaultParallelism);
+  }
+
+  @Override
+  protected String getType() {
+    return "retract";
+  }
+
+  @Override
+  protected List<String> getValidLocalProperties() {
+    return Lists.newArrayList("type", "parallelism",
+            "refreshInterval", "enableSavePoint", "runWithSavePoint");
+  }
+
+  protected void processInsert(Row row) {
+    LOGGER.debug("processInsert: " + row.toString());
+    materializedTable.add(row);
+  }
+
+  protected void processDelete(Row row) {
+    LOGGER.debug("processDelete: " + row.toString());
+    for (int i = 0; i < materializedTable.size(); i++) {
+      if (materializedTable.get(i).equals(row)) {
+        materializedTable.remove(i);
+        break;
+      }
+    }
+  }
+
+  @Override
+  protected void refresh(InterpreterContext context) {
+    context.out().clear();
+    try {
+      context.out.write("%table\n");
+      for (int i = 0; i < schema.getFieldCount(); ++i) {
+        String field = schema.getFieldName(i).get();
+        context.out.write(field);
+        if (i != (schema.getFieldCount() - 1)) {
+          context.out.write("\t");
+        }
+      }
+      context.out.write("\n");
+      LOGGER.debug("*****************Row size: " + materializedTable.size());
+      // sort it by the first column
+      materializedTable.sort((r1, r2) -> {
+        String f1 = r1.getField(0).toString();
+        String f2 = r2.getField(0).toString();
+        return f1.compareTo(f2);
+      });
+      for (Row row : materializedTable) {
+        for (int i = 0; i < row.getArity(); ++i) {
+          Object field = row.getField(i);
+          context.out.write(field.toString());
+          if (i != (row.getArity() - 1)) {
+            context.out.write("\t");
+          }
+        }
+        context.out.write("\n");
+      }
+      context.out.flush();
+    } catch (IOException e) {
+      LOGGER.error("Fail to refresh data", e);
+    }
+  }
+}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
new file mode 100644
index 0000000..a1abea6
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.sql;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
+import org.apache.flink.table.api.scala.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class SingleRowStreamSqlJob extends AbstractStreamSqlJob {
+
+  private static Logger LOGGER = LoggerFactory.getLogger(SingleRowStreamSqlJob.class);
+
+  private Row latestRow;
+  private String template;
+
+  public SingleRowStreamSqlJob(StreamExecutionEnvironment senv,
+                               StreamTableEnvironment stenv,
+                               InterpreterContext context,
+                               int defaultParallelism) {
+    super(senv, stenv, context, defaultParallelism);
+    this.template = context.getLocalProperties().getOrDefault("template", "{0}");
+  }
+
+  @Override
+  protected String getType() {
+    return "single";
+  }
+
+  @Override
+  protected List<String> getValidLocalProperties() {
+    return Lists.newArrayList("type", "parallelism",
+            "refreshInterval", "template", "enableSavePoint", "runWithSavePoint");
+  }
+
+  protected void processInsert(Row row) {
+    LOGGER.debug("processInsert: " + row.toString());
+    latestRow = row;
+  }
+
+  @Override
+  protected void processDelete(Row row) {
+    LOGGER.debug("Ignore delete");
+  }
+
+  @Override
+  protected void refresh(InterpreterContext context) throws Exception {
+    if (latestRow == null) {
+      LOGGER.warn("Skip RefreshTask as no data available");
+      return;
+    }
+    context.out().clear();
+    context.out.write("%html\n");
+    String outputText = template;
+    for (int i = 0; i < latestRow.getArity(); ++i) {
+      outputText = outputText.replace("{" + i + "}", latestRow.getField(i).toString());
+    }
+    LOGGER.debug("SingleRow Output: " + outputText);
+    context.out.write(outputText);
+    context.out.flush();
+  }
+}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java
new file mode 100644
index 0000000..c17633a
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.sql;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Simple parser for determining the type of command and its parameters.
+ */
+public final class SqlCommandParser {
+
+  private SqlCommandParser() {
+    // private
+  }
+
+  public static Optional<SqlCommandCall> parse(String stmt) {
+    // normalize
+    stmt = stmt.trim();
+    // remove ';' at the end
+    if (stmt.endsWith(";")) {
+      stmt = stmt.substring(0, stmt.length() - 1).trim();
+    }
+
+    // parse
+    for (SqlCommand cmd : SqlCommand.values()) {
+      final Matcher matcher = cmd.pattern.matcher(stmt);
+      if (matcher.matches()) {
+        final String[] groups = new String[matcher.groupCount()];
+        for (int i = 0; i < groups.length; i++) {
+          groups[i] = matcher.group(i + 1);
+        }
+        return cmd.operandConverter.apply(groups)
+                .map((operands) -> new SqlCommandCall(cmd, operands));
+      }
+    }
+    return Optional.empty();
+  }
+
+  // --------------------------------------------------------------------------------------------
+
+  private static final Function<String[], Optional<String[]>> NO_OPERANDS =
+      (operands) -> Optional.of(new String[0]);
+
+  private static final Function<String[], Optional<String[]>> SINGLE_OPERAND =
+      (operands) -> Optional.of(new String[]{operands[0]});
+
+  private static final int DEFAULT_PATTERN_FLAGS = Pattern.CASE_INSENSITIVE | Pattern.DOTALL;
+
+  /**
+   * Supported SQL commands.
+   */
+  public enum SqlCommand {
+    QUIT(
+            "(QUIT|EXIT)",
+            NO_OPERANDS),
+
+    CLEAR(
+            "CLEAR",
+            NO_OPERANDS),
+
+    HELP(
+            "HELP",
+            NO_OPERANDS),
+
+    SHOW_CATALOGS(
+            "SHOW\\s+CATALOGS",
+            NO_OPERANDS),
+
+    SHOW_DATABASES(
+            "SHOW\\s+DATABASES",
+            NO_OPERANDS),
+
+    SHOW_TABLES(
+            "SHOW\\s+TABLES",
+            NO_OPERANDS),
+
+    SHOW_FUNCTIONS(
+            "SHOW\\s+FUNCTIONS",
+            NO_OPERANDS),
+
+    USE_CATALOG(
+            "USE\\s+CATALOG\\s+(.*)",
+            SINGLE_OPERAND),
+
+    USE_DATABASE(
+            "USE\\s+DATABASE\\s+(.*)",
+            SINGLE_OPERAND),
+
+    DESCRIBE(
+            "DESCRIBE\\s+(.*)",
+            SINGLE_OPERAND),
+
+    EXPLAIN(
+            "EXPLAIN\\s+(.*)",
+            SINGLE_OPERAND),
+
+    SELECT(
+            "(SELECT.*)",
+            SINGLE_OPERAND),
+
+    INSERT_INTO(
+            "(INSERT\\s+INTO.*)",
+            SINGLE_OPERAND),
+
+    CREATE_VIEW("CREATE\\s+VIEW\\s+(\\S+)\\s+AS\\s+(.*)",
+        (operands) -> {
+          if (operands.length < 2) {
+            return Optional.empty();
+          }
+          return Optional.of(new String[]{operands[0], operands[1]});
+        }),
+
+    DROP_VIEW("DROP\\s+VIEW\\s+(.*)",
+            SINGLE_OPERAND),
+
+    SET("SET(\\s+(\\S+)\\s*=(.*))?", // whitespace is only ignored on the left side of '='
+        (operands) -> {
+          if (operands.length < 3) {
+            return Optional.empty();
+          } else if (operands[0] == null) {
+            return Optional.of(new String[0]);
+          }
+          return Optional.of(new String[]{operands[1], operands[2]});
+        }),
+
+    RESET(
+            "RESET",
+            NO_OPERANDS),
+
+    SOURCE(
+            "SOURCE\\s+(.*)",
+            SINGLE_OPERAND);
+
+    public final Pattern pattern;
+    public final Function<String[], Optional<String[]>> operandConverter;
+
+    SqlCommand(String matchingRegex, Function<String[], Optional<String[]>> operandConverter) {
+      this.pattern = Pattern.compile(matchingRegex, DEFAULT_PATTERN_FLAGS);
+      this.operandConverter = operandConverter;
+    }
+
+    @Override
+    public String toString() {
+      return super.toString().replace('_', ' ');
+    }
+
+    public boolean hasOperands() {
+      return operandConverter != NO_OPERANDS;
+    }
+  }
+
+  /**
+   * Call of SQL command with operands and command type.
+   */
+  public static class SqlCommandCall {
+    public final SqlCommand command;
+    public final String[] operands;
+
+    public SqlCommandCall(SqlCommand command, String[] operands) {
+      this.command = command;
+      this.operands = operands;
+    }
+
+    public SqlCommandCall(SqlCommand command) {
+      this(command, new String[0]);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      SqlCommandCall that = (SqlCommandCall) o;
+      return command == that.command && Arrays.equals(operands, that.operands);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = Objects.hash(command);
+      result = 31 * result + Arrays.hashCode(operands);
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      return command + "(" + Arrays.toString(operands) + ")";
+    }
+  }
+}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/SqlInfo.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/SqlInfo.java
new file mode 100644
index 0000000..0363aa2
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/SqlInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.sql;
+
+/**
+ * A wrapper of a sql statement and its line info.
+ */
+public class SqlInfo {
+
+  private String sqlContent;
+
+  private int line;
+
+  private int firstLineIndex;
+
+  public String getSqlContent() {
+    return sqlContent;
+  }
+
+  public void setSqlContent(String sqlContent) {
+    this.sqlContent = sqlContent;
+  }
+
+  public void setLine(int line) {
+    this.line = line;
+  }
+
+  public void setFirstLineIndex(int firstLineIndex) {
+    this.firstLineIndex = firstLineIndex;
+  }
+
+  @Override
+  public String toString() {
+    return "Sqlcontent => " + sqlContent + "\nSql start line num => "
+            + line + "\n First line index =>" + firstLineIndex;
+  }
+}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/SqlLists.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/SqlLists.java
new file mode 100644
index 0000000..5b4fafd
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/SqlLists.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.sql;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ *
+ */
+public class SqlLists {
+
+  private static final Pattern PATTERN_STATEMENT = Pattern.compile("[^\\\\];");
+  private static final Pattern PATTERN_STRING = Pattern.compile("(\"|')([^\"^']*)(\"|')");
+  private static final Pattern PATTERN_SINGLE_LINE = Pattern.compile("--.*");
+  private static final Pattern PATTERN_MULTI_LINE = Pattern.compile("/\\*.*?\\*/", Pattern.DOTALL);
+
+  public static List<SqlInfo> getSQLList(String context) {
+    Map<Integer, Integer> enterMap = new TreeMap<Integer, Integer>();
+    int enterCount = 1;
+    for (int i = 0; i < context.length(); i++) {
+      if (context.charAt(i) == '\n') {
+        enterMap.put(i, enterCount++);
+      }
+    }
+    enterMap.put(context.length(), enterCount++);
+    List<SqlInfo> list = new ArrayList<SqlInfo>();
+
+    Matcher match = PATTERN_STATEMENT.matcher(context);
+    int index = 0;
+    while (match.find()) {
+
+      if (isInComment(context, match.start() + 1)
+              || !isMatch(context.substring(index, match.start() + 1), '\'')
+              || !isMatch(context.substring(index, match.start() + 1), '\"')) {
+        continue;
+      }
+
+      String str = context.substring(index, match.start() + 1)
+              .replaceAll("\\\\;", ";");
+      str = str.replaceAll("^;", "");
+
+      if (!"".equals(str) && !isCommentClause(str)) {
+        int maxEnters = 0;
+        int lastEnter = 0;
+        int firstLineIndex = 0;
+        int loc = index - 1;
+        for (Integer i : enterMap.keySet()) {
+          if (loc > i) {
+            maxEnters = enterMap.get(i);
+            lastEnter = i;
+          }
+          if (loc <= i) {
+            if (loc == i) {
+              firstLineIndex = 0;
+            } else {
+              firstLineIndex = loc - lastEnter;
+            }
+            break;
+          }
+        }
+        SqlInfo sqlInfo = new SqlInfo();
+        sqlInfo.setSqlContent(str);
+        sqlInfo.setLine(maxEnters + 1);
+        sqlInfo.setFirstLineIndex(firstLineIndex);
+        list.add(sqlInfo);
+      }
+      index = match.start() + 2;
+    }
+    if (context.substring(index) != null
+            && context.substring(index).trim().length() != 0) {
+      String str = context.substring(index).replaceAll("\\\\;", ";");
+      str = str.replaceAll("^;", "").replaceAll(";$", "");
+      if (!"".equals(str) && !isCommentClause(str)) {
+        int loc = index - 1;
+        int maxEnters = 0;
+        int lastEnter = 0;
+        int firstLineIndex = 0;
+        for (Integer i : enterMap.keySet()) {
+          if (index > i) {
+            maxEnters = enterMap.get(i);
+            lastEnter = i;
+          }
+          if (index <= i) {
+            if (index == i) {
+              firstLineIndex = 0;
+            } else {
+              firstLineIndex = index - lastEnter;
+            }
+            break;
+          }
+        }
+        SqlInfo sqlInfo = new SqlInfo();
+        sqlInfo.setSqlContent(str);
+        sqlInfo.setLine(maxEnters + 1);
+        sqlInfo.setFirstLineIndex(firstLineIndex);
+        list.add(sqlInfo);
+      }
+    }
+
+    return list;
+  }
+
+  public static String toLowCase(String str) {
+    Matcher m = PATTERN_STRING.matcher(str);
+    StringBuffer sb = new StringBuffer();
+    int index = 0;
+    while (m.find()) {
+      sb.append(str.substring(index, m.start()).toLowerCase());
+      sb.append(str.substring(m.start(), m.end()));
+      index = m.end();
+    }
+    if (index != str.length()) {
+      sb.append(str.substring(index, str.length()).toLowerCase());
+    }
+    return sb.toString();
+  }
+
+  private static boolean isCommentClause(String str) {
+    String trimStr = str.trim();
+    if (trimStr.startsWith("/*") && trimStr.endsWith("*/")) {
+      return true;
+    }
+
+    boolean res = true;
+    String[] lines = StringUtils.split(str, "\n");
+    for (String line : lines) {
+      String val = line.trim();
+      if (StringUtils.isEmpty(val) || val.startsWith("--")) {
+        res = true;
+      } else {
+        return false;
+      }
+    }
+    return res;
+  }
+
+  private static boolean isMatch(String source, char pattern) {
+    int count = 0;
+    for (int i = 0; i < source.length(); i++) {
+
+      if (source.charAt(i) == pattern) {
+        count++;
+      }
+      if (source.charAt(i) == '\\' && i < source.length() - 1
+              && source.charAt(i + 1) == pattern) {
+        i++;
+      }
+    }
+    return count % 2 == 0;
+  }
+
+  private static boolean isInComment(String context, int index) {
+    Matcher singleMatch = PATTERN_SINGLE_LINE.matcher(context);
+
+    while (singleMatch.find()) {
+      int start = singleMatch.start();
+      int end = singleMatch.end() - 1;
+
+      if (index > start && index <= end) {
+        return true;
+      }
+    }
+
+    Matcher multiMatch = PATTERN_MULTI_LINE.matcher(context);
+
+    while (multiMatch.find()) {
+      int start = multiMatch.start();
+      int end = multiMatch.end() - 1;
+
+      if (index > start && index < end) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  private static boolean isComment(String context) {
+    return true;
+  }
+}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/TimeSeriesStreamSqlJob.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/TimeSeriesStreamSqlJob.java
new file mode 100644
index 0000000..e50a5c5
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/TimeSeriesStreamSqlJob.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.sql;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.scala.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class TimeSeriesStreamSqlJob extends AbstractStreamSqlJob {
+
+  private static Logger LOGGER = LoggerFactory.getLogger(RetractStreamSqlJob.class);
+
+  private List<Row> materializedTable = new ArrayList<>();
+  private long tsWindowThreshold;
+  private boolean firstRefresh = true;
+
+  public TimeSeriesStreamSqlJob(StreamExecutionEnvironment senv,
+                                StreamTableEnvironment stEnv,
+                                InterpreterContext context,
+                                int defaultParallelism) {
+    super(senv, stEnv, context, defaultParallelism);
+    this.tsWindowThreshold = Long.parseLong(context.getLocalProperties()
+            .getOrDefault("threshold", 1000 * 60 * 60 + ""));
+  }
+
+  @Override
+  protected String getType() {
+    return "ts";
+  }
+
+  @Override
+  protected List<String> getValidLocalProperties() {
+    return Lists.newArrayList("type", "parallelism",
+            "refreshInterval", "enableSavePoint", "runWithSavePoint", "threshold");
+  }
+
+  @Override
+  protected void checkTableSchema(TableSchema schema) throws Exception {
+    //    if (!(schema.getFieldDataType(0).get() instanceof TimestampType)) {
+    //      throw new Exception("The first column must be TimestampType, but is " +
+    //              schema.getFieldDataType(0));
+    //    }
+  }
+
+  @Override
+  protected void processInsert(Row row) {
+    LOGGER.debug("processInsert: " + row.toString());
+    materializedTable.add(row);
+  }
+
+  @Override
+  protected void processDelete(Row row) {
+    throw new RuntimeException("Delete operation is not expected");
+  }
+
+  @Override
+  protected void refresh(InterpreterContext context) {
+    context.out().clear();
+    try {
+      context.out.write("%table\n");
+      for (int i = 0; i < schema.getFieldCount(); ++i) {
+        String field = schema.getFieldNames()[i];
+        context.out.write(field);
+        if (i != (schema.getFieldCount() - 1)) {
+          context.out.write("\t");
+        }
+      }
+      context.out.write("\n");
+
+      // sort it by the first column
+      materializedTable.sort((r1, r2) -> {
+        String f1 = r1.getField(0).toString();
+        String f2 = r2.getField(0).toString();
+        return f1.compareTo(f2);
+      });
+
+      if (materializedTable.size() != 0) {
+        long maxTimestamp =
+                ((java.sql.Timestamp) materializedTable.get(materializedTable.size() - 1)
+                .getField(0)).getTime();
+
+        materializedTable = materializedTable.stream()
+                .filter(row -> ((java.sql.Timestamp) row.getField(0)).getTime() >
+                        maxTimestamp - tsWindowThreshold)
+                .collect(Collectors.toList());
+
+        for (Row row : materializedTable) {
+          for (int i = 0; i < row.getArity(); ++i) {
+            Object field = row.getField(i);
+            context.out.write(field.toString());
+            if (i != (row.getArity() - 1)) {
+              context.out.write("\t");
+            }
+          }
+          context.out.write("\n");
+        }
+      }
+      context.out.flush();
+    } catch (IOException e) {
+      LOGGER.error("Fail to refresh data", e);
+    }
+  }
+}
diff --git a/flink/src/main/resources/interpreter-setting.json b/flink/src/main/resources/interpreter-setting.json
index 1463e3d..74a3686 100644
--- a/flink/src/main/resources/interpreter-setting.json
+++ b/flink/src/main/resources/interpreter-setting.json
@@ -4,43 +4,176 @@
     "name": "flink",
     "className": "org.apache.zeppelin.flink.FlinkInterpreter",
     "properties": {
-      "host": {
-        "envName": "host",
+      "flink.execution.mode": {
+        "envName": null,
         "propertyName": null,
         "defaultValue": "local",
-        "description": "host name of running JobManager. 'local' runs flink in local mode.",
+        "description": "execution mode, it could be local/remote/yarn/k8s",
         "type": "string"
       },
-      "port": {
-        "envName": "port",
+      "flink.execution.remote.host": {
+        "envName": null,
         "propertyName": null,
-        "defaultValue": "6123",
-        "description": "port of running JobManager.",
+        "defaultValue": "",
+        "description": "host name of running JobManager. Only used for remote mode",
+        "type": "string"
+      },
+      "flink.execution.remote.port": {
+        "envName": null,
+        "propertyName": null,
+        "defaultValue": "",
+        "description": "port of running JobManager. Only used for remote mode",
+        "type": "number"
+      },
+      "flink.jm.memory": {
+        "envName": null,
+        "propertyName": null,
+        "defaultValue": "1024",
+        "description": "Memory for JobManager (mb)",
+        "type": "number"
+      },
+      "flink.tm.memory": {
+        "envName": null,
+        "propertyName": null,
+        "defaultValue": "1024",
+        "description": "Memory for TaskManager (mb)",
+        "type": "number"
+      },
+      "flink.tm.num": {
+        "envName": null,
+        "propertyName": null,
+        "defaultValue": "2",
+        "description": "Number of TaskManager",
+        "type": "number"
+      },
+      "flink.tm.slot": {
+        "envName": null,
+        "propertyName": null,
+        "defaultValue": "1",
+        "description": "Number of slot per TaskManager",
         "type": "number"
+      },
+      "flink.yarn.appName": {
+        "envName": null,
+        "propertyName": null,
+        "defaultValue": "Zeppelin Flink Session",
+        "description": "Yarn app name",
+        "type": "string"
+      },
+      "flink.yarn.queue": {
+        "envName": null,
+        "propertyName": null,
+        "defaultValue": "default",
+        "description": "yarn queue name",
+        "type": "string"
+      },
+      "flink.yarn.jars": {
+        "envName": null,
+        "propertyName": null,
+        "defaultValue": "",
+        "description": "additional user jars (comma separated)",
+        "type": "string"
+      },
+      "zeppelin.flink.scala.color": {
+        "envName": null,
+        "propertyName": null,
+        "defaultValue": "true",
+        "description": "whether display scala shell output in colorful format",
+        "type": "checkbox"
+      },
+      "zeppelin.flink.enableHive": {
+        "envName": null,
+        "propertyName": null,
+        "defaultValue": "false",
+        "description": "whether enable hive",
+        "type": "checkbox"
+      },
+      "zeppelin.flink.printREPLOutput": {
+        "envName": null,
+        "propertyName": "zeppelin.flink.printREPLOutput",
+        "defaultValue": true,
+        "description": "Print REPL output",
+        "type": "checkbox"
+      },
+      "zeppelin.flink.maxResult": {
+        "envName": "zeppelin.flink.maxResult",
+        "propertyName": "zeppelin.flink.maxResult",
+        "defaultValue": "1000",
+        "description": "max number of row returned by sql interpreter.",
+        "type": "number"
+      },
+      "zeppelin.flink.planner": {
+        "envName": "zeppelin.flink.planner",
+        "propertyName": "zeppelin.flink.planner",
+        "defaultValue": "blink",
+        "description": "planner or flink table api, blink or flink",
+        "type": "number"
+      },
+      "zeppelin.pyflink.python": {
+        "envName": "zeppelin.pyflink.python",
+        "propertyName": "zeppelin.pyflink.python",
+        "defaultValue": "python",
+        "description": "python executable for pyflink",
+        "type": "string"
       }
     },
     "editor": {
       "language": "scala",
+      "editOnDblClick": false,
+      "completionKey": "TAB",
+      "completionSupport": true
+    }
+  },
+
+  {
+    "group": "flink",
+    "name": "bsql",
+    "className": "org.apache.zeppelin.flink.FlinkBatchSqlInterpreter",
+    "properties": {
+
+    },
+    "editor": {
+      "language": "sql",
       "editOnDblClick": false
     }
   },
 
   {
     "group": "flink",
-    "name": "sql",
-    "className": "org.apache.zeppelin.flink.FlinkSQLInterpreter",
+    "name": "ssql",
+    "className": "org.apache.zeppelin.flink.FlinkStreamSqlInterpreter",
     "properties": {
-      "zeppelin.flink.maxResult": {
-        "envName": "zeppelin.flink.maxResult",
-        "propertyName": "zeppelin.flink.maxResult",
-        "defaultValue": "1000",
-        "description": "max number of row returned by sql interpreter.",
-        "type": "number"
-      }
+
     },
     "editor": {
       "language": "sql",
       "editOnDblClick": false
     }
+  },
+
+  {
+    "group": "flink",
+    "name": "pyflink",
+    "className": "org.apache.zeppelin.flink.PyFlinkInterpreter",
+    "properties": {
+
+    },
+    "editor": {
+      "language": "python",
+      "editOnDblClick": false
+    }
+  },
+
+  {
+    "group": "flink",
+    "name": "ipyflink",
+    "className": "org.apache.zeppelin.flink.IPyFlinkInterpreter",
+    "properties": {
+
+    },
+    "editor": {
+      "language": "python",
+      "editOnDblClick": false
+    }
   }
 ]
diff --git a/flink/src/main/resources/python/zeppelin_ipyflink.py b/flink/src/main/resources/python/zeppelin_ipyflink.py
new file mode 100644
index 0000000..c6dfa47
--- /dev/null
+++ b/flink/src/main/resources/python/zeppelin_ipyflink.py
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+from py4j.java_gateway import java_import, JavaGateway, GatewayClient
+
+from pyflink.common import *
+from pyflink.dataset import *
+from pyflink.datastream import *
+from pyflink.table import *
+from pyflink.table.catalog import *
+from pyflink.table.descriptors import *
+from pyflink.table.window import *
+
+import pyflink
+
+# start JVM gateway
+if "PY4J_GATEWAY_SECRET" in os.environ:
+    from py4j.java_gateway import GatewayParameters
+    gateway_secret = os.environ["PY4J_GATEWAY_SECRET"]
+    gateway = JavaGateway(gateway_parameters=GatewayParameters(address="${JVM_GATEWAY_ADDRESS}",
+        port=${JVM_GATEWAY_PORT}, auth_token=gateway_secret, auto_convert=True))
+else:
+    gateway = JavaGateway(GatewayClient(address="${JVM_GATEWAY_ADDRESS}", port=${JVM_GATEWAY_PORT}), auto_convert=True)
+
+
+intp = gateway.entry_point
+
+pyflink.java_gateway._gateway = gateway
+pyflink.java_gateway.import_flink_view(gateway)
+pyflink.java_gateway.install_exception_handler()
+
+b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
+bt_env = BatchTableEnvironment.create(b_env)
+s_env = StreamExecutionEnvironment(intp.getJavaStreamExecutionEnvironment())
+st_env = StreamTableEnvironment.create(s_env)
+
+class IPyFlinkZeppelinContext(PyZeppelinContext):
+
+    def __init__(self, z, gateway):
+        super(IPyFlinkZeppelinContext, self).__init__(z, gateway)
+
+    def show(self, obj):
+        from pyflink.table import Table
+        if isinstance(obj, Table):
+            print(self.z.showData(obj._j_table))
+        else:
+            super(IPyFlinkZeppelinContext, self).show(obj)
+
+z = __zeppelin__ = IPyFlinkZeppelinContext(intp.getZeppelinContext(), gateway)
diff --git a/flink/src/main/resources/python/zeppelin_pyflink.py b/flink/src/main/resources/python/zeppelin_pyflink.py
new file mode 100644
index 0000000..86e1a50
--- /dev/null
+++ b/flink/src/main/resources/python/zeppelin_pyflink.py
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyflink.common import *
+from pyflink.dataset import *
+from pyflink.datastream import *
+from pyflink.table import *
+from pyflink.table.catalog import *
+from pyflink.table.descriptors import *
+from pyflink.table.window import *
+
+import pyflink
+
+from py4j.java_gateway import java_import
+
+intp = gateway.entry_point
+
+pyflink.java_gateway._gateway = gateway
+pyflink.java_gateway.import_flink_view(gateway)
+pyflink.java_gateway.install_exception_handler()
+
+b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
+bt_env = BatchTableEnvironment.create(b_env)
+s_env = StreamExecutionEnvironment(intp.getJavaStreamExecutionEnvironment())
+st_env = StreamTableEnvironment.create(s_env)
+
+from zeppelin_context import PyZeppelinContext
+
+#TODO(zjffdu) merge it with IPyFlinkZeppelinContext
+class PyFlinkZeppelinContext(PyZeppelinContext):
+
+  def __init__(self, z, gateway):
+    super(PyFlinkZeppelinContext, self).__init__(z, gateway)
+
+  def show(self, obj):
+    from pyflink.table import Table
+    if isinstance(obj, Table):
+      print(self.z.showData(obj._j_table))
+    else:
+      super(PyFlinkZeppelinContext, self).show(obj)
+
+z = __zeppelin__ = PyFlinkZeppelinContext(intp.getZeppelinContext(), gateway)
+__zeppelin__._setup_matplotlib()
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala
new file mode 100644
index 0000000..d61bcbc
--- /dev/null
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink
+
+import scala.tools.nsc.interpreter.{ExprTyper, IR}
+
+trait FlinkExprTyper extends ExprTyper {
+
+  import repl._
+  import global.{Import => _, reporter => _, _}
+  import naming.freshInternalVarName
+
+  def doInterpret(code: String): IR.Result = {
+    // interpret/interpretSynthetic may change the phase,
+    // which would have unintended effects on types.
+    val savedPhase = phase
+    try interpretSynthetic(code) finally phase = savedPhase
+  }
+
+  override def symbolOfLine(code: String): Symbol = {
+    def asExpr(): Symbol = {
+      val name = freshInternalVarName()
+      // Typing it with a lazy val would give us the right type, but runs
+      // into compiler bugs with things like existentials, so we compile it
+      // behind a def and strip the NullaryMethodType which wraps the expr.
+      val line = "def " + name + " = " + code
+
+      doInterpret(line) match {
+        case IR.Success =>
+          val sym0 = symbolOfTerm(name)
+          // drop NullaryMethodType
+          sym0.cloneSymbol setInfo exitingTyper(sym0.tpe_*.finalResultType)
+        case _ => NoSymbol
+      }
+    }
+
+    def asDefn(): Symbol = {
+      val old = repl.definedSymbolList.toSet
+
+      doInterpret(code) match {
+        case IR.Success =>
+          repl.definedSymbolList filterNot old match {
+            case Nil => NoSymbol
+            case sym :: Nil => sym
+            case syms => NoSymbol.newOverloaded(NoPrefix, syms)
+          }
+        case _ => NoSymbol
+      }
+    }
+
+    def asError(): Symbol = {
+      doInterpret(code)
+      NoSymbol
+    }
+
+    beSilentDuring(asExpr()) orElse beSilentDuring(asDefn()) orElse asError()
+  }
+
+}
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala
new file mode 100644
index 0000000..08cb0c0
--- /dev/null
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink
+
+import scala.collection.mutable
+import scala.tools.nsc.Settings
+import scala.tools.nsc.interpreter._
+
+class FlinkILoopInterpreter(settings: Settings, out: JPrintWriter) extends IMain(settings, out) {
+  self =>
+
+  override lazy val memberHandlers = new {
+    val intp: self.type = self
+  } with MemberHandlers {
+    import intp.global._
+
+    override def chooseHandler(member: intp.global.Tree): MemberHandler = member match {
+      case member: Import => new FlinkImportHandler(member)
+      case _ => super.chooseHandler(member)
+    }
+
+    class FlinkImportHandler(imp: Import) extends ImportHandler(imp: Import) {
+
+      override def targetType: Type = intp.global.rootMirror.getModuleIfDefined("" + expr) match {
+        case NoSymbol => intp.typeOfExpression("" + expr)
+        case sym => sym.tpe
+      }
+
+      private def safeIndexOf(name: Name, s: String): Int = fixIndexOf(name, pos(name, s))
+      private def fixIndexOf(name: Name, idx: Int): Int = if (idx == name.length) -1 else idx
+      private def pos(name: Name, s: String): Int = {
+        var i = name.pos(s.charAt(0), 0)
+        val sLen = s.length()
+        if (sLen == 1) return i
+        while (i + sLen <= name.length) {
+          var j = 1
+          while (s.charAt(j) == name.charAt(i + j)) {
+            j += 1
+            if (j == sLen) return i
+          }
+          i = name.pos(s.charAt(0), i + 1)
+        }
+        name.length
+      }
+
+      private def isFlattenedSymbol(sym: Symbol): Boolean =
+        sym.owner.isPackageClass &&
+          sym.name.containsName(nme.NAME_JOIN_STRING) &&
+          sym.owner.info.member(sym.name.take(
+            safeIndexOf(sym.name, nme.NAME_JOIN_STRING))) != NoSymbol
+
+      private def importableTargetMembers =
+        importableMembers(exitingTyper(targetType)).filterNot(isFlattenedSymbol).toList
+
+      def isIndividualImport(s: ImportSelector): Boolean =
+        s.name != nme.WILDCARD && s.rename != nme.WILDCARD
+      def isWildcardImport(s: ImportSelector): Boolean =
+        s.name == nme.WILDCARD
+
+      // non-wildcard imports
+      private def individualSelectors = selectors filter isIndividualImport
+
+      override val importsWildcard: Boolean = selectors exists isWildcardImport
+
+      lazy val importableSymbolsWithRenames: List[(Symbol, Name)] = {
+        val selectorRenameMap =
+          individualSelectors.flatMap(x => x.name.bothNames zip x.rename.bothNames).toMap
+        importableTargetMembers flatMap (m => selectorRenameMap.get(m.name) map (m -> _))
+      }
+
+      override lazy val individualSymbols: List[Symbol] = importableSymbolsWithRenames map (_._1)
+      override lazy val wildcardSymbols: List[Symbol] =
+        if (importsWildcard) importableTargetMembers else Nil
+
+    }
+
+  }
+
+  object expressionTyper extends {
+    val repl: FlinkILoopInterpreter.this.type = self
+  } with FlinkExprTyper { }
+
+  override def symbolOfLine(code: String): global.Symbol =
+    expressionTyper.symbolOfLine(code)
+
+  override def typeOfExpression(expr: String, silent: Boolean): global.Type =
+    expressionTyper.typeOfExpression(expr, silent)
+
+
+  import global.Name
+  override def importsCode(wanted: Set[Name], wrapper: Request#Wrapper,
+                           definesClass: Boolean, generousImports: Boolean): ComputedImports = {
+
+    import global._
+    import definitions.PredefModule
+    import memberHandlers._
+
+    val header, code, trailingBraces, accessPath = new StringBuilder
+    val currentImps = mutable.HashSet[Name]()
+    // only emit predef import header if name not resolved in history, loosely
+    var predefEscapes = false
+
+    /**
+     * Narrow down the list of requests from which imports
+     * should be taken.  Removes requests which cannot contribute
+     * useful imports for the specified set of wanted names.
+     */
+    case class ReqAndHandler(req: Request, handler: MemberHandler)
+
+    def reqsToUse: List[ReqAndHandler] = {
+      /**
+       * Loop through a list of MemberHandlers and select which ones to keep.
+       * 'wanted' is the set of names that need to be imported.
+       */
+      def select(reqs: List[ReqAndHandler], wanted: Set[Name]): List[ReqAndHandler] = {
+        // Single symbol imports might be implicits! See bug #1752.  Rather than
+        // try to finesse this, we will mimic all imports for now.
+        def keepHandler(handler: MemberHandler) = handler match {
+          // While defining classes in class based mode - implicits are not needed.
+          case h: ImportHandler if isClassBased && definesClass =>
+            h.importedNames.exists(x => wanted.contains(x))
+          case _: ImportHandler => true
+          case x if generousImports => x.definesImplicit ||
+            (x.definedNames exists (d => wanted.exists(w => d.startsWith(w))))
+          case x => x.definesImplicit ||
+            (x.definedNames exists wanted)
+        }
+
+        reqs match {
+          case Nil =>
+            predefEscapes = wanted contains PredefModule.name ; Nil
+          case rh :: rest if !keepHandler(rh.handler) => select(rest, wanted)
+          case rh :: rest =>
+            import rh.handler._
+            val augment = rh match {
+              case ReqAndHandler(_, _: ImportHandler) => referencedNames
+              case _ => Nil
+            }
+            val newWanted = wanted ++ augment -- definedNames -- importedNames
+            rh :: select(rest, newWanted)
+        }
+      }
+
+      /** Flatten the handlers out and pair each with the original request */
+      select(allReqAndHandlers reverseMap { case (r, h) => ReqAndHandler(r, h) }, wanted).reverse
+    }
+
+    // add code for a new object to hold some imports
+    def addWrapper() {
+      import nme.{INTERPRETER_IMPORT_WRAPPER => iw}
+      code append (wrapper.prewrap format iw)
+      trailingBraces append wrapper.postwrap
+      accessPath append s".$iw"
+      currentImps.clear()
+    }
+
+    def maybeWrap(names: Name*) = if (names exists currentImps) addWrapper()
+
+    def wrapBeforeAndAfter[T](op: => T): T = {
+      addWrapper()
+      try op finally addWrapper()
+    }
+
+    // imports from Predef are relocated to the template header to allow hiding.
+    def checkHeader(h: ImportHandler) = h.referencedNames contains PredefModule.name
+
+    // loop through previous requests, adding imports for each one
+    wrapBeforeAndAfter {
+      // Reusing a single temporary value when import from a line with multiple definitions.
+      val tempValLines = mutable.Set[Int]()
+      for (ReqAndHandler(req, handler) <- reqsToUse) {
+        val objName = req.lineRep.readPathInstance
+        handler match {
+          case h: ImportHandler if checkHeader(h) =>
+            header.clear()
+            header append f"${h.member}%n"
+          // If the user entered an import, then just use it; add an import wrapping
+          // level if the import might conflict with some other import
+          case x: ImportHandler if x.importsWildcard =>
+            wrapBeforeAndAfter(code append (x.member + "\n"))
+          case x: ImportHandler =>
+            maybeWrap(x.importedNames: _*)
+            code append (x.member + "\n")
+            currentImps ++= x.importedNames
+
+          case x if isClassBased =>
+            for (sym <- x.definedSymbols) {
+              maybeWrap(sym.name)
+              x match {
+                case _: ClassHandler =>
+                  code.append(s"import ${objName}${req.accessPath}.`${sym.name}`\n")
+                case _ =>
+                  val valName = s"${req.lineRep.packageName}${req.lineRep.readName}"
+                  if (!tempValLines.contains(req.lineRep.lineId)) {
+                    code.append(s"val $valName: ${objName}.type = $objName\n")
+                    tempValLines += req.lineRep.lineId
+                  }
+                  code.append(s"import ${valName}${req.accessPath}.`${sym.name}`\n")
+              }
+              currentImps += sym.name
+            }
+          // For other requests, import each defined name.
+          // import them explicitly instead of with _, so that
+          // ambiguity errors will not be generated. Also, quote
+          // the name of the variable, so that we don't need to
+          // handle quoting keywords separately.
+          case x =>
+            for (sym <- x.definedSymbols) {
+              maybeWrap(sym.name)
+              code append s"import ${x.path}\n"
+              currentImps += sym.name
+            }
+        }
+      }
+    }
+
+    val computedHeader = if (predefEscapes) header.toString else ""
+    ComputedImports(computedHeader, code.toString, trailingBraces.toString, accessPath.toString)
+  }
+
+  private def allReqAndHandlers =
+    prevRequestList flatMap (req => req.handlers map (req -> _))
+
+}
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala
index b2d8d16..bb991c3 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala
@@ -18,15 +18,14 @@
 
 package org.apache.zeppelin.flink
 
-import org.apache.flink.table.api.Table
-import org.apache.flink.table.api.scala.BatchTableEnvironment
+import org.apache.flink.table.api.{Table, TableEnvironment}
 import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterResult}
 
 class FlinkSQLScalaInterpreter(scalaInterpreter: FlinkScalaInterpreter,
                                z: FlinkZeppelinContext,
                                maxRow: Int) {
 
-  private var btenv: BatchTableEnvironment = scalaInterpreter.getBatchTableEnviroment()
+  private var btenv: TableEnvironment = scalaInterpreter.getBatchTableEnvironment()
 
   def interpret(code: String, context: InterpreterContext): InterpreterResult = {
     try {
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 1d8b27e..8c3946e 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -18,21 +18,26 @@
 
 package org.apache.zeppelin.flink
 
-import java.io.BufferedReader
+import java.io.{BufferedReader, File}
+import java.net.URLClassLoader
 import java.nio.file.Files
 import java.util.Properties
 
-import org.apache.flink.api.scala.FlinkShell._
+import org.apache.flink.api.java.ScalaShellRemoteEnvironment
+import org.apache.flink.api.scala.FlinkShell.{ExecutionMode, _}
 import org.apache.flink.api.scala.{ExecutionEnvironment, FlinkILoop}
-import org.apache.flink.client.program.ClusterClient
-import org.apache.flink.configuration.GlobalConfiguration
-import org.apache.flink.runtime.minicluster.MiniCluster
+import org.apache.flink.configuration._
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory
+import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecutionEnvironment}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, TableEnvironment}
+import org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl
 import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment}
+import org.apache.flink.table.catalog.hive.HiveCatalog
+import org.apache.zeppelin.flink.util.DependencyUtils
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
 import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
-import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterResult}
+import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterException, InterpreterHookRegistry, InterpreterResult}
 import org.slf4j.{Logger, LoggerFactory}
 
 import scala.collection.JavaConverters._
@@ -45,35 +50,123 @@ class FlinkScalaInterpreter(val properties: Properties) {
   lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
 
   private var flinkILoop: FlinkILoop = _
-  private var cluster: Option[Either[MiniCluster, ClusterClient[_]]] = _
+  private var cluster: types.ClusterType = _
+
   private var scalaCompleter: ScalaCompleter = _
   private val interpreterOutput = new InterpreterOutputStream(LOGGER)
+  private var configuration: Configuration = _
 
+  private var mode: org.apache.flink.api.scala.FlinkShell.ExecutionMode.Value = _
   private var benv: ExecutionEnvironment = _
   private var senv: StreamExecutionEnvironment = _
-  private var btenv: BatchTableEnvironment = _
+  private var btenv: TableEnvironment = _
   private var stenv: StreamTableEnvironment = _
+  private var btEnvSetting: EnvironmentSettings = _
+  private var stEnvSetting: EnvironmentSettings = _
   private var z: FlinkZeppelinContext = _
+  private var jmWebUrl: String = _
+  private var jobManager: JobManager = _
+  private var defaultParallelism = 1;
+
 
   def open(): Unit = {
-    var config = Config(executionMode = ExecutionMode.withName(
-      properties.getProperty("flink.execution.mode", "LOCAL").toUpperCase))
-    val containerNum = Integer.parseInt(properties.getProperty("flink.yarn.num_container", "1"))
-    config = config.copy(yarnConfig =
-      Some(ensureYarnConfig(config).copy(containers = Some(containerNum))))
-    val configuration = GlobalConfiguration.loadConfiguration(System.getenv("FLINK_CONF_DIR"))
-    val replOut = new JPrintWriter(interpreterOutput, true)
+    mode = ExecutionMode.withName(
+      properties.getProperty("flink.execution.mode", "LOCAL").toUpperCase)
+    var config = Config(executionMode = mode)
+
+    if (mode == ExecutionMode.YARN) {
+      val jmMemory = properties.getProperty("flink.jm.memory", "1024")
+      config = config.copy(yarnConfig =
+        Some(ensureYarnConfig(config)
+          .copy(jobManagerMemory = Some(jmMemory))))
+
+      val tmMemory = properties.getProperty("flink.tm.memory", "1024")
+      config = config.copy(yarnConfig =
+        Some(ensureYarnConfig(config)
+          .copy(taskManagerMemory = Some(tmMemory))))
+
+      val tmNum = Integer.parseInt(properties.getProperty("flink.tm.num", "2"))
+      config = config.copy(yarnConfig =
+        Some(ensureYarnConfig(config)
+          .copy(containers = Some(tmNum))))
+
+      val appName = properties.getProperty("flink.yarn.appName", "Flink Yarn App Name")
+      config = config.copy(yarnConfig =
+        Some(ensureYarnConfig(config)
+          .copy(name = Some(appName))))
+
+      val slotNum = Integer.parseInt(properties.getProperty("flink.tm.slot", "1"))
+      config = config.copy(yarnConfig =
+        Some(ensureYarnConfig(config)
+          .copy(slots = Some(slotNum))))
+
+      val queue = (properties.getProperty("flink.yarn.queue", "default"))
+      config = config.copy(yarnConfig =
+        Some(ensureYarnConfig(config)
+          .copy(queue = Some(queue))))
+    }
+
+    this.configuration = GlobalConfiguration.loadConfiguration(System.getenv("FLINK_CONF_DIR"))
+    val userJars = getUserJars
+    config = config.copy(externalJars = Some(userJars.toArray))
+    LOGGER.info("Config: " + config)
+    configuration.setString("flink.yarn.jars", userJars.mkString(":"))
+
+    // load other configuration from interpreter properties
+    properties.asScala.foreach(entry => configuration.setString(entry._1, entry._2))
+    this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM)
+
+    // set scala.color
+    if (properties.getProperty("zeppelin.flink.scala.color", "true").toBoolean) {
+      System.setProperty("scala.color", "true")
+    }
+
+    if (config.executionMode == ExecutionMode.REMOTE) {
+      val host = properties.getProperty("flink.execution.remote.host")
+      val port = properties.getProperty("flink.execution.remote.port")
+      if (host == null) {
+        throw new InterpreterException("flink.execution.remote.host is not " +
+          "specified when using REMOTE mode")
+      }
+      if (port == null) {
+        throw new InterpreterException("flink.execution.remote.port is not " +
+          "specified when using REMOTE mode")
+      }
+      config = config.copy(host = Some(host))
+        .copy(port = Some(Integer.parseInt(port)))
+    }
+
+    val printReplOutput = properties.getProperty("zeppelin.flink.printREPLOutput", "true").toBoolean
+    val replOut = if (printReplOutput) {
+      new JPrintWriter(interpreterOutput, true)
+    } else {
+      new JPrintWriter(Console.out, true)
+    }
 
     val (iLoop, cluster) = try {
       val (host, port, cluster) = fetchConnectionInfo(configuration, config)
       val conf = cluster match {
-        case Some(Left(miniCluster)) => configuration
-        case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration
-        case None => configuration
+        case Some(Left(_)) =>
+          LOGGER.info("Starting MiniCluster in legacy mode")
+          this.jmWebUrl = "http://localhost:" + port
+          configuration
+        case Some(Right(yarnCluster)) =>
+          yarnCluster.setDetached(false)
+          // yarn mode
+          LOGGER.info("Starting FlinkCluster in yarn mode")
+          this.jmWebUrl = yarnCluster.getWebInterfaceURL
+          yarnCluster.getFlinkConfiguration
+        case None =>
+          // remote mode
+          LOGGER.info("Starting FlinkCluster in remote mode")
+          this.jmWebUrl = "http://" + host + ":" + port
+          configuration
       }
+
       LOGGER.info(s"\nConnecting to Flink cluster (host: $host, port: $port).\n")
+      LOGGER.info("externalJars: " +
+        config.externalJars.getOrElse(Array.empty[String]).mkString(":"))
       val repl = new FlinkILoop(host, port, conf, config.externalJars, None, replOut)
-
       (repl, cluster)
     } catch {
       case e: IllegalArgumentException =>
@@ -81,11 +174,14 @@ class FlinkScalaInterpreter(val properties: Properties) {
         sys.exit()
     }
 
+    LOGGER.info("JobManager address: " + this.jmWebUrl)
+
     this.flinkILoop = iLoop
     this.cluster = cluster
     val settings = new Settings()
     settings.usejavacp.value = true
     settings.Yreplsync.value = true
+    settings.classpath.value = getUserJars.mkString(File.pathSeparator)
 
     val outputDir = Files.createTempDirectory("flink-repl");
     val interpArguments = List(
@@ -95,7 +191,14 @@ class FlinkScalaInterpreter(val properties: Properties) {
     settings.processArguments(interpArguments, true)
 
     flinkILoop.settings = settings
-    flinkILoop.createInterpreter()
+    flinkILoop.intp = new FlinkILoopInterpreter(settings, replOut)
+    flinkILoop.intp.beQuietDuring {
+      // set execution environment
+      flinkILoop.intp.bind("benv", flinkILoop.scalaBenv)
+      flinkILoop.intp.bind("senv", flinkILoop.scalaSenv)
+      flinkILoop.intp.bind("btenv", flinkILoop.scalaBTEnv)
+      flinkILoop.intp.bind("stenv", flinkILoop.scalaSTEnv)
+    }
 
     val in0 = getField(flinkILoop, "scala$tools$nsc$interpreter$ILoop$$in0")
       .asInstanceOf[Option[BufferedReader]]
@@ -104,21 +207,93 @@ class FlinkScalaInterpreter(val properties: Properties) {
 
     flinkILoop.in = reader
     flinkILoop.initializeSynchronous()
-    callMethod(flinkILoop, "scala$tools$nsc$interpreter$ILoop$$loopPostInit")
+    flinkILoop.intp.setContextClassLoader()
+    reader.postInit()
     this.scalaCompleter = reader.completion.completer()
 
     this.benv = flinkILoop.scalaBenv
     this.senv = flinkILoop.scalaSenv
-    this.btenv = TableEnvironment.getTableEnvironment(this.benv)
-    this.stenv = TableEnvironment.getTableEnvironment(this.senv)
-    bind("btenv", btenv.getClass.getCanonicalName, btenv, List("@transient"))
-    bind("stenv", stenv.getClass.getCanonicalName, stenv, List("@transient"))
+    LOGGER.info("Default Parallelism for flink: " +
+      configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM))
+    this.benv.setParallelism(configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM))
+    this.senv.setParallelism(configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM))
+
+    ScalaShellRemoteEnvironment.resetContextEnvironments()
+    setAsContext()
+    if (getPlanner == "flink") {
+      // flink planner
+      this.btenv = flinkILoop.scalaBTEnv
+      this.stenv = flinkILoop.scalaSTEnv
+    } else {
+      // blink planner
+      this.btEnvSetting = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build()
+      this.btenv = TableEnvironment.create(this.btEnvSetting)
+      flinkILoop.intp.bind("btenv", this.btenv)
+
+      this.stEnvSetting =
+        EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
+      this.stenv = StreamTableEnvironmentImpl.create(this.senv, this.stEnvSetting, new TableConfig)
+      flinkILoop.intp.bind("stenv", this.stenv)
+    }
 
     if (java.lang.Boolean.parseBoolean(
       properties.getProperty("zeppelin.flink.disableSysoutLogging", "true"))) {
       this.benv.getConfig.disableSysoutLogging()
       this.senv.getConfig.disableSysoutLogging()
     }
+
+    flinkILoop.interpret("import org.apache.flink.api.scala._")
+    flinkILoop.interpret("import org.apache.flink.table.api.scala._")
+    flinkILoop.interpret("import org.apache.flink.types.Row")
+    flinkILoop.interpret("import org.apache.flink.table.functions.ScalarFunction")
+    flinkILoop.interpret("import org.apache.flink.table.functions.AggregateFunction")
+    flinkILoop.interpret("import org.apache.flink.table.functions.TableFunction")
+
+    this.z = new FlinkZeppelinContext(this.btenv, new InterpreterHookRegistry(),
+      Integer.parseInt(properties.getProperty("zeppelin.flink.maxResult", "1000")))
+    val modifiers = new java.util.ArrayList[String]()
+    modifiers.add("@transient");
+    this.bind("z", z.getClass().getCanonicalName(), z, modifiers);
+
+    this.jobManager = new JobManager(this.benv, this.senv, this.z, jmWebUrl)
+
+     //register hive catalog
+    if (properties.getProperty("zeppelin.flink.enableHive", "false").toBoolean) {
+      LOGGER.info("Hive is enabled, registering hive catalog.")
+      var hiveConfDir = System.getenv("HIVE_CONF_DIR")
+      if (hiveConfDir == null) {
+        hiveConfDir = properties.getProperty("HIVE_CONF_DIR")
+      }
+      if ( hiveConfDir == null) {
+        throw new InterpreterException("c is not specified");
+      }
+      val database = properties.getProperty("zeppelin.flink.hive.database", "default")
+      if (database == null) {
+        throw new InterpreterException("default database is not specified, " +
+          "please set zeppelin.flink.hive.database")
+      }
+      val hiveVersion = properties.getProperty("zeppelin.flink.hive.version", "2.3.4")
+      val hiveCatalog = new HiveCatalog("hive", database, hiveConfDir, hiveVersion)
+      this.btenv.registerCatalog("hive", hiveCatalog)
+      this.stenv.registerCatalog("hive", hiveCatalog)
+      this.btenv.useCatalog("hive")
+      this.stenv.useCatalog("hive")
+      this.btenv.useDatabase("default")
+      this.stenv.useDatabase("default")
+    } else {
+      LOGGER.info("Hive is disabled.")
+    }
+  }
+
+  def setAsContext(): Unit = {
+    val factory = new StreamExecutionEnvironmentFactory() {
+      override def createExecutionEnvironment = senv.getJavaEnv
+    }
+    //StreamExecutionEnvironment
+    val method = classOf[JStreamExecutionEnvironment].getDeclaredMethod("initializeContextEnvironment",
+      classOf[StreamExecutionEnvironmentFactory])
+    method.setAccessible(true)
+    method.invoke(null, factory);
   }
 
   // for use in java side
@@ -143,7 +318,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
   protected def completion(buf: String,
                            cursor: Int,
                            context: InterpreterContext): java.util.List[InterpreterCompletion] = {
-    val completions = scalaCompleter.complete(buf, cursor).candidates
+    val completions = scalaCompleter.complete(buf.substring(0, cursor), cursor).candidates
       .map(e => new InterpreterCompletion(e, e, null))
     scala.collection.JavaConversions.seqAsJavaList(completions)
   }
@@ -168,52 +343,67 @@ class FlinkScalaInterpreter(val properties: Properties) {
   }
 
   def interpret(code: String, context: InterpreterContext): InterpreterResult = {
-
     val originalOut = System.out
 
-    def _interpret(code: String): scala.tools.nsc.interpreter.Results.Result = {
-      Console.withOut(interpreterOutput) {
-        System.setOut(Console.out)
-        interpreterOutput.setInterpreterOutput(context.out)
-        interpreterOutput.ignoreLeadingNewLinesFromScalaReporter()
-        context.out.clear()
-
-        val status = flinkILoop.interpret(code) match {
-          case scala.tools.nsc.interpreter.IR.Success =>
-            scala.tools.nsc.interpreter.IR.Success
-          case scala.tools.nsc.interpreter.IR.Error =>
-            scala.tools.nsc.interpreter.IR.Error
-          case scala.tools.nsc.interpreter.IR.Incomplete =>
-            // add print("") at the end in case the last line is comment which lead to INCOMPLETE
-            flinkILoop.interpret(code + "\nprint(\"\")")
+    if (context != null) {
+      interpreterOutput.setInterpreterOutput(context.out)
+      context.out.clear()
+    }
+
+    Console.withOut(if (context != null) context.out else Console.out) {
+      System.setOut(Console.out)
+      interpreterOutput.ignoreLeadingNewLinesFromScalaReporter()
+      // add print("") at the end in case the last line is comment which lead to INCOMPLETE
+      val lines = code.split("\\n") ++ List("print(\"\")")
+      var incompleteCode = ""
+      var lastStatus: InterpreterResult.Code = null
+
+      for ((line, i) <- lines.zipWithIndex if !line.trim.isEmpty) {
+        val nextLine = if (incompleteCode != "") {
+          incompleteCode + "\n" + line
+        } else {
+          line
+        }
+        if (i < (lines.length - 1) && lines(i + 1).trim.startsWith(".")) {
+          incompleteCode = nextLine
+        } else {
+          flinkILoop.interpret(nextLine) match {
+            case scala.tools.nsc.interpreter.IR.Success =>
+              // continue the next line
+              incompleteCode = ""
+              lastStatus = InterpreterResult.Code.SUCCESS
+            case error@scala.tools.nsc.interpreter.IR.Error =>
+              return new InterpreterResult(InterpreterResult.Code.ERROR)
+            case scala.tools.nsc.interpreter.IR.Incomplete =>
+              // put this line into inCompleteCode for the next execution.
+              incompleteCode = incompleteCode + "\n" + line
+              lastStatus = InterpreterResult.Code.INCOMPLETE
+          }
         }
-        context.out.flush()
-        status
       }
+      // flush all output before returning result to frontend
+      Console.flush()
+      interpreterOutput.setInterpreterOutput(null)
+      // reset the java stdout
+      System.setOut(originalOut)
+      return new InterpreterResult(lastStatus)
     }
-    // reset the java stdout
-    System.setOut(originalOut)
+  }
 
-    val lastStatus = _interpret(code) match {
-      case scala.tools.nsc.interpreter.IR.Success =>
-        InterpreterResult.Code.SUCCESS
-      case scala.tools.nsc.interpreter.IR.Error =>
-        InterpreterResult.Code.ERROR
-      case scala.tools.nsc.interpreter.IR.Incomplete =>
-        InterpreterResult.Code.INCOMPLETE
-    }
-    new InterpreterResult(lastStatus)
+  def cancel(context: InterpreterContext): Unit = {
+    jobManager.cancelJob(context)
+  }
+
+  def getProgress(context: InterpreterContext): Int = {
+    jobManager.getJobProgress(context.getParagraphId)
   }
 
   def close(): Unit = {
-    if (flinkILoop != null) {
-      flinkILoop.close()
-    }
     if (cluster != null) {
       cluster match {
-        case Some(Left(newMiniCluster)) =>
-          LOGGER.info("Shutdown NewMiniCluster")
-          newMiniCluster.close()
+        case Some(Left(miniCluster)) =>
+          LOGGER.info("Shutdown LegacyMiniCluster")
+          miniCluster.close()
         case Some(Right(yarnCluster)) =>
           LOGGER.info("Shutdown YarnCluster")
           yarnCluster.shutDownCluster()
@@ -222,12 +412,61 @@ class FlinkScalaInterpreter(val properties: Properties) {
           LOGGER.error("Unrecognized cluster type: " + e.getClass.getSimpleName)
       }
     }
+
+    if (flinkILoop != null) {
+      flinkILoop.closeInterpreter()
+      flinkILoop = null
+    }
   }
 
-  def getExecutionEnviroment(): ExecutionEnvironment = this.benv
+  def getExecutionEnvironment(): ExecutionEnvironment = this.benv
+
+  def getStreamExecutionEnvironment(): StreamExecutionEnvironment = this.senv
 
-  def getStreamingExecutionEnviroment(): StreamExecutionEnvironment = this.senv
+  def getBatchTableEnvironment(): TableEnvironment = this.btenv
 
-  def getBatchTableEnviroment(): BatchTableEnvironment = this.btenv
+  def getStreamTableEnvionment(): StreamTableEnvironment = this.stenv
+
+  def getDefaultParallelism = this.defaultParallelism
+
+  def getUserJars: Seq[String] = {
+    val flinkJars =
+      if (properties.containsKey("flink.execution.jars")) {
+        properties.getProperty("flink.execution.jars").split(":").toSeq
+      } else {
+        Seq.empty[String]
+      }
 
+    val flinkPackageJars =
+      if (properties.containsKey("flink.execution.packages")) {
+        val packages = properties.getProperty("flink.execution.packages")
+        DependencyUtils.resolveMavenDependencies(null, packages, null, null, None).split(":").toSeq
+      } else {
+        Seq.empty[String]
+      }
+
+    flinkJars ++ flinkPackageJars
+  }
+
+  def getJobManager = this.jobManager
+
+  def getFlinkScalaShellLoader: ClassLoader = {
+    val userCodeJarFile = this.flinkILoop.writeFilesToDisk();
+    new URLClassLoader(Array(userCodeJarFile.toURL))
+  }
+
+  def getZeppelinContext = this.z
+
+  def getConfiguration = this.configuration
+
+  def getCluster = cluster
+
+  def getFlinkILoop = flinkILoop
+
+  // use blink planner by default
+  def getPlanner = properties.getProperty("zeppelin.flink.planner", "blink")
+
+  def getStEnvSetting = stEnvSetting
 }
+
+
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
index 4102adf..052456b 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
@@ -18,18 +18,17 @@
 
 package org.apache.zeppelin.flink
 
-import java.util
-
 import org.apache.flink.api.scala.DataSet
 import org.apache.flink.streaming.api.scala._
-import org.apache.flink.table.api.Table
+import org.apache.flink.table.api.internal.TableImpl
+import org.apache.flink.table.api.{Table, TableEnvironment}
 import org.apache.flink.table.api.scala.BatchTableEnvironment
 import org.apache.flink.types.Row
 import org.apache.zeppelin.annotation.ZeppelinApi
 import org.apache.zeppelin.display.AngularObjectWatcher
 import org.apache.zeppelin.display.ui.OptionInput.ParamOption
-import org.apache.zeppelin.interpreter.{BaseZeppelinContext, InterpreterContext,
-  InterpreterHookRegistry}
+import org.apache.zeppelin.flink.util.TableUtil
+import org.apache.zeppelin.interpreter.{BaseZeppelinContext, InterpreterContext, InterpreterHookRegistry, ResultMessages}
 
 import scala.collection.{JavaConversions, Seq}
 
@@ -37,32 +36,44 @@ import scala.collection.{JavaConversions, Seq}
 /**
   * ZeppelinContext for Flink
   */
-class FlinkZeppelinContext(val btenv: BatchTableEnvironment,
+class FlinkZeppelinContext(val btenv: TableEnvironment,
                            val hooks2: InterpreterHookRegistry,
                            val maxResult2: Int) extends BaseZeppelinContext(hooks2, maxResult2) {
 
+  private var currentSql: String = _
+
   private val interpreterClassMap = Map(
     "flink" -> "org.apache.zeppelin.flink.FlinkInterpreter",
-    "sql" -> "org.apache.zeppelin.flink.FlinkSqlInterpreter"
+    "bsql" -> "org.apache.zeppelin.flink.FlinkBatchSqlInterpreter",
+    "ssql" -> "org.apache.zeppelin.flink.FlinkStreamSqlInterpreter",
+    "pyflink" -> "org.apache.zeppelin.flink.PyFlinkInterpreter",
+    "ipyflink" -> "org.apache.zeppelin.flink.IPyFlinkInterpreter"
   )
 
   private val supportedClasses = Seq(classOf[DataSet[_]])
 
-  override def getSupportedClasses: util.List[Class[_]] =
+  def setCurrentSql(sql: String): Unit = {
+    this.currentSql = sql
+  }
+
+  override def getSupportedClasses: _root_.java.util.List[Class[_]] =
     JavaConversions.seqAsJavaList(supportedClasses)
 
-  override def getInterpreterClassMap: util.Map[String, String] =
+  override def getInterpreterClassMap: _root_.java.util.Map[String, String] =
     JavaConversions.mapAsJavaMap(interpreterClassMap)
 
   override def showData(obj: Any, maxResult: Int): String = {
-    def showTable(table: Table): String = {
-      val columnNames: Array[String] = table.getSchema.getColumnNames
-      val dsRow: DataSet[Row] = btenv.toDataSet[Row](table)
+    def showTable(columnsNames: Array[String], rows: Seq[Row]): String = {
+      val columnNames = obj.asInstanceOf[Table].getSchema.getFieldNames
       val builder: StringBuilder = new StringBuilder("%table ")
       builder.append(columnNames.mkString("\t"))
       builder.append("\n")
-      val rows = dsRow.first(maxResult).collect()
-      for (row <- rows) {
+      val isLargerThanMaxResult = rows.size > maxResult
+      var displayRows = rows
+      if (isLargerThanMaxResult) {
+        displayRows = rows.take(maxResult)
+      }
+      for (row <- displayRows) {
         var i = 0;
         while (i < row.getArity) {
           builder.append(row.getField(i))
@@ -73,6 +84,11 @@ class FlinkZeppelinContext(val btenv: BatchTableEnvironment,
         }
         builder.append("\n")
       }
+
+      if (isLargerThanMaxResult) {
+        builder.append("\n")
+        builder.append(ResultMessages.getExceedsLimitRowsMessage(maxResult, "zeppelin.spark.maxResult"))
+      }
       // append %text at the end, otherwise the following output will be put in table as well.
       builder.append("\n%text ")
       builder.toString()
@@ -80,16 +96,27 @@ class FlinkZeppelinContext(val btenv: BatchTableEnvironment,
 
     if (obj.isInstanceOf[DataSet[_]]) {
       val ds = obj.asInstanceOf[DataSet[_]]
-      val table = btenv.fromDataSet(ds)
-      showTable(table)
+      val env = btenv.asInstanceOf[BatchTableEnvironment]
+      val table = env.fromDataSet(ds)
+      val columnNames: Array[String] = table.getSchema.getFieldNames
+      val dsRows: DataSet[Row] = env.toDataSet[Row](table)
+      showTable(columnNames, dsRows.first(maxResult + 1).collect())
     } else if (obj.isInstanceOf[Table]) {
-      showTable(obj.asInstanceOf[Table])
+      if (btenv.isInstanceOf[BatchTableEnvironment]) {
+        val table = obj.asInstanceOf[Table]
+        val columnNames: Array[String] = table.getSchema.getFieldNames
+        val dsRows: DataSet[Row] = btenv.asInstanceOf[BatchTableEnvironment].toDataSet[Row](table)
+        showTable(columnNames, dsRows.first(maxResult + 1).collect())
+      } else {
+        var rows = TableUtil.collect(obj.asInstanceOf[TableImpl], currentSql)
+        val columnNames = obj.asInstanceOf[Table].getSchema.getFieldNames
+        showTable(columnNames, rows)
+      }
     } else {
       obj.toString
     }
   }
 
-
   @ZeppelinApi
   def select(name: String, options: Seq[(Any, String)]): Any = select(name, null, options)
 
@@ -174,4 +201,4 @@ class FlinkZeppelinContext(val btenv: BatchTableEnvironment,
     }
     angularWatch(name, noteId, w)
   }
-}
+}
\ No newline at end of file
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/SqlJobRunner.scala b/flink/src/main/scala/org/apache/zeppelin/flink/SqlJobRunner.scala
new file mode 100644
index 0000000..b8f2219
--- /dev/null
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/SqlJobRunner.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.flink
+
+import org.apache.flink.runtime.jobgraph.JobGraph
+import org.slf4j.{Logger, LoggerFactory}
+
+class SqlJobRunner(cluster: types.ClusterType,
+                   jobGraph: JobGraph,
+                   jobName: String,
+                   classLoader: ClassLoader) {
+
+  lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
+
+  def run(): Unit = {
+    cluster match {
+      case Some(Left(miniCluster)) =>
+        miniCluster.submitJob(jobGraph)
+      case Some(Right(yarnCluster)) =>
+        yarnCluster.submitJob(jobGraph, Thread.currentThread().getContextClassLoader)
+      case None =>
+        LOGGER.error("Unable to run SqlJobRunner")
+    }
+  }
+}
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/types.scala b/flink/src/main/scala/org/apache/zeppelin/flink/types.scala
new file mode 100644
index 0000000..71cc860
--- /dev/null
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/types.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink
+
+import org.apache.flink.client.program.ClusterClient
+import org.apache.flink.runtime.minicluster.MiniCluster
+
+object types {
+  type ClusterType = Option[Either[MiniCluster, ClusterClient[_]]]
+}
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/util/DependencyUtils.scala b/flink/src/main/scala/org/apache/zeppelin/flink/util/DependencyUtils.scala
new file mode 100644
index 0000000..a3303c9
--- /dev/null
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/util/DependencyUtils.scala
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.util
+
+import java.io.{File, IOException}
+import java.text.ParseException
+import java.util.UUID
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.ivy.Ivy
+import org.apache.ivy.core.LogOptions
+import org.apache.ivy.core.module.descriptor._
+import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId}
+import org.apache.ivy.core.report.ResolveReport
+import org.apache.ivy.core.resolve.ResolveOptions
+import org.apache.ivy.core.retrieve.RetrieveOptions
+import org.apache.ivy.core.settings.IvySettings
+import org.apache.ivy.plugins.matcher.GlobPatternMatcher
+import org.apache.ivy.plugins.repository.file.FileRepository
+import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBiblioResolver}
+
+object DependencyUtils {
+
+  def resolveMavenDependencies(
+                                packagesExclusions: String,
+                                packages: String,
+                                repositories: String,
+                                ivyRepoPath: String,
+                                ivySettingsPath: Option[String]): String = {
+    val exclusions: Seq[String] =
+      if (!StringUtils.isBlank(packagesExclusions)) {
+        packagesExclusions.split(",")
+      } else {
+        Nil
+      }
+    // Create the IvySettings, either load from file or build defaults
+    val ivySettings = ivySettingsPath match {
+      case Some(path) =>
+        loadIvySettings(path, Option(repositories), Option(ivyRepoPath))
+
+      case None =>
+        buildIvySettings(Option(repositories), Option(ivyRepoPath))
+    }
+
+    resolveMavenCoordinates(packages, ivySettings, exclusions = exclusions)
+  }
+
+  // Exposed for testing
+  var printStream = Console.out
+
+  /**
+    * Represents a Maven Coordinate
+    *
+    * @param groupId    the groupId of the coordinate
+    * @param artifactId the artifactId of the coordinate
+    * @param version    the version of the coordinate
+    */
+  case class MavenCoordinate(groupId: String, artifactId: String, version: String) {
+    override def toString: String = s"$groupId:$artifactId:$version"
+  }
+
+  /**
+    * Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
+    * in the format `groupId:artifactId:version` or `groupId/artifactId:version`.
+    *
+    * @param coordinates Comma-delimited string of maven coordinates
+    * @return Sequence of Maven coordinates
+    */
+  def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = {
+    coordinates.split(",").map { p =>
+      val splits = p.replace("/", ":").split(":")
+      require(splits.length == 3, s"Provided Maven Coordinates must be in the form " +
+        s"'groupId:artifactId:version'. The coordinate provided is: $p")
+      require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " +
+        s"be whitespace. The groupId provided is: ${splits(0)}")
+      require(splits(1) != null && splits(1).trim.nonEmpty, s"The artifactId cannot be null or " +
+        s"be whitespace. The artifactId provided is: ${splits(1)}")
+      require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " +
+        s"be whitespace. The version provided is: ${splits(2)}")
+      new MavenCoordinate(splits(0), splits(1), splits(2))
+    }
+  }
+
+  /** Path of the local Maven cache. */
+  private def m2Path: File = {
+    new File(System.getProperty("user.home"), ".m2" + File.separator + "repository")
+  }
+
+  /**
+    * Extracts maven coordinates from a comma-delimited string
+    *
+    * @param defaultIvyUserDir The default user path for Ivy
+    * @return A ChainResolver used by Ivy to search for and resolve dependencies.
+    */
+  def createRepoResolvers(defaultIvyUserDir: File): ChainResolver = {
+    // We need a chain resolver if we want to check multiple repositories
+    val cr = new ChainResolver
+    cr.setName("flink-list")
+
+    val localM2 = new IBiblioResolver
+    localM2.setM2compatible(true)
+    localM2.setRoot(m2Path.toURI.toString)
+    localM2.setUsepoms(true)
+    localM2.setName("local-m2-cache")
+    cr.add(localM2)
+
+    val localIvy = new FileSystemResolver
+    val localIvyRoot = new File(defaultIvyUserDir, "local")
+    localIvy.setLocal(true)
+    localIvy.setRepository(new FileRepository(localIvyRoot))
+    val ivyPattern = Seq(localIvyRoot.getAbsolutePath, "[organisation]", "[module]", "[revision]",
+      "ivys", "ivy.xml").mkString(File.separator)
+    localIvy.addIvyPattern(ivyPattern)
+    val artifactPattern = Seq(localIvyRoot.getAbsolutePath, "[organisation]", "[module]",
+      "[revision]", "[type]s", "[artifact](-[classifier]).[ext]").mkString(File.separator)
+    localIvy.addArtifactPattern(artifactPattern)
+    localIvy.setName("local-ivy-cache")
+    cr.add(localIvy)
+
+    // the biblio resolver resolves POM declared dependencies
+    val br: IBiblioResolver = new IBiblioResolver
+    br.setM2compatible(true)
+    br.setUsepoms(true)
+    br.setName("central")
+    cr.add(br)
+
+    cr
+  }
+
+  /**
+    * Output a comma-delimited list of paths for the downloaded jars to be added to the classpath
+    *
+    * @param artifacts      Sequence of dependencies that were resolved and retrieved
+    * @param cacheDirectory directory where jars are cached
+    * @return a comma-delimited list of paths for the dependencies
+    */
+  def resolveDependencyPaths(
+                              artifacts: Array[AnyRef],
+                              cacheDirectory: File): String = {
+    artifacts.map { artifactInfo =>
+      val artifact = artifactInfo.asInstanceOf[Artifact].getModuleRevisionId
+      cacheDirectory.getAbsolutePath + File.separator +
+        s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}.jar"
+    }.mkString(":")
+  }
+
+  /** Adds the given maven coordinates to Ivy's module descriptor. */
+  def addDependenciesToIvy(
+                            md: DefaultModuleDescriptor,
+                            artifacts: Seq[MavenCoordinate],
+                            ivyConfName: String): Unit = {
+    artifacts.foreach { mvn =>
+      val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version)
+      val dd = new DefaultDependencyDescriptor(ri, false, false)
+      dd.addDependencyConfiguration(ivyConfName, ivyConfName + "(runtime)")
+      // scalastyle:off println
+      printStream.println(s"${dd.getDependencyId} added as a dependency")
+      // scalastyle:on println
+      md.addDependency(dd)
+    }
+  }
+
+  /** Add exclusion rules for dependencies already included in the flink-dist */
+  def addExclusionRules(
+                         ivySettings: IvySettings,
+                         ivyConfName: String,
+                         md: DefaultModuleDescriptor): Unit = {
+    // Add scala exclusion rule
+    md.addExcludeRule(createExclusion("*:scala-library:*", ivySettings, ivyConfName))
+  }
+
+  /**
+    * Build Ivy Settings using options with default resolvers
+    *
+    * @param remoteRepos Comma-delimited string of remote repositories other than maven central
+    * @param ivyPath     The path to the local ivy repository
+    * @return An IvySettings object
+    */
+  def buildIvySettings(remoteRepos: Option[String], ivyPath: Option[String]): IvySettings = {
+    val ivySettings: IvySettings = new IvySettings
+    processIvyPathArg(ivySettings, ivyPath)
+
+    // create a pattern matcher
+    ivySettings.addMatcher(new GlobPatternMatcher)
+    // create the dependency resolvers
+    val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir)
+    ivySettings.addResolver(repoResolver)
+    ivySettings.setDefaultResolver(repoResolver.getName)
+    processRemoteRepoArg(ivySettings, remoteRepos)
+    ivySettings
+  }
+
+  /**
+    * Load Ivy settings from a given filename, using supplied resolvers
+    *
+    * @param settingsFile Path to Ivy settings file
+    * @param remoteRepos  Comma-delimited string of remote repositories other than maven central
+    * @param ivyPath      The path to the local ivy repository
+    * @return An IvySettings object
+    */
+  def loadIvySettings(
+                       settingsFile: String,
+                       remoteRepos: Option[String],
+                       ivyPath: Option[String]): IvySettings = {
+    val file = new File(settingsFile)
+    require(file.exists(), s"Ivy settings file $file does not exist")
+    require(file.isFile(), s"Ivy settings file $file is not a normal file")
+    val ivySettings: IvySettings = new IvySettings
+    try {
+      ivySettings.load(file)
+    } catch {
+      case e@(_: IOException | _: ParseException) =>
+        throw new RuntimeException(s"Failed when loading Ivy settings from $settingsFile", e)
+    }
+    processIvyPathArg(ivySettings, ivyPath)
+    processRemoteRepoArg(ivySettings, remoteRepos)
+    ivySettings
+  }
+
+  /* Set ivy settings for location of cache, if option is supplied */
+  private def processIvyPathArg(ivySettings: IvySettings, ivyPath: Option[String]): Unit = {
+    ivyPath.filterNot(_.trim.isEmpty).foreach { alternateIvyDir =>
+      ivySettings.setDefaultIvyUserDir(new File(alternateIvyDir))
+      ivySettings.setDefaultCache(new File(alternateIvyDir, "cache"))
+    }
+  }
+
+  /* Add any optional additional remote repositories */
+  private def processRemoteRepoArg(ivySettings: IvySettings, remoteRepos: Option[String]): Unit = {
+    remoteRepos.filterNot(_.trim.isEmpty).map(_.split(",")).foreach { repositoryList =>
+      val cr = new ChainResolver
+      cr.setName("user-list")
+
+      // add current default resolver, if any
+      Option(ivySettings.getDefaultResolver).foreach(cr.add)
+
+      // add additional repositories, last resolution in chain takes precedence
+      repositoryList.zipWithIndex.foreach { case (repo, i) =>
+        val brr: IBiblioResolver = new IBiblioResolver
+        brr.setM2compatible(true)
+        brr.setUsepoms(true)
+        brr.setRoot(repo)
+        brr.setName(s"repo-${i + 1}")
+        cr.add(brr)
+        // scalastyle:off println
+        printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
+        // scalastyle:on println
+      }
+
+      ivySettings.addResolver(cr)
+      ivySettings.setDefaultResolver(cr.getName)
+    }
+  }
+
+  /** A nice function to use in tests as well. Values are dummy strings. */
+  def getModuleDescriptor: DefaultModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
+    // Include UUID in module name, so multiple clients resolving maven coordinate at the same time
+    // do not modify the same resolution file concurrently.
+    ModuleRevisionId.newInstance("org.apache.flink",
+      s"flink-parent-${UUID.randomUUID.toString}",
+      "1.0"))
+
+  private def clearIvyResolutionFiles(
+                                       mdId: ModuleRevisionId,
+                                       ivySettings: IvySettings,
+                                       ivyConfName: String): Unit = {
+    val currentResolutionFiles = Seq(
+      s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml",
+      s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.xml",
+      s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.properties"
+    )
+    currentResolutionFiles.foreach { filename =>
+      new File(ivySettings.getDefaultCache, filename).delete()
+    }
+  }
+
+  /**
+    * Resolves any dependencies that were supplied through maven coordinates
+    *
+    * @param coordinates Comma-delimited string of maven coordinates
+    * @param ivySettings An IvySettings containing resolvers to use
+    * @param exclusions  Exclusions to apply when resolving transitive dependencies
+    * @return The comma-delimited path to the jars of the given maven artifacts including their
+    *         transitive dependencies
+    */
+  def resolveMavenCoordinates(
+                               coordinates: String,
+                               ivySettings: IvySettings,
+                               exclusions: Seq[String] = Nil,
+                               isTest: Boolean = false): String = {
+    if (coordinates == null || coordinates.trim.isEmpty) {
+      ""
+    } else {
+      val sysOut = System.out
+      try {
+        // To prevent ivy from logging to system out
+        System.setOut(printStream)
+        val artifacts = extractMavenCoordinates(coordinates)
+        val packagesDirectory: File = new File(ivySettings.getDefaultIvyUserDir, "jars")
+        // scalastyle:off println
+        printStream.println(
+          s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
+        printStream.println(s"The jars for the packages stored in: $packagesDirectory")
+        // scalastyle:on println
+
+        val ivy = Ivy.newInstance(ivySettings)
+        // Set resolve options to download transitive dependencies as well
+        val resolveOptions = new ResolveOptions
+        resolveOptions.setTransitive(true)
+        val retrieveOptions = new RetrieveOptions
+        // Turn downloading and logging off for testing
+        if (isTest) {
+          resolveOptions.setDownload(false)
+          resolveOptions.setLog(LogOptions.LOG_QUIET)
+          retrieveOptions.setLog(LogOptions.LOG_QUIET)
+        } else {
+          resolveOptions.setDownload(true)
+        }
+
+        // Default configuration name for ivy
+        val ivyConfName = "default"
+
+        // A Module descriptor must be specified. Entries are dummy strings
+        val md = getModuleDescriptor
+
+        md.setDefaultConf(ivyConfName)
+
+        // Add exclusion rules for Flink and Scala Library
+        addExclusionRules(ivySettings, ivyConfName, md)
+        // add all supplied maven artifacts as dependencies
+        addDependenciesToIvy(md, artifacts, ivyConfName)
+        exclusions.foreach { e =>
+          md.addExcludeRule(createExclusion(e + ":*", ivySettings, ivyConfName))
+        }
+        // resolve dependencies
+        val rr: ResolveReport = ivy.resolve(md, resolveOptions)
+        if (rr.hasError) {
+          throw new RuntimeException(rr.getAllProblemMessages.toString)
+        }
+        // retrieve all resolved dependencies
+        ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
+          packagesDirectory.getAbsolutePath + File.separator +
+            "[organization]_[artifact]-[revision](-[classifier]).[ext]",
+          retrieveOptions.setConfs(Array(ivyConfName)))
+        val paths = resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
+        val mdId = md.getModuleRevisionId
+        clearIvyResolutionFiles(mdId, ivySettings, ivyConfName)
+        paths
+      } finally {
+        System.setOut(sysOut)
+      }
+    }
+  }
+
+  private def createExclusion(
+                               coords: String,
+                               ivySettings: IvySettings,
+                               ivyConfName: String): ExcludeRule = {
+    val c = extractMavenCoordinates(coords)(0)
+    val id = new ArtifactId(new ModuleId(c.groupId, c.artifactId), "*", "*", "*")
+    val rule = new DefaultExcludeRule(id, ivySettings.getMatcher("glob"), null)
+    rule.addConfiguration(ivyConfName)
+    rule
+  }
+
+}
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/util/TableUtil.scala b/flink/src/main/scala/org/apache/zeppelin/flink/util/TableUtil.scala
new file mode 100644
index 0000000..333939a
--- /dev/null
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/util/TableUtil.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.util
+
+
+import _root_.java.util.{UUID, ArrayList => JArrayList}
+
+import org.apache.calcite.rel.RelNode
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.internal.{TableEnvironmentImpl, TableImpl}
+import org.apache.flink.table.api.{Table, TableEnvironment}
+import org.apache.flink.table.planner.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import org.apache.flink.table.planner.delegation.PlannerBase
+import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType
+import org.apache.flink.table.planner.sinks.{CollectRowTableSink, CollectTableSink}
+import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter
+import org.apache.flink.table.types.logical.TimestampType
+import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
+import org.apache.flink.types.Row
+import org.apache.flink.util.AbstractID
+
+import _root_.scala.collection.JavaConversions._
+import _root_.scala.collection.JavaConverters._
+
+object TableUtil {
+
+  /**
+    * Returns an collection that contains all rows in this Table.
+    *
+    * Note: The difference between print() and collect() is
+    * - print() prints data on workers and collect() collects data to the client.
+    * - You have to call TableEnvironment.execute() to run the job for print(), while collect()
+    * calls execute automatically.
+    */
+  def collect(table: TableImpl): Seq[Row] = collectSink(table, new CollectRowTableSink, None)
+
+  def collect(table: TableImpl, jobName: String): Seq[Row] =
+    collectSink(table, new CollectRowTableSink, Option.apply(jobName))
+
+  def collectAsT[T](table: TableImpl, t: TypeInformation[_], jobName: String = null): Seq[T] =
+    collectSink(
+      table,
+      new CollectTableSink(_ => t.asInstanceOf[TypeInformation[T]]), Option(jobName))
+
+  def collectSink[T](
+      table: TableImpl, sink: CollectTableSink[T], jobName: Option[String] = None): Seq[T] = {
+    // get schema information of table
+    val relNode = toRelNode(table)
+    val rowType = relNode.getRowType
+    val fieldNames = rowType.getFieldNames.asScala.toArray
+    val fieldTypes = rowType.getFieldList.map { field =>
+      val `type` = field.getType match {
+        // converts `TIME ATTRIBUTE(ROWTIME)`/`TIME ATTRIBUTE(PROCTIME)` to `TIMESTAMP(3)` for sink
+        case _: TimeIndicatorRelDataType =>
+          relNode.getCluster
+            .getTypeFactory.asInstanceOf[FlinkTypeFactory]
+            .createFieldTypeFromLogicalType(new TimestampType(false, 3))
+        case t => t
+      }
+      FlinkTypeFactory.toLogicalType(`type`)
+    }.toArray
+    val configuredSink = sink.configure(
+      fieldNames, fieldTypes.map(TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo))
+    collect(table.getTableEnvironment,
+      table, configuredSink.asInstanceOf[CollectTableSink[T]], jobName)
+  }
+
+  /**
+    * Converts operation tree in the given table to a RelNode tree.
+    */
+  def toRelNode(table: Table): RelNode = {
+    val plannerBase = table.asInstanceOf[TableImpl]
+      .getTableEnvironment.asInstanceOf[TableEnvironmentImpl]
+      .getPlanner.asInstanceOf[PlannerBase]
+
+    val method = classOf[PlannerBase].getMethod("getRelBuilder")
+    method.setAccessible(true)
+    method.invoke(plannerBase).asInstanceOf[FlinkRelBuilder]
+      .queryOperation(table.getQueryOperation).build()
+  }
+
+  def collect[T](
+                  tEnv: TableEnvironment,
+                  table: Table,
+                  sink: CollectTableSink[T],
+                  jobName: Option[String]): Seq[T] = {
+
+    val method = classOf[PlannerBase].getMethod("getExecEnv")
+    method.setAccessible(true)
+    val execEnv = method.invoke(tEnv.asInstanceOf[TableEnvironmentImpl]
+      .getPlanner).asInstanceOf[StreamExecutionEnvironment]
+
+    val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
+      .asInstanceOf[TypeInformation[T]]
+      .createSerializer(execEnv.getConfig)
+    val id = new AbstractID().toString
+    sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id)
+    val sinkName = UUID.randomUUID().toString
+
+    // workaround, otherwise it won't find the sink properly
+    val originalCatalog = tEnv.getCurrentCatalog
+    val originalDatabase = tEnv.getCurrentDatabase
+    try {
+      tEnv.useCatalog("default_catalog")
+      tEnv.useDatabase("default_database")
+      tEnv.registerTableSink(sinkName, sink)
+      tEnv.insertInto(table, sinkName)
+      val res = tEnv.execute(jobName.getOrElse("sql collect job"))
+      val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id)
+      SerializedListAccumulator.deserializeList(accResult, typeSerializer)
+    } finally {
+      tEnv.useCatalog(originalCatalog)
+      tEnv.useDatabase(originalDatabase)
+    }
+  }
+}
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/BatchSqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/BatchSqlInterpreterTest.java
new file mode 100644
index 0000000..cb517c3
--- /dev/null
+++ b/flink/src/test/java/org/apache/zeppelin/flink/BatchSqlInterpreterTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+public abstract class BatchSqlInterpreterTest extends FlinkSqlInterpreterTest {
+
+  protected abstract String getPlanner();
+
+  @Override
+  protected Properties getFlinkProperties() throws IOException {
+    Properties p = super.getFlinkProperties();
+    p.setProperty("zeppelin.flink.planner", getPlanner());
+    return p;
+  }
+
+  @Override
+  protected FlinkSqlInterrpeter createFlinkSqlInterpreter(Properties properties) {
+    return new FlinkBatchSqlInterpreter(properties);
+  }
+
+  @Test
+  public void testBatchSQL() throws InterpreterException {
+    if (getPlanner().equals("blink")) {
+      return;
+    }
+    InterpreterResult result = flinkInterpreter.interpret(
+            "val ds = benv.fromElements((1, \"jeff\"), (2, \"andy\"))", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    result = flinkInterpreter
+            .interpret("btenv.registerDataSet(\"table_1\", ds, 'a, 'b)",
+                    getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    sqlInterpreter.flinkInterpreter.getBatchTableEnvironment().useCatalog("default_catalog");
+    sqlInterpreter.flinkInterpreter.getBatchTableEnvironment().useDatabase("default_database");
+
+    result = sqlInterpreter.interpret("select * from default_catalog.default_database.table_1",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("a\tb\n" +
+            "1\tjeff\n" +
+            "2\tandy\n", appendOutput);
+  }
+
+  //@Test
+  public void testHiveTable() throws InterpreterException {
+    //    hiveShell.execute("create table hive_table (id int, name string)");
+    InterpreterResult result = sqlInterpreter.interpret(
+            "select * from hive_table",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+  }
+
+  //@Test
+  public void testInsertInto() throws InterpreterException {
+    if (getPlanner().equals("flink")) {
+      return;
+    }
+    //    hiveShell.execute("create table table_inserted (id int, name string)");
+    //    hiveShell.executeQuery("show tables");
+    InterpreterResult result = flinkInterpreter.interpret(
+            "val ds = benv.fromElements((1, \"jeff\"), (2, \"andy\"))", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    result = flinkInterpreter
+            .interpret("btenv.registerDataSet(\"table_2\", ds, 'a, 'b)",
+                    getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    result = sqlInterpreter.interpret(
+            "insert into table_inserted select * from default_catalog.default_database.table_2",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+  }
+
+  //@Test
+  public void testUDF() throws InterpreterException {
+
+    InterpreterResult result = flinkInterpreter.interpret(
+            "class AddOne extends ScalarFunction {\n" +
+                    "  def eval(a: Int): Int = a + 1\n" +
+                    "}", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    result = flinkInterpreter.interpret("btenv.registerFunction(\"addOne\", new $AddOne())",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    result = sqlInterpreter.interpret("INSERT INTO dest SELECT addOne(int_col) FROM source",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+  }
+}
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/BlinkBatchSqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/BlinkBatchSqlInterpreterTest.java
new file mode 100644
index 0000000..d9da86e
--- /dev/null
+++ b/flink/src/test/java/org/apache/zeppelin/flink/BlinkBatchSqlInterpreterTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.flink;
+
+
+public class BlinkBatchSqlInterpreterTest extends BatchSqlInterpreterTest {
+  @Override
+  protected String getPlanner() {
+    return "blink";
+  }
+
+}
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
new file mode 100644
index 0000000..62c9079
--- /dev/null
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.flink;
+
+public class FlinkBatchSqlInterpreterTest extends BatchSqlInterpreterTest {
+
+  @Override
+  protected String getPlanner() {
+    return "flink";
+  }
+}
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index 0c42139..a7935fa 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -28,6 +28,7 @@ import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterOutputListener;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.junit.After;
 import org.junit.Before;
@@ -42,6 +43,8 @@ import java.util.Properties;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
 
 public class FlinkInterpreterTest {
 
@@ -53,14 +56,21 @@ public class FlinkInterpreterTest {
   // catch the interpreter output in onUpdate
   private List<InterpreterResultMessageOutput> messageOutput;
 
+
   @Before
   public void setUp() throws InterpreterException {
     Properties p = new Properties();
+    p.setProperty("zeppelin.flink.printREPLOutput", "true");
+    p.setProperty("zeppelin.flink.scala.color", "false");
+    p.setProperty("flink.execution.mode", "local");
+
     interpreter = new FlinkInterpreter(p);
     InterpreterGroup intpGroup = new InterpreterGroup();
     interpreter.setInterpreterGroup(intpGroup);
     interpreter.open();
-    context = InterpreterContext.builder().build();
+    context = InterpreterContext.builder()
+            .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
+            .build();
     InterpreterContext.set(context);
   }
 
@@ -70,7 +80,7 @@ public class FlinkInterpreterTest {
   }
 
   @Test
-  public void testBasicScala() throws InterpreterException, IOException {
+  public void testBasicScala() throws InterpreterException {
     InterpreterResult result = interpreter.interpret("val a=\"hello world\"",
         getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -130,21 +140,15 @@ public class FlinkInterpreterTest {
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
     // case class
-    result = interpreter.interpret(
-        "case class Bank(age:Integer, job:String, marital : String, education : String," +
-            " balance : Integer)\n",
-        getInterpreterContext());
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-
-    // ZeppelinContext
-    context = getInterpreterContext();
-    result = interpreter.interpret("val ds = benv.fromElements(1,2,3)\nz.show(ds)", context);
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-    assertEquals(InterpreterResult.Type.TABLE, messageOutput.get(0).getType());
-    assertEquals("f0\n" +
-        "1\n" +
-        "2\n" +
-        "3\n", messageOutput.get(0).toInterpreterResultMessage().getData());
+    //    result = interpreter.interpret(
+    //            "case class WC(word: String, count: Int)\n" +
+    //            "val wordCounts = benv.fromElements(\n" +
+    //            "WC(\"hello\", 1),\n" +
+    //            "WC(\"world\", 2),\n" +
+    //            "WC(\"world\", 8))\n" +
+    //            "wordCounts.collect()",
+    //        getInterpreterContext());
+    //    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
     context = getInterpreterContext();
     result = interpreter.interpret("z.input(\"name\", \"default_name\")",
@@ -200,20 +204,26 @@ public class FlinkInterpreterTest {
     List<InterpreterCompletion> completions = interpreter.completion("a.", 2,
         getInterpreterContext());
     assertTrue(completions.size() > 0);
-  }
 
+    completions = interpreter.completion("benv.", 5, getInterpreterContext());
+    assertTrue(completions.size() > 0);
+  }
 
-  // Disable it for now as there's extra std output from flink shell.
   @Test
-  public void testWordCount() throws InterpreterException, IOException {
-    interpreter.interpret("val text = benv.fromElements(\"To be or not to be\")",
+  public void testBatchWordCount() throws InterpreterException, IOException {
+    InterpreterResult result = interpreter.interpret(
+            "val data = benv.fromElements(\"hello world\", \"hello flink\", \"hello hadoop\")",
         getInterpreterContext());
-    interpreter.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }" +
-        ".map { (_, 1) }.groupBy(0).sum(1)", getInterpreterContext());
-    InterpreterResult result = interpreter.interpret("counts.print()", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    result = interpreter.interpret(
+            "data.flatMap(line => line.split(\"\\\\s\"))\n" +
+            "  .map(w => (w, 1))\n" +
+            "  .groupBy(0)\n" +
+            "  .sum(1)\n" +
+            "  .print()", getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
-    String[] expectedCounts = {"(to,2)", "(be,2)", "(or,1)", "(not,1)"};
+    String[] expectedCounts = {"(hello,3)", "(world,1)", "(flink,1)", "(hadoop,1)"};
     Arrays.sort(expectedCounts);
 
     String[] counts = output.split("\n");
@@ -222,12 +232,50 @@ public class FlinkInterpreterTest {
     assertArrayEquals(expectedCounts, counts);
   }
 
+  @Test
+  public void testStreamWordCount() throws InterpreterException {
+    InterpreterResult result = interpreter.interpret(
+            "val data = senv.fromElements(\"hello world\", \"hello flink\", \"hello hadoop\")",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    result = interpreter.interpret(
+            "data.flatMap(line => line.split(\"\\\\s\"))\n" +
+                    "  .map(w => (w, 1))\n" +
+                    "  .keyBy(0)\n" +
+                    "  .sum(1)\n" +
+                    "  .print()\n" +
+                    "senv.execute()", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    String[] expectedCounts = {"(hello,3)", "(world,1)", "(flink,1)", "(hadoop,1)"};
+    for (String expectedCount : expectedCounts) {
+      assertTrue(output, output.contains(expectedCount));
+    }
+  }
+
+  //@Test
+  public void testStreamUDF() throws InterpreterException {
+    InterpreterResult result = interpreter.interpret(
+            "class MyUpper extends ScalarFunction {\n" +
+                    "  def eval(str: String) = str.toUpperCase\n" +
+                    "}\n" +
+                    "stenv.registerFunction(\"myupper\", new MyUpper())",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    result = interpreter.interpret(
+            "val data = Seq(\"Hello\", \"Flink\")\n" +
+            "val source = senv.fromCollection(data).toTable(stenv, 'word)\n" +
+            "source.select(\"myupper(word)\").print()",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+  }
+
   private InterpreterContext getInterpreterContext() {
     output = "";
     messageOutput = new ArrayList<>();
     InterpreterContext context = InterpreterContext.builder()
-        .setInterpreterOut(new InterpreterOutput(null))
         .setAngularObjectRegistry(new AngularObjectRegistry("flink", null))
+        .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
         .build();
     context.out = new InterpreterOutput(
         new InterpreterOutputListener() {
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkSQLInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkSQLInterpreterTest.java
deleted file mode 100644
index 6993540..0000000
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkSQLInterpreterTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink;
-
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterOutput;
-import org.apache.zeppelin.interpreter.InterpreterOutputListener;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-
-public class FlinkSQLInterpreterTest {
-
-  private FlinkInterpreter interpreter;
-  private FlinkSQLInterpreter sqlInterpreter;
-  private InterpreterContext context;
-
-  // catch the streaming output in onAppend
-  private volatile String output = "";
-  // catch the interpreter output in onUpdate
-  private InterpreterResultMessageOutput messageOutput;
-
-  @Before
-  public void setUp() throws InterpreterException {
-    Properties p = new Properties();
-    interpreter = new FlinkInterpreter(p);
-    sqlInterpreter = new FlinkSQLInterpreter(p);
-    InterpreterGroup intpGroup = new InterpreterGroup();
-    interpreter.setInterpreterGroup(intpGroup);
-    sqlInterpreter.setInterpreterGroup(intpGroup);
-    intpGroup.addInterpreterToSession(interpreter, "session_1");
-    intpGroup.addInterpreterToSession(sqlInterpreter, "session_1");
-
-    interpreter.open();
-    sqlInterpreter.open();
-    context = InterpreterContext.builder().build();
-  }
-
-  @Test
-  public void testSQLInterpreter() throws InterpreterException {
-    InterpreterResult result = interpreter.interpret(
-        "val ds = benv.fromElements((1, \"jeff\"), (2, \"andy\"))", getInterpreterContext());
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-
-    result = interpreter.interpret("btenv.registerDataSet(\"table_1\", ds)",
-        getInterpreterContext());
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-
-    result = sqlInterpreter.interpret("select * from table_1", getInterpreterContext());
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-    assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
-    assertEquals("_1\t_2\n" +
-        "1\tjeff\n" +
-        "2\tandy\n", result.message().get(0).getData());
-  }
-
-  private InterpreterContext getInterpreterContext() {
-    output = "";
-    InterpreterContext context = InterpreterContext.builder()
-        .setInterpreterOut(new InterpreterOutput(null))
-        .setAngularObjectRegistry(new AngularObjectRegistry("flink", null))
-        .build();
-    context.out = new InterpreterOutput(
-        new InterpreterOutputListener() {
-          @Override
-          public void onUpdateAll(InterpreterOutput out) {
-
-          }
-
-          @Override
-          public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
-            try {
-              output = out.toInterpreterResultMessage().getData();
-            } catch (IOException e) {
-              e.printStackTrace();
-            }
-          }
-
-          @Override
-          public void onUpdate(int index, InterpreterResultMessageOutput out) {
-            messageOutput = out;
-          }
-        });
-    return context;
-  }
-}
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkSqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkSqlInterpreterTest.java
new file mode 100644
index 0000000..e54d71a
--- /dev/null
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkSqlInterpreterTest.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import com.google.common.io.Files;
+//import com.klarna.hiverunner.HiveShell;
+//import com.klarna.hiverunner.annotations.HiveSQL;
+import org.apache.commons.io.IOUtils;
+//import org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterOutputListener;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
+import org.junit.After;
+import org.junit.Before;
+//import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+//import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+
+//@RunWith(FlinkStandaloneHiveRunner.class)
+public abstract class FlinkSqlInterpreterTest {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(FlinkSqlInterpreterTest.class);
+  protected static final String[][] INPUT_DATA = {
+          {"1", "1.1", "hello world", "true"},
+          {"2", "2.3", "hello flink", "true"},
+          {"3", "3.2", "hello hadoop", "false"},
+  };
+
+
+  protected FlinkInterpreter flinkInterpreter;
+  protected FlinkSqlInterrpeter sqlInterpreter;
+
+  // catch the streaming appendOutput in onAppend
+  protected volatile String appendOutput = "";
+  protected volatile InterpreterResult.Type appendOutputType;
+
+  // catch the flinkInterpreter appendOutput in onUpdate
+  protected InterpreterResultMessageOutput updatedOutput;
+
+  //  @HiveSQL(files = {})
+  //  protected static HiveShell hiveShell;
+
+
+  protected Properties getFlinkProperties() throws IOException {
+    Properties p = new Properties();
+    p.setProperty("zeppelin.flink.enableHive", "false");
+    p.setProperty("zeppelin.flink.planner", "blink");
+    p.setProperty("taskmanager.managed.memory.size", "32");
+    p.setProperty("zeppelin.flink.hive.version", "2.3.4");
+    File hiveConfDir = Files.createTempDir();
+    //    hiveShell.getHiveConf().writeXml(new FileWriter(new File(hiveConfDir, "hive-site.xml")));
+    p.setProperty("HIVE_CONF_DIR", hiveConfDir.getAbsolutePath());
+    return p;
+  }
+
+  @Before
+  public void setUp() throws InterpreterException, IOException {
+    Properties p = getFlinkProperties();
+    flinkInterpreter = new FlinkInterpreter(p);
+    sqlInterpreter = createFlinkSqlInterpreter(p);
+    InterpreterGroup intpGroup = new InterpreterGroup();
+    flinkInterpreter.setInterpreterGroup(intpGroup);
+    sqlInterpreter.setInterpreterGroup(intpGroup);
+    intpGroup.addInterpreterToSession(flinkInterpreter, "session_1");
+    intpGroup.addInterpreterToSession(sqlInterpreter, "session_1");
+
+    flinkInterpreter.open();
+    sqlInterpreter.open();
+
+    //    hiveShell.execute("drop database if exists test_db CASCADE");
+    //    hiveShell.execute("create database test_db");
+    //    hiveShell.execute("use test_db");
+
+    //    InterpreterResult result = sqlInterpreter.interpret("use database test_db",
+    //            getInterpreterContext());
+    //    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+  }
+
+  @After
+  public void tearDown() throws InterpreterException {
+    flinkInterpreter.close();
+  }
+
+  protected abstract FlinkSqlInterrpeter createFlinkSqlInterpreter(Properties properties);
+
+  //@Test
+  public void testDatabases() throws InterpreterException {
+    InterpreterResult result = sqlInterpreter.interpret("show databases",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(InterpreterResult.Type.TABLE, appendOutputType);
+    assertEquals("database\ndefault\ntest_db\n", appendOutput);
+
+    result = sqlInterpreter.interpret("create database db1",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(InterpreterResult.Type.TEXT, appendOutputType);
+    assertEquals("Database has been created.\n", appendOutput);
+
+    result = sqlInterpreter.interpret("use db1",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    result = sqlInterpreter.interpret("show tables",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(InterpreterResult.Type.TABLE, appendOutputType);
+    assertEquals("table\n", appendOutput);
+
+    result = sqlInterpreter.interpret(
+            "CREATE TABLE source (msg INT) with (type='csv', path='/tmp')",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    result = sqlInterpreter.interpret("show tables",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(InterpreterResult.Type.TABLE, appendOutputType);
+    assertEquals("table\nsource\n", appendOutput);
+
+    result = sqlInterpreter.interpret("use `default`",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    result = sqlInterpreter.interpret("show tables",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(InterpreterResult.Type.TABLE, appendOutputType);
+    assertEquals("table\n", appendOutput);
+
+    result = sqlInterpreter.interpret("drop database db1",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    assertTrue(result.message().get(0).getData(),
+            result.message().get(0).getData().contains("Database db1 is not empty"));
+
+    result = sqlInterpreter.interpret("drop table db1.source",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    result = sqlInterpreter.interpret("drop database db1",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    result = sqlInterpreter.interpret("show databases",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(InterpreterResult.Type.TABLE, appendOutputType);
+    assertEquals("database\ndefault\ntest_db\n", appendOutput);
+  }
+
+  //@Test
+  public void testDescribe() throws InterpreterException {
+    InterpreterResult result = sqlInterpreter.interpret("create database hive.db1",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(InterpreterResult.Type.TEXT, appendOutputType);
+    assertEquals("Database has been created.\n", appendOutput);
+
+    result = sqlInterpreter.interpret("describe database hive.db1", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(InterpreterResult.Type.TEXT, appendOutputType);
+    assertTrue(appendOutput, appendOutput.contains("db1"));
+
+    //TODO(zjffdu) hive and flink share the same namespace for db.
+    //    result = sqlInterpreter.interpret("create database flink.db1",
+    //            getInterpreterContext());
+    //    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    //    assertEquals(InterpreterResult.Type.TEXT, outputType);
+    //    assertEquals("Database has been created.\n", output);
+    //
+    //    result = sqlInterpreter.interpret("describe database flink.db1",
+    // getInterpreterContext());
+    //    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    //    assertEquals(InterpreterResult.Type.TEXT, outputType);
+    //    assertTrue(output, output.contains("db1"));
+
+    result = sqlInterpreter.interpret(
+            "CREATE TABLE source (int_col INT, double_col double, varchar_col varchar, " +
+                    "bool_col boolean) with (type='csv', path='/tmp')",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // TODO(zjffdu) this is bug of calcite, that table name should be
+    // quoted with single quote if it is keyword
+    result = sqlInterpreter.interpret("describe `source`", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(InterpreterResult.Type.TEXT, appendOutputType);
+    assertTrue(appendOutput, appendOutput.contains("name: int_col"));
+  }
+
+  protected InterpreterContext getInterpreterContext() {
+    appendOutput = "";
+    InterpreterContext context = InterpreterContext.builder()
+            .setInterpreterOut(new InterpreterOutput(null))
+            .setAngularObjectRegistry(new AngularObjectRegistry("flink", null))
+            .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
+            .build();
+    context.out = new InterpreterOutput(
+        new InterpreterOutputListener() {
+          @Override
+          public void onUpdateAll(InterpreterOutput out) {
+            System.out.println();
+          }
+
+          @Override
+          public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
+            try {
+              appendOutputType = out.toInterpreterResultMessage().getType();
+              appendOutput = out.toInterpreterResultMessage().getData();
+            } catch (IOException e) {
+              e.printStackTrace();
+            }
+          }
+
+          @Override
+          public void onUpdate(int index, InterpreterResultMessageOutput out) {
+              updatedOutput = out;
+            }
+        });
+    return context;
+  }
+
+  public static File createInputFile(String data) throws IOException {
+    File file = File.createTempFile("zeppelin-flink-input", ".csv");
+    FileOutputStream out = null;
+    try {
+      out = new FileOutputStream(file);
+      IOUtils.write(data, out);
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+    return file;
+  }
+
+  public static File createInputFile(String[][] data) throws IOException {
+    File file = File.createTempFile("zeppelin-flink-input", ".csv");
+    PrintWriter writer = null;
+    try {
+      writer = new PrintWriter(new FileOutputStream(file));
+      // int
+      int rowCount = data.length;
+      int colCount = data[0].length;
+      for (int i = 0; i < rowCount; ++i) {
+        for (int j = 0; j < colCount; ++j) {
+          writer.print(data[i][j]);
+          if (j != colCount - 1) {
+            writer.print(",");
+          }
+        }
+        // TODO(zjffdu) This is a bug of CSV Sink of Flink, it always put
+        // line separator at the end
+        //  which is not necessary.
+        writer.print("\n");
+      }
+    } finally {
+      if (writer != null) {
+        writer.close();
+      }
+    }
+    return file;
+  }
+
+  public File createORCFile(int[] values) throws IOException {
+    File file = File.createTempFile("zeppelin-flink-input", ".orc");
+    file.delete();
+    Path path = new Path(file.getAbsolutePath());
+    Configuration conf = new Configuration();
+    conf.set("orc.compress", "snappy");
+    TypeDescription schema = TypeDescription.fromString("struct<msg:int>");
+    Writer writer = OrcFile.createWriter(path,
+            OrcFile.writerOptions(conf)
+                    .setSchema(schema));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    LongColumnVector x = (LongColumnVector) batch.cols[0];
+    for (int i = 0; i < values.length; ++i) {
+      int row = batch.size++;
+      x.vector[row] = values[i];
+      // If the batch is full, write it out and start over.
+      if (batch.size == batch.getMaxSize()) {
+        writer.addRowBatch(batch);
+        batch.reset();
+      }
+    }
+    if (batch.size != 0) {
+      writer.addRowBatch(batch);
+      batch.reset();
+    }
+    writer.close();
+    return file;
+  }
+
+  public File createParquetFile(int[] values,
+                                ParquetProperties.WriterVersion version) throws IOException {
+    File file = File.createTempFile("zeppelin-flink-input", ".par");
+    file.delete();
+    Path path = new Path(file.getAbsolutePath());
+    Configuration conf = new Configuration();
+
+    MessageType schema = MessageTypeParser.parseMessageType(
+            "message test { "
+                    + "required int32 int32_field; "
+                    + "} ");
+    GroupWriteSupport.setSchema(schema, conf);
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+
+    ParquetWriter<Group> writer = new ParquetWriter<Group>(
+            path,
+            new GroupWriteSupport(),
+            CompressionCodecName.UNCOMPRESSED, 1024, 1024, 512, true, false, version, conf);
+    for (int i = 0; i < values.length; i++) {
+      writer.write(f.newGroup()
+              .append("int32_field", values[i]));
+    }
+    writer.close();
+    return file;
+  }
+}
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
new file mode 100644
index 0000000..0001662
--- /dev/null
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.flink;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class FlinkStreamSqlInterpreterTest extends FlinkSqlInterpreterTest {
+
+  @Override
+  protected FlinkSqlInterrpeter createFlinkSqlInterpreter(Properties properties) {
+    return new FlinkStreamSqlInterpreter(properties);
+  }
+
+  @Test
+  public void testSingleStreamSql() throws IOException, InterpreterException {
+    String initStreamScalaScript = IOUtils.toString(getClass().getResource("/init_stream.scala"));
+    InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    InterpreterContext context = getInterpreterContext();
+    context.getLocalProperties().put("type", "single");
+    result = sqlInterpreter.interpret("select max(rowtime), count(1) " +
+            "from default_catalog.default_database.log", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(InterpreterResult.Type.HTML, updatedOutput.toInterpreterResultMessage().getType());
+    assertTrue(updatedOutput.toInterpreterResultMessage().getData(),
+            !updatedOutput.toInterpreterResultMessage().getData().isEmpty());
+  }
+
+  @Test
+  public void testRetractStreamSql() throws IOException, InterpreterException {
+    String initStreamScalaScript = IOUtils.toString(getClass().getResource("/init_stream.scala"));
+    InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    InterpreterContext context = getInterpreterContext();
+    context.getLocalProperties().put("type", "retract");
+    result = sqlInterpreter.interpret("select url, count(1) as pv from " +
+            "default_catalog.default_database.log group by url", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(InterpreterResult.Type.TABLE,
+            updatedOutput.toInterpreterResultMessage().getType());
+    assertTrue(updatedOutput.toInterpreterResultMessage().getData(),
+            !updatedOutput.toInterpreterResultMessage().getData().isEmpty());
+  }
+
+  @Test
+  public void testTimeSeriesStreamSql() throws IOException, InterpreterException {
+    String initStreamScalaScript = IOUtils.toString(getClass().getResource("/init_stream.scala"));
+    InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    InterpreterContext context = getInterpreterContext();
+    context.getLocalProperties().put("type", "ts");
+    result = sqlInterpreter.interpret("select TUMBLE_START(rowtime, INTERVAL '5' SECOND) as " +
+            "start_time, url, count(1) as pv from default_catalog.default_database.log group by " +
+            "TUMBLE(rowtime, INTERVAL '5' SECOND), url", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(InterpreterResult.Type.TABLE,
+            updatedOutput.toInterpreterResultMessage().getType());
+    assertTrue(updatedOutput.toInterpreterResultMessage().getData(),
+            !updatedOutput.toInterpreterResultMessage().getData().isEmpty());
+  }
+}
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
new file mode 100644
index 0000000..9ea149f
--- /dev/null
+++ b/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+
+import com.google.common.io.Files;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class IPyFlinkInterpreterTest {
+
+  private InterpreterGroup intpGroup;
+  private Interpreter interpreter;
+  private RemoteInterpreterEventClient mockIntpEventClient =
+          mock(RemoteInterpreterEventClient.class);
+
+  protected Properties initIntpProperties() {
+    Properties p = new Properties();
+    p.setProperty("zeppelin.pyflink.python", "python");
+    p.setProperty("zeppelin.flink.maxResult", "3");
+    p.setProperty("zeppelin.flink.test", "true");
+    p.setProperty("zeppelin.dep.localrepo", Files.createTempDir().getAbsolutePath());
+    p.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
+    return p;
+  }
+
+  protected void startInterpreter(Properties properties) throws InterpreterException {
+    InterpreterContext context = getInterpreterContext();
+    context.setIntpEventClient(mockIntpEventClient);
+    InterpreterContext.set(context);
+
+    LazyOpenInterpreter flinkInterpreter = new LazyOpenInterpreter(
+        new FlinkInterpreter(properties));
+    intpGroup = new InterpreterGroup();
+    intpGroup.put("session_1", new ArrayList<Interpreter>());
+    intpGroup.get("session_1").add(flinkInterpreter);
+    flinkInterpreter.setInterpreterGroup(intpGroup);
+
+    LazyOpenInterpreter pyFlinkInterpreter =
+        new LazyOpenInterpreter(new PyFlinkInterpreter(properties));
+    intpGroup.get("session_1").add(pyFlinkInterpreter);
+    pyFlinkInterpreter.setInterpreterGroup(intpGroup);
+
+    interpreter = new LazyOpenInterpreter(new IPyFlinkInterpreter(properties));
+    intpGroup.get("session_1").add(interpreter);
+    interpreter.setInterpreterGroup(intpGroup);
+
+    interpreter.open();
+  }
+
+  @Before
+  public void setUp() throws InterpreterException {
+    Properties properties = initIntpProperties();
+    startInterpreter(properties);
+  }
+
+  @After
+  public void tearDown() throws InterpreterException {
+    intpGroup.close();
+  }
+
+  @Test
+  public void testIPyFlink() throws InterpreterException {
+    testBatchPyFlink(interpreter);
+    testStreamPyFlink(interpreter);
+  }
+
+  public static void testBatchPyFlink(Interpreter interpreter) throws InterpreterException {
+    InterpreterContext context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
+    InterpreterResult result = interpreter.interpret(
+        "import tempfile\n" +
+        "import os\n" +
+        "import shutil\n" +
+        "sink_path = tempfile.gettempdir() + '/batch.csv'\n" +
+        "if os.path.exists(sink_path):\n" +
+        "  if os.path.isfile(sink_path):\n" +
+        "    os.remove(sink_path)\n" +
+        "  else:\n" +
+        "    shutil.rmtree(sink_path)\n" +
+        "b_env.set_parallelism(1)\n" +
+        "t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])\n" +
+        "bt_env.connect(FileSystem().path(sink_path)) \\\n" +
+        "   .with_format(OldCsv()\n" +
+        "     .field_delimiter(',')\n" +
+        "     .field(\"a\", DataTypes.BIGINT())\n" +
+        "     .field(\"b\", DataTypes.STRING())\n" +
+        "     .field(\"c\", DataTypes.STRING())) \\\n" +
+        "   .with_schema(Schema()\n" +
+        "     .field(\"a\", DataTypes.BIGINT())\n" +
+        "     .field(\"b\", DataTypes.STRING())\n" +
+        "     .field(\"c\", DataTypes.STRING())) \\\n" +
+        "   .register_table_sink(\"batch_sink\")\n" +
+        "t.select(\"a + 1, b, c\").insert_into(\"batch_sink\")\n" +
+        "bt_env.execute(\"batch_job\")"
+            , context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+  }
+
+  public static void testStreamPyFlink(Interpreter interpreter) throws InterpreterException {
+    InterpreterContext context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
+    InterpreterResult result = interpreter.interpret(
+          "import tempfile\n" +
+          "import os\n" +
+          "import shutil\n" +
+          "sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
+          "if os.path.exists(sink_path):\n" +
+          "    if os.path.isfile(sink_path):\n" +
+          "      os.remove(sink_path)\n" +
+          "    else:\n" +
+          "      shutil.rmtree(sink_path)\n" +
+          "s_env.set_parallelism(1)\n" +
+          "t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])\n" +
+          "st_env.connect(FileSystem().path(sink_path)) \\\n" +
+          "    .with_format(OldCsv()\n" +
+          "      .field_delimiter(',')\n" +
+          "      .field(\"a\", DataTypes.BIGINT())\n" +
+          "      .field(\"b\", DataTypes.STRING())\n" +
+          "      .field(\"c\", DataTypes.STRING())) \\\n" +
+          "    .with_schema(Schema()\n" +
+          "      .field(\"a\", DataTypes.BIGINT())\n" +
+          "      .field(\"b\", DataTypes.STRING())\n" +
+          "      .field(\"c\", DataTypes.STRING())) \\\n" +
+          "    .register_table_sink(\"stream_sink\")\n" +
+          "t.select(\"a + 1, b, c\").insert_into(\"stream_sink\")\n" +
+          "st_env.execute(\"stream_job\")"
+            , context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+  }
+
+  private static InterpreterContext createInterpreterContext(
+          RemoteInterpreterEventClient mockRemoteEventClient) {
+    return InterpreterContext.builder()
+        .setNoteId("noteId")
+        .setParagraphId("paragraphId")
+        .setIntpEventClient(mockRemoteEventClient)
+        .setInterpreterOut(new InterpreterOutput(null))
+        .build();
+  }
+
+  protected InterpreterContext getInterpreterContext() {
+    return InterpreterContext.builder()
+            .setNoteId("noteId")
+            .setParagraphId("paragraphId")
+            .setInterpreterOut(new InterpreterOutput(null))
+            .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
+            .build();
+  }
+}
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
new file mode 100644
index 0000000..09f435a
--- /dev/null
+++ b/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+
+import com.google.common.io.Files;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
+import org.apache.zeppelin.python.PythonInterpreterTest;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.Properties;
+
+import static org.mockito.Mockito.mock;
+
+
+public class PyFlinkInterpreterTest extends PythonInterpreterTest {
+
+  private RemoteInterpreterEventClient mockRemoteEventClient =
+          mock(RemoteInterpreterEventClient.class);
+
+  @Override
+  public void setUp() throws InterpreterException {
+    Properties properties = new Properties();
+    properties.setProperty("zeppelin.pyflink.python", "python");
+    properties.setProperty("zeppelin.flink.maxResult", "3");
+    properties.setProperty("zeppelin.dep.localrepo", Files.createTempDir().getAbsolutePath());
+    properties.setProperty("zeppelin.pyflink.useIPython", "false");
+    properties.setProperty("zeppelin.flink.test", "true");
+    properties.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
+
+    // create interpreter group
+    intpGroup = new InterpreterGroup();
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    InterpreterContext context = InterpreterContext.builder()
+        .setInterpreterOut(new InterpreterOutput(null))
+        .setIntpEventClient(mockRemoteEventClient)
+        .build();
+    InterpreterContext.set(context);
+    LazyOpenInterpreter flinkInterpreter =
+        new LazyOpenInterpreter(new FlinkInterpreter(properties));
+
+    intpGroup.get("note").add(flinkInterpreter);
+    flinkInterpreter.setInterpreterGroup(intpGroup);
+
+    LazyOpenInterpreter iPyFlinkInterpreter =
+        new LazyOpenInterpreter(new IPyFlinkInterpreter(properties));
+    intpGroup.get("note").add(iPyFlinkInterpreter);
+    iPyFlinkInterpreter.setInterpreterGroup(intpGroup);
+
+    interpreter = new LazyOpenInterpreter(new PyFlinkInterpreter(properties));
+    intpGroup.get("note").add(interpreter);
+    interpreter.setInterpreterGroup(intpGroup);
+
+    interpreter.open();
+  }
+
+  @Override
+  public void tearDown() {
+    intpGroup.close();
+    intpGroup = null;
+    interpreter = null;
+  }
+
+  @Test
+  public void testPyFlink() throws InterpreterException {
+    IPyFlinkInterpreterTest.testBatchPyFlink(interpreter);
+    IPyFlinkInterpreterTest.testStreamPyFlink(interpreter);
+  }
+
+  private static InterpreterContext createInterpreterContext(
+          RemoteInterpreterEventClient mockRemoteEventClient) {
+    return InterpreterContext.builder()
+            .setNoteId("noteId")
+            .setParagraphId("paragraphId")
+            .setIntpEventClient(mockRemoteEventClient)
+            .setInterpreterOut(new InterpreterOutput(null))
+            .build();
+  }
+
+}
diff --git a/flink/src/test/resources/flink-conf.yaml b/flink/src/test/resources/flink-conf.yaml
index 1041d0f..33f0e4b 100644
--- a/flink/src/test/resources/flink-conf.yaml
+++ b/flink/src/test/resources/flink-conf.yaml
@@ -51,14 +51,14 @@ taskmanager.heap.mb: 1024
 
 # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
 
-taskmanager.numberOfTaskSlots: 1
+taskmanager.numberOfTaskSlots: 8
 
 # The parallelism used for programs that did not specify and other parallelism.
 
 parallelism.default: 1
 
 # The default file system scheme and authority.
-# 
+#
 # By default file paths without scheme are interpreted relative to the local
 # root file system 'file:///'. Use this to override the default and interpret
 # relative paths relative to a different file system,
@@ -77,9 +77,9 @@ parallelism.default: 1
 # The path where metadata for master recovery is persisted. While ZooKeeper stores
 # the small ground truth for checkpoint and leader election, this location stores
 # the larger objects, like persisted dataflow graphs.
-# 
+#
 # Must be a durable file system that is accessible from all nodes
-# (like HDFS, S3, Ceph, nfs, ...) 
+# (like HDFS, S3, Ceph, nfs, ...)
 #
 # high-availability.storageDir: hdfs:///flink/ha/
 
@@ -118,7 +118,7 @@ parallelism.default: 1
 # state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
 
 # Flag to enable/disable incremental checkpoints for backends that
-# support incremental checkpoints (like the RocksDB state backend). 
+# support incremental checkpoints (like the RocksDB state backend).
 #
 # state.backend.incremental: false
 
@@ -133,7 +133,7 @@ parallelism.default: 1
 # The port under which the web-based runtime monitor listens.
 # A value of -1 deactivates the web server.
 
-rest.port: 8081
+rest.port: 7071
 
 # Flag to specify whether job submission is enabled from the web-based
 # runtime monitor. Uncomment to disable.
@@ -181,10 +181,10 @@ rest.port: 8081
 #
 # classloader.resolve-order: child-first
 
-# The amount of memory going to the network stack. These numbers usually need 
+# The amount of memory going to the network stack. These numbers usually need
 # no tuning. Adjusting them may be necessary in case of an "Insufficient number
 # of network buffers" error. The default min is 64MB, teh default max is 1GB.
-# 
+#
 # taskmanager.network.memory.fraction: 0.1
 # taskmanager.network.memory.min: 67108864
 # taskmanager.network.memory.max: 1073741824
diff --git a/flink/src/test/resources/init_stream.scala b/flink/src/test/resources/init_stream.scala
new file mode 100644
index 0000000..a150aeb
--- /dev/null
+++ b/flink/src/test/resources/init_stream.scala
@@ -0,0 +1,45 @@
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
+import java.util.Collections
+import scala.collection.JavaConversions._
+
+senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+senv.enableCheckpointing(1000)
+
+val data = senv.addSource(new SourceFunction[(Long, String)] with ListCheckpointed[java.lang.Long] {
+
+  val pages = Seq("home", "search", "search", "product", "product", "product")
+  var count: Long = 0
+  // startTime is 2018/1/1
+  var startTime: Long = new java.util.Date(2018 - 1900,0,1).getTime
+  var sleepInterval = 1000
+
+  override def run(ctx: SourceFunction.SourceContext[(Long, String)]): Unit = {
+    val lock = ctx.getCheckpointLock
+
+    while (count < 20) {
+      lock.synchronized({
+        ctx.collect((startTime + count * sleepInterval, pages(count.toInt % pages.size)))
+        count += 1
+        Thread.sleep(sleepInterval)
+      })
+    }
+  }
+
+  override def cancel(): Unit = {
+
+  }
+
+  override def snapshotState(checkpointId: Long, timestamp: Long): java.util.List[java.lang.Long] = {
+    Collections.singletonList(count)
+  }
+
+  override def restoreState(state: java.util.List[java.lang.Long]): Unit = {
+    state.foreach(s => count = s)
+  }
+
+}).assignAscendingTimestamps(_._1)
+
+stenv.registerDataStream("log", data, 'time, 'url, 'rowtime.rowtime)
diff --git a/flink/src/test/resources/log4j.properties b/flink/src/test/resources/log4j.properties
index 65b6d36..532fc5e 100644
--- a/flink/src/test/resources/log4j.properties
+++ b/flink/src/test/resources/log4j.properties
@@ -21,4 +21,3 @@ log4j.appender.stdout = org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
 
-log4j.logger.org.apache.zeppelin.flink=WARN
diff --git a/flink/src/test/resources/log4j2.properties b/flink/src/test/resources/log4j2.properties
new file mode 100755
index 0000000..fa3485f
--- /dev/null
+++ b/flink/src/test/resources/log4j2.properties
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+status = INFO
+name = HiveLog4j2
+packages = org.apache.hadoop.hive.ql.log
+
+# list of properties
+property.hive.log.level = INFO
+property.hive.root.logger = console
+property.hive.perflogger.log.level = INFO
+
+# list of all appenders
+appenders = console
+
+# console appender
+appender.console.type = Console
+appender.console.name = console
+appender.console.target = SYSTEM_ERR
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{ISO8601} %5p [%t] %c{2}: %m%n
+
+# list of all loggers
+loggers = NIOServerCnxn, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, PerfLogger
+
+logger.NIOServerCnxn.name = org.apache.zookeeper.server.NIOServerCnxn
+logger.NIOServerCnxn.level = WARN
+
+logger.ClientCnxnSocketNIO.name = org.apache.zookeeper.ClientCnxnSocketNIO
+logger.ClientCnxnSocketNIO.level = WARN
+
+logger.DataNucleus.name = DataNucleus
+logger.DataNucleus.level = ERROR
+
+logger.Datastore.name = Datastore
+logger.Datastore.level = ERROR
+
+logger.JPOX.name = JPOX
+logger.JPOX.level = ERROR
+
+logger.flink.name = org.apache.zeppelin.flink
+logger.flink.level = DEBUG
+
+logger.PerfLogger.name = org.apache.hadoop.hive.ql.log.PerfLogger
+logger.PerfLogger.level = ${sys:hive.perflogger.log.level}
+
+# root logger
+rootLogger.level = ${sys:hive.log.level}
+rootLogger.appenderRefs = root
+rootLogger.appenderRef.root.ref = ${sys:hive.root.logger}
+
diff --git a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
index 2066ba4..9a5cb11 100644
--- a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
@@ -242,6 +242,7 @@ public class IPythonInterpreter extends Interpreter {
               .replace("${JVM_GATEWAY_PORT}", jvmGatewayPort + "")
               .replace("${JVM_GATEWAY_ADDRESS}", serverAddress)).build());
       if (response.getStatus() != ExecuteStatus.SUCCESS) {
+        LOGGER.error("Fail to run additional Python init file\n" + response.getOutput());
         throw new IOException("Fail to run additional Python init file: "
             + additionalPythonInitFile + "\n" + response.getOutput());
       }
diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
index eb76de3..bfa8348 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
@@ -558,7 +558,7 @@ public class PythonInterpreter extends Interpreter {
       InterpreterResult result = interpret(bootstrapCode + "\n" + "__zeppelin__._displayhook()",
           InterpreterContext.get());
       if (result.code() != Code.SUCCESS) {
-        throw new IOException("Fail to run bootstrap script: " + resourceName);
+        throw new IOException("Fail to run bootstrap script: " + resourceName + "\n" + result);
       }
     } catch (InterpreterException e) {
       throw new IOException(e);
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
index ca393f2..baefabc 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
@@ -17,9 +17,11 @@
 
 package org.apache.zeppelin.integration;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -55,26 +57,28 @@ public class FlinkIntegrationTest {
   private static InterpreterSettingManager interpreterSettingManager;
 
   private String flinkVersion;
+  private String hadoopHome;
   private String flinkHome;
 
   public FlinkIntegrationTest(String flinkVersion) {
     LOGGER.info("Testing FlinkVersion: " + flinkVersion);
     this.flinkVersion = flinkVersion;
     this.flinkHome = DownloadUtils.downloadFlink(flinkVersion);
+    this.hadoopHome = DownloadUtils.downloadHadoop("2.7.3");
   }
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-        {"1.5.1"},
-        {"1.5.2"}
+        {"1.9.0"}
     });
-
   }
 
   @BeforeClass
   public static void setUp() throws IOException {
-    hadoopCluster = new MiniHadoopCluster();
+    Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
+    hadoopCluster = new MiniHadoopCluster(conf);
     hadoopCluster.start();
 
     zeppelin = new MiniZeppelin();
@@ -102,6 +106,9 @@ public class FlinkIntegrationTest {
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
     assertTrue(interpreterResult.message().get(0).getData().contains("2"));
 
+    interpreterResult = flinkInterpreter.interpret("val data = benv.fromElements(1, 2, 3)\ndata.collect()", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
+    assertTrue(interpreterResult.message().get(0).getData().contains("1, 2, 3"));
   }
 
   @Test
@@ -121,11 +128,12 @@ public class FlinkIntegrationTest {
   }
 
   // TODO(zjffdu) enable it when make yarn integration test work
-  //  @Test
+  @Test
   public void testYarnMode() throws IOException, InterpreterException, YarnException {
     InterpreterSetting flinkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("flink");
     flinkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
     flinkInterpreterSetting.setProperty("FLINK_HOME", flinkHome);
+    flinkInterpreterSetting.setProperty("PATH", hadoopHome + "/bin:" + System.getenv("PATH"));
     flinkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
     flinkInterpreterSetting.setProperty("flink.execution.mode", "YARN");
     testInterpreterBasics();
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/MiniHadoopCluster.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/MiniHadoopCluster.java
index 3ed3437..94a85a8 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/MiniHadoopCluster.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/MiniHadoopCluster.java
@@ -21,8 +21,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,10 +43,16 @@ public class MiniHadoopCluster {
   private MiniYARNCluster yarnCluster;
   private String configPath = new File("target/tests/hadoop_conf").getAbsolutePath();
 
-  @BeforeClass
+  public MiniHadoopCluster() {
+    this.hadoopConf = new Configuration();
+  }
+
+  public MiniHadoopCluster(Configuration hadoopConf) {
+      this.hadoopConf = hadoopConf;
+  }
+
   public void start() throws IOException {
     LOGGER.info("Starting MiniHadoopCluster ...");
-    this.hadoopConf = new Configuration();
     new File(configPath).mkdirs();
     // start MiniDFSCluster
     this.dfsCluster = new MiniDFSCluster.Builder(hadoopConf)
@@ -112,7 +116,6 @@ public class MiniHadoopCluster {
     LOGGER.info("Save configuration to " + dest);
   }
 
-  @AfterClass
   public void stop() {
     if (this.yarnCluster != null) {
       this.yarnCluster.stop();
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
index fc37918..782ec5b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
@@ -99,6 +99,7 @@ public class StandardInterpreterLauncher extends InterpreterLauncher {
         String flinkHome = context.getProperties().get(key).toString();
         env.put("FLINK_CONF_DIR", flinkHome + "/conf");
         env.put("FLINK_LIB_DIR", flinkHome + "/lib");
+        env.put("FLINK_PLUGINS_DIR", flinkHome + "/plugins");
       }
     }
     env.put("INTERPRETER_GROUP_ID", context.getInterpreterGroupId());
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
index 546790d..b0fdec1 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
@@ -20,6 +20,7 @@ package org.apache.zeppelin.interpreter.integration;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.interpreter.InterpreterException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +47,6 @@ public class DownloadUtils {
     }
   }
 
-
   public static String downloadSpark(String version) {
     String sparkDownloadFolder = downloadFolder + "/spark";
     File targetSparkHomeFolder = new File(sparkDownloadFolder + "/spark-" + version + "-bin-hadoop2.6");
@@ -65,24 +65,54 @@ public class DownloadUtils {
       LOGGER.info("Skip to download flink as it is already downloaded.");
       return targetFlinkHomeFolder.getAbsolutePath();
     }
-    download("flink", version, "-bin-hadoop2.6.tgz");
+    download("flink", version, "-bin-scala_2.11.tgz");
+    // download other dependencies for running flink with yarn and hive
+    try {
+      runShellCommand(new String[]{"wget",
+              "https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.11/"
+                      + version + "/flink-connector-hive_2.11-" + version + ".jar",
+              "-P", targetFlinkHomeFolder + "/lib"});
+      runShellCommand(new String[]{"wget",
+              "https://repo1.maven.org/maven2/org/apache/flink/flink-hadoop-compatibility_2.11/"
+                      + version + "/flink-hadoop-compatibility_2.11-" + version + ".jar",
+              "-P", targetFlinkHomeFolder + "/lib"});
+      runShellCommand(new String[]{"wget",
+              "https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.4/hive-exec-2.3.4.jar",
+              "-P", targetFlinkHomeFolder + "/lib"});
+      runShellCommand(new String[]{"wget",
+              "https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop2-uber/2.7.5-1.8.1/flink-shaded-hadoop2-uber-2.7.5-1.8.1.jar",
+              "-P", targetFlinkHomeFolder + "/lib"});
+    } catch (Exception e) {
+      throw new RuntimeException("Fail to download jar", e);
+    }
     return targetFlinkHomeFolder.getAbsolutePath();
   }
 
+  public static String downloadHadoop(String version) {
+    String hadoopDownloadFolder = downloadFolder + "/hadoop";
+    File targetHadoopHomeFolder = new File(hadoopDownloadFolder + "/hadoop-" + version);
+    if (targetHadoopHomeFolder.exists()) {
+      LOGGER.info("Skip to download hadoop as it is already downloaded.");
+      return targetHadoopHomeFolder.getAbsolutePath();
+    }
+    download("hadoop", version, ".tar.gz", "hadoop/core");
+    return targetHadoopHomeFolder.getAbsolutePath();
+  }
+
   // Try mirrors first, if fails fallback to apache archive
-  private static void download(String project, String version, String postFix) {
+  private static void download(String project, String version, String postFix, String projectPath) {
     String projectDownloadFolder = downloadFolder + "/" + project;
     try {
       String preferredMirror = IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true"));
       File downloadFile = new File(projectDownloadFolder + "/" + project + "-" + version + postFix);
-      String downloadURL = preferredMirror + "/" + project + "/" + project + "-" + version + "/" + project + "-" + version + postFix;
+      String downloadURL = preferredMirror + "/" + projectPath + "/" + project + "-" + version + "/" + project + "-" + version + postFix;
       runShellCommand(new String[]{"wget", downloadURL, "-P", projectDownloadFolder});
       runShellCommand(new String[]{"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", projectDownloadFolder});
     } catch (Exception e) {
       LOGGER.warn("Failed to download " + project + " from mirror site, fallback to use apache archive", e);
       File downloadFile = new File(projectDownloadFolder + "/" + project + "-" + version + postFix);
       String downloadURL =
-              "https://archive.apache.org/dist/" + project + "/" + project +"-"
+              "https://archive.apache.org/dist/" + projectPath + "/" + project +"-"
                       + version
                       + "/" + project + "-"
                       + version
@@ -97,6 +127,10 @@ public class DownloadUtils {
     }
   }
 
+  private static void download(String project, String version, String postFix) {
+    download(project, version, postFix, project);
+  }
+
   private static void runShellCommand(String[] commands) throws IOException, InterruptedException {
     LOGGER.info("Starting shell commands: " + StringUtils.join(commands, " "));
     Process process = Runtime.getRuntime().exec(commands);