You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/06/05 23:11:49 UTC

[zeppelin] branch master updated: [ZEPPELIN-5339] Support scala 2.12 for flink

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 927e99b  [ZEPPELIN-5339] Support scala 2.12 for flink
927e99b is described below

commit 927e99b2ac80d4312796d9708271798701284c20
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Jun 3 14:05:39 2021 +0800

    [ZEPPELIN-5339] Support scala 2.12 for flink
    
    ### What is this PR for?
    
    This is to support scala 2.12 of flink interpreter. Here's the main changes:
    
    1. Copy `FlinkILoop`, `FlinkShell`, `ScalaShellEnvironment`, `ScalaShellStreamEnvironment` & `JarHelper` from flink project. The reason is that currently flink scala shell only support scala 2.11, we need to copy it here and do some hack work to support scala 2.12
    2. Introduce 2 modules: `flink-scala-2.11`  & `flink-scala-2.12` , each of them will build one interpreter jar, and FlinkInterpreterLauncher will choose the right interpreter jar based on the current scala version of flink.
    
    ### What type of PR is it?
    [ Feature ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5339
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #4128 from zjffdu/ZEPPELIN-5339 and squashes the following commits:
    
    f3ed38fc1 [Jeff Zhang] add README for flink interpreter
    2db26ca93 [Jeff Zhang] address comment
    a8a3b5bdb [Jeff Zhang] minior update
    336220ecf [Jeff Zhang] [ZEPPELIN-5339] Support scala 2.12 for flink
---
 .github/workflows/core.yml                         |   8 +-
 bin/interpreter.sh                                 |  10 +-
 flink/README.md                                    |  53 +++++
 flink/flink-scala-2.11/flink-scala-parent          |   1 +
 flink/flink-scala-2.11/pom.xml                     |  96 +++++++++
 .../org/apache/zeppelin/flink/FlinkExprTyper.scala |   0
 .../zeppelin/flink/FlinkILoopInterpreter.scala     |   0
 .../zeppelin/flink/FlinkScala211Interpreter.scala  |  54 +++++
 flink/flink-scala-2.12/flink-scala-parent          |   1 +
 flink/flink-scala-2.12/pom.xml                     |  96 +++++++++
 .../zeppelin/flink/FlinkScala212Interpreter.scala  |  53 +++++
 flink/{interpreter => flink-scala-parent}/pom.xml  | 236 ++++++++++++++-------
 .../zeppelin/flink/FlinkBatchSqlInterpreter.java   |   4 +-
 .../apache/zeppelin/flink/FlinkInterpreter.java    |  64 ++++--
 .../apache/zeppelin/flink/FlinkSqlInterrpeter.java |   8 +-
 .../zeppelin/flink/FlinkStreamSqlInterpreter.java  |   0
 .../org/apache/zeppelin/flink/HadoopUtils.java     |   0
 .../apache/zeppelin/flink/IPyFlinkInterpreter.java |   2 +-
 .../java/org/apache/zeppelin/flink/JobManager.java |   5 +-
 .../apache/zeppelin/flink/PyFlinkInterpreter.java  |   2 +-
 .../org/apache/zeppelin/flink/TableEnvFactory.java |   0
 .../flink/YarnApplicationExecutionEnvironment.java |   1 +
 .../flink/YarnApplicationStreamEnvironment.java    |   1 +
 .../apache/zeppelin/flink/internal/JarHelper.java  | 197 +++++++++++++++++
 .../flink/internal/ScalaShellEnvironment.java      |  87 ++++++++
 .../internal/ScalaShellStreamEnvironment.java      |  93 ++++++++
 .../zeppelin/flink/sql/AbstractStreamSqlJob.java   |   0
 .../zeppelin/flink/sql/AppendStreamSqlJob.java     |   0
 .../zeppelin/flink/sql/SingleRowStreamSqlJob.java  |   0
 .../zeppelin/flink/sql/UpdateStreamSqlJob.java     |   0
 .../src/main/resources/interpreter-setting.json    |   0
 .../src/main/resources/python/zeppelin_ipyflink.py |   0
 .../src/main/resources/python/zeppelin_pyflink.py  |   0
 .../zeppelin/flink/FlinkScalaInterpreter.scala     |  97 ++++-----
 .../zeppelin/flink/FlinkZeppelinContext.scala      |  11 +-
 .../zeppelin/flink/internal/FlinkILoop.scala       | 230 ++++++++++++++++++++
 .../zeppelin/flink/internal}/FlinkShell.scala      |   7 +-
 .../flink/FlinkBatchSqlInterpreterTest.java        |   2 +-
 .../zeppelin/flink/FlinkInterpreterTest.java       |   4 +-
 .../flink/FlinkStreamSqlInterpreterTest.java       |   0
 .../zeppelin/flink/IPyFlinkInterpreterTest.java    |   4 +-
 .../java/org/apache/zeppelin/flink/JavaLower.java  |   0
 .../java/org/apache/zeppelin/flink/JavaUpper.java  |   0
 .../org/apache/zeppelin/flink/JobManagerTest.java  |   0
 .../zeppelin/flink/PyFlinkInterpreterTest.java     |   0
 .../apache/zeppelin/flink/SqlInterpreterTest.java  |   0
 .../src/test/resources/flink-conf.yaml             |   0
 .../src/test/resources/init_stream.scala           |   0
 .../src/test/resources/log4j.properties            |   0
 .../src/test/resources/log4j2.properties           |  49 +++++
 .../zeppelin/flink/FlinkScalaInterpreterTest.scala |   7 +-
 .../org/apache/zeppelin/flink/FlinkVersion.java    |   2 +-
 flink/flink1.10-shims/pom.xml                      |  31 +--
 .../flink/shims110/Flink110ScalaShims.scala        |   4 +-
 flink/flink1.11-shims/pom.xml                      |  32 ++-
 flink/flink1.12-shims/pom.xml                      |  32 ++-
 flink/flink1.13-shims/pom.xml                      |  32 ++-
 .../org/apache/zeppelin/flink/FlinkILoop.scala     | 117 ----------
 flink/pom.xml                                      |   7 +-
 zeppelin-interpreter-integration/pom.xml           |   3 +
 .../zeppelin/integration/FlinkIntegrationTest.java |   7 +-
 .../integration/FlinkIntegrationTest110.java       |   7 +-
 .../integration/FlinkIntegrationTest111.java       |   7 +-
 .../integration/FlinkIntegrationTest112.java       |   7 +-
 .../integration/FlinkIntegrationTest113.java       |   7 +-
 .../integration/ZSessionIntegrationTest.java       |   2 +-
 .../integration/ZeppelinFlinkClusterTest.java      |   2 +-
 .../launcher/FlinkInterpreterLauncher.java         |  45 +++-
 .../interpreter/integration/DownloadUtils.java     |  14 +-
 69 files changed, 1423 insertions(+), 418 deletions(-)

diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
index 475d569..b8df0ac 100644
--- a/.github/workflows/core.yml
+++ b/.github/workflows/core.yml
@@ -135,7 +135,7 @@ jobs:
           R -e "IRkernel::installspec()"
       - name: install environment
         run: |
-          mvn install -DskipTests -DskipRat -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/spark-dependencies,markdown,flink-cmd,flink/interpreter,jdbc,shell -am
+          mvn install -DskipTests -DskipRat -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/spark-dependencies,markdown,flink-cmd,flink/flink-scala-2.11,flink/flink-scala-2.12,jdbc,shell -am
           mvn package -DskipRat -pl zeppelin-plugins -amd -DskipTests -B
       - name: run tests
         run: mvn test -DskipRat -pl zeppelin-interpreter-integration -Pintegration -DfailIfNoTests=false -Dtest=ZeppelinClientIntegrationTest,ZeppelinClientWithAuthIntegrationTest,ZSessionIntegrationTest
@@ -144,7 +144,7 @@ jobs:
     strategy:
       fail-fast: false
       matrix:
-        flink: [ 110, 111, 112, 113]
+        flink: [110, 111, 112, 113]
     steps:
       - name: Checkout
         uses: actions/checkout@v2
@@ -172,10 +172,10 @@ jobs:
           auto-activate-base: false
       - name: install environment
         run: |
-          mvn install -DskipTests -DskipRat -am -pl flink/interpreter,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration -B
+          mvn install -DskipTests -DskipRat -am -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration -B
           mvn clean package -pl zeppelin-plugins -amd -DskipTests -B
       - name: run tests
-        run: mvn test -DskipRat -pl flink/interpreter,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration -DfailIfNoTests=false -B -Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest${{ matrix.flink }},ZeppelinFlinkClusterTest${{ matrix.flink }}
+        run: mvn test -DskipRat -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration -DfailIfNoTests=false -B -Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest${{ matrix.flink }},ZeppelinFlinkClusterTest${{ matrix.flink }}
   run-spark-intergration-test:
     runs-on: ubuntu-18.04
     steps:
diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index d6edaf2..30384b6 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -118,12 +118,17 @@ if [[ -d "${ZEPPELIN_HOME}/zeppelin-zengine/target/test-classes" ]]; then
 fi
 
 addJarInDirForIntp "${ZEPPELIN_HOME}/zeppelin-interpreter-shaded/target"
-addJarInDirForIntp "${INTERPRETER_DIR}"
 
 HOSTNAME=$(hostname)
 ZEPPELIN_SERVER=org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer
 
 INTERPRETER_ID=$(basename "${INTERPRETER_DIR}")
+if [[ "${INTERPRETER_ID}" != "flink" ]]; then
+  # don't add interpreter jar for flink, FlinkInterpreterLauncher will choose the right interpreter jar based
+  # on scala version of current FLINK_HOME.
+  addJarInDirForIntp "${INTERPRETER_DIR}"
+fi
+
 ZEPPELIN_PID="${ZEPPELIN_PID_DIR}/zeppelin-interpreter-${INTP_GROUP_ID}-${ZEPPELIN_IDENT_STRING}-${HOSTNAME}-${PORT}.pid"
 
 if [[ "${ZEPPELIN_INTERPRETER_LAUNCHER}" == "yarn" ]]; then
@@ -247,8 +252,7 @@ elif [[ "${INTERPRETER_ID}" == "flink" ]]; then
   addEachJarInDirRecursiveForIntp "${FLINK_HOME}/lib"
 
   FLINK_PYTHON_JAR=$(find "${FLINK_HOME}/opt" -name 'flink-python_*.jar')
-  ZEPPELIN_INTP_CLASSPATH+=":${FLINK_PYTHON_JAR}"
-  FLINK_APP_JAR="$(ls "${ZEPPELIN_HOME}"/interpreter/flink/zeppelin-flink-*.jar)"
+  ZEPPELIN_INTP_CLASSPATH+=":${FLINK_PYTHON_JAR}:${FLINK_APP_JAR}"
 
   if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
     ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
