You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/06/10 02:55:08 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4816]. Support Flink 1.11

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 50a6fe1  [ZEPPELIN-4816]. Support Flink 1.11
50a6fe1 is described below

commit 50a6fe1e20e07a7f757128c0883bb74cb4bb6148
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed May 13 22:31:11 2020 +0800

    [ZEPPELIN-4816]. Support Flink 1.11
    
    ### What is this PR for?
    Although flink 1.11 is not released yet,  it is already feature cut, so its api is stable now. This PR is to make flink interpreter of zeppelin to support flink 1.11. Due to api change of flink 1.11, this PR introduce flink-shim which will use different api for different version of flink.
    
    ### What type of PR is it?
    [Feature ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://jira.apache.org/jira/browse/ZEPPELIN-4816
    
    ### How should this be tested?
    * CI pass and tested on flink 1.11 snapshot
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? no
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3782 from zjffdu/ZEPPELIN-4816 and squashes the following commits:
    
    6ce065221 [Jeff Zhang] [ZEPPELIN-4816]. Support Flink 1.11
    
    (cherry picked from commit c1d6297bee690f2c0472a063d22a6c18fc0a9de5)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 bin/common.sh                                      |   2 +-
 conf/log4j2.properties                             |  53 ++
 flink/flink-shims/pom.xml                          |  53 ++
 .../java/org/apache/zeppelin/flink/FlinkShims.java | 107 +++
 .../org/apache/zeppelin/flink/FlinkVersion.java    |  74 ++
 flink/flink1.10-shims/pom.xml                      | 228 ++++++
 .../org/apache/zeppelin/flink/Flink110Shims.java   | 150 ++++
 .../flink/shims111}/CollectStreamTableSink.java    |   2 +-
 .../flink/shims111/Flink110ScalaShims.scala        |  37 +
 flink/flink1.11-shims/pom.xml                      | 221 ++++++
 .../org/apache/zeppelin/flink/Flink111Shims.java   | 172 +++++
 .../flink/shims111}/CollectStreamTableSink.java    |   7 +-
 .../flink/shims111/Flink111ScalaShims.scala        |  36 +
 flink/{ => interpreter}/pom.xml                    | 115 ++-
 .../zeppelin/flink/FlinkBatchSqlInterpreter.java   |   1 -
 .../apache/zeppelin/flink/FlinkInterpreter.java    |  10 +-
 .../apache/zeppelin/flink/FlinkSqlInterrpeter.java |  23 +-
 .../zeppelin/flink/FlinkStreamSqlInterpreter.java  |  16 +-
 .../org/apache/zeppelin/flink/HadoopUtils.java     |   0
 .../apache/zeppelin/flink/IPyFlinkInterpreter.java |   6 +-
 .../java/org/apache/zeppelin/flink/JobManager.java |   0
 .../apache/zeppelin/flink/PyFlinkInterpreter.java  |  31 +-
 .../org/apache/zeppelin/flink/TableEnvFactory.java | 384 ++++++++++
 .../zeppelin/flink/sql/AbstractStreamSqlJob.java   |  24 +-
 .../zeppelin/flink/sql/AppendStreamSqlJob.java     |  10 +-
 .../zeppelin/flink/sql/SingleRowStreamSqlJob.java  |   6 +-
 .../zeppelin/flink/sql/SqlCommandParser.java       |   0
 .../zeppelin/flink/sql/UpdateStreamSqlJob.java     |  11 +-
 .../src/main/resources/interpreter-setting.json    |   0
 .../src/main/resources/python/zeppelin_ipyflink.py |  15 +-
 .../src/main/resources/python/zeppelin_pyflink.py  |  15 +-
 .../org/apache/zeppelin/flink/FlinkExprTyper.scala |   0
 .../zeppelin/flink/FlinkILoopInterpreter.scala     |   0
 .../zeppelin/flink/FlinkScalaInterpreter.scala     | 278 ++++---
 .../zeppelin/flink/FlinkZeppelinContext.scala      |  35 +-
 .../zeppelin/flink/util/DependencyUtils.scala      |   0
 .../flink/FlinkBatchSqlInterpreterTest.java        |  13 +-
 .../zeppelin/flink/FlinkInterpreterTest.java       |   0
 .../flink/FlinkStreamSqlInterpreterTest.java       |   0
 .../zeppelin/flink/IPyFlinkInterpreterTest.java    |  10 +-
 .../zeppelin/flink/PyFlinkInterpreterTest.java     |   0
 .../apache/zeppelin/flink/SqlInterpreterTest.java  |   0
 .../src/test/resources/flink-conf.yaml             |   0
 .../src/test/resources/init_stream.scala           |   0
 .../src/test/resources/log4j.properties            |   5 +-
 flink/pom.xml                                      | 820 +--------------------
 .../org/apache/zeppelin/flink/TableEnvFactory.java | 263 -------
 47 files changed, 1972 insertions(+), 1261 deletions(-)

diff --git a/bin/common.sh b/bin/common.sh
index d9cb812..25cc148 100644
--- a/bin/common.sh
+++ b/bin/common.sh
@@ -145,7 +145,7 @@ export JAVA_OPTS
 
 JAVA_INTP_OPTS="${ZEPPELIN_INTP_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING}"
 if [[ -z "${ZEPPELIN_SPARK_YARN_CLUSTER}" ]]; then
-    JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
+    JAVA_INTP_OPTS+=" -Dlog4j.configuration='file://${ZEPPELIN_CONF_DIR}/log4j.properties' -Dlog4j.configurationFile='file://${ZEPPELIN_CONF_DIR}/log4j2.properties'"
 else
     JAVA_INTP_OPTS+=" -Dlog4j.configuration=log4j_yarn_cluster.properties"
 fi