diff --git a/flink/README.md b/flink/README.md
new file mode 100644
index 0000000..e8e7dd9
--- /dev/null
+++ b/flink/README.md
@@ -0,0 +1,53 @@
+# Flink Interpreter
+
+This is the doc for Zeppelin developers who want to work on flink interpreter.
+
+## Development Guide 
+
+### Project Structure
+
+Flink interpreter is more complex than other interpreter (such as jdbc, shell). Currently it has following 8 modules
+* flink-shims
+* flink1.10-shims
+* flink1.11-shims
+* flink1.12-shims
+* flink1.13-shims
+* flink-scala-parent
+* flink-scala-2.11
+* flink-scala-2.12
+
+The first 5 modules are to adapt different flink versions because there're some api changes between different versions of flink.
+`flink-shims` is parent module for other shims modules. 
+At runtime Flink interpreter will load the FlinkShims based on the current flink versions (See `FlinkShims#loadShims`). 
+ 
+The remaining 3 modules are to adapt different scala versions (Apache Flink supports 2 scala versions: 2.11 & 2.12).
+`flink-scala-parent` is a parent module for `flink-scala-2.11` and `flink-scala-2.12`. It contains common code for both `flink-scala-2.11` and `flink-scala-2.12`.
+There's symlink folder `flink-scala-parent` under `flink-scala-2.11` and `flink-scala-2.12`.
+When you run maven command to build flink interpreter, the source code in `flink-scala-parent` won't be compiled directly, instead
+they will be compiled against different scala versions when building `flink-scala-2.11` & `flink-scala-2.12`. (See `build-helper-maven-plugin` in `pom.xml`)
+Both `flink-scala-2.11` and `flink-scala-2.12` build a flink interpreter jar and `FlinkInterpreterLauncher` in `zeppelin-plugins/launcher/flink` will choose the right jar based
+on the scala version of flink.
+
+### Work in IDE
+
+Because of the complex project structure of flink interpreter, we need to do more configuration to make it work in IDE.
+Here we take Intellij as an example (other IDE should be similar). 
+
+The key point is that we can only make flink interpreter work with one scala version at the same time in IDE. 
+So we have to disable the other module when working with one specific scala version module.
+
+#### Make it work with scala-2.11
+
+1. Exclude the source code folder (java/scala) of `flink-scala-parent` (Right click these folder -> Mark directory As -> Excluded)
+2. Include the source code folder (java/scala) of `flink/flink-scala-2.11/flink-scala-parent` (Right click these folder -> Mark directory As -> Source root)
+
+#### Make it work with scala-2.12
+
+1. Exclude the source code folder (java/scala) of `flink-scala-parent` (Right click these folder -> Mark directory As -> Excluded)
+2. Include the source code folder (java/scala) of `flink/flink-scala-2.12/flink-scala-parent` (Right click these folder -> Mark directory As -> Source root)
+
+
+#### How to run unit test in IDE
+
+Take `FlinkInterpreterTest` as an example, you need to specify environment variables `FLINK_HOME`, `FLINK_CONF_DIR`, `ZEPPELIN_HOME`. 
+See `maven-surefire-plugin` in `pom.xml` for more details
diff --git a/flink/flink-scala-2.11/flink-scala-parent b/flink/flink-scala-2.11/flink-scala-parent
new file mode 120000
index 0000000..3dfa859
--- /dev/null
+++ b/flink/flink-scala-2.11/flink-scala-parent
@@ -0,0 +1 @@
+../flink-scala-parent
\ No newline at end of file
diff --git a/flink/flink-scala-2.11/pom.xml b/flink/flink-scala-2.11/pom.xml
new file mode 100644
index 0000000..cf8231a
--- /dev/null
+++ b/flink/flink-scala-2.11/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <groupId>org.apache.zeppelin</groupId>
+    <artifactId>flink-scala-parent</artifactId>
+    <version>0.10.0-SNAPSHOT</version>
+    <relativePath>../flink-scala-parent/pom.xml</relativePath>
+  </parent>
+
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.zeppelin</groupId>
+  <artifactId>flink-scala-2.11</artifactId>
+  <version>0.10.0-SNAPSHOT</version>
+  <packaging>jar</packaging>
+  <name>Zeppelin: Flink Interpreter Scala_2.11</name>
+
+  <properties>
+    <flink.scala.version>2.11.12</flink.scala.version>
+    <flink.scala.binary.version>2.11</flink.scala.binary.version>
+    <flink.scala.compile.version>${flink.scala.version}</flink.scala.compile.version>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>com.googlecode.maven-download-plugin</groupId>
+        <artifactId>download-maven-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-resources-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala b/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala
similarity index 100%
rename from flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala
rename to flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala b/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala
similarity index 100%
rename from flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala
rename to flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala
diff --git a/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkScala211Interpreter.scala b/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkScala211Interpreter.scala
new file mode 100644
index 0000000..66b74bd
--- /dev/null
+++ b/flink/flink-scala-2.11/src/main/scala/org/apache/zeppelin/flink/FlinkScala211Interpreter.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink
+
+import java.io.File
+import java.net.URLClassLoader
+import java.util.Properties
+
+import org.apache.zeppelin.interpreter.InterpreterContext
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
+
+import scala.tools.nsc.Settings
+import scala.tools.nsc.interpreter.{IMain, JPrintWriter}
+
+
+class FlinkScala211Interpreter(override val properties: Properties,
+                               override val flinkScalaClassLoader: URLClassLoader)
+  extends FlinkScalaInterpreter(properties, flinkScalaClassLoader) {
+
+  override def completion(buf: String,
+                          cursor: Int,
+                          context: InterpreterContext): java.util.List[InterpreterCompletion] = {
+    val completions = scalaCompletion.completer().complete(buf.substring(0, cursor), cursor).candidates
+      .map(e => new InterpreterCompletion(e, e, null))
+    scala.collection.JavaConversions.seqAsJavaList(completions)
+  }
+
+  override def createIMain(settings: Settings, out: JPrintWriter): IMain = new FlinkILoopInterpreter(settings, out)
+
+  override def createSettings(): Settings = {
+    val settings = new Settings()
+    // Don't call settings#embeddedDefaults for scala-2.11, otherwise it could cause weird error
+    settings.usejavacp.value = true
+    settings.Yreplsync.value = true
+    settings.classpath.value = userJars.mkString(File.pathSeparator)
+    settings
+  }
+}
diff --git a/flink/flink-scala-2.12/flink-scala-parent b/flink/flink-scala-2.12/flink-scala-parent
new file mode 120000
index 0000000..3dfa859
--- /dev/null
+++ b/flink/flink-scala-2.12/flink-scala-parent
@@ -0,0 +1 @@
+../flink-scala-parent
\ No newline at end of file
diff --git a/flink/flink-scala-2.12/pom.xml b/flink/flink-scala-2.12/pom.xml
new file mode 100644
index 0000000..4e01ec4
--- /dev/null
+++ b/flink/flink-scala-2.12/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <groupId>org.apache.zeppelin</groupId>
+    <artifactId>flink-scala-parent</artifactId>
+    <version>0.10.0-SNAPSHOT</version>
+    <relativePath>../flink-scala-parent/pom.xml</relativePath>
+  </parent>
+
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.zeppelin</groupId>
+  <artifactId>flink-scala-2.12</artifactId>
+  <version>0.10.0-SNAPSHOT</version>
+  <packaging>jar</packaging>
+  <name>Zeppelin: Flink Interpreter Scala_2.12</name>
+
+  <properties>
+    <flink.scala.version>2.12.7</flink.scala.version>
+    <flink.scala.binary.version>2.12</flink.scala.binary.version>
+    <flink.scala.compile.version>${flink.scala.version}</flink.scala.compile.version>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>com.googlecode.maven-download-plugin</groupId>
+        <artifactId>download-maven-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-resources-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScala212Interpreter.scala b/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScala212Interpreter.scala
new file mode 100644
index 0000000..f1bfcfe
--- /dev/null
+++ b/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScala212Interpreter.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink
+
+import java.io.File
+import java.net.URLClassLoader
+import java.util.Properties
+
+import org.apache.zeppelin.interpreter.InterpreterContext
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
+
+import scala.tools.nsc.Settings
+import scala.tools.nsc.interpreter.{IMain, JPrintWriter}
+
+class FlinkScala212Interpreter(override val properties: Properties,
+                               override val flinkScalaClassLoader: URLClassLoader)
+  extends FlinkScalaInterpreter(properties, flinkScalaClassLoader) {
+
+  override def completion(buf: String,
+                          cursor: Int,
+                          context: InterpreterContext): java.util.List[InterpreterCompletion] = {
+    val completions = scalaCompletion.complete(buf.substring(0, cursor), cursor).candidates
+      .map(e => new InterpreterCompletion(e, e, null))
+    scala.collection.JavaConversions.seqAsJavaList(completions)
+  }
+
+  override def createIMain(settings: Settings, out: JPrintWriter): IMain = new IMain(settings, out)
+
+  override def createSettings(): Settings = {
+    val settings = new Settings()
+    settings.embeddedDefaults(flinkScalaClassLoader)
+    settings.usejavacp.value = true
+    settings.Yreplsync.value = true
+    settings.classpath.value = userJars.mkString(File.pathSeparator)
+    settings
+  }
+}
diff --git a/flink/interpreter/pom.xml b/flink/flink-scala-parent/pom.xml
similarity index 83%
rename from flink/interpreter/pom.xml
rename to flink/flink-scala-parent/pom.xml
index 642cb59..abb29c1 100644
--- a/flink/interpreter/pom.xml
+++ b/flink/flink-scala-parent/pom.xml
@@ -28,11 +28,11 @@
   </parent>
 
   <groupId>org.apache.zeppelin</groupId>
-  <artifactId>zeppelin-flink</artifactId>
-  <packaging>jar</packaging>
+  <artifactId>flink-scala-parent</artifactId>
+  <packaging>pom</packaging>
   <version>0.10.0-SNAPSHOT</version>
-  <name>Zeppelin: Flink</name>
-  <description>Zeppelin Flink Interpreter</description>
+  <name>Zeppelin: Flink Scala Parent</name>
+  <description>Zeppelin Flink Scala Parent</description>
 
   <properties>
     <!--library versions-->
@@ -43,20 +43,9 @@
     <hiverunner.version>4.0.0</hiverunner.version>
     <grpc.version>1.15.0</grpc.version>
 
-    <scala.macros.version>2.0.1</scala.macros.version>
-    <scala.binary.version>2.11</scala.binary.version>
-    <scala.version>2.11.12</scala.version>
-
-    <flink.bin.download.url>https://archive.apache.org/dist/flink/flink-${flink.version}/flink-${flink.version}-bin-scala_${scala.binary.version}.tgz</flink.bin.download.url>
+    <flink.bin.download.url>https://archive.apache.org/dist/flink/flink-${flink.version}/flink-${flink.version}-bin-scala_${flink.scala.binary.version}.tgz</flink.bin.download.url>
   </properties>
 
-  <repositories>
-    <repository>
-      <id>conjars repo</id>
-      <name>conjars repo</name>
-      <url>https://conjars.org/repo/</url>
-    </repository>
-  </repositories>
   
   <dependencies>
 