diff --git a/conf/log4j2.properties b/conf/log4j2.properties
new file mode 100644
index 0000000..8e6f949
--- /dev/null
+++ b/conf/log4j2.properties
@@ -0,0 +1,53 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This affects logging for both user code and Flink
+rootLogger.level = INFO
+rootLogger.appenderRef.file.ref = MainAppender
+
+# Uncomment this if you want to _only_ change Flink's logging
+#logger.flink.name = org.apache.flink
+#logger.flink.level = INFO
+
+# The following lines keep the log level of common libraries/connectors on
+# log level INFO. The root logger does not override this. You have to manually
+# change the log levels here.
+logger.akka.name = akka
+logger.akka.level = INFO
+logger.kafka.name= org.apache.kafka
+logger.kafka.level = INFO
+logger.hadoop.name = org.apache.hadoop
+logger.hadoop.level = INFO
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = INFO
+
+logger.flink.name = org.apache.zeppelin.flink
+logger.flink.level = DEBUG
+
+
+# Log all infos in the given file
+appender.main.name = MainAppender
+appender.main.type = File
+appender.main.append = false
+appender.main.fileName = ${sys:zeppelin.log.file}
+appender.main.layout.type = PatternLayout
+appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler
+logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
+logger.netty.level = OFF
diff --git a/flink/flink-shims/pom.xml b/flink/flink-shims/pom.xml
new file mode 100644
index 0000000..5ca0568
--- /dev/null
+++ b/flink/flink-shims/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <parent>
+        <artifactId>flink-parent</artifactId>
+        <groupId>org.apache.zeppelin</groupId>
+        <version>0.9.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.zeppelin</groupId>
+    <artifactId>flink-shims</artifactId>
+    <version>0.9.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+    <name>Zeppelin: Flink Shims</name>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy-interpreter-setting</id>
+                        <phase>none</phase>
+                        <configuration>
+                            <skip>true</skip>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
new file mode 100644
index 0000000..ef5f0a0
--- /dev/null
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * This is abstract class for anything that is api incompatible between different flink versions. It will
+ * load the correct version of FlinkShims based on the version of flink.
+ */
+public abstract class FlinkShims {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(FlinkShims.class);
+
+  private static FlinkShims flinkShims;
+
+  protected Properties properties;
+
+  public FlinkShims(Properties properties) {
+    this.properties = properties;
+  }
+
+  private static FlinkShims loadShims(FlinkVersion flinkVersion, Properties properties)
+      throws Exception {
+    Class<?> flinkShimsClass;
+    if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 10) {
+      LOGGER.info("Initializing shims for Flink 1.10");
+      flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink110Shims");
+    } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() >= 11) {
+      LOGGER.info("Initializing shims for Flink 1.11");
+      flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink111Shims");
+    } else {
+      throw new Exception("Flink version: '" + flinkVersion + "' is not supported yet");
+    }
+
+    Constructor c = flinkShimsClass.getConstructor(Properties.class);
+    return (FlinkShims) c.newInstance(properties);
+  }
+
+  /**
+   *
+   * @param flinkVersion
+   * @param properties
+   * @return
+   */
+  public static FlinkShims getInstance(FlinkVersion flinkVersion,
+                                       Properties properties) throws Exception {
+    if (flinkShims == null) {
+      flinkShims = loadShims(flinkVersion, properties);
+    }
+    return flinkShims;
+  }
+
+  public abstract Object createCatalogManager(Object config);
+
+  public abstract String getPyFlinkPythonPath(Properties properties) throws IOException;
+
+  public abstract Object getCollectStreamTableSink(InetAddress targetAddress,
+                                                   int targetPort,
+                                                   Object serializer);
+
+  public abstract List collectToList(Object table) throws Exception;
+
+  public abstract void startMultipleInsert(Object tblEnv, InterpreterContext context) throws Exception;
+
+  public abstract void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception;
+
+  public abstract boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception;
+
+  public abstract boolean rowEquals(Object row1, Object row2);
+
+  public abstract Object fromDataSet(Object btenv, Object ds);
+
+  public abstract Object toDataSet(Object btenv, Object table);
+
+  public abstract void registerTableFunction(Object btenv, String name, Object tableFunction);
+
+  public abstract void registerAggregateFunction(Object btenv, String name, Object aggregateFunction);
+
+  public abstract void registerTableAggregateFunction(Object btenv, String name, Object tableAggregateFunction);
+
+  public abstract void registerTableSink(Object stenv, String tableName, Object collectTableSink);
+}
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java
new file mode 100644
index 0000000..c0566ad
--- /dev/null
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.flink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class FlinkVersion {
+  private static final Logger logger = LoggerFactory.getLogger(FlinkVersion.class);
+
+  private int majorVersion;
+  private int minorVersion;
+  private int patchVersion;
+  private String versionString;
+
+  FlinkVersion(String versionString) {
+    this.versionString = versionString;
+
+    try {
+      int pos = versionString.indexOf('-');
+
+      String numberPart = versionString;
+      if (pos > 0) {
+        numberPart = versionString.substring(0, pos);
+      }
+
+      String versions[] = numberPart.split("\\.");
+      this.majorVersion = Integer.parseInt(versions[0]);
+      this.minorVersion = Integer.parseInt(versions[1]);
+      if (versions.length == 3) {
+        this.patchVersion = Integer.parseInt(versions[2]);
+      }
+
+    } catch (Exception e) {
+      logger.error("Can not recognize Spark version " + versionString +
+          ". Assume it's a future release", e);
+    }
+  }
+
+  public int getMajorVersion() {
+    return majorVersion;
+  }
+
+  public int getMinorVersion() {
+    return minorVersion;
+  }
+
+  public String toString() {
+    return versionString;
+  }
+
+  public static FlinkVersion fromVersionString(String versionString) {
+    return new FlinkVersion(versionString);
+  }
+
+  public boolean isFlink110() {
+    return this.majorVersion == 1 && minorVersion == 10;
+  }
+}
diff --git a/flink/flink1.10-shims/pom.xml b/flink/flink1.10-shims/pom.xml
new file mode 100644
index 0000000..8a60436
--- /dev/null
+++ b/flink/flink1.10-shims/pom.xml
@@ -0,0 +1,228 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>flink-parent</artifactId>
+        <groupId>org.apache.zeppelin</groupId>
+        <version>0.9.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.zeppelin</groupId>
+    <artifactId>flink1.10-shims</artifactId>
+    <version>0.9.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+    <name>Zeppelin: Flink1.10 Shims</name>
+
+    <properties>
+        <flink.version>1.10.0</flink.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.12</scala.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.zeppelin</groupId>
+            <artifactId>flink-shims</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-scala_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-scala_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner-blink_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.reflections</groupId>
+                    <artifactId>reflections</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-python_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala-shell_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>eclipse-add-source</id>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile-first</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <scalaVersion>${scala.version}</scalaVersion>
+                    <args>
+                        <arg>-unchecked</arg>
+                        <arg>-deprecation</arg>
+                        <arg>-feature</arg>
+                        <arg>-target:jvm-1.8</arg>
+                    </args>
+                    <jvmArgs>
+                        <jvmArg>-Xms1024m</jvmArg>
+                        <jvmArg>-Xmx1024m</jvmArg>
+                        <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg>
+                    </jvmArgs>
+                    <javacArgs>
+                        <javacArg>-source</javacArg>
+                        <javacArg>${java.version}</javacArg>
+                        <javacArg>-target</javacArg>
+                        <javacArg>${java.version}</javacArg>
+                        <javacArg>-Xlint:all,-serial,-path,-options</javacArg>
+                    </javacArgs>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy-interpreter-setting</id>
+                        <phase>none</phase>
+                        <configuration>
+                            <skip>true</skip>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
new file mode 100644
index 0000000..dec3560
--- /dev/null
+++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.scala.DataSet;
+import org.apache.flink.python.util.ResourceUtil;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableUtils;
+import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.table.api.scala.BatchTableEnvironment;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.TableAggregateFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.zeppelin.flink.shims111.CollectStreamTableSink;
+import org.apache.zeppelin.flink.shims111.Flink110ScalaShims;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * Shims for flink 1.10
+ */
+public class Flink110Shims extends FlinkShims {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(Flink110Shims.class);
+
+  public Flink110Shims(Properties properties) {
+    super(properties);
+  }
+
+  @Override
+  public Object createCatalogManager(Object config) {
+    return new CatalogManager("default_catalog",
+            new GenericInMemoryCatalog("default_catalog", "default_database"));
+  }
+
+
+  @Override
+  public String getPyFlinkPythonPath(Properties properties) throws IOException {
+    String flinkHome = System.getenv("FLINK_HOME");
+    if (flinkHome != null) {
+      File tmpDir = Files.createTempDirectory("zeppelin").toFile();
+      List<File> depFiles = null;
+      try {
+        depFiles = ResourceUtil.extractBuiltInDependencies(tmpDir.getAbsolutePath(), "pyflink", true);
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+      StringBuilder builder = new StringBuilder();
+      for (File file : depFiles) {
+        LOGGER.info("Adding extracted file to PYTHONPATH: " + file.getAbsolutePath());
+        builder.append(file.getAbsolutePath() + ":");
+      }
+      return builder.toString();
+    } else {
+      throw new IOException("No FLINK_HOME is specified");
+    }
+  }
+
+  @Override
+  public Object getCollectStreamTableSink(InetAddress targetAddress, int targetPort, Object serializer) {
+    return new CollectStreamTableSink(targetAddress, targetPort, (TypeSerializer<Tuple2<Boolean, Row >>) serializer);
+  }
+
+  @Override
+  public List collectToList(Object table) throws Exception {
+    return TableUtils.collectToList((Table) table);
+  }
+
+  @Override
+  public void startMultipleInsert(Object tblEnv, InterpreterContext context) throws Exception {
+
+  }
+
+  @Override
+  public void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception {
+    ((TableEnvironment) tblEnv).sqlUpdate(sql);
+  }
+
+  @Override
+  public boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception {
+    ((TableEnvironment) tblEnv).execute(sql);
+    return true;
+  }
+
+  @Override
+  public boolean rowEquals(Object row1, Object row2) {
+    return ((Row)row1).equals((Row) row2);
+  }
+
+  public Object fromDataSet(Object btenv, Object ds) {
+    return Flink110ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds);
+  }
+
+  @Override
+  public Object toDataSet(Object btenv, Object table) {
+    return Flink110ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table);
+  }
+
+  @Override
+  public void registerTableSink(Object stenv, String tableName, Object collectTableSink) {
+    ((TableEnvironment) stenv).registerTableSink(tableName, (TableSink) collectTableSink);
+  }
+
+  @Override
+  public void registerTableFunction(Object btenv, String name, Object tableFunction) {
+    ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, (TableFunction) tableFunction);
+  }
+
+  @Override
+  public void registerAggregateFunction(Object btenv, String name, Object aggregateFunction) {
+    ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, (AggregateFunction) aggregateFunction);
+  }
+
+  @Override
+  public void registerTableAggregateFunction(Object btenv, String name, Object tableAggregateFunction) {
+    ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, (TableAggregateFunction) tableAggregateFunction);
+  }
+}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/CollectStreamTableSink.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/shims111/CollectStreamTableSink.java
similarity index 98%
copy from flink/src/main/java/org/apache/zeppelin/flink/sql/CollectStreamTableSink.java
copy to flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/shims111/CollectStreamTableSink.java
index 74911ae..b7a0ea7 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/sql/CollectStreamTableSink.java
+++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/shims111/CollectStreamTableSink.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.zeppelin.flink.sql;
+package org.apache.zeppelin.flink.shims111;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
diff --git a/flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims111/Flink110ScalaShims.scala b/flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims111/Flink110ScalaShims.scala
new file mode 100644
index 0000000..cfd8894
--- /dev/null
+++ b/flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims111/Flink110ScalaShims.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.shims111
+
+import org.apache.flink.api.scala.{DataSet, FlinkILoop}
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.api.scala.BatchTableEnvironment
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.table.api.scala.internal.{BatchTableEnvironmentImpl, StreamTableEnvironmentImpl}
+
+object Flink110ScalaShims {
+
+  def fromDataSet(btenv: BatchTableEnvironment, ds: DataSet[_]): Table = {
+    btenv.fromDataSet(ds)
+  }
+
+  def toDataSet(btenv: BatchTableEnvironment, table: Table): DataSet[Row] = {
+    btenv.toDataSet[Row](table)
+  }
+}
diff --git a/flink/flink1.11-shims/pom.xml b/flink/flink1.11-shims/pom.xml
new file mode 100644
index 0000000..458e560
--- /dev/null
+++ b/flink/flink1.11-shims/pom.xml
@@ -0,0 +1,221 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>flink-parent</artifactId>
+        <groupId>org.apache.zeppelin</groupId>
+        <version>0.9.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.zeppelin</groupId>
+    <artifactId>flink1.11-shims</artifactId>
+    <version>0.9.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+    <name>Zeppelin: Flink1.11 Shims</name>
+
+    <properties>
+        <flink.version>1.11-SNAPSHOT</flink.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.12</scala.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.zeppelin</groupId>
+            <artifactId>flink-shims</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-scala_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-scala_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner-blink_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.reflections</groupId>
+                    <artifactId>reflections</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-python_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala-shell_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>eclipse-add-source</id>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile-first</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <scalaVersion>${scala.version}</scalaVersion>
+                    <args>
+                        <arg>-unchecked</arg>
+                        <arg>-deprecation</arg>
+                        <arg>-feature</arg>
+                        <arg>-target:jvm-1.8</arg>
+                    </args>
+                    <jvmArgs>
+                        <jvmArg>-Xms1024m</jvmArg>
+                        <jvmArg>-Xmx1024m</jvmArg>
+                        <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg>
+                    </jvmArgs>
+                    <javacArgs>
+                        <javacArg>-source</javacArg>
+                        <javacArg>${java.version}</javacArg>
+                        <javacArg>-target</javacArg>
+                        <javacArg>${java.version}</javacArg>
+                        <javacArg>-Xlint:all,-serial,-path,-options</javacArg>
+                    </javacArgs>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy-interpreter-setting</id>
+                        <phase>none</phase>
+                        <configuration>
+                            <skip>true</skip>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
new file mode 100644
index 0000000..ea11ced
--- /dev/null
+++ b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.scala.DataSet;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.table.api.StatementSet;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.TableAggregateFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.zeppelin.flink.shims111.CollectStreamTableSink;
+import org.apache.zeppelin.flink.shims111.Flink111ScalaShims;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Shims for flink 1.11
+ */
+public class Flink111Shims extends FlinkShims {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(Flink111Shims.class);
+
+  private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>();
+
+  public Flink111Shims(Properties properties) {
+    super(properties);
+  }
+
+  @Override
+  public Object createCatalogManager(Object config) {
+    return CatalogManager.newBuilder()
+            .classLoader(Thread.currentThread().getContextClassLoader())
+            .config((ReadableConfig) config)
+            .defaultCatalog("default_catalog",
+                    new GenericInMemoryCatalog("default_catalog", "default_database"))
+            .build();
+  }
+
+  @Override
+  public String getPyFlinkPythonPath(Properties properties) throws IOException {
+    String flinkHome = System.getenv("FLINK_HOME");
+    if (flinkHome != null) {
+      List<File> depFiles = null;
+      depFiles = Arrays.asList(new File(flinkHome + "/opt/python").listFiles());
+      StringBuilder builder = new StringBuilder();
+      for (File file : depFiles) {
+        LOGGER.info("Adding extracted file to PYTHONPATH: " + file.getAbsolutePath());
+        builder.append(file.getAbsolutePath() + ":");
+      }
+      return builder.toString();
+    } else {
+      throw new IOException("No FLINK_HOME is specified");
+    }
+  }
+
+  @Override
+  public Object getCollectStreamTableSink(InetAddress targetAddress, int targetPort, Object serializer) {
+    return new CollectStreamTableSink(targetAddress, targetPort, (TypeSerializer<Tuple2<Boolean, Row>>) serializer);
+  }
+
+  @Override
+  public List collectToList(Object table) throws Exception {
+    return Lists.newArrayList(((Table) table).execute().collect());
+  }
+
+  @Override
+  public void startMultipleInsert(Object tblEnv, InterpreterContext context) throws Exception {
+    StatementSet statementSet = ((TableEnvironment) tblEnv).createStatementSet();
+    statementSetMap.put(context.getParagraphId(), statementSet);
+  }
+
+  @Override
+  public void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception {
+    statementSetMap.get(context.getParagraphId()).addInsertSql(sql);
+  }
+
+  @Override
+  public boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception {
+    JobClient jobClient = statementSetMap.get(context.getParagraphId()).execute().getJobClient().get();
+    while(!jobClient.getJobStatus().get().isTerminalState()) {
+      LOGGER.debug("Wait for job to finish");
+      Thread.sleep(1000 * 5);
+    }
+    if (jobClient.getJobStatus().get() == JobStatus.CANCELED) {
+      context.out.write("Job is cancelled.\n");
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public boolean rowEquals(Object row1, Object row2) {
+    Row r1 = (Row) row1;
+    Row r2 = (Row) row2;
+    r1.setKind(RowKind.INSERT);
+    r2.setKind(RowKind.INSERT);
+    return r1.equals(r2);
+  }
+
+  @Override
+  public Object fromDataSet(Object btenv, Object ds) {
+    return Flink111ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds);
+  }
+
+  @Override
+  public Object toDataSet(Object btenv, Object table) {
+    return Flink111ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table);
+  }
+
+  @Override
+  public void registerTableSink(Object stenv, String tableName, Object collectTableSink) {
+    ((org.apache.flink.table.api.internal.TableEnvironmentInternal) stenv)
+            .registerTableSinkInternal(tableName, (TableSink) collectTableSink);
+  }
+
+  @Override
+  public void registerTableFunction(Object btenv, String name, Object tableFunction) {
+    ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, (TableFunction) tableFunction);
+  }
+
+  @Override
+  public void registerAggregateFunction(Object btenv, String name, Object aggregateFunction) {
+    ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, (AggregateFunction) aggregateFunction);
+  }
+
+  @Override
+  public void registerTableAggregateFunction(Object btenv, String name, Object tableAggregateFunction) {
+    ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, (TableAggregateFunction) tableAggregateFunction);
+  }
+
+}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/CollectStreamTableSink.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/shims111/CollectStreamTableSink.java
similarity index 95%
rename from flink/src/main/java/org/apache/zeppelin/flink/sql/CollectStreamTableSink.java
rename to flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/shims111/CollectStreamTableSink.java
index 74911ae..b98f406 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/sql/CollectStreamTableSink.java
+++ b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/shims111/CollectStreamTableSink.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.zeppelin.flink.sql;
+package org.apache.zeppelin.flink.shims111;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
@@ -82,11 +82,6 @@ public class CollectStreamTableSink implements RetractStreamTableSink<Row> {
   }
 
   @Override
-  public void emitDataStream(DataStream<Tuple2<Boolean, Row>> stream) {
-    consumeDataStream(stream);
-  }
-
-  @Override
   public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> stream) {
     // add sink
     return stream
diff --git a/flink/flink1.11-shims/src/main/scala/org/apache/zeppelin/flink/shims111/Flink111ScalaShims.scala b/flink/flink1.11-shims/src/main/scala/org/apache/zeppelin/flink/shims111/Flink111ScalaShims.scala
new file mode 100644
index 0000000..abdaca2
--- /dev/null
+++ b/flink/flink1.11-shims/src/main/scala/org/apache/zeppelin/flink/shims111/Flink111ScalaShims.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.shims111
+
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment
+import org.apache.flink.types.Row
+
+object Flink111ScalaShims {
+
+  def fromDataSet(btenv: BatchTableEnvironment, ds: DataSet[_]): Table = {
+    btenv.fromDataSet(ds)
+  }
+
+  def toDataSet(btenv: BatchTableEnvironment, table: Table): DataSet[Row] = {
+    btenv.toDataSet[Row](table)
+  }
+}
diff --git a/flink/pom.xml b/flink/interpreter/pom.xml
similarity index 88%
copy from flink/pom.xml
copy to flink/interpreter/pom.xml
index 69eb026..bb991a3 100644
--- a/flink/pom.xml
+++ b/flink/interpreter/pom.xml
@@ -21,10 +21,10 @@
   <modelVersion>4.0.0</modelVersion>
 
   <parent>
-    <artifactId>zeppelin-interpreter-parent</artifactId>
+    <artifactId>flink-parent</artifactId>
     <groupId>org.apache.zeppelin</groupId>
     <version>0.9.0-SNAPSHOT</version>
-    <relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath>
+    <relativePath>../pom.xml</relativePath>
   </parent>
 
   <groupId>org.apache.zeppelin</groupId>
@@ -32,13 +32,14 @@
   <packaging>jar</packaging>
   <version>0.9.0-SNAPSHOT</version>
   <name>Zeppelin: Flink</name>
-  <description>Zeppelin flink support</description>
+  <description>Zeppelin Flink Interpreter</description>
 
   <properties>
     <!--library versions-->
     <interpreter.name>flink</interpreter.name>
+<!--    <flink.version>1.11-SNAPSHOT</flink.version>-->
     <flink.version>1.10.0</flink.version>
-    <hadoop.version>2.6.5</hadoop.version>
+    <flink.hadoop.version>2.6.5</flink.hadoop.version>
     <hive.version>2.3.4</hive.version>
     <hiverunner.version>4.0.0</hiverunner.version>
     <grpc.version>1.15.0</grpc.version>
@@ -53,6 +54,39 @@
   <dependencies>
 
     <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>flink-shims</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>flink1.10-shims</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>flink1.11-shims</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.zeppelin</groupId>
       <artifactId>zeppelin-python</artifactId>
       <version>${project.version}</version>
@@ -162,6 +196,10 @@
           <artifactId>flink-shaded-hadoop2</artifactId>
         </exclusion>
         <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
           <groupId>org.eclipse.jetty</groupId>
           <artifactId>*</artifactId>
         </exclusion>
@@ -341,6 +379,41 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${flink.hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <version>${flink.hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+      <version>${flink.hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+      <version>${flink.hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${flink.hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
@@ -641,8 +714,10 @@
           https://blogs.oracle.com/poonam/crashes-in-zipgetentry
           https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8191484 -->
           <argLine>-Xmx4096m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine>
+<!--          <argLine>-Xmx4096m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=6006</argLine>-->
 
           <environmentVariables>
+<!--            <FLINK_HOME>/Users/jzhang/github/flink/build-target</FLINK_HOME>-->
             <FLINK_HOME>${project.build.directory}/flink-${flink.version}</FLINK_HOME>
             <FLINK_CONF_DIR>${project.build.directory}/test-classes</FLINK_CONF_DIR>
           </environmentVariables>
@@ -757,7 +832,7 @@
               <shadedPattern>org.apache.zeppelin.shaded.com.google</shadedPattern>
             </relocation>
           </relocations>
-          <outputFile>${project.basedir}/../interpreter/${interpreter.name}/${project.artifactId}-${project.version}.jar</outputFile>
+          <outputFile>${project.basedir}/../../interpreter/${interpreter.name}/${project.artifactId}-${project.version}.jar</outputFile>
         </configuration>
         <executions>
           <execution>
@@ -770,6 +845,22 @@
       </plugin>
 
       <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-interpreter-setting</id>
+            <phase>package</phase>
+            <goals>
+              <goal>resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/../../../interpreter/${interpreter.name}</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
         <configuration>
@@ -783,6 +874,20 @@
   <profiles>
 
     <profile>
+      <id>flink-1.10</id>
+      <properties>
+        <flink.version>1.10.0</flink.version>
+      </properties>
+    </profile>
+
+    <profile>
+      <id>flink-1.11</id>
+      <properties>
+        <flink.version>1.11-SNAPSHOT</flink.version>
+      </properties>
+    </profile>
+
+    <profile>
       <id>hive2</id>
       <activation>
         <activeByDefault>true</activeByDefault>
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
similarity index 97%
rename from flink/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
index dab4524..f397187 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
@@ -18,7 +18,6 @@
 package org.apache.zeppelin.flink;
 
 import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.scheduler.Scheduler;
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
similarity index 96%
rename from flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index f02c21b..244559f 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.scala.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.scala.StreamTableEnvironment;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
@@ -127,7 +126,7 @@ public class FlinkInterpreter extends Interpreter {
     return this.innerIntp.getStreamExecutionEnvironment();
   }
 
-  StreamTableEnvironment getStreamTableEnvironment() {
+  TableEnvironment getStreamTableEnvironment() {
     return this.innerIntp.getStreamTableEnvironment("blink");
   }
 
@@ -178,6 +177,10 @@ public class FlinkInterpreter extends Interpreter {
     return this.innerIntp;
   }
 
+  public FlinkShims getFlinkShims() {
+    return this.innerIntp.getFlinkShims();
+  }
+
   public void setSavePointIfNecessary(InterpreterContext context) {
     this.innerIntp.setSavePointIfNecessary(context);
   }
@@ -186,4 +189,7 @@ public class FlinkInterpreter extends Interpreter {
     this.innerIntp.setParallelismIfNecessary(context);
   }
 
+  public FlinkVersion getFlinkVersion() {
+    return this.innerIntp.getFlinkVersion();
+  }
 }
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
similarity index 95%
rename from flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index 2332704..1e8e803 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -166,6 +166,7 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
     ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader();
     try {
       Thread.currentThread().setContextClassLoader(flinkInterpreter.getFlinkScalaShellLoader());
+      flinkInterpreter.createPlannerAgain();
       flinkInterpreter.setParallelismIfNecessary(context);
       flinkInterpreter.setSavePointIfNecessary(context);
       return runSqlList(st, context);
@@ -181,7 +182,9 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
     paragraphTableConfigMap.put(context.getParagraphId(), tableConfig);
 
     try {
+      boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
       List<String> sqls = sqlSplitter.splitSql(st);
+      boolean isFirstInsert = true;
       for (String sql : sqls) {
         Optional<SqlCommandParser.SqlCommandCall> sqlCommand = SqlCommandParser.parse(sql);
         if (!sqlCommand.isPresent()) {
@@ -194,6 +197,13 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
           return new InterpreterResult(InterpreterResult.Code.ERROR);
         }
         try {
+          if (sqlCommand.get().command == SqlCommand.INSERT_INTO ||
+                  sqlCommand.get().command == SqlCommand.INSERT_OVERWRITE) {
+            if (isFirstInsert && runAsOne) {
+              flinkInterpreter.getFlinkShims().startMultipleInsert(tbenv, context);
+              isFirstInsert = false;
+            }
+          }
           callCommand(sqlCommand.get(), context);
           context.out.flush();
         } catch (Throwable e) {
@@ -210,12 +220,12 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
         }
       }
 
-      boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
       if (runAsOne) {
         try {
           lock.lock();
-          this.tbenv.execute(st);
-          context.out.write("Insertion successfully.\n");
+          if (flinkInterpreter.getFlinkShims().executeMultipleInsertInto(st, this.tbenv, context)) {
+            context.out.write("Insertion successfully.\n");
+          }
         } catch (Exception e) {
           LOGGER.error("Fail to execute sql as one job", e);
           return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
@@ -225,6 +235,9 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
           }
         }
       }
+    } catch(Exception e) {
+      LOGGER.error("Fail to execute sql", e);
+      return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
     } finally {
       // reset parallelism
       this.tbenv.getConfig().getConfiguration()
@@ -516,11 +529,13 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
          this.tbenv.getConfig().getConfiguration().setString(entry.getKey(), entry.getValue());
        }
 
-       this.tbenv.sqlUpdate(sql);
        boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
        if (!runAsOne) {
+         this.tbenv.sqlUpdate(sql);
          this.tbenv.execute(sql);
          context.out.write("Insertion successfully.\n");
+       } else {
+         flinkInterpreter.getFlinkShims().addInsertStatement(sql, this.tbenv, context);
        }
      } catch (Exception e) {
        throw new IOException(e);
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
similarity index 92%
rename from flink/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
index f4d2319..ab05526 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
@@ -18,12 +18,9 @@
 
 package org.apache.zeppelin.flink;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.zeppelin.flink.sql.UpdateStreamSqlJob;
-import org.apache.zeppelin.flink.sql.SingleRowStreamSqlJob;
 import org.apache.zeppelin.flink.sql.AppendStreamSqlJob;
+import org.apache.zeppelin.flink.sql.SingleRowStreamSqlJob;
+import org.apache.zeppelin.flink.sql.UpdateStreamSqlJob;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
@@ -70,7 +67,8 @@ public class FlinkStreamSqlInterpreter extends FlinkSqlInterrpeter {
               tbenv,
               flinkInterpreter.getJobManager(),
               context,
-              flinkInterpreter.getDefaultParallelism());
+              flinkInterpreter.getDefaultParallelism(),
+              flinkInterpreter.getFlinkShims());
       streamJob.run(sql);
     } else if (streamType.equalsIgnoreCase("append")) {
       AppendStreamSqlJob streamJob = new AppendStreamSqlJob(
@@ -78,7 +76,8 @@ public class FlinkStreamSqlInterpreter extends FlinkSqlInterrpeter {
               flinkInterpreter.getStreamTableEnvironment(),
               flinkInterpreter.getJobManager(),
               context,
-              flinkInterpreter.getDefaultParallelism());
+              flinkInterpreter.getDefaultParallelism(),
+              flinkInterpreter.getFlinkShims());
       streamJob.run(sql);
     } else if (streamType.equalsIgnoreCase("update")) {
       UpdateStreamSqlJob streamJob = new UpdateStreamSqlJob(
@@ -86,7 +85,8 @@ public class FlinkStreamSqlInterpreter extends FlinkSqlInterrpeter {
               flinkInterpreter.getStreamTableEnvironment(),
               flinkInterpreter.getJobManager(),
               context,
-              flinkInterpreter.getDefaultParallelism());
+              flinkInterpreter.getDefaultParallelism(),
+              flinkInterpreter.getFlinkShims());
       streamJob.run(sql);
     } else {
       throw new IOException("Unrecognized stream type: " + streamType);
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java
similarity index 100%
rename from flink/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java
rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
similarity index 96%
rename from flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
index 7ffafb2..3f7e65f 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
@@ -70,7 +70,7 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
   protected Map<String, String> setupKernelEnv() throws IOException {
     Map<String, String> envs = super.setupKernelEnv();
     String pythonPath = envs.getOrDefault("PYTHONPATH", "");
-    String pyflinkPythonPath = PyFlinkInterpreter.getPyFlinkPythonPath(properties);
+    String pyflinkPythonPath = flinkInterpreter.getFlinkShims().getPyFlinkPythonPath(properties);
     envs.put("PYTHONPATH", pythonPath + ":" + pyflinkPythonPath);
     return envs;
   }
@@ -142,6 +142,10 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
     return flinkInterpreter.getProgress(context);
   }
 
+  public boolean isFlink110() {
+    return flinkInterpreter.getFlinkVersion().isFlink110();
+  }
+
   public org.apache.flink.api.java.ExecutionEnvironment getJavaExecutionEnvironment() {
     return flinkInterpreter.getExecutionEnvironment().getJavaEnv();
   }
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
similarity index 100%
rename from flink/src/main/java/org/apache/zeppelin/flink/JobManager.java
rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
similarity index 88%
rename from flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
index ace08cb..40d1d11 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
@@ -17,12 +17,11 @@
 
 package org.apache.zeppelin.flink;
 
-import org.apache.flink.python.util.ResourceUtil;
-import org.apache.zeppelin.interpreter.ZeppelinContext;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.ZeppelinContext;
 import org.apache.zeppelin.python.IPythonInterpreter;
 import org.apache.zeppelin.python.PythonInterpreter;
 import org.slf4j.Logger;
@@ -33,7 +32,6 @@ import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
-import java.nio.file.Files;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -164,32 +162,11 @@ public class PyFlinkInterpreter extends PythonInterpreter {
   protected Map<String, String> setupPythonEnv() throws IOException {
     Map<String, String> envs = super.setupPythonEnv();
     String pythonPath = envs.getOrDefault("PYTHONPATH", "");
-    String pyflinkPythonPath = getPyFlinkPythonPath(properties);
+    String pyflinkPythonPath = flinkInterpreter.getFlinkShims().getPyFlinkPythonPath(properties);
     envs.put("PYTHONPATH", pythonPath + ":" + pyflinkPythonPath);
     return envs;
   }
 
-  public static String getPyFlinkPythonPath(Properties properties) throws IOException {
-    String flinkHome = System.getenv("FLINK_HOME");
-    if (flinkHome != null) {
-      File tmpDir = Files.createTempDirectory("zeppelin").toFile();
-      List<File> depFiles = null;
-      try {
-        depFiles = ResourceUtil.extractBuiltInDependencies(tmpDir.getAbsolutePath(), "pyflink", true);
-      } catch (InterruptedException e) {
-        throw new IOException(e);
-      }
-      StringBuilder builder = new StringBuilder();
-      for (File file : depFiles) {
-        LOGGER.info("Adding extracted file to PYTHONPATH: " + file.getAbsolutePath());
-        builder.append(file.getAbsolutePath() + ":");
-      }
-      return builder.toString();
-    } else {
-      throw new IOException("No FLINK_HOME is specified");
-    }
-  }
-
   @Override
   protected IPythonInterpreter getIPythonInterpreter() throws InterpreterException {
     return getInterpreterInTheSameSessionByClassName(IPyFlinkInterpreter.class, false);
@@ -213,6 +190,10 @@ public class PyFlinkInterpreter extends PythonInterpreter {
     return flinkInterpreter.getProgress(context);
   }
 
+  public boolean isFlink110() {
+    return flinkInterpreter.getFlinkVersion().isFlink110();
+  }
+
   public org.apache.flink.api.java.ExecutionEnvironment getJavaExecutionEnvironment() {
     return flinkInterpreter.getExecutionEnvironment().getJavaEnv();
   }
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
new file mode 100644
index 0000000..75d06bb
--- /dev/null
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.factories.ComponentFactoryService;
+import org.apache.flink.table.module.ModuleManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.Map;
+
+/**
+ * Factory class for creating flink table env for different purpose:
+ * 1. java/scala
+ * 2. stream table / batch table
+ * 3. flink planner / blink planner
+ *
+ */
+public class TableEnvFactory {
+
+  private static Logger LOGGER = LoggerFactory.getLogger(TableEnvFactory.class);
+
+  private FlinkVersion flinkVersion;
+  private FlinkShims flinkShims;
+  private Executor executor;
+  private org.apache.flink.api.scala.ExecutionEnvironment benv;
+  private org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv;
+  private TableConfig tblConfig;
+  private CatalogManager catalogManager;
+  private ModuleManager moduleManager;
+  private FunctionCatalog flinkFunctionCatalog;
+  private FunctionCatalog blinkFunctionCatalog;
+
+  public TableEnvFactory(FlinkVersion flinkVersion,
+                         FlinkShims flinkShims,
+                         org.apache.flink.api.scala.ExecutionEnvironment env,
+                         org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv,
+                         TableConfig tblConfig,
+                         CatalogManager catalogManager,
+                         ModuleManager moduleManager,
+                         FunctionCatalog flinkFunctionCatalog,
+                         FunctionCatalog blinkFunctionCatalog) {
+    this.flinkVersion = flinkVersion;
+    this.flinkShims = flinkShims;
+    this.benv = env;
+    this.senv = senv;
+    this.tblConfig = tblConfig;
+    this.catalogManager = catalogManager;
+    this.moduleManager = moduleManager;
+    this.flinkFunctionCatalog = flinkFunctionCatalog;
+    this.blinkFunctionCatalog = blinkFunctionCatalog;
+  }
+
+  public TableEnvironment createScalaFlinkBatchTableEnvironment() {
+    try {
+      Class clazz = null;
+      if (flinkVersion.isFlink110()) {
+        clazz = Class
+                .forName("org.apache.flink.table.api.scala.internal.BatchTableEnvironmentImpl");
+      } else {
+        clazz = Class
+                .forName("org.apache.flink.table.api.bridge.scala.internal.BatchTableEnvironmentImpl");
+      }
+      Constructor constructor = clazz
+              .getConstructor(
+                      org.apache.flink.api.scala.ExecutionEnvironment.class,
+                      TableConfig.class,
+                      CatalogManager.class,
+                      ModuleManager.class);
+
+      return (TableEnvironment)
+              constructor.newInstance(benv, tblConfig, catalogManager, moduleManager);
+    } catch (Exception e) {
+      throw new TableException("Fail to createScalaFlinkBatchTableEnvironment", e);
+    }
+  }
+
+  public TableEnvironment createScalaFlinkStreamTableEnvironment(EnvironmentSettings settings) {
+    try {
+      Map<String, String> executorProperties = settings.toExecutorProperties();
+      Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
+
+      Map<String, String> plannerProperties = settings.toPlannerProperties();
+      Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+              .create(
+                      plannerProperties,
+                      executor,
+                      tblConfig,
+                      flinkFunctionCatalog,
+                      catalogManager);
+
+      Class clazz = null;
+      if (flinkVersion.isFlink110()) {
+        clazz = Class
+                .forName("org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl");
+      } else {
+        clazz = Class
+                .forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl");
+      }
+      Constructor constructor = clazz
+              .getConstructor(
+                      CatalogManager.class,
+                      ModuleManager.class,
+                      FunctionCatalog.class,
+                      TableConfig.class,
+                      org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class,
+                      Planner.class,
+                      Executor.class,
+                      boolean.class);
+      return (TableEnvironment) constructor.newInstance(catalogManager,
+              moduleManager,
+              flinkFunctionCatalog,
+              tblConfig,
+              senv,
+              planner,
+              executor,
+              settings.isStreamingMode());
+
+    } catch (Exception e) {
+      throw new TableException("Fail to createScalaFlinkStreamTableEnvironment", e);
+    }
+  }
+
+  public TableEnvironment createJavaFlinkBatchTableEnvironment() {
+    try {
+      Class<?> clazz = null;
+      if (flinkVersion.isFlink110()) {
+        clazz = Class
+                .forName("org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl");
+      } else {
+        clazz = Class
+                .forName("org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl");
+      }
+
+      Constructor con = clazz.getConstructor(
+              ExecutionEnvironment.class,
+              TableConfig.class,
+              CatalogManager.class,
+              ModuleManager.class);
+      return (TableEnvironment) con.newInstance(
+              benv.getJavaEnv(),
+              tblConfig,
+              catalogManager,
+              moduleManager);
+    } catch (Throwable t) {
+      throw new TableException("Create BatchTableEnvironment failed.", t);
+    }
+  }
+
+  public TableEnvironment createJavaFlinkStreamTableEnvironment(EnvironmentSettings settings) {
+
+    try {
+      Map<String, String> executorProperties = settings.toExecutorProperties();
+      Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
+
+      Map<String, String> plannerProperties = settings.toPlannerProperties();
+      Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+              .create(plannerProperties, executor, tblConfig, flinkFunctionCatalog, catalogManager);
+
+      Class clazz = null;
+      if (flinkVersion.isFlink110()) {
+        clazz = Class
+                .forName("org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl");
+      } else {
+        clazz = Class
+                .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl");
+      }
+      Constructor constructor = clazz
+              .getConstructor(
+                      CatalogManager.class,
+                      ModuleManager.class,
+                      FunctionCatalog.class,
+                      TableConfig.class,
+                      org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class,
+                      Planner.class,
+                      Executor.class,
+                      boolean.class);
+      return (TableEnvironment) constructor.newInstance(catalogManager,
+              moduleManager,
+              flinkFunctionCatalog,
+              tblConfig,
+              senv.getJavaEnv(),
+              planner,
+              executor,
+              settings.isStreamingMode());
+
+    } catch (Exception e) {
+      throw new TableException("Fail to createJavaFlinkStreamTableEnvironment", e);
+    }
+  }
+
+  public TableEnvironment createScalaBlinkStreamTableEnvironment(EnvironmentSettings settings) {
+
+    try {
+      Map<String, String> executorProperties = settings.toExecutorProperties();
+      Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
+
+      Map<String, String> plannerProperties = settings.toPlannerProperties();
+      Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+              .create(
+                      plannerProperties,
+                      executor,
+                      tblConfig,
+                      blinkFunctionCatalog,
+                      catalogManager);
+
+
+      Class clazz = null;
+      if (flinkVersion.isFlink110()) {
+        clazz = Class
+                .forName("org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl");
+      } else {
+        clazz = Class
+                .forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl");
+      }
+      Constructor constructor = clazz
+              .getConstructor(
+                      CatalogManager.class,
+                      ModuleManager.class,
+                      FunctionCatalog.class,
+                      TableConfig.class,
+                      org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class,
+                      Planner.class,
+                      Executor.class,
+                      boolean.class);
+      return (TableEnvironment) constructor.newInstance(catalogManager,
+              moduleManager,
+              blinkFunctionCatalog,
+              tblConfig,
+              senv,
+              planner,
+              executor,
+              settings.isStreamingMode());
+    } catch (Exception e) {
+      throw new TableException("Fail to createScalaBlinkStreamTableEnvironment", e);
+    }
+  }
+
+  public TableEnvironment createJavaBlinkStreamTableEnvironment(EnvironmentSettings settings) {
+
+    try {
+      Map<String, String> executorProperties = settings.toExecutorProperties();
+      Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
+
+      Map<String, String> plannerProperties = settings.toPlannerProperties();
+      Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+              .create(plannerProperties, executor, tblConfig, blinkFunctionCatalog, catalogManager);
+
+      Class clazz = null;
+      if (flinkVersion.isFlink110()) {
+        clazz = Class
+                .forName("org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl");
+      } else {
+        clazz = Class
+                .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl");
+      }
+      Constructor constructor = clazz
+              .getConstructor(
+                      CatalogManager.class,
+                      ModuleManager.class,
+                      FunctionCatalog.class,
+                      TableConfig.class,
+                      org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class,
+                      Planner.class,
+                      Executor.class,
+                      boolean.class);
+      return (TableEnvironment) constructor.newInstance(catalogManager,
+              moduleManager,
+              blinkFunctionCatalog,
+              tblConfig,
+              senv.getJavaEnv(),
+              planner,
+              executor,
+              settings.isStreamingMode());
+    } catch (Exception e) {
+      throw new TableException("Fail to createJavaBlinkStreamTableEnvironment", e);
+    }
+  }
+
+  public TableEnvironment createJavaBlinkBatchTableEnvironment(
+          EnvironmentSettings settings) {
+    try {
+      final Map<String, String> executorProperties = settings.toExecutorProperties();
+      executor = lookupExecutor(executorProperties, senv.getJavaEnv());
+      final Map<String, String> plannerProperties = settings.toPlannerProperties();
+      final Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+              .create(plannerProperties, executor, tblConfig, blinkFunctionCatalog, catalogManager);
+
+      Class clazz = null;
+      if (flinkVersion.isFlink110()) {
+        clazz = Class
+                .forName("org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl");
+      } else {
+        clazz = Class
+                .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl");
+      }
+      Constructor constructor = clazz.getConstructor(
+                      CatalogManager.class,
+                      ModuleManager.class,
+                      FunctionCatalog.class,
+                      TableConfig.class,
+                      StreamExecutionEnvironment.class,
+                      Planner.class,
+                      Executor.class,
+                      boolean.class);
+      return (TableEnvironment) constructor.newInstance(
+              catalogManager,
+              moduleManager,
+              blinkFunctionCatalog,
+              tblConfig,
+              senv.getJavaEnv(),
+              planner,
+              executor,
+              settings.isStreamingMode());
+    } catch (Exception e) {
+      LOGGER.info(ExceptionUtils.getStackTrace(e));
+      throw new TableException("Fail to createJavaBlinkBatchTableEnvironment", e);
+    }
+  }
+
+
+  public void createPlanner(EnvironmentSettings settings) {
+    Map<String, String> executorProperties = settings.toExecutorProperties();
+    Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
+
+    Map<String, String> plannerProperties = settings.toPlannerProperties();
+    ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+            .create(
+                    plannerProperties,
+                    executor,
+                    tblConfig,
+                    blinkFunctionCatalog,
+                    catalogManager);
+  }
+
+  private static Executor lookupExecutor(
+          Map<String, String> executorProperties,
+          StreamExecutionEnvironment executionEnvironment) {
+    try {
+      ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
+      Method createMethod = executorFactory.getClass()
+              .getMethod("create", Map.class, StreamExecutionEnvironment.class);
+
+      return (Executor) createMethod.invoke(
+              executorFactory,
+              executorProperties,
+              executionEnvironment);
+    } catch (Exception e) {
+      throw new TableException(
+              "Could not instantiate the executor. Make sure a planner module is on the classpath",
+              e);
+    }
+  }
+}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
similarity index 90%
rename from flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
index fd47175..2d98ef7 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
@@ -26,15 +26,15 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
 import org.apache.flink.streaming.experimental.SocketStreamIterator;
-import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.sinks.RetractStreamTableSink;
 import org.apache.flink.types.Row;
+import org.apache.zeppelin.flink.FlinkShims;
 import org.apache.zeppelin.flink.JobManager;
 import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,18 +64,21 @@ public abstract class AbstractStreamSqlJob {
   protected Object resultLock = new Object();
   protected volatile boolean enableToRefresh = true;
   protected int defaultParallelism;
+  protected FlinkShims flinkShims;
   protected ScheduledExecutorService refreshScheduler = Executors.newScheduledThreadPool(1);
 
   public AbstractStreamSqlJob(StreamExecutionEnvironment senv,
                               TableEnvironment stenv,
                               JobManager jobManager,
                               InterpreterContext context,
-                              int defaultParallelism) {
+                              int defaultParallelism,
+                              FlinkShims flinkShims) {
     this.senv = senv;
     this.stenv = stenv;
     this.jobManager = jobManager;
     this.context = context;
     this.defaultParallelism = defaultParallelism;
+    this.flinkShims = flinkShims;
   }
 
   private static TableSchema removeTimeAttributes(TableSchema schema) {
@@ -97,7 +100,8 @@ public abstract class AbstractStreamSqlJob {
 
   public String run(String st) throws IOException {
     Table table = stenv.sqlQuery(st);
-    String tableName = "UnnamedTable_" + st + "_" + SQL_INDEX.getAndIncrement();
+    String tableName = "UnnamedTable_" +
+            "_" + SQL_INDEX.getAndIncrement();
     return run(table, tableName);
   }
 
@@ -125,9 +129,10 @@ public abstract class AbstractStreamSqlJob {
       // pass binding address and port such that sink knows where to send to
       LOGGER.debug("Collecting data at address: " + iterator.getBindAddress() +
               ":" + iterator.getPort());
-      CollectStreamTableSink collectTableSink =
-              new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer);
-      collectTableSink = collectTableSink.configure(
+      RetractStreamTableSink collectTableSink =
+              (RetractStreamTableSink) flinkShims.getCollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer);
+             // new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer);
+      collectTableSink = (RetractStreamTableSink) collectTableSink.configure(
               outputType.getFieldNames(), outputType.getFieldTypes());
 
       // workaround, otherwise it won't find the sink properly
@@ -136,8 +141,8 @@ public abstract class AbstractStreamSqlJob {
       try {
         stenv.useCatalog("default_catalog");
         stenv.useDatabase("default_database");
-        stenv.registerTableSink(tableName, collectTableSink);
-        table.insertInto(new StreamQueryConfig(), tableName);
+        flinkShims.registerTableSink(stenv, tableName, collectTableSink);
+        table.insertInto(tableName);
       } finally {
         stenv.useCatalog(originalCatalog);
         stenv.useDatabase(originalDatabase);
@@ -205,6 +210,7 @@ public abstract class AbstractStreamSqlJob {
       try {
         while (isRunning && iterator.hasNext()) {
           final Tuple2<Boolean, Row> change = iterator.next();
+          LOGGER.info(change.f0 + ", " + change.f1);
           processRecord(change);
         }
       } catch (Throwable e) {
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
similarity index 93%
rename from flink/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
index ba5b4fe..f1eb997 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
@@ -19,10 +19,11 @@
 package org.apache.zeppelin.flink.sql;
 
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.scala.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.StringUtils;
+import org.apache.zeppelin.flink.FlinkShims;
 import org.apache.zeppelin.flink.JobManager;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.tabledata.TableDataUtils;
@@ -42,11 +43,12 @@ public class AppendStreamSqlJob extends AbstractStreamSqlJob {
   private long tsWindowThreshold;
 
   public AppendStreamSqlJob(StreamExecutionEnvironment senv,
-                            StreamTableEnvironment stEnv,
+                            TableEnvironment stEnv,
                             JobManager jobManager,
                             InterpreterContext context,
-                            int defaultParallelism) {
-    super(senv, stEnv, jobManager, context, defaultParallelism);
+                            int defaultParallelism,
+                            FlinkShims flinkShims) {
+    super(senv, stEnv, jobManager, context, defaultParallelism, flinkShims);
     this.tsWindowThreshold = Long.parseLong(context.getLocalProperties()
             .getOrDefault("threshold", 1000 * 60 * 60 + ""));
   }
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
similarity index 93%
rename from flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
index 567b940..05d8a4f 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
@@ -21,6 +21,7 @@ package org.apache.zeppelin.flink.sql;
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.types.Row;
+import org.apache.zeppelin.flink.FlinkShims;
 import org.apache.zeppelin.flink.JobManager;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.tabledata.TableDataUtils;
@@ -40,8 +41,9 @@ public class SingleRowStreamSqlJob extends AbstractStreamSqlJob {
                                TableEnvironment stenv,
                                JobManager jobManager,
                                InterpreterContext context,
-                               int defaultParallelism) {
-    super(senv, stenv, jobManager, context, defaultParallelism);
+                               int defaultParallelism,
+                               FlinkShims flinkShims) {
+    super(senv, stenv, jobManager, context, defaultParallelism, flinkShims);
     this.template = context.getLocalProperties().getOrDefault("template", "{0}");
   }
 
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java
similarity index 100%
rename from flink/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java
rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java
similarity index 90%
rename from flink/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java
rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java
index 0353d89..805afee 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java
@@ -19,9 +19,10 @@
 package org.apache.zeppelin.flink.sql;
 
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
-import org.apache.flink.table.api.scala.StreamTableEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.StringUtils;
+import org.apache.zeppelin.flink.FlinkShims;
 import org.apache.zeppelin.flink.JobManager;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.tabledata.TableDataUtils;
@@ -40,11 +41,12 @@ public class UpdateStreamSqlJob extends AbstractStreamSqlJob {
   private List<Row> lastSnapshot = new ArrayList<>();
 
   public UpdateStreamSqlJob(StreamExecutionEnvironment senv,
-                            StreamTableEnvironment stEnv,
+                            TableEnvironment stEnv,
                             JobManager jobManager,
                             InterpreterContext context,
-                            int defaultParallelism) {
-    super(senv, stEnv, jobManager, context, defaultParallelism);
+                            int defaultParallelism,
+                            FlinkShims flinkShims) {
+    super(senv, stEnv, jobManager, context, defaultParallelism, flinkShims);
   }
 
   @Override
@@ -64,6 +66,7 @@ public class UpdateStreamSqlJob extends AbstractStreamSqlJob {
     LOGGER.debug("processDelete: " + row.toString());
     for (int i = 0; i < materializedTable.size(); i++) {
       if (materializedTable.get(i).equals(row)) {
+        LOGGER.debug("real processDelete: " + row.toString());
         materializedTable.remove(i);
         break;
       }
diff --git a/flink/src/main/resources/interpreter-setting.json b/flink/interpreter/src/main/resources/interpreter-setting.json
similarity index 100%
rename from flink/src/main/resources/interpreter-setting.json
rename to flink/interpreter/src/main/resources/interpreter-setting.json
diff --git a/flink/src/main/resources/python/zeppelin_ipyflink.py b/flink/interpreter/src/main/resources/python/zeppelin_ipyflink.py
similarity index 78%
rename from flink/src/main/resources/python/zeppelin_ipyflink.py
rename to flink/interpreter/src/main/resources/python/zeppelin_ipyflink.py
index fe94c9f..50efbea 100644
--- a/flink/src/main/resources/python/zeppelin_ipyflink.py
+++ b/flink/interpreter/src/main/resources/python/zeppelin_ipyflink.py
@@ -46,11 +46,18 @@ pyflink.java_gateway.import_flink_view(gateway)
 pyflink.java_gateway.install_exception_handler()
 
 b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
-bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"), True)
-bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"), False)
 s_env = StreamExecutionEnvironment(intp.getJavaStreamExecutionEnvironment())
-st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"), True)
-st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"), False)
+
+if intp.isFlink110():
+    bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"), True)
+    bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"), False)
+    st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"), True)
+    st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"), False)
+else:
+    bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"))
+    bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"))
+    st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"))
+    st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"))
 
 class IPyFlinkZeppelinContext(PyZeppelinContext):
 
diff --git a/flink/src/main/resources/python/zeppelin_pyflink.py b/flink/interpreter/src/main/resources/python/zeppelin_pyflink.py
similarity index 76%
rename from flink/src/main/resources/python/zeppelin_pyflink.py
rename to flink/interpreter/src/main/resources/python/zeppelin_pyflink.py
index 8a401b2..542ab8f 100644
--- a/flink/src/main/resources/python/zeppelin_pyflink.py
+++ b/flink/interpreter/src/main/resources/python/zeppelin_pyflink.py
@@ -35,11 +35,18 @@ pyflink.java_gateway.import_flink_view(gateway)
 pyflink.java_gateway.install_exception_handler()
 
 b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
-bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"), True)
-bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"), False)
 s_env = StreamExecutionEnvironment(intp.getJavaStreamExecutionEnvironment())
-st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"), True)
-st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"), False)
+
+if intp.isFlink110():
+  bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"), True)
+  bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"), False)
+  st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"), True)
+  st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"), False)
+else:
+  bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"))
+  bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"))
+  st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"))
+  st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"))
 
 from zeppelin_context import PyZeppelinContext
 
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala
similarity index 100%
rename from flink/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala
rename to flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala
similarity index 100%
rename from flink/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala
rename to flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
similarity index 81%
rename from flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
rename to flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 7922c99..3c3162e 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit
 import java.util.jar.JarFile
 
 import org.apache.commons.lang3.StringUtils
+import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.scala.FlinkShell.{ExecutionMode, _}
 import org.apache.flink.api.scala.{ExecutionEnvironment, FlinkILoop}
@@ -36,12 +37,11 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironmentFactory, StreamExecutionEnvironment => JStreamExecutionEnvironment}
 import org.apache.flink.api.java.{ExecutionEnvironmentFactory, ExecutionEnvironment => JExecutionEnvironment}
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
+import org.apache.flink.runtime.util.EnvironmentInformation
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl
-import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment}
 import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, TableEnvironment}
-import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
 import org.apache.flink.table.catalog.hive.HiveCatalog
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableAggregateFunction, TableFunction}
 import org.apache.flink.table.module.ModuleManager
@@ -82,11 +82,11 @@ class FlinkScalaInterpreter(val properties: Properties) {
 
   // TableEnvironment of blink planner
   private var btenv: TableEnvironment = _
-  private var stenv: StreamTableEnvironment = _
+  private var stenv: TableEnvironment = _
 
   // TableEnvironment of flink planner
-  private var btenv_2: BatchTableEnvironment = _
-  private var stenv_2: StreamTableEnvironment = _
+  private var btenv_2: TableEnvironment = _
+  private var stenv_2: TableEnvironment = _
 
   // PyFlink depends on java version of TableEnvironment,
   // so need to create java version of TableEnvironment
@@ -97,13 +97,52 @@ class FlinkScalaInterpreter(val properties: Properties) {
   private var java_stenv_2: TableEnvironment = _
 
   private var z: FlinkZeppelinContext = _
+  private var flinkVersion: FlinkVersion = _
+  private var flinkShims: FlinkShims = _
   private var jmWebUrl: String = _
   private var jobManager: JobManager = _
-  private var defaultParallelism = 1;
-  private var defaultSqlParallelism = 1;
+  private var defaultParallelism = 1
+  private var defaultSqlParallelism = 1
   private var userJars: Seq[String] = _
 
   def open(): Unit = {
+    val config = initFlinkConfig()
+    createFlinkILoop(config)
+    createTableEnvs()
+    setTableEnvConfig()
+
+    // init ZeppelinContext
+    this.z = new FlinkZeppelinContext(this, new InterpreterHookRegistry(),
+      Integer.parseInt(properties.getProperty("zeppelin.flink.maxResult", "1000")))
+    val modifiers = new java.util.ArrayList[String]()
+    modifiers.add("@transient")
+    this.bind("z", z.getClass().getCanonicalName(), z, modifiers);
+
+    this.jobManager = new JobManager(this.z, jmWebUrl)
+
+    // register JobListener
+    val jobListener = new FlinkJobListener()
+    this.benv.registerJobListener(jobListener)
+    this.senv.registerJobListener(jobListener)
+
+    // register hive catalog
+    if (properties.getProperty("zeppelin.flink.enableHive", "false").toBoolean) {
+      LOGGER.info("Hive is enabled, registering hive catalog.")
+      registerHiveCatalog()
+    } else {
+      LOGGER.info("Hive is disabled.")
+    }
+
+    // load udf jar
+    val udfJars = properties.getProperty("flink.udf.jars", "")
+    if (!StringUtils.isBlank(udfJars)) {
+      udfJars.split(",").foreach(jar => {
+        loadUDFJar(jar)
+      })
+    }
+  }
+
+  private def initFlinkConfig(): Config = {
     val flinkHome = properties.getProperty("FLINK_HOME", sys.env.getOrElse("FLINK_HOME", ""))
     val flinkConfDir = properties.getProperty("FLINK_CONF_DIR", sys.env.getOrElse("FLINK_CONF_DIR", ""))
     val hadoopConfDir = properties.getProperty("HADOOP_CONF_DIR", sys.env.getOrElse("HADOOP_CONF_DIR", ""))
@@ -115,6 +154,10 @@ class FlinkScalaInterpreter(val properties: Properties) {
     LOGGER.info("YARN_CONF_DIR: " + yarnConfDir)
     LOGGER.info("HIVE_CONF_DIR: " + hiveConfDir)
 
+    this.flinkVersion = new FlinkVersion(EnvironmentInformation.getVersion)
+    LOGGER.info("Using flink: " + flinkVersion)
+    this.flinkShims = FlinkShims.getInstance(flinkVersion, properties)
+
     this.configuration = GlobalConfiguration.loadConfiguration(flinkConfDir)
 
     mode = ExecutionMode.withName(
@@ -180,6 +223,10 @@ class FlinkScalaInterpreter(val properties: Properties) {
         .copy(port = Some(Integer.parseInt(port)))
     }
 
+    config
+  }
+
+  private def createFlinkILoop(config: Config): Unit = {
     val printReplOutput = properties.getProperty("zeppelin.flink.printREPLOutput", "true").toBoolean
     val replOut = if (printReplOutput) {
       new JPrintWriter(interpreterOutput, true)
@@ -234,6 +281,10 @@ class FlinkScalaInterpreter(val properties: Properties) {
         Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
         val repl = new FlinkILoop(configuration, config.externalJars, None, replOut)
         (repl, cluster)
+      } catch {
+        case e: Exception =>
+          LOGGER.error(ExceptionUtils.getStackTrace(e))
+          throw e
       } finally {
         Thread.currentThread().setContextClassLoader(classLoader)
       }
@@ -241,6 +292,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
 
     this.flinkILoop = iLoop
     this.cluster = cluster
+
     val settings = new Settings()
     settings.usejavacp.value = true
     settings.Yreplsync.value = true
@@ -259,6 +311,42 @@ class FlinkScalaInterpreter(val properties: Properties) {
       // set execution environment
       flinkILoop.intp.bind("benv", flinkILoop.scalaBenv)
       flinkILoop.intp.bind("senv", flinkILoop.scalaSenv)
+
+      val packageImports = Seq[String](
+        "org.apache.flink.core.fs._",
+        "org.apache.flink.core.fs.local._",
+        "org.apache.flink.api.common.io._",
+        "org.apache.flink.api.common.aggregators._",
+        "org.apache.flink.api.common.accumulators._",
+        "org.apache.flink.api.common.distributions._",
+        "org.apache.flink.api.common.operators._",
+        "org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint",
+        "org.apache.flink.api.common.functions._",
+        "org.apache.flink.api.java.io._",
+        "org.apache.flink.api.java.aggregation._",
+        "org.apache.flink.api.java.functions._",
+        "org.apache.flink.api.java.operators._",
+        "org.apache.flink.api.java.sampling._",
+        "org.apache.flink.api.scala._",
+        "org.apache.flink.api.scala.utils._",
+        "org.apache.flink.streaming.api.scala._",
+        "org.apache.flink.streaming.api.windowing.time._",
+        "org.apache.flink.types.Row"
+      )
+
+      flinkILoop.intp.interpret("import " + packageImports.mkString(", "))
+
+      if (flinkVersion.isFlink110) {
+        flinkILoop.intp.interpret("import org.apache.flink.table.api.scala._")
+      } else {
+        flinkILoop.intp.interpret("import org.apache.flink.table.api._")
+        flinkILoop.intp.interpret("import org.apache.flink.table.api.bridge.scala._")
+      }
+
+      flinkILoop.intp.interpret("import org.apache.flink.table.functions.ScalarFunction")
+      flinkILoop.intp.interpret("import org.apache.flink.table.functions.AggregateFunction")
+      flinkILoop.intp.interpret("import org.apache.flink.table.functions.TableFunction")
+      flinkILoop.intp.interpret("import org.apache.flink.table.functions.TableAggregateFunction")
     }
 
     val in0 = getField(flinkILoop, "scala$tools$nsc$interpreter$ILoop$$in0")
@@ -280,43 +368,47 @@ class FlinkScalaInterpreter(val properties: Properties) {
     this.senv.setParallelism(configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM))
 
     setAsContext()
+  }
 
+  private def createTableEnvs(): Unit = {
     val originalClassLoader = Thread.currentThread().getContextClassLoader
     try {
       Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
       val tblConfig = new TableConfig
       tblConfig.getConfiguration.addAll(configuration)
       // Step 1.1 Initialize the CatalogManager if required.
-      val catalogManager = new CatalogManager("default_catalog",
-        new GenericInMemoryCatalog("default_catalog", "default_database"));
+      val catalogManager = flinkShims.createCatalogManager(tblConfig.getConfiguration).asInstanceOf[CatalogManager]
       // Step 1.2 Initialize the ModuleManager if required.
       val moduleManager = new ModuleManager();
       // Step 1.3 Initialize the FunctionCatalog if required.
       val flinkFunctionCatalog = new FunctionCatalog(tblConfig, catalogManager, moduleManager);
       val blinkFunctionCatalog = new FunctionCatalog(tblConfig, catalogManager, moduleManager);
 
-      this.tblEnvFactory = new TableEnvFactory(this.benv, this.senv, tblConfig,
+      this.tblEnvFactory = new TableEnvFactory(this.flinkVersion, this.flinkShims, this.benv, this.senv, tblConfig,
         catalogManager, moduleManager, flinkFunctionCatalog, blinkFunctionCatalog)
 
+      val modifiers = new java.util.ArrayList[String]()
+      modifiers.add("@transient")
+
       // blink planner
       var btEnvSetting = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build()
       this.btenv = tblEnvFactory.createJavaBlinkBatchTableEnvironment(btEnvSetting);
-      flinkILoop.intp.bind("btenv", this.btenv.asInstanceOf[StreamTableEnvironmentImpl])
+      flinkILoop.bind("btenv", btenv.getClass().getCanonicalName(), btenv, List("@transient"))
       this.java_btenv = this.btenv
 
       var stEnvSetting =
         EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
       this.stenv = tblEnvFactory.createScalaBlinkStreamTableEnvironment(stEnvSetting)
-      flinkILoop.intp.bind("stenv", this.stenv)
+      flinkILoop.bind("stenv", stenv.getClass().getCanonicalName(), stenv, List("@transient"))
       this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting)
 
       // flink planner
       this.btenv_2 = tblEnvFactory.createScalaFlinkBatchTableEnvironment()
-      flinkILoop.intp.bind("btenv_2", this.btenv_2)
+      flinkILoop.bind("btenv_2", btenv_2.getClass().getCanonicalName(), btenv_2, List("@transient"))
       stEnvSetting =
         EnvironmentSettings.newInstance().inStreamingMode().useOldPlanner().build()
       this.stenv_2 = tblEnvFactory.createScalaFlinkStreamTableEnvironment(stEnvSetting)
-      flinkILoop.intp.bind("stenv_2", this.stenv_2)
+      flinkILoop.bind("stenv_2", stenv_2.getClass().getCanonicalName(), stenv_2, List("@transient"))
 
       this.java_btenv_2 = tblEnvFactory.createJavaFlinkBatchTableEnvironment()
       btEnvSetting = EnvironmentSettings.newInstance.useOldPlanner.inStreamingMode.build
@@ -324,7 +416,9 @@ class FlinkScalaInterpreter(val properties: Properties) {
     } finally {
       Thread.currentThread().setContextClassLoader(originalClassLoader)
     }
+  }
 
+  private def setTableEnvConfig(): Unit = {
     this.properties.asScala.filter(e => e._1.startsWith("table.exec"))
       .foreach(e => {
         this.btenv.getConfig.getConfiguration.setString(e._1, e._2)
@@ -347,97 +441,24 @@ class FlinkScalaInterpreter(val properties: Properties) {
       this.benv.getConfig.disableSysoutLogging()
       this.senv.getConfig.disableSysoutLogging()
     }
+  }
 
-    flinkILoop.interpret("import org.apache.flink.api.scala._")
-    flinkILoop.interpret("import org.apache.flink.table.api.scala._")
-    flinkILoop.interpret("import org.apache.flink.types.Row")
-    flinkILoop.interpret("import org.apache.flink.table.functions.ScalarFunction")
-    flinkILoop.interpret("import org.apache.flink.table.functions.AggregateFunction")
-    flinkILoop.interpret("import org.apache.flink.table.functions.TableFunction")
-
-    this.z = new FlinkZeppelinContext(this, new InterpreterHookRegistry(),
-      Integer.parseInt(properties.getProperty("zeppelin.flink.maxResult", "1000")))
-    val modifiers = new java.util.ArrayList[String]()
-    modifiers.add("@transient")
-    this.bind("z", z.getClass().getCanonicalName(), z, modifiers);
-
-    this.jobManager = new JobManager(this.z, jmWebUrl)
-
-    val jobListener = new JobListener {
-      override def onJobSubmitted(jobClient: JobClient, e: Throwable): Unit = {
-        if (e != null) {
-          LOGGER.warn("Fail to submit job")
-        } else {
-          if (InterpreterContext.get() == null) {
-            LOGGER.warn("Job {} is submitted but unable to associate this job to paragraph, " +
-              "as InterpreterContext is null", jobClient.getJobID)
-          } else {
-            LOGGER.info("Job {} is submitted for paragraph {}", Array(jobClient.getJobID,
-              InterpreterContext.get().getParagraphId): _ *)
-            jobManager.addJob(InterpreterContext.get(), jobClient)
-            if (jmWebUrl != null) {
-              jobManager.sendFlinkJobUrl(InterpreterContext.get());
-            } else {
-              LOGGER.error("Unable to link JobURL, because JobManager weburl is null")
-            }
-          }
-        }
-      }
-
-      override def onJobExecuted(jobExecutionResult: JobExecutionResult, e: Throwable): Unit = {
-        if (e != null) {
-          LOGGER.warn("Fail to execute job")
-        } else {
-          LOGGER.info("Job {} is executed with time {} seconds", jobExecutionResult.getJobID,
-            jobExecutionResult.getNetRuntime(TimeUnit.SECONDS))
-        }
-        if (InterpreterContext.get() != null) {
-          jobManager.removeJob(InterpreterContext.get().getParagraphId)
-        } else {
-          if (e == null) {
-            LOGGER.warn("Unable to remove this job {}, as InterpreterContext is null",
-              jobExecutionResult.getJobID)
-          }
-        }
-      }
-    }
-
-    this.benv.registerJobListener(jobListener)
-    this.senv.registerJobListener(jobListener)
-
-    // register hive catalog
-    if (properties.getProperty("zeppelin.flink.enableHive", "false").toBoolean) {
-      LOGGER.info("Hive is enabled, registering hive catalog.")
-      val hiveConfDir =
-        properties.getOrDefault("HIVE_CONF_DIR", System.getenv("HIVE_CONF_DIR")).toString
-      if (hiveConfDir == null) {
-        throw new InterpreterException("HIVE_CONF_DIR is not specified");
-      }
-      val database = properties.getProperty("zeppelin.flink.hive.database", "default")
-      if (database == null) {
-        throw new InterpreterException("default database is not specified, " +
-          "please set zeppelin.flink.hive.database")
-      }
-      val hiveVersion = properties.getProperty("zeppelin.flink.hive.version", "2.3.4")
-      val hiveCatalog = new HiveCatalog("hive", database, hiveConfDir, hiveVersion)
-      this.btenv.registerCatalog("hive", hiveCatalog)
-      this.btenv.useCatalog("hive")
-      this.btenv.useDatabase("default")
-      this.btenv.loadModule("hive", new HiveModule(hiveVersion))
-    } else {
-      LOGGER.info("Hive is disabled.")
-    }
-
-    // load udf jar
-    val udfJars = properties.getProperty("flink.udf.jars", "")
-    if (!StringUtils.isBlank(udfJars)) {
-      udfJars.split(",").foreach(jar => {
-        loadUDFJar(jar)
-      })
+  private def registerHiveCatalog(): Unit = {
+    val hiveConfDir =
+      properties.getOrDefault("HIVE_CONF_DIR", System.getenv("HIVE_CONF_DIR")).toString
+    if (hiveConfDir == null) {
+      throw new InterpreterException("HIVE_CONF_DIR is not specified");
     }
+    val database = properties.getProperty("zeppelin.flink.hive.database", "default")
+    val hiveVersion = properties.getProperty("zeppelin.flink.hive.version", "2.3.4")
+    val hiveCatalog = new HiveCatalog("hive", database, hiveConfDir, hiveVersion)
+    this.btenv.registerCatalog("hive", hiveCatalog)
+    this.btenv.useCatalog("hive")
+    this.btenv.useDatabase(database)
+    this.btenv.loadModule("hive", new HiveModule(hiveVersion))
   }
 
-  def loadUDFJar(jar: String): Unit = {
+  private def loadUDFJar(jar: String): Unit = {
     LOGGER.info("Loading UDF Jar: " + jar)
     val jarFile = new JarFile(jar)
     val entries = jarFile.entries
@@ -459,13 +480,13 @@ class FlinkScalaInterpreter(val properties: Properties) {
             btenv.registerFunction(c.getSimpleName, scalarUDF)
           } else if (udf.isInstanceOf[TableFunction[_]]) {
             val tableUDF = udf.asInstanceOf[TableFunction[_]]
-            (btenv.asInstanceOf[StreamTableEnvironmentImpl]).registerFunction(c.getSimpleName, tableUDF)
+            flinkShims.registerTableFunction(btenv, c.getSimpleName, tableUDF)
           } else if (udf.isInstanceOf[AggregateFunction[_,_]]) {
             val aggregateUDF = udf.asInstanceOf[AggregateFunction[_,_]]
-            (btenv.asInstanceOf[StreamTableEnvironmentImpl]).registerFunction(c.getSimpleName, aggregateUDF)
+            flinkShims.registerAggregateFunction(btenv, c.getSimpleName, aggregateUDF)
           } else if (udf.isInstanceOf[TableAggregateFunction[_,_]]) {
             val tableAggregateUDF = udf.asInstanceOf[TableAggregateFunction[_,_]]
-            (btenv.asInstanceOf[StreamTableEnvironmentImpl]).registerFunction(c.getSimpleName, tableAggregateUDF)
+            flinkShims.registerTableAggregateFunction(btenv, c.getSimpleName, tableAggregateUDF)
           } else {
             LOGGER.warn("No UDF definition found in class file: " + je.getName)
           }
@@ -477,7 +498,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
     }
   }
 
-  def setAsContext(): Unit = {
+  private def setAsContext(): Unit = {
     val streamFactory = new StreamExecutionEnvironmentFactory() {
       override def createExecutionEnvironment = senv.getJavaEnv
     }
@@ -693,7 +714,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
       this.btenv_2
   }
 
-  def getStreamTableEnvironment(planner: String = "blink"): StreamTableEnvironment = {
+  def getStreamTableEnvironment(planner: String = "blink"): TableEnvironment = {
     if (planner == "blink")
       this.stenv
     else
@@ -765,6 +786,49 @@ class FlinkScalaInterpreter(val properties: Properties) {
 
   def getFlinkILoop = flinkILoop
 
+  def getFlinkShims = flinkShims
+
+  def getFlinkVersion = flinkVersion
+
+  class FlinkJobListener extends JobListener {
+
+    override def onJobSubmitted(jobClient: JobClient, e: Throwable): Unit = {
+      if (e != null) {
+        LOGGER.warn("Fail to submit job")
+      } else {
+        if (InterpreterContext.get() == null) {
+          LOGGER.warn("Job {} is submitted but unable to associate this job to paragraph, " +
+            "as InterpreterContext is null", jobClient.getJobID)
+        } else {
+          LOGGER.info("Job {} is submitted for paragraph {}", Array(jobClient.getJobID,
+            InterpreterContext.get().getParagraphId): _ *)
+          jobManager.addJob(InterpreterContext.get(), jobClient)
+          if (jmWebUrl != null) {
+            jobManager.sendFlinkJobUrl(InterpreterContext.get());
+          } else {
+            LOGGER.error("Unable to link JobURL, because JobManager weburl is null")
+          }
+        }
+      }
+    }
+
+    override def onJobExecuted(jobExecutionResult: JobExecutionResult, e: Throwable): Unit = {
+      if (e != null) {
+        LOGGER.warn("Fail to execute job")
+      } else {
+        LOGGER.info("Job {} is executed with time {} seconds", jobExecutionResult.getJobID,
+          jobExecutionResult.getNetRuntime(TimeUnit.SECONDS))
+      }
+      if (InterpreterContext.get() != null) {
+        jobManager.removeJob(InterpreterContext.get().getParagraphId)
+      } else {
+        if (e == null) {
+          LOGGER.warn("Unable to remove this job {}, as InterpreterContext is null",
+            jobExecutionResult.getJobID)
+        }
+      }
+    }
+  }
 }
 
 
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
similarity index 85%
rename from flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
rename to flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
index 146ec63..b3a964a 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
@@ -21,12 +21,11 @@ package org.apache.zeppelin.flink
 import java.io.IOException
 import java.util.concurrent.atomic.AtomicInteger
 
-import com.google.common.collect.Maps
 import org.apache.flink.api.scala.DataSet
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.table.api.internal.TableImpl
-import org.apache.flink.table.api.{Table, TableEnvironment, TableUtils}
-import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment}
+import org.apache.flink.table.api.Table
+//import org.apache.flink.table.api.scala.BatchTableEnvironment
 import org.apache.flink.types.Row
 import org.apache.flink.util.StringUtils
 import org.apache.zeppelin.annotation.ZeppelinApi
@@ -103,13 +102,17 @@ class FlinkZeppelinContext(val flinkInterpreter: FlinkScalaInterpreter,
   override def showData(obj: Any, maxResult: Int): String = {
     if (obj.isInstanceOf[DataSet[_]]) {
       val ds = obj.asInstanceOf[DataSet[_]]
-      val btenv = flinkInterpreter.getBatchTableEnvironment("flink").asInstanceOf[BatchTableEnvironment]
-      val table = btenv.fromDataSet(ds)
+      val btenv = flinkInterpreter.getBatchTableEnvironment("flink")//.asInstanceOf[BatchTableEnvironment]
+
+      val table =  flinkInterpreter.getFlinkShims.fromDataSet(btenv, ds).asInstanceOf[Table]
+        //btenv.fromDataSet(ds)
       val columnNames: Array[String] = table.getSchema.getFieldNames
-      val dsRows: DataSet[Row] = btenv.toDataSet[Row](table)
+      val dsRows: DataSet[Row] = flinkInterpreter.getFlinkShims.toDataSet(btenv, table).asInstanceOf[DataSet[Row]]
+        //        btenv.toDataSet[Row](table)
       showTable(columnNames, dsRows.first(maxResult + 1).collect())
     } else if (obj.isInstanceOf[Table]) {
-      val rows = JavaConversions.asScalaBuffer(TableUtils.collectToList(obj.asInstanceOf[TableImpl])).toSeq
+      val rows = JavaConversions.asScalaBuffer(
+        flinkInterpreter.getFlinkShims.collectToList(obj.asInstanceOf[TableImpl]).asInstanceOf[java.util.List[Row]]).toSeq
       val columnNames = obj.asInstanceOf[Table].getSchema.getFieldNames
       showTable(columnNames, rows)
     } else {
@@ -119,35 +122,39 @@ class FlinkZeppelinContext(val flinkInterpreter: FlinkScalaInterpreter,
 
   def showFlinkTable(table: Table): String = {
     val columnNames: Array[String] = table.getSchema.getFieldNames
-    val dsRows: DataSet[Row] = flinkInterpreter.getJavaBatchTableEnvironment("flink")
-      .asInstanceOf[BatchTableEnvironment].toDataSet[Row](table)
+    val btenv =  flinkInterpreter.getJavaBatchTableEnvironment("flink")
+    val dsRows: DataSet[Row] = flinkInterpreter.getFlinkShims.toDataSet(btenv, table).asInstanceOf[DataSet[Row]]
     showTable(columnNames, dsRows.first(maxResult + 1).collect())
   }
 
   def showBlinkTable(table: Table): String = {
-    val rows = JavaConversions.asScalaBuffer(TableUtils.collectToList(table.asInstanceOf[TableImpl])).toSeq
+    val rows = JavaConversions.asScalaBuffer(
+      flinkInterpreter.getFlinkShims.collectToList(table.asInstanceOf[TableImpl]).asInstanceOf[java.util.List[Row]]).toSeq
     val columnNames = table.getSchema.getFieldNames
     showTable(columnNames, rows)
   }
 
   def show(table: Table, streamType: String, configs: Map[String, String] = Map.empty): Unit = {
-    val stenv = flinkInterpreter.getStreamTableEnvironment()
+    val stenv = flinkInterpreter.getJavaStreamTableEnvironment("blink")
     val context = InterpreterContext.get()
     configs.foreach(e => context.getLocalProperties.put(e._1, e._2))
     val tableName = "UnnamedTable_" + context.getParagraphId.replace("-", "_") + "_" + SQL_INDEX.getAndIncrement()
     if (streamType.equalsIgnoreCase("single")) {
       val streamJob = new SingleRowStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment,
-        stenv, flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism)
+        table.asInstanceOf[TableImpl].getTableEnvironment,
+        flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism, flinkInterpreter.getFlinkShims)
       streamJob.run(table, tableName)
     }
     else if (streamType.equalsIgnoreCase("append")) {
       val streamJob = new AppendStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment,
-        stenv, flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism)
+        table.asInstanceOf[TableImpl].getTableEnvironment,
+        flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism, flinkInterpreter.getFlinkShims)
       streamJob.run(table, tableName)
     }
     else if (streamType.equalsIgnoreCase("update")) {
       val streamJob = new UpdateStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment,
-        stenv, flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism)
+        table.asInstanceOf[TableImpl].getTableEnvironment,
+        flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism, flinkInterpreter.getFlinkShims)
       streamJob.run(table, tableName)
     }
     else throw new IOException("Unrecognized stream type: " + streamType)
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/util/DependencyUtils.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/util/DependencyUtils.scala
similarity index 100%
rename from flink/src/main/scala/org/apache/zeppelin/flink/util/DependencyUtils.scala
rename to flink/interpreter/src/main/scala/org/apache/zeppelin/flink/util/DependencyUtils.scala
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
similarity index 97%
rename from flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
rename to flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
index 03efa9b..b3c3ae4 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
@@ -107,14 +107,15 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
                     "    return s.upper()", getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
+    context = getInterpreterContext();
     result = pyFlinkInterpreter.interpret("bt_env.register_function(\"python_upper\", " +
                     "udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))",
-            getInterpreterContext());
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-    resultMessages = context.out.toInterpreterResultMessage();
-    assertEquals(1, resultMessages.size());
-    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
-    assertEquals("add_one\n2\n3\n", resultMessages.get(0).getData());
+            context);
+    assertEquals(result.toString(), InterpreterResult.Code.SUCCESS, result.code());
+    //    resultMessages = context.out.toInterpreterResultMessage();
+    //    assertEquals(1, resultMessages.size());
+    //    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    //    assertEquals("add_one\n2\n3\n", resultMessages.get(0).getData());
 
     // select which use python udf
     context = getInterpreterContext();
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
similarity index 100%
rename from flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
rename to flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
similarity index 100%
rename from flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
rename to flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
similarity index 98%
rename from flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
rename to flink/interpreter/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
index e857508..9e9b46b 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
@@ -171,7 +171,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
         "     .field(\"a\", DataTypes.BIGINT())\n" +
         "     .field(\"b\", DataTypes.STRING())\n" +
         "     .field(\"c\", DataTypes.STRING())) \\\n" +
-        "   .register_table_sink(\"batch_sink\")\n" +
+        "   .create_temporary_table(\"batch_sink\")\n" +
         "t.select(\"a + 1, b, c\").insert_into(\"batch_sink\")\n" +
         "bt_env.execute(\"batch_job\")"
             , context);
@@ -201,7 +201,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
             "      .field(\"a\", DataTypes.STRING())\n" +
             "      .field(\"b\", DataTypes.BIGINT())\n" +
             "      .field(\"c\", DataTypes.BIGINT())) \\\n" +
-            "    .register_table_sink(\"batch_sink4\")\n" +
+            "    .create_temporary_table(\"batch_sink4\")\n" +
             "t.group_by(\"c\").select(\"c, sum(a), count(b)\").insert_into(\"batch_sink4\")\n" +
             "bt_env.execute(\"batch_job4\")"
             , context);
@@ -242,7 +242,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
             "      .field(\"a\", DataTypes.BIGINT())\n" +
             "      .field(\"b\", DataTypes.STRING())\n" +
             "      .field(\"c\", DataTypes.STRING())) \\\n" +
-            "    .register_table_sink(\"batch_sink3\")\n" +
+            "    .create_temporary_table(\"batch_sink3\")\n" +
             "t.select(\"a, addOne(a), c\").insert_into(\"batch_sink3\")\n" +
             "bt_env.execute(\"batch_job3\")"
             , context);
@@ -311,11 +311,11 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
             "      .field(\"a\", DataTypes.BIGINT())\n" +
             "      .field(\"b\", DataTypes.STRING())\n" +
             "      .field(\"c\", DataTypes.STRING())) \\\n" +
-            "    .register_table_sink(\"stream_sink\")\n" +
+            "    .create_temporary_table(\"stream_sink\")\n" +
             "t.select(\"a + 1, b, c\").insert_into(\"stream_sink\")\n" +
             "st_env.execute(\"stream_job\")"
             , context);
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
   }
 
   public static void testSingleStreamTableApi(Interpreter interpreter,
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
similarity index 100%
rename from flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
rename to flink/interpreter/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
similarity index 100%
rename from flink/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
rename to flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
diff --git a/flink/src/test/resources/flink-conf.yaml b/flink/interpreter/src/test/resources/flink-conf.yaml
similarity index 100%
rename from flink/src/test/resources/flink-conf.yaml
rename to flink/interpreter/src/test/resources/flink-conf.yaml
diff --git a/flink/src/test/resources/init_stream.scala b/flink/interpreter/src/test/resources/init_stream.scala
similarity index 100%
rename from flink/src/test/resources/init_stream.scala
rename to flink/interpreter/src/test/resources/init_stream.scala
diff --git a/flink/src/test/resources/log4j.properties b/flink/interpreter/src/test/resources/log4j.properties
similarity index 90%
rename from flink/src/test/resources/log4j.properties
rename to flink/interpreter/src/test/resources/log4j.properties
index 24ec949..23680df 100644
--- a/flink/src/test/resources/log4j.properties
+++ b/flink/interpreter/src/test/resources/log4j.properties
@@ -21,7 +21,8 @@ log4j.appender.stdout = org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
 
-log4j.logger.org.apache.hive=WARN
-log4j.logger.org.apache.flink=WARN
+log4j.logger.org.apache.hive=INFO
+log4j.logger.org.apache.flink=INFO
 log4j.logger.org.apache.zeppelin.flink=DEBUG
+log4j.logger.org.apache.zeppelin.python=DEBUG
 
diff --git a/flink/pom.xml b/flink/pom.xml
index 69eb026..e197e22 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -17,798 +17,52 @@
   -->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>zeppelin-interpreter-parent</artifactId>
+        <groupId>org.apache.zeppelin</groupId>
+        <version>0.9.0-SNAPSHOT</version>
+        <relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath>
+    </parent>
 
-  <parent>
-    <artifactId>zeppelin-interpreter-parent</artifactId>
     <groupId>org.apache.zeppelin</groupId>
+    <artifactId>flink-parent</artifactId>
+    <packaging>pom</packaging>
     <version>0.9.0-SNAPSHOT</version>
-    <relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath>
-  </parent>
-
-  <groupId>org.apache.zeppelin</groupId>
-  <artifactId>zeppelin-flink</artifactId>
-  <packaging>jar</packaging>
-  <version>0.9.0-SNAPSHOT</version>
-  <name>Zeppelin: Flink</name>
-  <description>Zeppelin flink support</description>
-
-  <properties>
-    <!--library versions-->
-    <interpreter.name>flink</interpreter.name>
-    <flink.version>1.10.0</flink.version>
-    <hadoop.version>2.6.5</hadoop.version>
-    <hive.version>2.3.4</hive.version>
-    <hiverunner.version>4.0.0</hiverunner.version>
-    <grpc.version>1.15.0</grpc.version>
-
-    <scala.macros.version>2.0.1</scala.macros.version>
-    <scala.binary.version>2.11</scala.binary.version>
-    <scala.version>2.11.12</scala.version>
-
-    <flink.bin.download.url>https://archive.apache.org/dist/flink/flink-${flink.version}/flink-${flink.version}-bin-scala_${scala.binary.version}.tgz</flink.bin.download.url>
-  </properties>
-
-  <dependencies>
-
-    <dependency>
-      <groupId>org.apache.zeppelin</groupId>
-      <artifactId>zeppelin-python</artifactId>
-      <version>${project.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>io.atomix</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.google.guava</groupId>
-          <artifactId>guava</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>zeppelin-interpreter</artifactId>
-      <version>${project.version}</version>
-      <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>io.atomix</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.google.guava</groupId>
-          <artifactId>guava</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.grpc</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>zeppelin-python</artifactId>
-      <version>${project.version}</version>
-      <classifier>tests</classifier>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>io.atomix</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.google.guava</groupId>
-          <artifactId>guava</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.grpc</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.jline</groupId>
-      <artifactId>jline-terminal</artifactId>
-      <version>3.9.0</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.jline</groupId>
-      <artifactId>jline-reader</artifactId>
-      <version>3.9.0</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-python_${scala.binary.version}</artifactId>
-      <version>${flink.version}</version>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-core</artifactId>
-      <version>${flink.version}</version>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-clients_${scala.binary.version}</artifactId>
-      <version>${flink.version}</version>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-runtime_${scala.binary.version}</artifactId>
-      <version>${flink.version}</version>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-yarn_${scala.binary.version}</artifactId>
-      <version>${flink.version}</version>
-      <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.flink</groupId>
-          <artifactId>flink-shaded-hadoop2</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.eclipse.jetty</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.netty</groupId>
-          <artifactId>netty</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
-      <version>${flink.version}</version>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
-      <version>${flink.version}</version>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-scala_${scala.binary.version}</artifactId>
-      <version>${flink.version}</version>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-scala-shell_${scala.binary.version}</artifactId>
-      <version>${flink.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-streaming-java_2.11</artifactId>
-      <version>${flink.version}</version>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-streaming-scala_2.11</artifactId>
-      <version>${flink.version}</version>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-java</artifactId>
-      <version>${flink.version}</version>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-connector-hive_2.11</artifactId>
-      <version>${flink.version}</version>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-connector-hive_2.11</artifactId>
-      <version>${flink.version}</version>
-      <classifier>tests</classifier>
-      <scope>test</scope>
-    </dependency>
-
-    <!-- hadoop compatibility dependency -->
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
-      <version>${flink.version}</version>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.ivy</groupId>
-      <artifactId>ivy</artifactId>
-      <version>2.4.0</version>
-    </dependency>
-
-    <dependency>
-      <groupId>oro</groupId>
-      <!-- oro is needed by ivy, but only listed as an optional dependency, so we include it. -->
-      <artifactId>oro</artifactId>
-      <version>2.0.8</version>
-    </dependency>
-
-
-    <dependency>
-      <groupId>com.google.code.gson</groupId>
-      <artifactId>gson</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-      <version>${scala.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-compiler</artifactId>
-      <version>${scala.version}</version>
-    </dependency>
+    <name>Zeppelin: Flink Parent</name>
+    <description>Zeppelin Flink Support</description>
 
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-reflect</artifactId>
-      <version>${scala.version}</version>
-    </dependency>
+    <modules>
+        <module>interpreter</module>
+        <module>flink-shims</module>
+        <module>flink1.10-shims</module>
+        <module>flink1.11-shims</module>
+    </modules>
 
-    <dependency>
-      <groupId>com.mashape.unirest</groupId>
-      <artifactId>unirest-java</artifactId>
-      <version>1.4.9</version>
-    </dependency>
+    <dependencies>
 
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
-      <version>${flink.version}</version>
-      <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hive</groupId>
-          <artifactId>hive-metastore</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hive</groupId>
-          <artifactId>hive-exec</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
-      <version>${flink.version}</version>
-      <classifier>tests</classifier>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hive</groupId>
-          <artifactId>hive-exec</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hive</groupId>
-          <artifactId>hive-metastore</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-table-planner-blink_2.11</artifactId>
-      <version>${flink.version}</version>
-      <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.reflections</groupId>
-          <artifactId>reflections</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-table-planner_2.11</artifactId>
-      <version>${flink.version}</version>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-core</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.powermock</groupId>
-      <artifactId>powermock-api-mockito</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.powermock</groupId>
-      <artifactId>powermock-module-junit4</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hive</groupId>
-      <artifactId>hive-metastore</artifactId>
-      <version>${hive.version}</version>
-      <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <artifactId>hadoop-auth</artifactId>
-          <groupId>org.apache.hadoop</groupId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.google.guava</groupId>
-          <artifactId>guava</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.netty</groupId>
-          <artifactId>netty</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.netty</groupId>
-          <artifactId>netty-all</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.google.protobuf</groupId>
-          <artifactId>protobuf-java</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hive</groupId>
-      <artifactId>hive-exec</artifactId>
-      <version>${hive.version}</version>
-      <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.calcite</groupId>
-          <artifactId>calcite-core</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.calcite</groupId>
-          <artifactId>calcite-druid</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.calcite.avatica</groupId>
-          <artifactId>avatica</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-codec</groupId>
-          <artifactId>commons-codec</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-httpclient</groupId>
-          <artifactId>commons-httpclient</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-io</groupId>
-          <artifactId>commons-io</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.logging.log4j</groupId>
-          <artifactId>log4j-1.2-api</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.logging.log4j</groupId>
-          <artifactId>log4j-slf4j-impl</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-api</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.zookeeper</groupId>
-          <artifactId>zookeeper</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.curator</groupId>
-          <artifactId>curator-framework</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.curator</groupId>
-          <artifactId>apache-curator</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.google.code.gson</groupId>
-          <artifactId>gson</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>jline</groupId>
-          <artifactId>jline</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.google.guava</groupId>
-          <artifactId>guava</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.netty</groupId>
-          <artifactId>netty</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.netty</groupId>
-          <artifactId>netty-all</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.google.protobuf</groupId>
-          <artifactId>protobuf-java</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hive.hcatalog</groupId>
-      <artifactId>hive-webhcat-java-client</artifactId>
-      <version>${hive.version}</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.calcite</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.google.guava</groupId>
-          <artifactId>guava</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.netty</groupId>
-          <artifactId>netty</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>javax.jms</groupId>
-          <artifactId>jms</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hive</groupId>
-      <artifactId>hive-contrib</artifactId>
-      <version>${hive.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hive.hcatalog</groupId>
-      <artifactId>hive-hcatalog-core</artifactId>
-      <version>${hive.version}</version>
-      <scope>test</scope>
-      <type>test-jar</type>
-      <exclusions>
-        <exclusion>
-          <groupId>jline</groupId>
-          <artifactId>jline</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.google.guava</groupId>
-          <artifactId>guava</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.netty</groupId>
-          <artifactId>netty</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.netty</groupId>
-          <artifactId>netty-all</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>net.jodah</groupId>
-      <artifactId>concurrentunit</artifactId>
-      <version>0.4.4</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>com.klarna</groupId>
-      <artifactId>hiverunner</artifactId>
-      <version>4.0.0</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>com.google.guava</groupId>
-          <artifactId>guava</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>net.jodah</groupId>
-      <artifactId>concurrentunit</artifactId>
-      <version>0.4.4</version>
-      <scope>test</scope>
-    </dependency>
-
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>net.alchim31.maven</groupId>
-        <artifactId>scala-maven-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>eclipse-add-source</id>
-            <goals>
-              <goal>add-source</goal>
-            </goals>
-          </execution>
-          <execution>
-            <id>scala-compile-first</id>
-            <phase>process-resources</phase>
-            <goals>
-              <goal>compile</goal>
-            </goals>
-          </execution>
-          <execution>
-            <id>scala-test-compile-first</id>
-            <phase>process-test-resources</phase>
-            <goals>
-              <goal>testCompile</goal>
-            </goals>
-          </execution>
-        </executions>
-        <configuration>
-          <scalaVersion>${scala.version}</scalaVersion>
-          <!--<recompileMode>incremental</recompileMode>-->
-          <!--<useZincServer>true</useZincServer>-->
-          <args>
-            <arg>-unchecked</arg>
-            <arg>-deprecation</arg>
-            <arg>-feature</arg>
-            <arg>-target:jvm-1.8</arg>
-          </args>
-          <jvmArgs>
-            <jvmArg>-Xms1024m</jvmArg>
-            <jvmArg>-Xmx1024m</jvmArg>
-            <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg>
-          </jvmArgs>
-          <javacArgs>
-            <javacArg>-source</javacArg>
-            <javacArg>${java.version}</javacArg>
-            <javacArg>-target</javacArg>
-            <javacArg>${java.version}</javacArg>
-            <javacArg>-Xlint:all,-serial,-path,-options</javacArg>
-          </javacArgs>
-        </configuration>
-      </plugin>
-
-      <!-- include flink by default -->
-      <plugin>
-        <groupId>com.googlecode.maven-download-plugin</groupId>
-        <artifactId>download-maven-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>download-flink-files</id>
-            <phase>validate</phase>
-            <goals>
-              <goal>wget</goal>
-            </goals>
-            <configuration>
-              <readTimeOut>60000</readTimeOut>
-              <retries>5</retries>
-              <url>${flink.bin.download.url}</url>
-              <unpack>true</unpack>
-              <outputDirectory>${project.build.directory}</outputDirectory>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <configuration>
-          <skip>false</skip>
-          <forkMode>always</forkMode>
-          <forkCount>1</forkCount>
-          <reuseForks>false</reuseForks>
-          <!-- set sun.zip.disableMemoryMapping=true because of
-          https://blogs.oracle.com/poonam/crashes-in-zipgetentry
-          https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8191484 -->
-          <argLine>-Xmx4096m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine>
-
-          <environmentVariables>
-            <FLINK_HOME>${project.build.directory}/flink-${flink.version}</FLINK_HOME>
-            <FLINK_CONF_DIR>${project.build.directory}/test-classes</FLINK_CONF_DIR>
-          </environmentVariables>
-        </configuration>
-      </plugin>
-
-      <!-- Eclipse Integration -->
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-eclipse-plugin</artifactId>
-        <configuration>
-          <downloadSources>true</downloadSources>
-          <projectnatures>
-            <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
-            <projectnature>org.eclipse.jdt.core.javanature</projectnature>
-          </projectnatures>
-          <buildcommands>
-            <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
-          </buildcommands>
-          <classpathContainers>
-            <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
-            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-          </classpathContainers>
-          <sourceIncludes>
-            <sourceInclude>**/*.scala</sourceInclude>
-            <sourceInclude>**/*.java</sourceInclude>
-          </sourceIncludes>
-        </configuration>
-      </plugin>
-
-      <!-- Adding scala source directories to build path -->
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>build-helper-maven-plugin</artifactId>
-        <executions>
-          <!-- Add src/main/scala to eclipse build path -->
-          <execution>
-            <id>add-source</id>
-            <phase>generate-sources</phase>
-            <goals>
-              <goal>add-source</goal>
-            </goals>
-            <configuration>
-              <sources>
-            <source>src/main/scala</source>
-              </sources>
-            </configuration>
-          </execution>
-          <!-- Add src/test/scala to eclipse build path -->
-          <execution>
-            <id>add-test-source</id>
-            <phase>generate-test-sources</phase>
-            <goals>
-              <goal>add-test-source</goal>
-            </goals>
-            <configuration>
-              <sources>
-            <source>src/test/scala</source>
-              </sources>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-
-      <plugin>
-        <artifactId>maven-enforcer-plugin</artifactId>
-      </plugin>
-      <plugin>
-        <artifactId>maven-dependency-plugin</artifactId>
-      </plugin>
-      <plugin>
-        <artifactId>maven-resources-plugin</artifactId>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-shade-plugin</artifactId>
-        <configuration>
-          <filters>
-            <filter>
-              <artifact>*:*</artifact>
-              <excludes>
-                <exclude>org/datanucleus/**</exclude>
-                <exclude>META-INF/*.SF</exclude>
-                <exclude>META-INF/*.DSA</exclude>
-                <exclude>META-INF/*.RSA</exclude>
-              </excludes>
-            </filter>
-          </filters>
-
-          <artifactSet>
-            <excludes>
-              <exclude>org.scala-lang:scala-library</exclude>
-              <exclude>org.scala-lang:scala-compiler</exclude>
-              <exclude>org.scala-lang:scala-reflect</exclude>
-              <exclude>org.apache.flink:*</exclude>
-            </excludes>
-          </artifactSet>
-
-          <transformers>
-            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
-            <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
-              <resource>reference.conf</resource>
-            </transformer>
-          </transformers>
-          <relocations>
-            <relocation>
-              <pattern>io.netty</pattern>
-              <shadedPattern>org.apache.zeppelin.shaded.io.netty</shadedPattern>
-            </relocation>
-            <relocation>
-              <pattern>com.google</pattern>
-              <shadedPattern>org.apache.zeppelin.shaded.com.google</shadedPattern>
-            </relocation>
-          </relocations>
-          <outputFile>${project.basedir}/../interpreter/${interpreter.name}/${project.artifactId}-${project.version}.jar</outputFile>
-        </configuration>
-        <executions>
-          <execution>
-            <phase>package</phase>
-            <goals>
-              <goal>shade</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-checkstyle-plugin</artifactId>
-        <configuration>
-          <skip>true</skip>
-        </configuration>
-      </plugin>
-
-    </plugins>
-  </build>
-
-  <profiles>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
 
-    <profile>
-      <id>hive2</id>
-      <activation>
-        <activeByDefault>true</activeByDefault>
-      </activation>
-      <properties>
-        <hive.version>2.3.4</hive.version>
-        <hiverunner.version>4.0.0</hiverunner.version>
-      </properties>
-    </profile>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </dependency>
 
-    <profile>
-      <id>hive1</id>
-      <properties>
-        <hive.version>1.2.1</hive.version>
-        <hiverunner.version>3.2.1</hiverunner.version>
-      </properties>
-      <dependencies>
         <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-          <version>2.7.5</version>
-          <scope>provided</scope>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
         </dependency>
-      </dependencies>
-    </profile>
 
-  </profiles>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
 
-</project>
+</project>
\ No newline at end of file
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
deleted file mode 100644
index 6720bf2..0000000
--- a/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.FunctionCatalog;
-import org.apache.flink.table.delegation.Executor;
-import org.apache.flink.table.delegation.ExecutorFactory;
-import org.apache.flink.table.delegation.Planner;
-import org.apache.flink.table.delegation.PlannerFactory;
-import org.apache.flink.table.factories.ComponentFactoryService;
-import org.apache.flink.table.module.ModuleManager;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
-import java.util.Map;
-
-/**
- * Factory class for creating flink table env for different purpose:
- * 1. java/scala
- * 2. stream table / batch table
- * 3. flink planner / blink planner
- *
- */
-public class TableEnvFactory {
-
-  private Executor executor;
-  private org.apache.flink.api.scala.ExecutionEnvironment benv;
-  private org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv;
-  private TableConfig tblConfig;
-  private CatalogManager catalogManager;
-  private ModuleManager moduleManager;
-  private FunctionCatalog flinkFunctionCatalog;
-  private FunctionCatalog blinkFunctionCatalog;
-
-  public TableEnvFactory(org.apache.flink.api.scala.ExecutionEnvironment env,
-                         org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv,
-                         TableConfig tblConfig,
-                         CatalogManager catalogManager,
-                         ModuleManager moduleManager,
-                         FunctionCatalog flinkFunctionCatalog,
-                         FunctionCatalog blinkFunctionCatalog) {
-    this.benv = env;
-    this.senv = senv;
-    this.tblConfig = tblConfig;
-    this.catalogManager = catalogManager;
-    this.moduleManager = moduleManager;
-    this.flinkFunctionCatalog = flinkFunctionCatalog;
-    this.blinkFunctionCatalog = blinkFunctionCatalog;
-  }
-
-  public org.apache.flink.table.api.scala.BatchTableEnvironment createScalaFlinkBatchTableEnvironment() {
-    try {
-      Class clazz = Class
-              .forName("org.apache.flink.table.api.scala.internal.BatchTableEnvironmentImpl");
-      Constructor constructor = clazz
-              .getConstructor(
-                      org.apache.flink.api.scala.ExecutionEnvironment.class,
-                      TableConfig.class,
-                      CatalogManager.class,
-                      ModuleManager.class);
-
-      return (org.apache.flink.table.api.scala.BatchTableEnvironment)
-              constructor.newInstance(benv, tblConfig, catalogManager, moduleManager);
-    } catch (Exception e) {
-      throw new TableException("Fail to createScalaFlinkBatchTableEnvironment", e);
-    }
-  }
-
-  public org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl
-  createScalaFlinkStreamTableEnvironment(EnvironmentSettings settings) {
-
-    Map<String, String> executorProperties = settings.toExecutorProperties();
-    Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
-
-    Map<String, String> plannerProperties = settings.toPlannerProperties();
-    Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
-            .create(
-                    plannerProperties,
-                    executor,
-                    tblConfig,
-                    flinkFunctionCatalog,
-                    catalogManager);
-
-    return new org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl(
-            catalogManager,
-            moduleManager,
-            flinkFunctionCatalog,
-            tblConfig,
-            senv,
-            planner,
-            executor,
-            settings.isStreamingMode()
-    );
-  }
-
-  public org.apache.flink.table.api.java.BatchTableEnvironment createJavaFlinkBatchTableEnvironment() {
-    try {
-      Class<?> clazz =
-              Class.forName("org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl");
-      Constructor con = clazz.getConstructor(
-              ExecutionEnvironment.class, TableConfig.class, CatalogManager.class, ModuleManager.class);
-      return (org.apache.flink.table.api.java.BatchTableEnvironment) con.newInstance(
-              benv.getJavaEnv(), tblConfig, catalogManager, moduleManager);
-    } catch (Throwable t) {
-      throw new TableException("Create BatchTableEnvironment failed.", t);
-    }
-  }
-
-  public StreamTableEnvironment createJavaFlinkStreamTableEnvironment(EnvironmentSettings settings) {
-
-    if (!settings.isStreamingMode()) {
-      throw new TableException(
-              "StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment.");
-    }
-
-    Map<String, String> executorProperties = settings.toExecutorProperties();
-    Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
-
-    Map<String, String> plannerProperties = settings.toPlannerProperties();
-    Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
-            .create(plannerProperties, executor, tblConfig, flinkFunctionCatalog, catalogManager);
-
-    return new StreamTableEnvironmentImpl(
-            catalogManager,
-            moduleManager,
-            flinkFunctionCatalog,
-            tblConfig,
-            senv.getJavaEnv(),
-            planner,
-            executor,
-            settings.isStreamingMode()
-    );
-  }
-
-  public org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl
-  createScalaBlinkStreamTableEnvironment(EnvironmentSettings settings) {
-
-    Map<String, String> executorProperties = settings.toExecutorProperties();
-    Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
-
-    Map<String, String> plannerProperties = settings.toPlannerProperties();
-    Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
-            .create(
-                    plannerProperties,
-                    executor,
-                    tblConfig,
-                    blinkFunctionCatalog,
-                    catalogManager);
-
-    return new org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl(
-            catalogManager,
-            moduleManager,
-            blinkFunctionCatalog,
-            tblConfig,
-            senv,
-            planner,
-            executor,
-            settings.isStreamingMode());
-  }
-
-  public void createPlanner(EnvironmentSettings settings) {
-    Map<String, String> executorProperties = settings.toExecutorProperties();
-    Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
-
-    Map<String, String> plannerProperties = settings.toPlannerProperties();
-    ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
-            .create(
-                    plannerProperties,
-                    executor,
-                    tblConfig,
-                    blinkFunctionCatalog,
-                    catalogManager);
-  }
-
-  public StreamTableEnvironment createJavaBlinkStreamTableEnvironment(
-          EnvironmentSettings settings) {
-
-    if (!settings.isStreamingMode()) {
-      throw new TableException(
-              "StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment.");
-    }
-
-    Map<String, String> executorProperties = settings.toExecutorProperties();
-    Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
-
-    Map<String, String> plannerProperties = settings.toPlannerProperties();
-    Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
-            .create(plannerProperties, executor, tblConfig, blinkFunctionCatalog, catalogManager);
-
-    return new StreamTableEnvironmentImpl(
-            catalogManager,
-            moduleManager,
-            blinkFunctionCatalog,
-            tblConfig,
-            senv.getJavaEnv(),
-            planner,
-            executor,
-            settings.isStreamingMode()
-    );
-  }
-
-  public TableEnvironment createJavaBlinkBatchTableEnvironment(
-          EnvironmentSettings settings) {
-    final Map<String, String> executorProperties = settings.toExecutorProperties();
-    executor = lookupExecutor(executorProperties, senv.getJavaEnv());
-    final Map<String, String> plannerProperties = settings.toPlannerProperties();
-    final Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
-            .create(plannerProperties, executor, tblConfig, blinkFunctionCatalog, catalogManager);
-
-    return new StreamTableEnvironmentImpl(
-            catalogManager,
-            moduleManager,
-            blinkFunctionCatalog,
-            tblConfig,
-            senv.getJavaEnv(),
-            planner,
-            executor,
-            settings.isStreamingMode());
-  }
-
-  private static Executor lookupExecutor(
-          Map<String, String> executorProperties,
-          StreamExecutionEnvironment executionEnvironment) {
-    try {
-      ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
-      Method createMethod = executorFactory.getClass()
-              .getMethod("create", Map.class, StreamExecutionEnvironment.class);
-
-      return (Executor) createMethod.invoke(
-              executorFactory,
-              executorProperties,
-              executionEnvironment);
-    } catch (Exception e) {
-      throw new TableException(
-              "Could not instantiate the executor. Make sure a planner module is on the classpath",
-              e);
-    }
-  }
-}