@@ -156,7 +145,7 @@
 
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-python_${scala.binary.version}</artifactId>
+      <artifactId>flink-python_${flink.scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <scope>provided</scope>
     </dependency>
@@ -170,21 +159,21 @@
 
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-clients_${scala.binary.version}</artifactId>
+      <artifactId>flink-clients_${flink.scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+      <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-yarn_${scala.binary.version}</artifactId>
+      <artifactId>flink-yarn_${flink.scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <scope>provided</scope>
       <exclusions>
@@ -209,41 +198,35 @@
 
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
+      <artifactId>flink-table-api-scala_${flink.scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+      <artifactId>flink-table-api-scala-bridge_${flink.scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-scala_${scala.binary.version}</artifactId>
+      <artifactId>flink-scala_${flink.scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-scala-shell_${scala.binary.version}</artifactId>
-      <version>${flink.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-streaming-java_2.11</artifactId>
+      <artifactId>flink-streaming-java_${flink.scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-streaming-scala_2.11</artifactId>
+      <artifactId>flink-streaming-scala_${flink.scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <scope>provided</scope>
     </dependency>
@@ -257,14 +240,14 @@
 
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-connector-hive_2.11</artifactId>
+      <artifactId>flink-connector-hive_${flink.scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-connector-hive_2.11</artifactId>
+      <artifactId>flink-connector-hive_${flink.scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <classifier>tests</classifier>
       <scope>test</scope>
@@ -273,7 +256,7 @@
     <!-- hadoop compatibility dependency -->
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
+      <artifactId>flink-hadoop-compatibility_${flink.scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <scope>provided</scope>
     </dependency>
@@ -300,19 +283,22 @@
     <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-library</artifactId>
-      <version>${scala.version}</version>
+      <version>${flink.scala.version}</version>
+      <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-compiler</artifactId>
-      <version>${scala.version}</version>
+      <version>${flink.scala.version}</version>
+      <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-reflect</artifactId>
-      <version>${scala.version}</version>
+      <version>${flink.scala.version}</version>
+      <scope>provided</scope>
     </dependency>
 
     <dependency>
@@ -323,7 +309,7 @@
 
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
+      <artifactId>flink-connector-hive_${flink.scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <scope>provided</scope>
       <exclusions>
@@ -339,7 +325,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
+      <artifactId>flink-connector-hive_${flink.scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <classifier>tests</classifier>
       <scope>test</scope>
@@ -357,7 +343,7 @@
 
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-table-planner-blink_2.11</artifactId>
+      <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <scope>provided</scope>
       <exclusions>
@@ -370,7 +356,7 @@
 
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-table-planner_2.11</artifactId>
+      <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <scope>provided</scope>
     </dependency>
@@ -454,6 +440,10 @@
           <groupId>com.google.protobuf</groupId>
           <artifactId>protobuf-java</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.apache.logging.log4j</groupId>
+          <artifactId>log4j-slf4j-impl</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
 
@@ -604,6 +594,10 @@
           <artifactId>netty-all</artifactId>
         </exclusion>
         <exclusion>
+          <groupId>org.apache.logging.log4j</groupId>
+          <artifactId>log4j-slf4j-impl</artifactId>
+        </exclusion>
+        <exclusion>
           <groupId>org.pentaho</groupId>
           <artifactId>pentaho-aggdesigner-algorithm</artifactId>
         </exclusion>
@@ -636,15 +630,110 @@
 
     <dependency>
       <groupId>org.scalatest</groupId>
-      <artifactId>scalatest_2.11</artifactId>
+      <artifactId>scalatest_${flink.scala.binary.version}</artifactId>
       <version>3.0.8</version>
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
-    <plugins>
+    <pluginManagement>
+      <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>add-java-sources</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>${project.basedir}/../flink-scala-parent/src/main/java</source>
+              </sources>
+            </configuration>
+          </execution>
+          <execution>
+            <id>add-scala-sources</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>${project.basedir}/../flink-scala-parent/src/main/scala</source>
+              </sources>
+            </configuration>
+          </execution>
+          <execution>
+            <id>add-java-test-sources</id>
+            <phase>generate-test-sources</phase>
+            <goals>
+              <goal>add-test-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>${project.basedir}/../flink-scala-parent/src/test/java</source>
+              </sources>
+            </configuration>
+          </execution>
+          <execution>
+            <id>add-scala-test-sources</id>
+            <phase>generate-test-sources</phase>
+            <goals>
+              <goal>add-test-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>${project.basedir}/../flink-scala-parent/src/test/scala</source>
+              </sources>
+            </configuration>
+          </execution>
+          <execution>
+            <id>add-resource</id>
+            <phase>generate-resources</phase>
+            <goals>
+              <goal>add-resource</goal>
+            </goals>
+            <configuration>
+              <resources>
+                <resource>
+                  <directory>${project.basedir}/../flink-scala-parent/src/main/resources</directory>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+          <execution>
+            <id>add-test-resource</id>
+            <phase>generate-test-resources</phase>
+            <goals>
+              <goal>add-test-resource</goal>
+            </goals>
+            <configuration>
+              <resources>
+                <resource>
+                  <directory>${project.basedir}/../flink-scala-parent/src/test/resources</directory>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
       <plugin>
         <groupId>net.alchim31.maven</groupId>
         <artifactId>scala-maven-plugin</artifactId>
@@ -671,14 +760,11 @@
           </execution>
         </executions>
         <configuration>
-          <scalaVersion>${scala.version}</scalaVersion>
-          <!--<recompileMode>incremental</recompileMode>-->
-          <!--<useZincServer>true</useZincServer>-->
+          <scalaVersion>${flink.scala.version}</scalaVersion>
           <args>
             <arg>-unchecked</arg>
             <arg>-deprecation</arg>
             <arg>-feature</arg>
-            <arg>-target:jvm-1.8</arg>
           </args>
           <jvmArgs>
             <jvmArg>-Xms1024m</jvmArg>
@@ -731,6 +817,7 @@
 <!--          <argLine>-Xmx4096m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=6006</argLine>-->
 
           <environmentVariables>
+            <ZEPPELIN_HOME>${basedir}/../../</ZEPPELIN_HOME>
             <FLINK_HOME>${project.build.directory}/flink-${flink.version}</FLINK_HOME>
             <FLINK_CONF_DIR>${project.build.directory}/test-classes</FLINK_CONF_DIR>
           </environmentVariables>
@@ -807,6 +894,31 @@
       <plugin>
         <artifactId>maven-dependency-plugin</artifactId>
       </plugin>
+
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-interpreter-setting</id>
+            <phase>package</phase>
+            <goals>
+              <goal>resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/../../../interpreter/${interpreter.name}</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-shade-plugin</artifactId>
@@ -848,7 +960,7 @@
               <shadedPattern>org.apache.zeppelin.shaded.com.google</shadedPattern>
             </relocation>
           </relocations>
-          <outputFile>${project.basedir}/../../interpreter/${interpreter.name}/${project.artifactId}-${project.version}.jar</outputFile>
+          <outputFile>${project.basedir}/../../interpreter/${interpreter.name}/zeppelin-flink-${project.version}-${flink.scala.binary.version}.jar</outputFile>
         </configuration>
         <executions>
           <execution>
@@ -859,32 +971,8 @@
           </execution>
         </executions>
       </plugin>
-
-      <plugin>
-        <artifactId>maven-resources-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>copy-interpreter-setting</id>
-            <phase>package</phase>
-            <goals>
-              <goal>resources</goal>
-            </goals>
-            <configuration>
-              <outputDirectory>${project.build.directory}/../../../interpreter/${interpreter.name}</outputDirectory>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-checkstyle-plugin</artifactId>
-        <configuration>
-          <skip>true</skip>
-        </configuration>
-      </plugin>
-
-    </plugins>
+      </plugins>
+    </pluginManagement>
   </build>
 
   <profiles>
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
similarity index 96%
rename from flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
rename to flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
index f48dcb5..90fae60 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
@@ -20,6 +20,7 @@ package org.apache.zeppelin.flink;
 import org.apache.flink.table.api.Table;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.ZeppelinContext;
 import org.apache.zeppelin.scheduler.Scheduler;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
 
@@ -28,7 +29,7 @@ import java.util.Properties;
 
 public class FlinkBatchSqlInterpreter extends FlinkSqlInterrpeter {
 
-  private FlinkZeppelinContext z;
+  private ZeppelinContext z;
 
   public FlinkBatchSqlInterpreter(Properties properties) {
     super(properties);
@@ -56,7 +57,6 @@ public class FlinkBatchSqlInterpreter extends FlinkSqlInterrpeter {
   @Override
   public void callInnerSelect(String sql, InterpreterContext context) throws IOException {
     Table table = this.tbenv.sqlQuery(sql);
-    z.setCurrentSql(sql);
     String result = z.showData(table);
     context.out.write(result);
   }
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
similarity index 75%
rename from flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
rename to flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index d79fb2e..d629ba4 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -17,6 +17,7 @@
 
 package org.apache.zeppelin.flink;
 
+
 import org.apache.flink.api.scala.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
@@ -25,45 +26,76 @@ import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.ZeppelinContext;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.URLClassLoader;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 /**
- * Interpreter for flink scala. It delegates all the function to FlinkScalaInterpreter.
+ * Interpreter for flink scala. It delegates all the function to FlinkScalaInterpreter
+ * which is implemented by flink scala shell underneath.
  */
 public class FlinkInterpreter extends Interpreter {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(FlinkInterpreter.class);
 
+  private Map<String, String> innerInterpreterClassMap = new HashMap<>();
   private FlinkScalaInterpreter innerIntp;
-  private FlinkZeppelinContext z;
+  private ZeppelinContext z;
 
   public FlinkInterpreter(Properties properties) {
     super(properties);
+    innerInterpreterClassMap.put("2.11", "org.apache.zeppelin.flink.FlinkScala211Interpreter");
+    innerInterpreterClassMap.put("2.12", "org.apache.zeppelin.flink.FlinkScala212Interpreter");
   }
 
-  private void checkScalaVersion() throws InterpreterException {
+  private String extractScalaVersion() throws InterpreterException {
     String scalaVersionString = scala.util.Properties.versionString();
     LOGGER.info("Using Scala: " + scalaVersionString);
     if (scalaVersionString.contains("version 2.11")) {
-      return;
+      return "2.11";
+    } else if (scalaVersionString.contains("version 2.12")) {
+      return "2.12";
     } else {
       throw new InterpreterException("Unsupported scala version: " + scalaVersionString +
-              ", Only scala 2.11 is supported");
+              ", Only scala 2.11/2.12 is supported");
     }
   }
 
   @Override
   public void open() throws InterpreterException {
-    checkScalaVersion();
-    
-    this.innerIntp = new FlinkScalaInterpreter(getProperties());
-    this.innerIntp.open();
-    this.z = this.innerIntp.getZeppelinContext();
+    try {
+      this.innerIntp = loadFlinkScalaInterpreter();
+      this.innerIntp.open();
+      this.z = this.innerIntp.getZeppelinContext();
+    } catch (InterpreterException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new InterpreterException("Fail to open FlinkInterpreter", e);
+    }
+  }
+
+  /**
+   * Load FlinkScalaInterpreter based on the runtime scala version.
+   *
+   * @return FlinkScalaInterpreter
+   * @throws Exception
+   */
+  private FlinkScalaInterpreter loadFlinkScalaInterpreter() throws Exception {
+    String scalaVersion = extractScalaVersion();
+    ClassLoader flinkScalaClassLoader = FlinkScalaInterpreter.class.getClassLoader();
+    String innerIntpClassName = innerInterpreterClassMap.get(scalaVersion);
+    Class clazz = Class.forName(innerIntpClassName);
+
+    return (FlinkScalaInterpreter)
+            clazz.getConstructor(Properties.class, URLClassLoader.class)
+                    .newInstance(getProperties(), flinkScalaClassLoader);
   }
 
   @Override
@@ -88,7 +120,7 @@ public class FlinkInterpreter extends Interpreter {
       Thread.currentThread().setContextClassLoader(getFlinkScalaShellLoader());
       createPlannerAgain();
       setParallelismIfNecessary(context);
-      setSavepointIfNecessary(context);
+      setSavepointPathIfNecessary(context);
       return innerIntp.interpret(st, context);
     } finally {
       Thread.currentThread().setContextClassLoader(originClassLoader);
@@ -165,23 +197,19 @@ public class FlinkInterpreter extends Interpreter {
     return innerIntp.getFlinkScalaShellLoader();
   }
 
-  FlinkZeppelinContext getZeppelinContext() {
+  ZeppelinContext getZeppelinContext() {
     return this.z;
   }
 
   Configuration getFlinkConfiguration() {
-    return this.innerIntp.getConfiguration();
-  }
-
-  public FlinkScalaInterpreter getInnerIntp() {
-    return this.innerIntp;
+    return this.innerIntp.getFlinkConfiguration();
   }
 
   public FlinkShims getFlinkShims() {
     return this.innerIntp.getFlinkShims();
   }
 
-  public void setSavepointIfNecessary(InterpreterContext context) {
+  public void setSavepointPathIfNecessary(InterpreterContext context) {
     this.innerIntp.setSavepointPathIfNecessary(context);
   }
 
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
similarity index 98%
rename from flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
rename to flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index 16f2187..b4ce193 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -24,16 +24,15 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.JobListener;
-import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.zeppelin.flink.sql.SqlCommandParser;
 import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
 import org.apache.zeppelin.interpreter.AbstractInterpreter;
-import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
@@ -74,7 +73,7 @@ public abstract class FlinkSqlInterrpeter extends AbstractInterpreter {
 
   @Override
   public void open() throws InterpreterException {
-    sqlCommandParser = new SqlCommandParser(flinkInterpreter.getFlinkShims(), tbenv);
+    this.sqlCommandParser = new SqlCommandParser(flinkInterpreter.getFlinkShims(), tbenv);
     this.sqlSplitter = new SqlSplitter();
     JobListener jobListener = new JobListener() {
       @Override
@@ -107,7 +106,7 @@ public abstract class FlinkSqlInterrpeter extends AbstractInterpreter {
       Thread.currentThread().setContextClassLoader(flinkInterpreter.getFlinkScalaShellLoader());
       flinkInterpreter.createPlannerAgain();
       flinkInterpreter.setParallelismIfNecessary(context);
-      flinkInterpreter.setSavepointIfNecessary(context);
+      flinkInterpreter.setSavepointPathIfNecessary(context);
       return runSqlList(st, context);
     } finally {
       Thread.currentThread().setContextClassLoader(originClassLoader);
@@ -508,6 +507,7 @@ public abstract class FlinkSqlInterrpeter extends AbstractInterpreter {
       throw new IOException(key + " is not a valid table/sql config, please check link: " +
               "https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html");
     }
+    LOGGER.info("Set table config: {}={}", key, value);
     this.tbenv.getConfig().getConfiguration().setString(key, value);
   }
 
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
similarity index 100%
rename from flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
rename to flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java
similarity index 100%
rename from flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java
rename to flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
similarity index 98%
rename from flink/interpreter/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
rename to flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
index 9271cd5..763795f 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
@@ -89,7 +89,7 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
         throw new InterpreterException("Fail to initJavaThread: " +
                 result.toString());
       }
-      flinkInterpreter.setSavepointIfNecessary(context);
+      flinkInterpreter.setSavepointPathIfNecessary(context);
       flinkInterpreter.setParallelismIfNecessary(context);
       return super.internalInterpret(st, context);
     } finally {
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
similarity index 98%
rename from flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
rename to flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
index 50bfcc6..9614ee9 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -49,16 +49,13 @@ public class JobManager {
   private Map<String, JobClient> jobs = new HashMap<>();
   private ConcurrentHashMap<JobID, FlinkJobProgressPoller> jobProgressPollerMap =
           new ConcurrentHashMap<>();
-  private FlinkZeppelinContext z;
   private String flinkWebUrl;
   private String displayedFlinkWebUrl;
   private Properties properties;
 
-  public JobManager(FlinkZeppelinContext z,
-                    String flinkWebUrl,
+  public JobManager(String flinkWebUrl,
                     String displayedFlinkWebUrl,
                     Properties properties) {
-    this.z = z;
     this.flinkWebUrl = flinkWebUrl;
     this.displayedFlinkWebUrl = displayedFlinkWebUrl;
     this.properties = properties;
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
similarity index 99%
rename from flink/interpreter/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
rename to flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
index cc9ec7f..d27f0fa 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
@@ -115,7 +115,7 @@ public class PyFlinkInterpreter extends PythonInterpreter {
                     result.toString());
           }
         }
-        flinkInterpreter.setSavepointIfNecessary(context);
+        flinkInterpreter.setSavepointPathIfNecessary(context);
         flinkInterpreter.setParallelismIfNecessary(context);
       }
       return super.interpret(st, context);
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
similarity index 100%
rename from flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
rename to flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/YarnApplicationExecutionEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationExecutionEnvironment.java
similarity index 98%
rename from flink/interpreter/src/main/java/org/apache/zeppelin/flink/YarnApplicationExecutionEnvironment.java
rename to flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationExecutionEnvironment.java
index 2cdfa9e..2cfd0e3 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/YarnApplicationExecutionEnvironment.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationExecutionEnvironment.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
+import org.apache.zeppelin.flink.internal.FlinkILoop;
 
 import java.io.File;
 import java.net.MalformedURLException;
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java
similarity index 98%
rename from flink/interpreter/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java
rename to flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java
index f7f3dcb..ca9089f 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.zeppelin.flink.internal.FlinkILoop;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/JarHelper.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/JarHelper.java
new file mode 100644
index 0000000..07b5033
--- /dev/null
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/JarHelper.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.internal;
+
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.jar.JarEntry;
+import java.util.jar.JarInputStream;
+import java.util.jar.JarOutputStream;
+
+/**
+ * This class is copied from flink project, the reason is that flink scala shell only supports
+ * scala-2.11, we copied it here to support scala-2.12 as well.
+ */
+public class JarHelper {
+  // ========================================================================
+  // Constants
+
+  private static final int BUFFER_SIZE = 2156;
+
+  // ========================================================================
+  // Variables
+
+  private byte[] mBuffer = new byte[BUFFER_SIZE];
+
+  private int mByteCount = 0;
+
+  private boolean mVerbose = false;
+
+  private String mDestJarName = "";
+
+  // ========================================================================
+  // Constructor
+
+  /**
+   * Instantiates a new JarHelper.
+   */
+  public JarHelper() {
+  }
+
+  // ========================================================================
+  // Public methods
+
+  /**
+   * Jars a given directory or single file into a JarOutputStream.
+   */
+  public void jarDir(File dirOrFile2Jar, File destJar) throws IOException {
+
+    if (dirOrFile2Jar == null || destJar == null) {
+      throw new IllegalArgumentException();
+    }
+
+    mDestJarName = destJar.getCanonicalPath();
+    FileOutputStream fout = new FileOutputStream(destJar);
+    JarOutputStream jout = new JarOutputStream(fout);
+    // jout.setLevel(0);
+    try {
+      jarDir(dirOrFile2Jar, jout, null);
+    } catch (IOException ioe) {
+      throw ioe;
+    } finally {
+      jout.close();
+      fout.close();
+    }
+  }
+
+  /**
+   * Unjars a given jar file into a given directory.
+   */
+  public void unjarDir(File jarFile, File destDir) throws IOException {
+    BufferedOutputStream dest = null;
+    FileInputStream fis = new FileInputStream(jarFile);
+    unjar(fis, destDir);
+  }
+
+  /**
+   * Given an InputStream on a jar file, unjars the contents into the given directory.
+   */
+  public void unjar(InputStream in, File destDir) throws IOException {
+    BufferedOutputStream dest = null;
+    JarInputStream jis = new JarInputStream(in);
+    JarEntry entry;
+    while ((entry = jis.getNextJarEntry()) != null) {
+      if (entry.isDirectory()) {
+        File dir = new File(destDir, entry.getName());
+        dir.mkdir();
+        if (entry.getTime() != -1) {
+          dir.setLastModified(entry.getTime());
+        }
+        continue;
+      }
+      int count;
+      byte[] data = new byte[BUFFER_SIZE];
+      File destFile = new File(destDir, entry.getName());
+      if (mVerbose) {
+        System.out.println("unjarring " + destFile + " from " + entry.getName());
+      }
+      FileOutputStream fos = new FileOutputStream(destFile);
+      dest = new BufferedOutputStream(fos, BUFFER_SIZE);
+      try {
+        while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) {
+          dest.write(data, 0, count);
+        }
+        dest.flush();
+      } finally {
+        dest.close();
+      }
+      if (entry.getTime() != -1) {
+        destFile.setLastModified(entry.getTime());
+      }
+    }
+    jis.close();
+  }
+
+  public void setVerbose(boolean b) {
+    mVerbose = b;
+  }
+
+  // ========================================================================
+  // Private methods
+
+  private static final char SEP = '/';
+
+  /**
+   * Recursively jars up the given path under the given directory.
+   */
+  private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path) throws IOException {
+    if (mVerbose) {
+      System.out.println("checking " + dirOrFile2jar);
+    }
+    if (dirOrFile2jar.isDirectory()) {
+      String[] dirList = dirOrFile2jar.list();
+      String subPath = (path == null) ? "" : (path + dirOrFile2jar.getName() + SEP);
+      if (path != null) {
+        JarEntry je = new JarEntry(subPath);
+        je.setTime(dirOrFile2jar.lastModified());
+        jos.putNextEntry(je);
+        jos.flush();
+        jos.closeEntry();
+      }
+      for (int i = 0; i < dirList.length; i++) {
+        File f = new File(dirOrFile2jar, dirList[i]);
+        jarDir(f, jos, subPath);
+      }
+    } else if (dirOrFile2jar.exists()) {
+      if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName)) {
+        if (mVerbose) {
+          System.out.println("skipping " + dirOrFile2jar.getPath());
+        }
+        return;
+      }
+
+      if (mVerbose) {
+        System.out.println("adding " + dirOrFile2jar.getPath());
+      }
+      FileInputStream fis = new FileInputStream(dirOrFile2jar);
+      try {
+        JarEntry entry = new JarEntry(path + dirOrFile2jar.getName());
+        entry.setTime(dirOrFile2jar.lastModified());
+        jos.putNextEntry(entry);
+        while ((mByteCount = fis.read(mBuffer)) != -1) {
+          jos.write(mBuffer, 0, mByteCount);
+          if (mVerbose) {
+            System.out.println("wrote " + mByteCount + " bytes");
+          }
+        }
+        jos.flush();
+        jos.closeEntry();
+      } catch (IOException ioe) {
+        throw ioe;
+      } finally {
+        fis.close();
+      }
+    }
+  }
+}
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellEnvironment.java
new file mode 100644
index 0000000..ebd9f1d
--- /dev/null
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellEnvironment.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *	 http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.internal;
+
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.util.JarUtils;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This class is copied from flink project, the reason is that flink scala shell only supports
+ * scala-2.11, we copied it here to support scala-2.12 as well.
+ */
+public class ScalaShellEnvironment extends ExecutionEnvironment {
+
+  /**
+   * The jar files that need to be attached to each job.
+   */
+  private final List<URL> jarFiles;
+
+  /**
+   * reference to Scala Shell, for access to virtual directory.
+   */
+  private final FlinkILoop flinkILoop;
+
+  public ScalaShellEnvironment(
+          final Configuration configuration,
+          final FlinkILoop flinkILoop,
+          final String... jarFiles) {
+    super(configuration);
+    this.flinkILoop = checkNotNull(flinkILoop);
+    this.jarFiles = checkNotNull(JarUtils.getJarFiles(jarFiles));
+  }
+
+  @Override
+  public JobClient executeAsync(String jobName) throws Exception {
+    updateDependencies();
+    return super.executeAsync(jobName);
+  }
+
+  private void updateDependencies() throws Exception {
+    final Configuration configuration = getConfiguration();
+    final List<URL> updatedJarFiles = getUpdatedJarFiles();
+    ConfigUtils.encodeCollectionToConfig(
+            configuration, PipelineOptions.JARS, updatedJarFiles, URL::toString);
+  }
+
+  private List<URL> getUpdatedJarFiles() throws MalformedURLException {
+    final URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
+    final List<URL> allJarFiles = new ArrayList<>(jarFiles);
+    allJarFiles.add(jarUrl);
+    return allJarFiles;
+  }
+
+  public static void resetContextEnvironments() {
+    ExecutionEnvironment.resetContextEnvironment();
+  }
+}
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java
new file mode 100644
index 0000000..448c34b
--- /dev/null
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *	 http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.internal;
+
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.util.JarUtils;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This class is copied from flink project, the reason is that flink scala shell only supports
+ * scala-2.11, we copied it here to support scala-2.12 as well.
+ */
+public class ScalaShellStreamEnvironment extends StreamExecutionEnvironment {
+
+  /**
+   * The jar files that need to be attached to each job.
+   */
+  private final List<URL> jarFiles;
+
+  /**
+   * reference to Scala Shell, for access to virtual directory.
+   */
+  private final FlinkILoop flinkILoop;
+
+  public ScalaShellStreamEnvironment(
+          final Configuration configuration,
+          final FlinkILoop flinkILoop,
+          final String... jarFiles) {
+    super(configuration);
+    this.flinkILoop = checkNotNull(flinkILoop);
+    this.jarFiles = checkNotNull(JarUtils.getJarFiles(jarFiles));
+  }
+
+  @Override
+  public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
+    updateDependencies();
+    return super.executeAsync(streamGraph);
+  }
+
+  private void updateDependencies() throws Exception {
+    final Configuration configuration = getConfiguration();
+    final List<URL> updatedJarFiles = getUpdatedJarFiles();
+    ConfigUtils.encodeCollectionToConfig(
+            configuration, PipelineOptions.JARS, updatedJarFiles, URL::toString);
+  }
+
+  public Configuration getClientConfiguration() {
+    return getConfiguration();
+  }
+
+  private List<URL> getUpdatedJarFiles() throws MalformedURLException {
+    final URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
+    final List<URL> allJarFiles = new ArrayList<>(jarFiles);
+    allJarFiles.add(jarUrl);
+    return allJarFiles;
+  }
+
+  public static void resetContextEnvironments() {
+    StreamExecutionEnvironment.resetContextEnvironment();
+  }
+}
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
similarity index 100%
rename from flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
rename to flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
similarity index 100%
rename from flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
rename to flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
similarity index 100%
rename from flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
rename to flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java
similarity index 100%
rename from flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java
rename to flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java
diff --git a/flink/interpreter/src/main/resources/interpreter-setting.json b/flink/flink-scala-parent/src/main/resources/interpreter-setting.json
similarity index 100%
rename from flink/interpreter/src/main/resources/interpreter-setting.json
rename to flink/flink-scala-parent/src/main/resources/interpreter-setting.json
diff --git a/flink/interpreter/src/main/resources/python/zeppelin_ipyflink.py b/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py
similarity index 100%
rename from flink/interpreter/src/main/resources/python/zeppelin_ipyflink.py
rename to flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py
diff --git a/flink/interpreter/src/main/resources/python/zeppelin_pyflink.py b/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py
similarity index 100%
rename from flink/interpreter/src/main/resources/python/zeppelin_pyflink.py
rename to flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
similarity index 94%
rename from flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
rename to flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 459ff7b..e0e97ba 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -18,7 +18,7 @@
 
 package org.apache.zeppelin.flink
 
-import java.io.{BufferedReader, File}
+import java.io.File
 import java.net.{URL, URLClassLoader}
 import java.nio.file.Files
 import java.util.Properties
@@ -40,14 +40,14 @@ import org.apache.flink.runtime.util.EnvironmentInformation
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, TableEnvironment}
-import org.apache.flink.table.catalog.CatalogManager
 import org.apache.flink.table.catalog.hive.HiveCatalog
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableAggregateFunction, TableFunction}
-import org.apache.flink.table.module.ModuleManager
 import org.apache.flink.table.module.hive.HiveModule
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli
 import org.apache.zeppelin.dep.DependencyResolver
-import org.apache.zeppelin.flink.FlinkShell._
+import org.apache.zeppelin.flink.internal.FlinkShell._
+import org.apache.zeppelin.flink.internal.FlinkILoop
+import org.apache.zeppelin.interpreter.Interpreter.FormType
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
 import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
 import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterException, InterpreterHookRegistry, InterpreterResult}
@@ -56,22 +56,22 @@ import org.slf4j.{Logger, LoggerFactory}
 import scala.collection.JavaConversions
 import scala.collection.JavaConverters._
 import scala.tools.nsc.Settings
-import scala.tools.nsc.interpreter.Completion.ScalaCompleter
-import scala.tools.nsc.interpreter.{JPrintWriter, SimpleReader}
+import scala.tools.nsc.interpreter.{Completion, IMain, IR, JPrintWriter, Results, SimpleReader}
 
 /**
  * It instantiate flink scala shell and create env, senv, btenv, stenv.
  *
  * @param properties
  */
-class FlinkScalaInterpreter(val properties: Properties) {
+abstract class FlinkScalaInterpreter(val properties: Properties,
+                                     val flinkScalaClassLoader: URLClassLoader) {
 
   private lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
 
-  private var flinkILoop: FlinkILoop = _
+  protected var flinkILoop: FlinkILoop = _
   private var cluster: Option[ClusterClient[_]] = _
 
-  private var scalaCompleter: ScalaCompleter = _
+  protected var scalaCompletion: Completion = _
   private val interpreterOutput = new InterpreterOutputStream(LOGGER)
   private var configuration: Configuration = _
 
@@ -111,12 +111,13 @@ class FlinkScalaInterpreter(val properties: Properties) {
   private var defaultSqlParallelism = 1
 
   // flink.execution.jars + flink.execution.jars + flink.udf.jars
-  private var userJars: Seq[String] = _
+  protected var userJars: Seq[String] = _
   // flink.udf.jars
   private var userUdfJars: Seq[String] = _
 
 
   def open(): Unit = {
+
     val config = initFlinkConfig()
     createFlinkILoop(config)
     createTableEnvs()
@@ -125,10 +126,8 @@ class FlinkScalaInterpreter(val properties: Properties) {
     // init ZeppelinContext
     this.z = new FlinkZeppelinContext(this, new InterpreterHookRegistry(),
       Integer.parseInt(properties.getProperty("zeppelin.flink.maxResult", "1000")))
-    val modifiers = new java.util.ArrayList[String]()
-    modifiers.add("@transient")
-    this.bind("z", z.getClass().getCanonicalName(), z, modifiers);
-    this.jobManager = new JobManager(this.z, jmWebUrl, displayedJMWebUrl, properties)
+    this.bind("z", z.getClass().getCanonicalName(), z, List("@transient"))
+    this.jobManager = new JobManager(jmWebUrl, displayedJMWebUrl, properties)
 
     // register JobListener
     val jobListener = new FlinkJobListener()
@@ -166,6 +165,10 @@ class FlinkScalaInterpreter(val properties: Properties) {
     }
   }
 
+  def createIMain(settings: Settings, out: JPrintWriter): IMain
+
+  def createSettings(): Settings
+
   private def initFlinkConfig(): Config = {
 
     this.flinkVersion = new FlinkVersion(EnvironmentInformation.getVersion)
@@ -300,7 +303,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
               // for some cloud vender, the yarn address may be mapped to some other address.
               val yarnAddress = properties.getProperty("flink.webui.yarn.address")
               if (!StringUtils.isBlank(yarnAddress)) {
-                this.displayedJMWebUrl = replaceYarnAddress(this.jmWebUrl, yarnAddress)
+                this.displayedJMWebUrl = FlinkScalaInterpreter.replaceYarnAddress(this.jmWebUrl, yarnAddress)
               }
             } else {
               this.jmWebUrl = clusterClient.getWebInterfaceURL
@@ -354,10 +357,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
     this.flinkILoop = iLoop
     this.cluster = cluster
 
-    val settings = new Settings()
-    settings.usejavacp.value = true
-    settings.Yreplsync.value = true
-    settings.classpath.value = userJars.mkString(File.pathSeparator)
+    val settings = createSettings()
 
     val outputDir = Files.createTempDirectory("flink-repl");
     val interpArguments = List(
@@ -367,7 +367,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
     settings.processArguments(interpArguments, true)
 
     flinkILoop.settings = settings
-    flinkILoop.intp = new FlinkILoopInterpreter(settings, replOut)
+    flinkILoop.intp = createIMain(settings, replOut)
     flinkILoop.intp.beQuietDuring {
       // set execution environment
       flinkILoop.intp.bind("benv", flinkILoop.scalaBenv)
@@ -410,8 +410,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
       flinkILoop.intp.interpret("import org.apache.flink.table.functions.TableAggregateFunction")
     }
 
-    val in0 = getField(flinkILoop, "scala$tools$nsc$interpreter$ILoop$$in0")
-      .asInstanceOf[Option[BufferedReader]]
+    val in0 = None
     val reader = in0.fold(flinkILoop.chooseReader(settings))(r =>
       SimpleReader(r, replOut, interactive = true))
 
@@ -419,7 +418,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
     flinkILoop.initializeSynchronous()
     flinkILoop.intp.setContextClassLoader()
     reader.postInit()
-    this.scalaCompleter = reader.completion.completer()
+    this.scalaCompletion = reader.completion
 
     this.benv = flinkILoop.scalaBenv
     this.senv = flinkILoop.scalaSenv
@@ -441,9 +440,6 @@ class FlinkScalaInterpreter(val properties: Properties) {
       this.tblEnvFactory = new TableEnvFactory(this.flinkVersion, this.flinkShims,
         this.benv, this.senv, tableConfig)
 
-      val modifiers = new java.util.ArrayList[String]()
-      modifiers.add("@transient")
-
       // blink planner
       var btEnvSetting = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build()
       this.btenv = tblEnvFactory.createJavaBlinkBatchTableEnvironment(btEnvSetting, getFlinkClassLoader);
@@ -574,33 +570,18 @@ class FlinkScalaInterpreter(val properties: Properties) {
     method.invoke(null, batchFactory);
   }
 
-  // for use in java side
-  protected def bind(name: String,
-                     tpe: String,
-                     value: Object,
-                     modifier: java.util.List[String]): Unit = {
-    flinkILoop.beQuietDuring {
-      flinkILoop.bind(name, tpe, value, modifier.asScala.toList)
-    }
-  }
-
   protected def bind(name: String,
                      tpe: String,
                      value: Object,
                      modifier: List[String]): Unit = {
     flinkILoop.beQuietDuring {
-      flinkILoop.bind(name, tpe, value, modifier)
+      val result = flinkILoop.bind(name, tpe, value, modifier)
+      if (result != IR.Success) {
+        throw new Exception("Fail to bind variable: " + name)
+      }
     }
   }
 
-  protected def completion(buf: String,
-                           cursor: Int,
-                           context: InterpreterContext): java.util.List[InterpreterCompletion] = {
-    val completions = scalaCompleter.complete(buf.substring(0, cursor), cursor).candidates
-      .map(e => new InterpreterCompletion(e, e, null))
-    JavaConversions.seqAsJavaList(completions)
-  }
-
   protected def callMethod(obj: Object, name: String): Object = {
     callMethod(obj, name, Array.empty[Class[_]], Array.empty[Object])
   }
@@ -613,12 +594,6 @@ class FlinkScalaInterpreter(val properties: Properties) {
     method.invoke(obj, parameters: _ *)
   }
 
-  protected def getField(obj: Object, name: String): Object = {
-    val field = obj.getClass.getField(name)
-    field.setAccessible(true)
-    field.get(obj)
-  }
-
   /**
    * This is just a workaround to make table api work in multiple threads.
    */
@@ -881,7 +856,9 @@ class FlinkScalaInterpreter(val properties: Properties) {
 
   def getZeppelinContext = this.z
 
-  def getConfiguration = this.configuration
+  def getFlinkConfiguration = this.configuration
+
+  def getFormType() = FormType.NATIVE
 
   def getCluster = cluster
 
@@ -931,14 +908,10 @@ class FlinkScalaInterpreter(val properties: Properties) {
     }
   }
 
-  def replaceYarnAddress(webURL: String, yarnAddress: String): String = {
-    val pattern = "(https?://.*:\\d+)(.*)".r
-    val pattern(prefix, remaining) = webURL
-    yarnAddress + remaining
-  }
-
   def getUserJars:java.util.List[String] = JavaConversions.seqAsJavaList(userJars)
 
+  def completion(buf: String, cursor: Int, context: InterpreterContext): java.util.List[InterpreterCompletion]
+
   private def getConfigurationOfStreamExecutionEnv(): Configuration = {
     val getConfigurationMethod = classOf[JStreamExecutionEnvironment].getDeclaredMethod("getConfiguration")
     getConfigurationMethod.setAccessible(true)
@@ -946,4 +919,12 @@ class FlinkScalaInterpreter(val properties: Properties) {
   }
 }
 
+object FlinkScalaInterpreter {
+  def replaceYarnAddress(webURL: String, yarnAddress: String): String = {
+    val pattern = "(https?://.*:\\d+)(.*)".r
+    val pattern(prefix, remaining) = webURL
+    yarnAddress + remaining
+  }
+}
+
 
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
similarity index 96%
rename from flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
rename to flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
index 69bfa33..517c67a 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
@@ -22,10 +22,8 @@ import java.io.IOException
 import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.flink.api.scala.DataSet
-import org.apache.flink.streaming.api.scala._
 import org.apache.flink.table.api.internal.TableImpl
 import org.apache.flink.table.api.Table
-//import org.apache.flink.table.api.scala.BatchTableEnvironment
 import org.apache.flink.types.Row
 import org.apache.flink.util.StringUtils
 import org.apache.zeppelin.annotation.ZeppelinApi
@@ -46,7 +44,6 @@ class FlinkZeppelinContext(val flinkInterpreter: FlinkScalaInterpreter,
                            val maxResult2: Int) extends ZeppelinContext(hooks2, maxResult2) {
 
   private val SQL_INDEX = new AtomicInteger(0)
-  private var currentSql: String = _
 
   private val interpreterClassMap = Map(
     "flink" -> "org.apache.zeppelin.flink.FlinkInterpreter",
@@ -58,9 +55,6 @@ class FlinkZeppelinContext(val flinkInterpreter: FlinkScalaInterpreter,
 
   private val supportedClasses = Seq(classOf[DataSet[_]], classOf[Table])
 
-  def setCurrentSql(sql: String): Unit = {
-    this.currentSql = sql
-  }
 
   override def getSupportedClasses: _root_.java.util.List[Class[_]] =
     JavaConversions.seqAsJavaList(supportedClasses)
@@ -102,13 +96,10 @@ class FlinkZeppelinContext(val flinkInterpreter: FlinkScalaInterpreter,
   override def showData(obj: Any, maxResult: Int): String = {
     if (obj.isInstanceOf[DataSet[_]]) {
       val ds = obj.asInstanceOf[DataSet[_]]
-      val btenv = flinkInterpreter.getBatchTableEnvironment("flink")//.asInstanceOf[BatchTableEnvironment]
-
+      val btenv = flinkInterpreter.getBatchTableEnvironment("flink")
       val table =  flinkInterpreter.getFlinkShims.fromDataSet(btenv, ds).asInstanceOf[Table]
-        //btenv.fromDataSet(ds)
       val columnNames: Array[String] = table.getSchema.getFieldNames
       val dsRows: DataSet[Row] = flinkInterpreter.getFlinkShims.toDataSet(btenv, table).asInstanceOf[DataSet[Row]]
-        //        btenv.toDataSet[Row](table)
       showTable(columnNames, dsRows.first(maxResult + 1).collect())
     } else if (obj.isInstanceOf[Table]) {
       val rows = JavaConversions.asScalaBuffer(
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
new file mode 100644
index 0000000..27ab381
--- /dev/null
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.internal
+
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.util.AbstractID
+import java.io.{BufferedReader, File, FileOutputStream, IOException}
+import java.net.URL
+
+import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecutionEnvironment}
+import org.apache.flink.api.java.{ExecutionEnvironment => JExecutionEnvironment}
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.core.execution.PipelineExecutorServiceLoader
+import org.apache.zeppelin.flink.{FlinkScalaInterpreter, YarnApplicationExecutionEnvironment, YarnApplicationStreamEnvironment}
+import FlinkShell.ExecutionMode
+
+import scala.tools.nsc.interpreter._
+
+
+class FlinkILoop(
+                  val flinkConfig: Configuration,
+                  val externalJars: Option[Array[String]],
+                  in0: Option[BufferedReader],
+                  out0: JPrintWriter,
+                  mode: ExecutionMode.Value,
+                  jenv: JExecutionEnvironment,
+                  jsenv: JStreamExecutionEnvironment,
+                  flinkScalaInterpreter: FlinkScalaInterpreter)
+  extends ILoop(in0, out0) {
+
+
+  // remote environment
+  private val (remoteBenv: ScalaShellEnvironment,
+  remoteSenv: ScalaShellStreamEnvironment) = {
+    ScalaShellEnvironment.resetContextEnvironments()
+    ScalaShellStreamEnvironment.resetContextEnvironments()
+    // create our environment that submits against the cluster (local or remote)
+    val remoteBenv = new ScalaShellEnvironment(
+      flinkConfig,
+      this,
+      this.getExternalJars(): _*)
+    val remoteSenv = new ScalaShellStreamEnvironment(
+      flinkConfig,
+      this,
+      getExternalJars(): _*)
+
+    (remoteBenv,remoteSenv)
+  }
+
+  // local environment
+  val (
+    scalaBenv: ExecutionEnvironment,
+    scalaSenv: StreamExecutionEnvironment
+    ) = {
+    if (mode == ExecutionMode.YARN_APPLICATION) {
+      // For yarn application mode, ExecutionEnvironment & StreamExecutionEnvironment has already been created
+      // by flink itself, we here just try get them via reflection and reconstruct them.
+      val scalaBenv = new ExecutionEnvironment(new YarnApplicationExecutionEnvironment(
+        getExecutionEnvironmentField(jenv, "executorServiceLoader").asInstanceOf[PipelineExecutorServiceLoader],
+        getExecutionEnvironmentField(jenv, "configuration").asInstanceOf[Configuration],
+        getExecutionEnvironmentField(jenv, "userClassloader").asInstanceOf[ClassLoader],
+        this,
+        flinkScalaInterpreter
+      ))
+      val scalaSenv = new StreamExecutionEnvironment(new YarnApplicationStreamEnvironment(
+        getStreamExecutionEnvironmentField(jsenv, "executorServiceLoader").asInstanceOf[PipelineExecutorServiceLoader],
+        getStreamExecutionEnvironmentField(jsenv, "configuration").asInstanceOf[Configuration],
+        getStreamExecutionEnvironmentField(jsenv, "userClassloader").asInstanceOf[ClassLoader],
+        this,
+        flinkScalaInterpreter
+      ))
+      (scalaBenv, scalaSenv)
+    } else {
+      val scalaBenv = new ExecutionEnvironment(remoteBenv)
+      val scalaSenv = new StreamExecutionEnvironment(remoteSenv)
+      (scalaBenv, scalaSenv)
+    }
+  }
+
+  /**
+   * creates a temporary directory to store compiled console files
+   */
+  private val tmpDirBase: File = {
+    // get unique temporary folder:
+    val abstractID: String = new AbstractID().toString
+    val tmpDir: File = new File(
+      System.getProperty("java.io.tmpdir"),
+      "scala_shell_tmp-" + abstractID)
+    if (!tmpDir.exists) {
+      tmpDir.mkdir
+    }
+    tmpDir
+  }
+
+  // scala_shell commands
+  private val tmpDirShell: File = {
+    if (mode == ExecutionMode.YARN_APPLICATION) {
+      new File(".", "scala_shell_commands")
+    } else {
+      new File(tmpDirBase, "scala_shell_commands")
+    }
+  }
+
+  // scala shell jar file name
+  private val tmpJarShell: File = {
+    new File(tmpDirBase, "scala_shell_commands.jar")
+  }
+
+  private val packageImports = Seq[String](
+    "org.apache.flink.core.fs._",
+    "org.apache.flink.core.fs.local._",
+    "org.apache.flink.api.common.io._",
+    "org.apache.flink.api.common.aggregators._",
+    "org.apache.flink.api.common.accumulators._",
+    "org.apache.flink.api.common.distributions._",
+    "org.apache.flink.api.common.operators._",
+    "org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint",
+    "org.apache.flink.api.common.functions._",
+    "org.apache.flink.api.java.io._",
+    "org.apache.flink.api.java.aggregation._",
+    "org.apache.flink.api.java.functions._",
+    "org.apache.flink.api.java.operators._",
+    "org.apache.flink.api.java.sampling._",
+    "org.apache.flink.api.scala._",
+    "org.apache.flink.api.scala.utils._",
+    "org.apache.flink.streaming.api.scala._",
+    "org.apache.flink.streaming.api.windowing.time._",
+    "org.apache.flink.table.api._",
+    "org.apache.flink.table.api.bridge.scala._",
+    "org.apache.flink.types.Row"
+  )
+
+  override def createInterpreter(): Unit = {
+    super.createInterpreter()
+
+    intp.beQuietDuring {
+      // import dependencies
+      intp.interpret("import " + packageImports.mkString(", "))
+
+      // set execution environment
+      intp.bind("benv", this.scalaBenv)
+      intp.bind("senv", this.scalaSenv)
+    }
+  }
+
+  /**
+   * Packages the compiled classes of the current shell session into a Jar file for execution
+   * on a Flink cluster.
+   *
+   * @return The path of the created Jar file
+   */
+  def writeFilesToDisk(): File = {
+    if (!tmpDirShell.exists()) {
+      if (!tmpDirShell.mkdirs()) {
+        throw new IOException("Fail to create tmp dir: " +
+          tmpDirShell.getAbsolutePath + " for scala shell")
+      }
+    }
+    val vd = intp.virtualDirectory
+
+    val vdIt = vd.iterator
+
+    for (fi <- vdIt) {
+      if (fi.isDirectory) {
+
+        val fiIt = fi.iterator
+
+        for (f <- fiIt) {
+
+          // directory for compiled line
+          val lineDir = new File(tmpDirShell.getAbsolutePath, fi.name)
+          lineDir.mkdirs()
+
+          // compiled classes for commands from shell
+          val writeFile = new File(lineDir.getAbsolutePath, f.name)
+          val outputStream = new FileOutputStream(writeFile)
+          val inputStream = f.input
+
+          // copy file contents
+          org.apache.commons.io.IOUtils.copy(inputStream, outputStream)
+
+          inputStream.close()
+          outputStream.close()
+        }
+      }
+    }
+
+    val compiledClasses = new File(tmpDirShell.getAbsolutePath)
+
+    val jarFilePath = new File(tmpJarShell.getAbsolutePath)
+
+    val jh: JarHelper = new JarHelper
+    jh.jarDir(compiledClasses, jarFilePath)
+
+    jarFilePath
+  }
+
+  def getExternalJars(): Array[String] = externalJars.getOrElse(Array.empty[String])
+
+  private def getExecutionEnvironmentField(obj: Object, name: String): Object = {
+    val field = classOf[JExecutionEnvironment].getDeclaredField(name)
+    field.setAccessible(true)
+    field.get(obj)
+  }
+
+  private def getStreamExecutionEnvironmentField(obj: Object, name: String): Object = {
+    val field = classOf[JStreamExecutionEnvironment].getDeclaredField(name)
+    field.setAccessible(true)
+    field.get(obj)
+  }
+}
+
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkShell.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala
similarity index 98%
rename from flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkShell.scala
rename to flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala
index f26d6b1..437d188 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkShell.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala
@@ -16,18 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.zeppelin.flink
+package org.apache.zeppelin.flink.internal
 
-import java.io._
+import java.io.BufferedReader
 
 import org.apache.flink.annotation.Internal
-import org.apache.flink.client.cli.{CliFrontend, CliFrontendParser, CustomCommandLine}
+import org.apache.flink.client.cli.{CliFrontend, CliFrontendParser}
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
 import org.apache.flink.client.deployment.executors.RemoteExecutor
 import org.apache.flink.client.program.{ClusterClient, MiniClusterClient}
 import org.apache.flink.configuration._
 import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration}
 import org.apache.flink.yarn.executors.YarnSessionClusterExecutor
+import org.apache.zeppelin.flink.FlinkShims
 
 import scala.collection.mutable.ArrayBuffer
 
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
similarity index 99%
rename from flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
rename to flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
index 6077ff5..ce627c1 100644
--- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
@@ -75,7 +75,7 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
     result =
             flinkInterpreter.interpret("z.show(btenv.sqlQuery(\"select * from source_table\"))", context);
     resultMessages = context.out.toInterpreterResultMessage();
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
     assertEquals(1, resultMessages.size());
     assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
     assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData());
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
similarity index 99%
rename from flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
rename to flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index 7a97656..ceced31 100644
--- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -326,8 +326,8 @@ public class FlinkInterpreterTest {
     thread.start();
 
     // the streaming job will run for 20 seconds. check init_stream.scala
-    // sleep 10 seconds to make sure the job is started but not finished
-    Thread.sleep(10 * 1000);
+    // sleep 20 seconds to make sure the job is started but not finished
+    Thread.sleep(20 * 1000);
 
     InterpreterContext context = getInterpreterContext();
     context.getLocalProperties().put("type", "update");
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
similarity index 100%
rename from flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
rename to flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
similarity index 99%
rename from flink/interpreter/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
rename to flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
index 6a8d71f..fb562cd 100644
--- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
@@ -398,8 +398,8 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
     thread.start();
 
     // the streaming job will run for 20 seconds. check init_stream.scala
-    // sleep 10 seconds to make sure the job is started but not finished
-    Thread.sleep(10 * 1000);
+    // sleep 20 seconds to make sure the job is started but not finished
+    Thread.sleep(20 * 1000);
 
     InterpreterContext context = createInterpreterContext();
     context.getLocalProperties().put("type", "update");
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JavaLower.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/JavaLower.java
similarity index 100%
rename from flink/interpreter/src/test/java/org/apache/zeppelin/flink/JavaLower.java
rename to flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/JavaLower.java
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JavaUpper.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/JavaUpper.java
similarity index 100%
rename from flink/interpreter/src/test/java/org/apache/zeppelin/flink/JavaUpper.java
rename to flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/JavaUpper.java
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java
similarity index 100%
rename from flink/interpreter/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java
rename to flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
similarity index 100%
rename from flink/interpreter/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
rename to flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
similarity index 100%
rename from flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
rename to flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
diff --git a/flink/interpreter/src/test/resources/flink-conf.yaml b/flink/flink-scala-parent/src/test/resources/flink-conf.yaml
similarity index 100%
rename from flink/interpreter/src/test/resources/flink-conf.yaml
rename to flink/flink-scala-parent/src/test/resources/flink-conf.yaml
diff --git a/flink/interpreter/src/test/resources/init_stream.scala b/flink/flink-scala-parent/src/test/resources/init_stream.scala
similarity index 100%
rename from flink/interpreter/src/test/resources/init_stream.scala
rename to flink/flink-scala-parent/src/test/resources/init_stream.scala
diff --git a/flink/interpreter/src/test/resources/log4j.properties b/flink/flink-scala-parent/src/test/resources/log4j.properties
similarity index 100%
rename from flink/interpreter/src/test/resources/log4j.properties
rename to flink/flink-scala-parent/src/test/resources/log4j.properties
diff --git a/flink/flink-scala-parent/src/test/resources/log4j2.properties b/flink/flink-scala-parent/src/test/resources/log4j2.properties
new file mode 100644
index 0000000..d003c42
--- /dev/null
+++ b/flink/flink-scala-parent/src/test/resources/log4j2.properties
@@ -0,0 +1,49 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This affects logging for both user code and Flink
+rootLogger.level = INFO
+#rootLogger.appenderRef.file.ref = MainAppender
+
+# Uncomment this if you want to _only_ change Flink's logging
+#logger.flink.name = org.apache.flink
+#logger.flink.level = INFO
+
+# The following lines keep the log level of common libraries/connectors on
+# log level INFO. The root logger does not override this. You have to manually
+# change the log levels here.
+logger.akka.name = akka
+logger.akka.level = INFO
+logger.kafka.name= org.apache.kafka
+logger.kafka.level = INFO
+logger.hadoop.name = org.apache.hadoop
+logger.hadoop.level = INFO
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = INFO
+
+## Log all infos in the given file
+#appender.main.name = MainAppender
+#appender.main.type = File
+#appender.main.append = false
+#appender.main.fileName = ${sys:zeppelin.log.file}
+#appender.main.layout.type = PatternLayout
+#appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler
+logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
+logger.netty.level = OFF
diff --git a/flink/interpreter/src/test/scala/org/apache/zeppelin/flink/FlinkScalaInterpreterTest.scala b/flink/flink-scala-parent/src/test/scala/org/apache/zeppelin/flink/FlinkScalaInterpreterTest.scala
similarity index 84%
rename from flink/interpreter/src/test/scala/org/apache/zeppelin/flink/FlinkScalaInterpreterTest.scala
rename to flink/flink-scala-parent/src/test/scala/org/apache/zeppelin/flink/FlinkScalaInterpreterTest.scala
index 6c084c5..7e326d8 100644
--- a/flink/interpreter/src/test/scala/org/apache/zeppelin/flink/FlinkScalaInterpreterTest.scala
+++ b/flink/flink-scala-parent/src/test/scala/org/apache/zeppelin/flink/FlinkScalaInterpreterTest.scala
@@ -26,16 +26,15 @@ import org.scalatest.FunSuite
 class FlinkScalaInterpreterTest extends FunSuite {
 
   test("testReplaceYarnAddress") {
-    val flinkScalaInterpreter = new FlinkScalaInterpreter(new Properties())
-    var targetURL = flinkScalaInterpreter.replaceYarnAddress("http://localhost:8081",
+    var targetURL = FlinkScalaInterpreter.replaceYarnAddress("http://localhost:8081",
       "http://my-server:9090/gateway")
     assertEquals("http://my-server:9090/gateway", targetURL)
 
-    targetURL = flinkScalaInterpreter.replaceYarnAddress("https://localhost:8081/",
+    targetURL = FlinkScalaInterpreter.replaceYarnAddress("https://localhost:8081/",
       "https://my-server:9090/gateway")
     assertEquals("https://my-server:9090/gateway/", targetURL)
 
-    targetURL = flinkScalaInterpreter.replaceYarnAddress("https://localhost:8081/proxy/app_1",
+    targetURL = FlinkScalaInterpreter.replaceYarnAddress("https://localhost:8081/proxy/app_1",
       "https://my-server:9090/gateway")
     assertEquals("https://my-server:9090/gateway/proxy/app_1", targetURL)
   }
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java
index dcf32d0..f4d4cca 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java
@@ -29,7 +29,7 @@ public class FlinkVersion {
   private int version;
   private String versionString;
 
-  FlinkVersion(String versionString) {
+  public FlinkVersion(String versionString) {
     this.versionString = versionString;
 
     try {
diff --git a/flink/flink1.10-shims/pom.xml b/flink/flink1.10-shims/pom.xml
index b0f41af..df9e884 100644
--- a/flink/flink1.10-shims/pom.xml
+++ b/flink/flink1.10-shims/pom.xml
@@ -34,8 +34,6 @@
 
     <properties>
         <flink.version>${flink1.10.version}</flink.version>
-        <scala.binary.version>2.11</scala.binary.version>
-        <scala.version>2.11.12</scala.version>
     </properties>
 
     <dependencies>
@@ -55,14 +53,14 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-clients_2.11</artifactId>
+            <artifactId>flink-clients_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-runtime_2.11</artifactId>
+            <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
@@ -76,28 +74,28 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-api-scala_2.11</artifactId>
+            <artifactId>flink-table-api-scala_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
+            <artifactId>flink-table-api-scala-bridge_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
+            <artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-scala_2.11</artifactId>
+            <artifactId>flink-scala_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
@@ -111,7 +109,7 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-scala_2.11</artifactId>
+            <artifactId>flink-streaming-scala_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
@@ -125,7 +123,7 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner-blink_2.11</artifactId>
+            <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
             <exclusions>
@@ -138,21 +136,14 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner_2.11</artifactId>
+            <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-python_2.11</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-scala-shell_2.11</artifactId>
+            <artifactId>flink-python_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
@@ -188,7 +179,7 @@
                     </execution>
                 </executions>
                 <configuration>
-                    <scalaVersion>${scala.version}</scalaVersion>
+                    <scalaVersion>${flink.scala.version}</scalaVersion>
                     <args>
                         <arg>-unchecked</arg>
                         <arg>-deprecation</arg>
diff --git a/flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims110/Flink110ScalaShims.scala b/flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims110/Flink110ScalaShims.scala
index b9978f5..9be7b8a 100644
--- a/flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims110/Flink110ScalaShims.scala
+++ b/flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims110/Flink110ScalaShims.scala
@@ -18,12 +18,12 @@
 
 package org.apache.zeppelin.flink.shims110
 
-import org.apache.flink.api.scala.{DataSet, FlinkILoop}
+import org.apache.flink.api.scala.DataSet
 import org.apache.flink.table.api.Table
 import org.apache.flink.table.api.scala.BatchTableEnvironment
 import org.apache.flink.types.Row
 import org.apache.flink.streaming.api.scala._
-import org.apache.flink.table.api.scala.internal.{BatchTableEnvironmentImpl, StreamTableEnvironmentImpl}
+
 
 object Flink110ScalaShims {
 
diff --git a/flink/flink1.11-shims/pom.xml b/flink/flink1.11-shims/pom.xml
index 874025c..7504ca6 100644
--- a/flink/flink1.11-shims/pom.xml
+++ b/flink/flink1.11-shims/pom.xml
@@ -34,8 +34,6 @@
 
     <properties>
         <flink.version>${flink1.11.version}</flink.version>
-        <scala.binary.version>2.11</scala.binary.version>
-        <scala.version>2.11.12</scala.version>
     </properties>
 
     <dependencies>
@@ -55,14 +53,14 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-clients_2.11</artifactId>
+            <artifactId>flink-clients_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-runtime_2.11</artifactId>
+            <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
@@ -70,42 +68,42 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-api-scala_2.11</artifactId>
+            <artifactId>flink-table-api-scala_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
+            <artifactId>flink-table-api-scala-bridge_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
+            <artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-scala_2.11</artifactId>
+            <artifactId>flink-scala_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_2.11</artifactId>
+            <artifactId>flink-streaming-java_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-scala_2.11</artifactId>
+            <artifactId>flink-streaming-scala_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
@@ -119,7 +117,7 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner-blink_2.11</artifactId>
+            <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
             <exclusions>
@@ -132,24 +130,18 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner_2.11</artifactId>
+            <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-python_2.11</artifactId>
+            <artifactId>flink-python_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-scala-shell_2.11</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
     </dependencies>
 
     <build>
@@ -181,7 +173,7 @@
                     </execution>
                 </executions>
                 <configuration>
-                    <scalaVersion>${scala.version}</scalaVersion>
+                    <scalaVersion>${flink.scala.version}</scalaVersion>
                     <args>
                         <arg>-unchecked</arg>
                         <arg>-deprecation</arg>
diff --git a/flink/flink1.12-shims/pom.xml b/flink/flink1.12-shims/pom.xml
index b4b4300..244e54e 100644
--- a/flink/flink1.12-shims/pom.xml
+++ b/flink/flink1.12-shims/pom.xml
@@ -34,8 +34,6 @@
 
     <properties>
         <flink.version>${flink1.12.version}</flink.version>
-        <scala.binary.version>2.11</scala.binary.version>
-        <scala.version>2.11.12</scala.version>
     </properties>
 
     <dependencies>
@@ -55,14 +53,14 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-clients_2.11</artifactId>
+            <artifactId>flink-clients_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-runtime_2.11</artifactId>
+            <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
@@ -70,42 +68,42 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-api-scala_2.11</artifactId>
+            <artifactId>flink-table-api-scala_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
+            <artifactId>flink-table-api-scala-bridge_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
+            <artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-scala_2.11</artifactId>
+            <artifactId>flink-scala_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_2.11</artifactId>
+            <artifactId>flink-streaming-java_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-scala_2.11</artifactId>
+            <artifactId>flink-streaming-scala_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
@@ -119,7 +117,7 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner-blink_2.11</artifactId>
+            <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
             <exclusions>
@@ -132,24 +130,18 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner_2.11</artifactId>
+            <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-python_2.11</artifactId>
+            <artifactId>flink-python_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-scala-shell_2.11</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
     </dependencies>
 
     <build>
@@ -181,7 +173,7 @@
                     </execution>
                 </executions>
                 <configuration>
-                    <scalaVersion>${scala.version}</scalaVersion>
+                    <scalaVersion>${flink.scala.version}</scalaVersion>
                     <args>
                         <arg>-unchecked</arg>
                         <arg>-deprecation</arg>
diff --git a/flink/flink1.13-shims/pom.xml b/flink/flink1.13-shims/pom.xml
index e7ef24e..0c91557 100644
--- a/flink/flink1.13-shims/pom.xml
+++ b/flink/flink1.13-shims/pom.xml
@@ -34,8 +34,6 @@
 
     <properties>
         <flink.version>${flink1.13.version}</flink.version>
-        <scala.binary.version>2.11</scala.binary.version>
-        <scala.version>2.11.12</scala.version>
     </properties>
 
     <dependencies>
@@ -55,14 +53,14 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-clients_2.11</artifactId>
+            <artifactId>flink-clients_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-runtime_2.11</artifactId>
+            <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
@@ -70,42 +68,42 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-api-scala_2.11</artifactId>
+            <artifactId>flink-table-api-scala_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
+            <artifactId>flink-table-api-scala-bridge_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
+            <artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-scala_2.11</artifactId>
+            <artifactId>flink-scala_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_2.11</artifactId>
+            <artifactId>flink-streaming-java_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-scala_2.11</artifactId>
+            <artifactId>flink-streaming-scala_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
@@ -119,7 +117,7 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner-blink_2.11</artifactId>
+            <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
             <exclusions>
@@ -132,24 +130,18 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner_2.11</artifactId>
+            <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-python_2.11</artifactId>
+            <artifactId>flink-python_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-scala-shell_2.11</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
     </dependencies>
 
     <build>
@@ -181,7 +173,7 @@
                     </execution>
                 </executions>
                 <configuration>
-                    <scalaVersion>${scala.version}</scalaVersion>
+                    <scalaVersion>${flink.scala.version}</scalaVersion>
                     <args>
                         <arg>-unchecked</arg>
                         <arg>-deprecation</arg>
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkILoop.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkILoop.scala
deleted file mode 100644
index 5a96e32..0000000
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkILoop.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink
-
-import java.io.{BufferedReader, File}
-
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.api.java.{ScalaShellEnvironment, ScalaShellStreamEnvironment, ExecutionEnvironment => JExecutionEnvironment}
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.core.execution.PipelineExecutorServiceLoader
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecutionEnvironment}
-import org.apache.zeppelin.flink.FlinkShell.ExecutionMode
-
-import scala.tools.nsc.interpreter._
-
-
-class FlinkILoop(
-    override val flinkConfig: Configuration,
-    override val externalJars: Option[Array[String]],
-    in0: Option[BufferedReader],
-    out0: JPrintWriter,
-    mode: ExecutionMode.Value,
-    jenv: JExecutionEnvironment,
-    jsenv: JStreamExecutionEnvironment,
-    flinkScalaInterpreter: FlinkScalaInterpreter) extends org.apache.flink.api.scala.FlinkILoop(flinkConfig, externalJars, in0, out0) {
-
-
-  override def writeFilesToDisk(): File = {
-    if (mode == ExecutionMode.YARN_APPLICATION) {
-      // write jars to current working directory in yarn application mode.
-      val tmpDirShellField = classOf[org.apache.flink.api.scala.FlinkILoop]
-        .getDeclaredField("org$apache$flink$api$scala$FlinkILoop$$tmpDirShell")
-      tmpDirShellField.setAccessible(true)
-      tmpDirShellField.set(this, new File("scala_shell_commands"));
-
-      val tmpJarShellField = classOf[org.apache.flink.api.scala.FlinkILoop].getDeclaredField("tmpJarShell")
-      tmpJarShellField.setAccessible(true)
-      tmpJarShellField.set(this, new File("scala_shell_commands.jar"));
-    } else {
-      // create tmpDirBase again in case it is deleted by system, because it is in the system temp folder.
-      val field = classOf[org.apache.flink.api.scala.FlinkILoop].getDeclaredField("tmpDirBase")
-      field.setAccessible(true)
-      val tmpDir = field.get(this).asInstanceOf[File]
-      if (!tmpDir.exists()) {
-        tmpDir.mkdir()
-      }
-    }
-
-    super.writeFilesToDisk()
-  }
-
-  override val (
-    scalaBenv: ExecutionEnvironment,
-    scalaSenv: StreamExecutionEnvironment
-    ) = {
-    if (mode == ExecutionMode.YARN_APPLICATION) {
-      // For yarn application mode, ExecutionEnvironment & StreamExecutionEnvironment has already been created
-      // by flink itself, we here just try get them via reflection and reconstruct them.
-      val scalaBenv = new ExecutionEnvironment(new YarnApplicationExecutionEnvironment(
-        getExecutionEnvironmentField(jenv, "executorServiceLoader").asInstanceOf[PipelineExecutorServiceLoader],
-        getExecutionEnvironmentField(jenv, "configuration").asInstanceOf[Configuration],
-        getExecutionEnvironmentField(jenv, "userClassloader").asInstanceOf[ClassLoader],
-        this,
-        flinkScalaInterpreter
-      ))
-      val scalaSenv = new StreamExecutionEnvironment(new YarnApplicationStreamEnvironment(
-        getStreamExecutionEnvironmentField(jsenv, "executorServiceLoader").asInstanceOf[PipelineExecutorServiceLoader],
-        getStreamExecutionEnvironmentField(jsenv, "configuration").asInstanceOf[Configuration],
-        getStreamExecutionEnvironmentField(jsenv, "userClassloader").asInstanceOf[ClassLoader],
-        this,
-        flinkScalaInterpreter
-      ))
-      (scalaBenv, scalaSenv)
-    } else {
-      val scalaBenv = new ExecutionEnvironment(
-        getSuperFlinkILoopField("remoteBenv").asInstanceOf[ScalaShellEnvironment])
-      val scalaSenv = new StreamExecutionEnvironment(
-        getSuperFlinkILoopField("remoteSenv").asInstanceOf[ScalaShellStreamEnvironment])
-      (scalaBenv, scalaSenv)
-    }
-  }
-
-  private def getExecutionEnvironmentField(obj: Object, name: String): Object = {
-    val field = classOf[JExecutionEnvironment].getDeclaredField(name)
-    field.setAccessible(true)
-    field.get(obj)
-  }
-
-  private def getStreamExecutionEnvironmentField(obj: Object, name: String): Object = {
-    val field = classOf[JStreamExecutionEnvironment].getDeclaredField(name)
-    field.setAccessible(true)
-    field.get(obj)
-  }
-
-  private def getSuperFlinkILoopField(name: String): Object = {
-    val field = classOf[org.apache.flink.api.scala.FlinkILoop].getDeclaredField(name)
-    field.setAccessible(true)
-    field.get(this)
-  }
-}
diff --git a/flink/pom.xml b/flink/pom.xml
index 2ee34e7..ee3ba23 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -35,7 +35,9 @@
     <description>Zeppelin Flink Support</description>
 
     <modules>
-        <module>interpreter</module>
+        <module>flink-scala-parent</module>
+        <module>flink-scala-2.11</module>
+        <module>flink-scala-2.12</module>
         <module>flink-shims</module>
         <module>flink1.10-shims</module>
         <module>flink1.11-shims</module>
@@ -48,6 +50,9 @@
         <flink1.11.version>1.11.3</flink1.11.version>
         <flink1.12.version>1.12.0</flink1.12.version>
         <flink1.13.version>1.13.0</flink1.13.version>
+
+        <flink.scala.version>2.11.12</flink.scala.version>
+        <flink.scala.binary.version>2.11</flink.scala.binary.version>
     </properties>
 
     <dependencies>
diff --git a/zeppelin-interpreter-integration/pom.xml b/zeppelin-interpreter-integration/pom.xml
index c89b1e3..07c30aa 100644
--- a/zeppelin-interpreter-integration/pom.xml
+++ b/zeppelin-interpreter-integration/pom.xml
@@ -181,6 +181,9 @@
           <forkCount>1</forkCount>
           <reuseForks>false</reuseForks>
           <argLine>-Xmx3072m</argLine>
+          <environmentVariables>
+            <ZEPPELIN_HOME>${basedir}/../</ZEPPELIN_HOME>
+          </environmentVariables>
         </configuration>
       </plugin>
       <plugin>
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
index 21b1f66..17a4c8c 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
@@ -59,13 +59,16 @@ public abstract class FlinkIntegrationTest {
   private static InterpreterSettingManager interpreterSettingManager;
 
   private String flinkVersion;
+  private String scalaVersion;
   private String hadoopHome;
   private String flinkHome;
 
-  public FlinkIntegrationTest(String flinkVersion) {
+  public FlinkIntegrationTest(String flinkVersion, String scalaVersion) {
     LOGGER.info("Testing FlinkVersion: " + flinkVersion);
+    LOGGER.info("Testing ScalaVersion: " + scalaVersion);
     this.flinkVersion = flinkVersion;
-    this.flinkHome = DownloadUtils.downloadFlink(flinkVersion);
+    this.scalaVersion = scalaVersion;
+    this.flinkHome = DownloadUtils.downloadFlink(flinkVersion, scalaVersion);
     this.hadoopHome = DownloadUtils.downloadHadoop("2.7.7");
   }
 
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
index ab68068..21f5292 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
@@ -29,11 +29,12 @@ public class FlinkIntegrationTest110 extends FlinkIntegrationTest {
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"1.10.3"}
+            {"1.10.3", "2.11"},
+            {"1.10.3", "2.12"}
     });
   }
 
-  public FlinkIntegrationTest110(String flinkVersion) {
-    super(flinkVersion);
+  public FlinkIntegrationTest110(String flinkVersion, String scalaVersion) {
+    super(flinkVersion, scalaVersion);
   }
 }
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java
index a85bb5f..66fe6d8 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java
@@ -29,11 +29,12 @@ public class FlinkIntegrationTest111 extends FlinkIntegrationTest {
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"1.11.3"}
+            {"1.11.3", "2.11"},
+            {"1.11.3", "2.12"}
     });
   }
 
-  public FlinkIntegrationTest111(String flinkVersion) {
-    super(flinkVersion);
+  public FlinkIntegrationTest111(String flinkVersion, String scalaVersion) {
+    super(flinkVersion, scalaVersion);
   }
 }
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest112.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest112.java
index 56b318b..29c38b6 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest112.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest112.java
@@ -29,11 +29,12 @@ public class FlinkIntegrationTest112 extends FlinkIntegrationTest {
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"1.12.0"}
+            {"1.12.0", "2.11"},
+            {"1.12.0", "2.12"}
     });
   }
 
-  public FlinkIntegrationTest112(String flinkVersion) {
-    super(flinkVersion);
+  public FlinkIntegrationTest112(String flinkVersion, String scalaVersion) {
+    super(flinkVersion, scalaVersion);
   }
 }
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest113.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest113.java
index 9d48949..dc8cbc4 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest113.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest113.java
@@ -29,11 +29,12 @@ public class FlinkIntegrationTest113 extends FlinkIntegrationTest {
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"1.13.0"}
+            {"1.13.0", "2.11"},
+            {"1.13.0", "2.12"}
     });
   }
 
-  public FlinkIntegrationTest113(String flinkVersion) {
-    super(flinkVersion);
+  public FlinkIntegrationTest113(String flinkVersion, String scalaVersion) {
+    super(flinkVersion, scalaVersion);
   }
 }
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
index 0e9fd90..07bb036 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
@@ -69,7 +69,7 @@ public class ZSessionIntegrationTest extends AbstractTestRestApi {
 
     notebook = TestUtils.getInstance(Notebook.class);
     sparkHome = DownloadUtils.downloadSpark("2.4.4", "2.7");
-    flinkHome = DownloadUtils.downloadFlink("1.10.1");
+    flinkHome = DownloadUtils.downloadFlink("1.10.1", "2.11");
   }
 
   @AfterClass
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java
index 930e59f..cae651e 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java
@@ -48,7 +48,7 @@ public abstract class ZeppelinFlinkClusterTest extends AbstractTestRestApi {
   public ZeppelinFlinkClusterTest(String flinkVersion) throws Exception {
     this.flinkVersion = flinkVersion;
     LOGGER.info("Testing FlinkVersion: " + flinkVersion);
-    this.flinkHome = DownloadUtils.downloadFlink(flinkVersion);
+    this.flinkHome = DownloadUtils.downloadFlink(flinkVersion, "2.11");
   }
 
   @BeforeClass
diff --git a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
index f18e569..21379f6 100644
--- a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -59,9 +60,45 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
       updateEnvsForYarnApplicationMode(envs, context);
     }
 
+    String flinkAppJar = chooseFlinkAppJar(flinkHome);
+    LOGGER.info("Choose FLINK_APP_JAR: {}", flinkAppJar);
+    envs.put("FLINK_APP_JAR", flinkAppJar);
     return envs;
   }
 
+  private String chooseFlinkAppJar(String flinkHome) throws IOException {
+    File flinkLibFolder = new File(flinkHome, "lib");
+    List<File> flinkDistFiles =
+            Arrays.stream(flinkLibFolder.listFiles(file -> file.getName().contains("flink-dist_")))
+                    .collect(Collectors.toList());
+    if (flinkDistFiles.size() > 1) {
+      throw new IOException("More than 1 flink-dist files: " +
+              flinkDistFiles.stream()
+                      .map(file -> file.getAbsolutePath())
+                      .collect(Collectors.joining(",")));
+    }
+    String scalaVersion = "2.11";
+    if (flinkDistFiles.get(0).getName().contains("2.12")) {
+      scalaVersion = "2.12";
+    }
+    final String flinkScalaVersion = scalaVersion;
+    File flinkInterpreterFolder =
+            new File(ZeppelinConfiguration.create().getInterpreterDir(), "flink");
+    List<File> flinkScalaJars =
+            Arrays.stream(flinkInterpreterFolder
+                    .listFiles(file -> file.getName().endsWith(".jar")))
+            .filter(file -> file.getName().contains(flinkScalaVersion))
+            .collect(Collectors.toList());
+    if (flinkScalaJars.size() > 1) {
+      throw new IOException("More than 1 flink scala files: " +
+              flinkScalaJars.stream()
+                      .map(file -> file.getAbsolutePath())
+                      .collect(Collectors.joining(",")));
+    }
+
+    return flinkScalaJars.get(0).getAbsolutePath();
+  }
+
   private String updateEnvsForFlinkHome(Map<String, String> envs,
                                       InterpreterLaunchContext context) throws IOException {
     String flinkHome = context.getProperties().getProperty("FLINK_HOME");
@@ -83,7 +120,9 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
   }
 
   private void updateEnvsForYarnApplicationMode(Map<String, String> envs,
-                                                InterpreterLaunchContext context) {
+                                                InterpreterLaunchContext context)
+          throws IOException {
+
     envs.put("ZEPPELIN_FLINK_YARN_APPLICATION", "true");
 
     StringBuilder flinkYarnApplicationConfBuilder = new StringBuilder();
@@ -91,7 +130,7 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
     List<String> yarnShipFiles = getYarnShipFiles(context);
     if (!yarnShipFiles.isEmpty()) {
       flinkYarnApplicationConfBuilder.append(
-              " -D yarn.ship-files=" + yarnShipFiles.stream().collect(Collectors.joining(",")));
+              " -D yarn.ship-files=" + yarnShipFiles.stream().collect(Collectors.joining(";")));
     }
 
     // set yarn.application.name
@@ -119,7 +158,7 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
     envs.put("ZEPPELIN_FLINK_YARN_APPLICATION_CONF", flinkYarnApplicationConfBuilder.toString());
   }
 
-  private List<String> getYarnShipFiles(InterpreterLaunchContext context) {
+  private List<String> getYarnShipFiles(InterpreterLaunchContext context) throws IOException {
     // Extract yarn.ship-files, add hive-site.xml automatically if hive is enabled
     // and HIVE_CONF_DIR is specified
     List<String> yarnShipFiles = new ArrayList<>();
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
index 49d3785..bf07fcf 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
@@ -59,23 +59,23 @@ public class DownloadUtils {
     return targetSparkHomeFolder.getAbsolutePath();
   }
 
-  public static String downloadFlink(String version) {
+  public static String downloadFlink(String flinkVersion, String scalaVersion) {
     String flinkDownloadFolder = downloadFolder + "/flink";
-    File targetFlinkHomeFolder = new File(flinkDownloadFolder + "/flink-" + version);
+    File targetFlinkHomeFolder = new File(flinkDownloadFolder + "/flink-" + flinkVersion);
     if (targetFlinkHomeFolder.exists()) {
       LOGGER.info("Skip to download flink as it is already downloaded.");
       return targetFlinkHomeFolder.getAbsolutePath();
     }
-    download("flink", version, "-bin-scala_2.11.tgz");
+    download("flink", flinkVersion, "-bin-scala_" + scalaVersion + ".tgz");
     // download other dependencies for running flink with yarn and hive
     try {
       runShellCommand(new String[]{"wget",
-              "https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.11/"
-                      + version + "/flink-connector-hive_2.11-" + version + ".jar",
+              "https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_" + scalaVersion + "/"
+                      + flinkVersion + "/flink-connector-hive_" + scalaVersion + "-" + flinkVersion + ".jar",
               "-P", targetFlinkHomeFolder + "/lib"});
       runShellCommand(new String[]{"wget",
-              "https://repo1.maven.org/maven2/org/apache/flink/flink-hadoop-compatibility_2.11/"
-                      + version + "/flink-hadoop-compatibility_2.11-" + version + ".jar",
+              "https://repo1.maven.org/maven2/org/apache/flink/flink-hadoop-compatibility_" + scalaVersion + "/"
+                      + flinkVersion + "/flink-hadoop-compatibility_" + scalaVersion + "-" + flinkVersion + ".jar",
               "-P", targetFlinkHomeFolder + "/lib"});
       runShellCommand(new String[]{"wget",
               "https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.4/hive-exec-2.3.4.jar",