You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2023/01/31 12:17:41 UTC

[zeppelin] branch master updated: [ZEPPELIN-5844] Support flink 1.16 (#4506)

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 521acca985 [ZEPPELIN-5844] Support flink 1.16 (#4506)
521acca985 is described below

commit 521acca98570b1e64b4c3ae569aad5da8423b0f7
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Jan 31 20:17:30 2023 +0800

    [ZEPPELIN-5844] Support flink 1.16 (#4506)
    
    * [ZEPPELIN-5844] Support flink 1.16
    
    * Update github action
    
    * Fix CI
    
    * save
    
    * Fix flink test
    
    * Remove -B in core.yaml
---
 .github/workflows/core.yml                         |  14 +-
 flink/flink-scala-parent/pom.xml                   |  73 ++-
 .../org/apache/zeppelin/flink/TableEnvFactory.java | 186 +------
 .../internal/ScalaShellStreamEnvironment.java      |  10 +
 .../zeppelin/flink/FlinkScalaInterpreter.scala     |  27 +-
 .../zeppelin/flink/internal/FlinkILoop.scala       |  43 +-
 .../flink/FlinkStreamSqlInterpreterTest.java       |  10 +-
 .../{init_stream.scala => init_stream.scala2}      |   0
 .../java/org/apache/zeppelin/flink/FlinkShims.java |  31 +-
 .../org/apache/zeppelin/flink/Flink112Shims.java   |  59 ++-
 .../org/apache/zeppelin/flink/Flink113Shims.java   |  59 ++-
 .../org/apache/zeppelin/flink/Flink114Shims.java   |  59 ++-
 .../org/apache/zeppelin/flink/Flink115Shims.java   |  60 ++-
 flink/flink1.16-shims/pom.xml                      | 207 ++++++++
 .../org/apache/zeppelin/flink/Flink116Shims.java}  | 105 +++-
 .../zeppelin/flink/Flink116SqlInterpreter.java     | 590 +++++++++++++++++++++
 .../java/org/apache/zeppelin/flink/PrintUtils.java | 318 +++++++++++
 .../zeppelin/flink/TimestampStringUtils.java       | 143 +++++
 .../flink/shims116/CollectStreamTableSink.java     |  97 ++++
 flink/pom.xml                                      |  10 +
 testing/env_python_3_with_flink_116.yml            |  29 +
 21 files changed, 1886 insertions(+), 244 deletions(-)

diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
index 4d806acf87..76380103da 100644
--- a/.github/workflows/core.yml
+++ b/.github/workflows/core.yml
@@ -233,7 +233,7 @@ jobs:
     strategy:
       fail-fast: false
       matrix:
-        flink: [112, 113, 114, 115]
+        flink: [112, 113, 114, 115, 116]
     steps:
       - name: Checkout
         uses: actions/checkout@v3
@@ -256,12 +256,12 @@ jobs:
           restore-keys: |
             ${{ runner.os }}-zeppelin-
       - name: install environment for flink before 1.15 (exclusive)
-        if: matrix.flink != '115'
+        if: matrix.flink < '115'
         run: |
           ./mvnw install -DskipTests -am -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration ${MAVEN_ARGS}
           ./mvnw clean package -pl zeppelin-plugins -amd -DskipTests ${MAVEN_ARGS}
       - name: install environment for flink after 1.15 (inclusive)
-        if: matrix.flink == '115'
+        if: matrix.flink >= '115'
         run: |
           ./mvnw install -DskipTests -am -pl flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration ${MAVEN_ARGS}
           ./mvnw clean package -pl zeppelin-plugins -amd -DskipTests ${MAVEN_ARGS}
@@ -277,10 +277,10 @@ jobs:
           auto-activate-base: false
           use-mamba: true
       - name: run tests for flink before 1.15 (exclusive)
-        if: matrix.flink != '115'
-        run: ./mvnw verify -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -am -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration -DfailIfNoTests=false -Dtest=org.apache.zeppelin.flink.*Test,FlinkIntegrationTest${{ matrix.flink }} ${MAVEN_ARGS}
-      - name: run tests for flink before 1.15 (inclusive)
-        if: matrix.flink == '115'
+        if: matrix.flink < '115'
+        run: ./mvnw verify -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration -DfailIfNoTests=false -Dtest=org.apache.zeppelin.flink.*Test,FlinkIntegrationTest${{ matrix.flink }} ${MAVEN_ARGS}
+      - name: run tests for flink after 1.15 (inclusive)
+        if: matrix.flink >= '115'
         run: ./mvnw verify -pl flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -am -Phadoop2 -Pintegration -DfailIfNoTests=false -Dtest=org.apache.zeppelin.flink.*Test,FlinkIntegrationTest${{ matrix.flink }} ${MAVEN_ARGS}
       - name: Print zeppelin logs
         if: always()
diff --git a/flink/flink-scala-parent/pom.xml b/flink/flink-scala-parent/pom.xml
index e9f364a162..8bbeebd26f 100644
--- a/flink/flink-scala-parent/pom.xml
+++ b/flink/flink-scala-parent/pom.xml
@@ -79,6 +79,12 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>flink1.16-shims</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.zeppelin</groupId>
       <artifactId>zeppelin-python</artifactId>
@@ -138,13 +144,6 @@
       </exclusions>
     </dependency>
 
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-python_${flink.scala.binary.version}</artifactId>
-      <version>${flink.version}</version>
-      <scope>provided</scope>
-    </dependency>
-
     <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-core</artifactId>
@@ -931,6 +930,12 @@
             </exclusion>
           </exclusions>
         </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-python_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
       </dependencies>
     </profile>
 
@@ -970,6 +975,12 @@
             </exclusion>
           </exclusions>
         </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-python_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
       </dependencies>
     </profile>
 
@@ -991,6 +1002,12 @@
           <version>${flink.version}</version>
           <scope>provided</scope>
         </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-python_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
       </dependencies>
     </profile>
 
@@ -1015,6 +1032,48 @@
           <version>${flink.version}</version>
           <scope>provided</scope>
         </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-python_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+
+    <profile>
+      <id>flink-116</id>
+      <properties>
+        <flink.version>${flink1.16.version}</flink.version>
+        <flink.scala.version>2.12.7</flink.scala.version>
+        <flink.scala.binary.version>2.12</flink.scala.binary.version>
+        <flink.library.scala.suffix></flink.library.scala.suffix>
+      </properties>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-runtime</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-sql-client</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-python</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
       </dependencies>
     </profile>
 
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
index 5ec2de96eb..0328ca3936 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
@@ -20,6 +20,7 @@ package org.apache.zeppelin.flink;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableConfig;
@@ -34,6 +35,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Constructor;
+import java.net.URL;
+import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Factory class for creating flink table env for different purpose:
@@ -51,6 +55,8 @@ public class TableEnvFactory {
   private org.apache.flink.api.scala.ExecutionEnvironment benv;
   private org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv;
 
+  private List<URL> userJars;
+
   /***********************************************************************
   Should use different TableConfig for different kinds of table_env
   otherwise it will cause conflicts after flink 1.13
@@ -73,7 +79,8 @@ public class TableEnvFactory {
                          FlinkShims flinkShims,
                          org.apache.flink.api.scala.ExecutionEnvironment env,
                          org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv,
-                         TableConfig streamTableConfig) {
+                         TableConfig streamTableConfig,
+                         List<URL> userJars) {
 
     this.flinkVersion = flinkVersion;
     this.flinkShims = flinkShims;
@@ -94,7 +101,11 @@ public class TableEnvFactory {
     this.oldPlannerCatalogManager = (CatalogManager) flinkShims.createCatalogManager(
             this.oldPlannerStreamTableConfig.getConfiguration());
     this.moduleManager = new ModuleManager();
-    this.functionCatalog = (FunctionCatalog) flinkShims.createFunctionCatalog(streamTableConfig, catalogManager, moduleManager);
+    this.functionCatalog = (FunctionCatalog) flinkShims.createFunctionCatalog(streamTableConfig,
+            catalogManager,
+            moduleManager,
+            userJars);
+    this.userJars = userJars;
   }
 
   public TableEnvironment createScalaFlinkBatchTableEnvironment() {
@@ -138,178 +149,19 @@ public class TableEnvFactory {
   }
 
   public TableEnvironment createScalaBlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) {
-
-    try {
-      ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor(
-              classLoader, settings, senv.getJavaEnv(),
-              streamTableConfig, moduleManager, functionCatalog, catalogManager);
-      Planner planner = (Planner) pair.left;
-      Executor executor = (Executor) pair.right;
-
-      Class<?> clazz = Class
-                .forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl");
-      try {
-        Constructor<?> constructor = clazz
-                .getConstructor(
-                        CatalogManager.class,
-                        ModuleManager.class,
-                        FunctionCatalog.class,
-                        TableConfig.class,
-                        org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class,
-                        Planner.class,
-                        Executor.class,
-                        boolean.class);
-        return (TableEnvironment) constructor.newInstance(catalogManager,
-                moduleManager,
-                functionCatalog,
-                streamTableConfig,
-                senv,
-                planner,
-                executor,
-                settings.isStreamingMode());
-      } catch (NoSuchMethodException e) {
-        // Flink 1.11.1 change the constructor signature, FLINK-18419
-        Constructor<?> constructor = clazz
-                .getConstructor(
-                        CatalogManager.class,
-                        ModuleManager.class,
-                        FunctionCatalog.class,
-                        TableConfig.class,
-                        org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class,
-                        Planner.class,
-                        Executor.class,
-                        boolean.class,
-                        ClassLoader.class);
-        return (TableEnvironment) constructor.newInstance(catalogManager,
-                moduleManager,
-                functionCatalog,
-                streamTableConfig,
-                senv,
-                planner,
-                executor,
-                settings.isStreamingMode(),
-                classLoader);
-      }
-    } catch (Exception e) {
-      throw new TableException("Fail to createScalaBlinkStreamTableEnvironment", e);
-    }
+    return (TableEnvironment) flinkShims.createScalaBlinkStreamTableEnvironment(settings,
+            senv.getJavaEnv(), streamTableConfig, moduleManager, functionCatalog, catalogManager, userJars, classLoader);
   }
 
   public TableEnvironment createJavaBlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) {
-    try {
-      ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor(
-              classLoader, settings, senv.getJavaEnv(),
-              streamTableConfig, moduleManager, functionCatalog, catalogManager);
-      Planner planner = (Planner) pair.left;
-      Executor executor = (Executor) pair.right;
-
-      Class<?> clazz = Class
-                .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl");
-
-      try {
-        Constructor<?> constructor = clazz
-                .getConstructor(
-                        CatalogManager.class,
-                        ModuleManager.class,
-                        FunctionCatalog.class,
-                        TableConfig.class,
-                        org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class,
-                        Planner.class,
-                        Executor.class,
-                        boolean.class);
-        return (TableEnvironment) constructor.newInstance(catalogManager,
-                moduleManager,
-                functionCatalog,
-                streamTableConfig,
-                senv.getJavaEnv(),
-                planner,
-                executor,
-                settings.isStreamingMode());
-      } catch (NoSuchMethodException e) {
-        // Flink 1.11.1 change the constructor signature, FLINK-18419
-        Constructor constructor = clazz
-                .getConstructor(
-                        CatalogManager.class,
-                        ModuleManager.class,
-                        FunctionCatalog.class,
-                        TableConfig.class,
-                        org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class,
-                        Planner.class,
-                        Executor.class,
-                        boolean.class,
-                        ClassLoader.class);
-        return (TableEnvironment) constructor.newInstance(catalogManager,
-                moduleManager,
-                functionCatalog,
-                streamTableConfig,
-                senv.getJavaEnv(),
-                planner,
-                executor,
-                settings.isStreamingMode(),
-                classLoader);
-      }
-    } catch (Exception e) {
-      throw new TableException("Fail to createJavaBlinkStreamTableEnvironment", e);
-    }
+    return (TableEnvironment) flinkShims.createJavaBlinkStreamTableEnvironment(settings,
+            senv.getJavaEnv(), streamTableConfig, moduleManager, functionCatalog, catalogManager, userJars, classLoader);
   }
 
   public TableEnvironment createJavaBlinkBatchTableEnvironment(
           EnvironmentSettings settings, ClassLoader classLoader) {
-    try {
-      ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor(
-              classLoader, settings, senv.getJavaEnv(),
-              batchTableConfig, moduleManager, functionCatalog, catalogManager);
-      Planner planner = (Planner) pair.left;
-      Executor executor = (Executor) pair.right;
-
-      Class<?> clazz = Class
-                .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl");
-      try {
-        Constructor<?> constructor = clazz.getConstructor(
-                CatalogManager.class,
-                ModuleManager.class,
-                FunctionCatalog.class,
-                TableConfig.class,
-                StreamExecutionEnvironment.class,
-                Planner.class,
-                Executor.class,
-                boolean.class);
-        return (TableEnvironment) constructor.newInstance(
-                catalogManager,
-                moduleManager,
-                functionCatalog,
-                batchTableConfig,
-                senv.getJavaEnv(),
-                planner,
-                executor,
-                settings.isStreamingMode());
-      } catch (NoSuchMethodException e) {
-        // Flink 1.11.1 change the constructor signature, FLINK-18419
-        Constructor<?> constructor = clazz.getConstructor(
-                CatalogManager.class,
-                ModuleManager.class,
-                FunctionCatalog.class,
-                TableConfig.class,
-                StreamExecutionEnvironment.class,
-                Planner.class,
-                Executor.class,
-                boolean.class,
-                ClassLoader.class);
-        return (TableEnvironment) constructor.newInstance(
-                catalogManager,
-                moduleManager,
-                functionCatalog,
-                batchTableConfig,
-                senv.getJavaEnv(),
-                planner,
-                executor,
-                settings.isStreamingMode(),
-                classLoader);
-      }
-    } catch (Exception e) {
-      LOGGER.info(ExceptionUtils.getStackTrace(e));
-      throw new TableException("Fail to createJavaBlinkBatchTableEnvironment", e);
-    }
+    return (TableEnvironment) flinkShims.createJavaBlinkStreamTableEnvironment(settings,
+            senv.getJavaEnv(), batchTableConfig, moduleManager, functionCatalog, catalogManager, userJars, classLoader);
   }
 
   public void createStreamPlanner(EnvironmentSettings settings) {
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java
index bee40a0f69..ae9fd3d0f0 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java
@@ -59,11 +59,21 @@ public class ScalaShellStreamEnvironment extends StreamExecutionEnvironment {
           final Configuration configuration,
           final FlinkILoop flinkILoop,
           final FlinkVersion flinkVersion,
+          final ClassLoader classLoader,
           final String... jarFiles) {
     super(configuration);
     this.flinkILoop = checkNotNull(flinkILoop);
     this.flinkVersion = checkNotNull(flinkVersion);
     this.jarFiles = checkNotNull(JarUtils.getJarFiles(jarFiles));
+    if (flinkVersion.newerThanOrEqual(FlinkVersion.fromVersionString("1.16"))) {
+      try {
+        Field field = StreamExecutionEnvironment.class.getDeclaredField("userClassloader");
+        field.setAccessible(true);
+        field.set(this, classLoader);
+      } catch (NoSuchFieldException | IllegalAccessException e) {
+        throw new RuntimeException("Unable to set userClassLoader", e);
+      }
+    }
   }
 
   @Override
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 40adcffeae..d5bf2fc14e 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -365,6 +365,8 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
 
     flinkILoop.settings = settings
     flinkILoop.intp = createIMain(settings, replOut)
+    flinkILoop.initEnvironments()
+
     flinkILoop.intp.beQuietDuring {
       // set execution environment
       flinkILoop.intp.bind("benv", flinkILoop.scalaBenv)
@@ -424,29 +426,29 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
   private def createTableEnvs(): Unit = {
     val originalClassLoader = Thread.currentThread().getContextClassLoader
     try {
-      Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
+      Thread.currentThread().setContextClassLoader(getFlinkScalaShellLoader)
       val tableConfig = new TableConfig
       tableConfig.getConfiguration.addAll(configuration)
 
       this.tblEnvFactory = new TableEnvFactory(this.flinkVersion, this.flinkShims,
-        this.benv, this.senv, tableConfig)
+        this.benv, this.senv, tableConfig, this.userJars.map(new URL(_)).asJava)
 
       // blink planner
-      var btEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder()
+      val btEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder()
         .asInstanceOf[EnvironmentSettings.Builder]
         .inBatchMode()
         .build()
-      this.btenv = tblEnvFactory.createJavaBlinkBatchTableEnvironment(btEnvSetting, getFlinkClassLoader);
+      this.btenv = tblEnvFactory.createJavaBlinkBatchTableEnvironment(btEnvSetting, getFlinkScalaShellLoader);
       flinkILoop.bind("btenv", btenv.getClass().getCanonicalName(), btenv, List("@transient"))
       this.java_btenv = this.btenv
 
-      var stEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder()
+      val stEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder()
         .asInstanceOf[EnvironmentSettings.Builder]
         .inStreamingMode()
         .build()
-      this.stenv = tblEnvFactory.createScalaBlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader)
+      this.stenv = tblEnvFactory.createScalaBlinkStreamTableEnvironment(stEnvSetting, getFlinkScalaShellLoader)
       flinkILoop.bind("stenv", stenv.getClass().getCanonicalName(), stenv, List("@transient"))
-      this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader)
+      this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting, getFlinkScalaShellLoader)
 
       if (!flinkVersion.isAfterFlink114()) {
         // flink planner is not supported after flink 1.14
@@ -506,7 +508,7 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
 
     val udfPackages = properties.getProperty("flink.udf.jars.packages", "").split(",").toSet
     val urls = Array(new URL("jar:file:" + jar + "!/"))
-    val cl = new URLClassLoader(urls, getFlinkScalaShellLoader)
+    val cl = new URLClassLoader(urls, getFlinkClassLoader)
 
     while (entries.hasMoreElements) {
       val je = entries.nextElement
@@ -590,7 +592,7 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
   def createPlannerAgain(): Unit = {
     val originalClassLoader = Thread.currentThread().getContextClassLoader
     try {
-      Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
+      Thread.currentThread().setContextClassLoader(getFlinkScalaShellLoader)
       val stEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder()
         .asInstanceOf[EnvironmentSettings.Builder]
         .inStreamingMode()
@@ -844,8 +846,11 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
   def getJobManager = this.jobManager
 
   def getFlinkScalaShellLoader: ClassLoader = {
-    val userCodeJarFile = this.flinkILoop.writeFilesToDisk()
-    new URLClassLoader(Array(userCodeJarFile.toURL) ++ userJars.map(e => new File(e).toURL))
+    if (this.flinkILoop == null) {
+      getFlinkClassLoader
+    } else {
+      flinkILoop.classLoader;
+    }
   }
 
   private def getFlinkClassLoader: ClassLoader = {
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
index b133487749..b50135b91e 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
@@ -49,52 +49,46 @@ class FlinkILoop(
 
   private lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
 
-  // remote environment
-  private val (remoteBenv: ScalaShellEnvironment,
-  remoteSenv: ScalaShellStreamEnvironment) = {
+  var remoteBenv: ScalaShellEnvironment = null
+  var remoteSenv: ScalaShellStreamEnvironment = null
+  var scalaBenv: ExecutionEnvironment = null
+  var scalaSenv: StreamExecutionEnvironment = null
+
+  def initEnvironments(): Unit = {
     ScalaShellEnvironment.resetContextEnvironments()
     ScalaShellStreamEnvironment.resetContextEnvironments()
     // create our environment that submits against the cluster (local or remote)
-    val remoteBenv = new ScalaShellEnvironment(
+    remoteBenv = new ScalaShellEnvironment(
       flinkConfig,
       this,
       this.getExternalJars(): _*)
-    val remoteSenv = new ScalaShellStreamEnvironment(
+    remoteSenv = new ScalaShellStreamEnvironment(
       flinkConfig,
       this,
       flinkScalaInterpreter.getFlinkVersion,
+      this.classLoader,
       getExternalJars(): _*)
 
-    (remoteBenv,remoteSenv)
-  }
-
-  // local environment
-  val (
-    scalaBenv: ExecutionEnvironment,
-    scalaSenv: StreamExecutionEnvironment
-    ) = {
     if (ExecutionMode.isApplicationMode(mode)) {
       // For yarn application mode, ExecutionEnvironment & StreamExecutionEnvironment has already been created
       // by flink itself, we here just try get them via reflection and reconstruct them.
-      val scalaBenv = new ExecutionEnvironment(new ApplicationModeExecutionEnvironment(
+      scalaBenv = new ExecutionEnvironment(new ApplicationModeExecutionEnvironment(
         getExecutionEnvironmentField(jenv, "executorServiceLoader").asInstanceOf[PipelineExecutorServiceLoader],
         getExecutionEnvironmentField(jenv, "configuration").asInstanceOf[Configuration],
         getExecutionEnvironmentField(jenv, "userClassloader").asInstanceOf[ClassLoader],
         this,
         flinkScalaInterpreter
       ))
-      val scalaSenv = new StreamExecutionEnvironment(new ApplicationModeStreamEnvironment(
+      scalaSenv = new StreamExecutionEnvironment(new ApplicationModeStreamEnvironment(
         getStreamExecutionEnvironmentField(jsenv, "executorServiceLoader").asInstanceOf[PipelineExecutorServiceLoader],
         getStreamExecutionEnvironmentField(jsenv, "configuration").asInstanceOf[Configuration],
         getStreamExecutionEnvironmentField(jsenv, "userClassloader").asInstanceOf[ClassLoader],
         this,
         flinkScalaInterpreter
       ))
-      (scalaBenv, scalaSenv)
     } else {
-      val scalaBenv = new ExecutionEnvironment(remoteBenv)
-      val scalaSenv = new StreamExecutionEnvironment(remoteSenv)
-      (scalaBenv, scalaSenv)
+      scalaBenv = new ExecutionEnvironment(remoteBenv)
+      scalaSenv = new StreamExecutionEnvironment(remoteSenv)
     }
   }
 
@@ -183,28 +177,20 @@ class FlinkILoop(
       }
     }
     val vd = intp.virtualDirectory
-
     val vdIt = vd.iterator
-
     for (fi <- vdIt) {
       if (fi.isDirectory) {
-
         val fiIt = fi.iterator
-
         for (f <- fiIt) {
-
           // directory for compiled line
           val lineDir = new File(tmpDirShell.getAbsolutePath, fi.name)
           lineDir.mkdirs()
-
           // compiled classes for commands from shell
           val writeFile = new File(lineDir.getAbsolutePath, f.name)
           val outputStream = new FileOutputStream(writeFile)
           val inputStream = f.input
-
           // copy file contents
           org.apache.commons.io.IOUtils.copy(inputStream, outputStream)
-
           inputStream.close()
           outputStream.close()
         }
@@ -212,12 +198,9 @@ class FlinkILoop(
     }
 
     val compiledClasses = new File(tmpDirShell.getAbsolutePath)
-
     val jarFilePath = new File(tmpJarShell.getAbsolutePath)
-
     val jh: JarHelper = new JarHelper
     jh.jarDir(compiledClasses, jarFilePath)
-
     jarFilePath
   }
 
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
index 49b609d745..ca49a35a15 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
@@ -408,9 +408,9 @@ public class FlinkStreamSqlInterpreterTest extends FlinkSqlInterpreterTest {
   @Test
   public void testStreamUDF() throws IOException, InterpreterException {
     String initStreamScalaScript = getInitStreamScript(100);
-    InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
-            getInterpreterContext());
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    InterpreterContext context = getInterpreterContext();
+    InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript, context);
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
 
     result = flinkInterpreter.interpret(
             "class MyUpper extends ScalarFunction {\n" +
@@ -418,7 +418,7 @@ public class FlinkStreamSqlInterpreterTest extends FlinkSqlInterpreterTest {
                     "}\n" + "stenv.registerFunction(\"myupper\", new MyUpper())", getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
-    InterpreterContext context = getInterpreterContext();
+    context = getInterpreterContext();
     result = sqlInterpreter.interpret("select myupper(url), count(1) as pv from " +
             "log group by url", context);
     assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
@@ -692,7 +692,7 @@ public class FlinkStreamSqlInterpreterTest extends FlinkSqlInterpreterTest {
   }
 
   public static String getInitStreamScript(int sleep_interval) throws IOException {
-    return IOUtils.toString(FlinkStreamSqlInterpreterTest.class.getResource("/init_stream.scala"))
+    return IOUtils.toString(FlinkStreamSqlInterpreterTest.class.getResource("/init_stream.scala2"))
             .replace("{{sleep_interval}}", sleep_interval + "");
   }
 }
diff --git a/flink/flink-scala-parent/src/test/resources/init_stream.scala b/flink/flink-scala-parent/src/test/resources/init_stream.scala2
similarity index 100%
rename from flink/flink-scala-parent/src/test/resources/init_stream.scala
rename to flink/flink-scala-parent/src/test/resources/init_stream.scala2
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index a473a0f888..a7b6e9871a 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.net.InetAddress;
+import java.net.URL;
 import java.util.List;
 import java.util.Properties;
 
@@ -65,6 +66,9 @@ public abstract class FlinkShims {
     } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 15) {
       LOGGER.info("Initializing shims for Flink 1.15");
       flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink115Shims");
+    } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 16) {
+      LOGGER.info("Initializing shims for Flink 1.16");
+      flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink116Shims");
     } else {
       throw new Exception("Flink version: '" + flinkVersion + "' is not supported yet");
     }
@@ -91,7 +95,10 @@ public abstract class FlinkShims {
     return flinkVersion;
   }
 
-  public abstract Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager);
+  public abstract Object createFunctionCatalog(Object tableConfig,
+                                               Object catalogManager,
+                                               Object moduleManager,
+                                               List<URL> jars);
 
   public abstract void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext);
 
@@ -149,6 +156,28 @@ public abstract class FlinkShims {
           ClassLoader classLoader, Object environmentSettings, Object sEnv,
           Object tableConfig, Object moduleManager, Object functionCatalog, Object catalogManager);
 
+  public abstract Object createResourceManager(List<URL> jars, Object tableConfig);
+
+  public abstract Object createScalaBlinkStreamTableEnvironment(
+          Object environmentSettingsObj,
+          Object senvObj,
+          Object tableConfigObj,
+          Object moduleManagerObj,
+          Object functionCatalogObj,
+          Object catalogManagerObj,
+          List<URL> jars,
+          ClassLoader classLoader);
+
+  public abstract Object createJavaBlinkStreamTableEnvironment(
+          Object environmentSettingsObj,
+          Object senvObj,
+          Object tableConfigObj,
+          Object moduleManagerObj,
+          Object functionCatalogObj,
+          Object catalogManagerObj,
+          List<URL> jars,
+          ClassLoader classLoader);
+
   public abstract Object createBlinkPlannerEnvSettingBuilder();
 
   public abstract Object createOldPlannerEnvSettingBuilder();
diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
index 5088a7f623..187eea0b4d 100644
--- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
+++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
@@ -66,6 +66,7 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
+import java.net.URL;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -96,7 +97,12 @@ public class Flink112Shims extends FlinkShims {
   }
 
   @Override
-  public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager) {
+  public Object createResourceManager(List<URL> jars, Object tableConfig) {
+    return null;
+  }
+
+  @Override
+  public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager, List<URL> jars) {
     return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager);
   }
 
@@ -105,6 +111,57 @@ public class Flink112Shims extends FlinkShims {
     // do nothing
   }
 
+  @Override
+  public Object createScalaBlinkStreamTableEnvironment(Object environmentSettingsObj,
+                                                       Object senvObj,
+                                                       Object tableConfigObj,
+                                                       Object moduleManagerObj,
+                                                       Object functionCatalogObj,
+                                                       Object catalogManagerObj,
+                                                       List<URL> jars,
+                                                       ClassLoader classLoader) {
+    EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj;
+    StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj;
+    TableConfig tableConfig = (TableConfig) tableConfigObj;
+    ModuleManager moduleManager = (ModuleManager) moduleManagerObj;
+    FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj;
+    CatalogManager catalogManager = (CatalogManager) catalogManagerObj;
+    ImmutablePair<Object, Object> pair = createPlannerAndExecutor(
+            classLoader, environmentSettings, senv,
+            tableConfig, moduleManager, functionCatalog, catalogManager);
+    Planner planner = (Planner) pair.left;
+    Executor executor = (Executor) pair.right;
+
+    return new org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl(catalogManager,
+            moduleManager,
+            functionCatalog, tableConfig, new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(senv),
+            planner, executor, environmentSettings.isStreamingMode(), classLoader);
+  }
+
+  @Override
+  public Object createJavaBlinkStreamTableEnvironment(Object environmentSettingsObj,
+                                                      Object senvObj,
+                                                      Object tableConfigObj,
+                                                      Object moduleManagerObj,
+                                                      Object functionCatalogObj,
+                                                      Object catalogManagerObj,
+                                                      List<URL> jars,
+                                                      ClassLoader classLoader) {
+    EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj;
+    StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj;
+    TableConfig tableConfig = (TableConfig) tableConfigObj;
+    ModuleManager moduleManager = (ModuleManager) moduleManagerObj;
+    FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj;
+    CatalogManager catalogManager = (CatalogManager) catalogManagerObj;
+    ImmutablePair<Object, Object> pair = createPlannerAndExecutor(
+            classLoader, environmentSettings, senv,
+            tableConfig, moduleManager, functionCatalog, catalogManager);
+    Planner planner = (Planner) pair.left;
+    Executor executor = (Executor) pair.right;
+
+    return new StreamTableEnvironmentImpl(catalogManager, moduleManager,
+            functionCatalog, tableConfig, senv, planner, executor, environmentSettings.isStreamingMode(), classLoader);
+  }
   @Override
   public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) {
     return new StreamExecutionEnvironmentFactory() {
diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
index 1df3f92951..440b245796 100644
--- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
+++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
@@ -69,6 +69,7 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
+import java.net.URL;
 import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.List;
@@ -99,7 +100,12 @@ public class Flink113Shims extends FlinkShims {
   }
 
   @Override
-  public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager) {
+  public Object createResourceManager(List<URL> jars, Object tableConfig) {
+    return null;
+  }
+
+  @Override
+  public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager, List<URL> jars) {
     return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager);
   }
 
@@ -108,6 +114,57 @@ public class Flink113Shims extends FlinkShims {
     // do nothing
   }
 
+  @Override
+  public Object createScalaBlinkStreamTableEnvironment(Object environmentSettingsObj,
+                                                       Object senvObj,
+                                                       Object tableConfigObj,
+                                                       Object moduleManagerObj,
+                                                       Object functionCatalogObj,
+                                                       Object catalogManagerObj,
+                                                       List<URL> jars,
+                                                       ClassLoader classLoader) {
+    EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj;
+    StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj;
+    TableConfig tableConfig = (TableConfig) tableConfigObj;
+    ModuleManager moduleManager = (ModuleManager) moduleManagerObj;
+    FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj;
+    CatalogManager catalogManager = (CatalogManager) catalogManagerObj;
+    ImmutablePair<Object, Object> pair = createPlannerAndExecutor(
+            classLoader, environmentSettings, senv,
+            tableConfig, moduleManager, functionCatalog, catalogManager);
+    Planner planner = (Planner) pair.left;
+    Executor executor = (Executor) pair.right;
+
+    return new org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl(catalogManager,
+            moduleManager,
+            functionCatalog, tableConfig, new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(senv),
+            planner, executor, environmentSettings.isStreamingMode(), classLoader);
+  }
+
+  @Override
+  public Object createJavaBlinkStreamTableEnvironment(Object environmentSettingsObj,
+                                                      Object senvObj,
+                                                      Object tableConfigObj,
+                                                      Object moduleManagerObj,
+                                                      Object functionCatalogObj,
+                                                      Object catalogManagerObj,
+                                                      List<URL> jars,
+                                                      ClassLoader classLoader) {
+    EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj;
+    StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj;
+    TableConfig tableConfig = (TableConfig) tableConfigObj;
+    ModuleManager moduleManager = (ModuleManager) moduleManagerObj;
+    FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj;
+    CatalogManager catalogManager = (CatalogManager) catalogManagerObj;
+    ImmutablePair<Object, Object> pair = createPlannerAndExecutor(
+            classLoader, environmentSettings, senv,
+            tableConfig, moduleManager, functionCatalog, catalogManager);
+    Planner planner = (Planner) pair.left;
+    Executor executor = (Executor) pair.right;
+
+    return new StreamTableEnvironmentImpl(catalogManager, moduleManager,
+            functionCatalog, tableConfig, senv, planner, executor, environmentSettings.isStreamingMode(), classLoader);
+  }
   @Override
   public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) {
     return new StreamExecutionEnvironmentFactory() {
diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
index 9660c9f469..475be0da7e 100644
--- a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
+++ b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
@@ -66,6 +66,7 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
+import java.net.URL;
 import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.List;
@@ -95,7 +96,12 @@ public class Flink114Shims extends FlinkShims {
   }
 
   @Override
-  public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager) {
+  public Object createResourceManager(List<URL> jars, Object tableConfig) {
+    return null;
+  }
+
+  @Override
+  public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager, List<URL> jars) {
     return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager);
   }
 
@@ -104,6 +110,57 @@ public class Flink114Shims extends FlinkShims {
     // do nothing
   }
 
+  @Override
+  public Object createScalaBlinkStreamTableEnvironment(Object environmentSettingsObj,
+                                                       Object senvObj,
+                                                       Object tableConfigObj,
+                                                       Object moduleManagerObj,
+                                                       Object functionCatalogObj,
+                                                       Object catalogManagerObj,
+                                                       List<URL> jars,
+                                                       ClassLoader classLoader) {
+    EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj;
+    StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj;
+    TableConfig tableConfig = (TableConfig) tableConfigObj;
+    ModuleManager moduleManager = (ModuleManager) moduleManagerObj;
+    FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj;
+    CatalogManager catalogManager = (CatalogManager) catalogManagerObj;
+    ImmutablePair<Object, Object> pair = createPlannerAndExecutor(
+            classLoader, environmentSettings, senv,
+            tableConfig, moduleManager, functionCatalog, catalogManager);
+    Planner planner = (Planner) pair.left;
+    Executor executor = (Executor) pair.right;
+
+    return new org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl(catalogManager,
+            moduleManager,
+            functionCatalog, tableConfig, new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(senv),
+            planner, executor, environmentSettings.isStreamingMode(), classLoader);
+  }
+
+  @Override
+  public Object createJavaBlinkStreamTableEnvironment(Object environmentSettingsObj,
+                                                      Object senvObj,
+                                                      Object tableConfigObj,
+                                                      Object moduleManagerObj,
+                                                      Object functionCatalogObj,
+                                                      Object catalogManagerObj,
+                                                      List<URL> jars,
+                                                      ClassLoader classLoader) {
+    EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj;
+    StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj;
+    TableConfig tableConfig = (TableConfig) tableConfigObj;
+    ModuleManager moduleManager = (ModuleManager) moduleManagerObj;
+    FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj;
+    CatalogManager catalogManager = (CatalogManager) catalogManagerObj;
+    ImmutablePair<Object, Object> pair = createPlannerAndExecutor(
+            classLoader, environmentSettings, senv,
+            tableConfig, moduleManager, functionCatalog, catalogManager);
+    Planner planner = (Planner) pair.left;
+    Executor executor = (Executor) pair.right;
+
+    return new StreamTableEnvironmentImpl(catalogManager, moduleManager,
+            functionCatalog, tableConfig, senv, planner, executor, environmentSettings.isStreamingMode(), classLoader);
+  }
 
   @Override
   public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) {
diff --git a/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java
index 98c250773d..4ed8abf3af 100644
--- a/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java
+++ b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java
@@ -65,6 +65,7 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
+import java.net.URL;
 import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.List;
@@ -94,7 +95,12 @@ public class Flink115Shims extends FlinkShims {
   }
 
   @Override
-  public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager) {
+  public Object createResourceManager(List<URL> jars, Object tableConfig) {
+    return null;
+  }
+
+  @Override
+  public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager, List<URL> jars) {
     return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager);
   }
 
@@ -103,6 +109,58 @@ public class Flink115Shims extends FlinkShims {
     // do nothing
   }
 
+  @Override
+  public Object createScalaBlinkStreamTableEnvironment(Object environmentSettingsObj,
+                                                       Object senvObj,
+                                                       Object tableConfigObj,
+                                                       Object moduleManagerObj,
+                                                       Object functionCatalogObj,
+                                                       Object catalogManagerObj,
+                                                       List<URL> jars,
+                                                       ClassLoader classLoader) {
+    EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj;
+    StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj;
+    TableConfig tableConfig = (TableConfig) tableConfigObj;
+    ModuleManager moduleManager = (ModuleManager) moduleManagerObj;
+    FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj;
+    CatalogManager catalogManager = (CatalogManager) catalogManagerObj;
+    ImmutablePair<Object, Object> pair = createPlannerAndExecutor(
+            classLoader, environmentSettings, senv,
+            tableConfig, moduleManager, functionCatalog, catalogManager);
+    Planner planner = (Planner) pair.left;
+    Executor executor = (Executor) pair.right;
+
+    return new org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl(catalogManager,
+            moduleManager,
+            functionCatalog, tableConfig, new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(senv),
+            planner, executor, environmentSettings.isStreamingMode(), classLoader);
+  }
+
+  @Override
+  public Object createJavaBlinkStreamTableEnvironment(Object environmentSettingsObj,
+                                                      Object senvObj,
+                                                      Object tableConfigObj,
+                                                      Object moduleManagerObj,
+                                                      Object functionCatalogObj,
+                                                      Object catalogManagerObj,
+                                                      List<URL> jars,
+                                                      ClassLoader classLoader) {
+    EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj;
+    StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj;
+    TableConfig tableConfig = (TableConfig) tableConfigObj;
+    ModuleManager moduleManager = (ModuleManager) moduleManagerObj;
+    FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj;
+    CatalogManager catalogManager = (CatalogManager) catalogManagerObj;
+    ImmutablePair<Object, Object> pair = createPlannerAndExecutor(
+            classLoader, environmentSettings, senv,
+            tableConfig, moduleManager, functionCatalog, catalogManager);
+    Planner planner = (Planner) pair.left;
+    Executor executor = (Executor) pair.right;
+
+    return new StreamTableEnvironmentImpl(catalogManager, moduleManager,
+            functionCatalog, tableConfig, senv, planner, executor, environmentSettings.isStreamingMode(), classLoader);
+  }
+
   @Override
   public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) {
     return new StreamExecutionEnvironmentFactory() {
diff --git a/flink/flink1.16-shims/pom.xml b/flink/flink1.16-shims/pom.xml
new file mode 100644
index 0000000000..756f64f571
--- /dev/null
+++ b/flink/flink1.16-shims/pom.xml
@@ -0,0 +1,207 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>flink-parent</artifactId>
+        <groupId>org.apache.zeppelin</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.zeppelin</groupId>
+    <artifactId>flink1.16-shims</artifactId>
+    <version>0.11.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+    <name>Zeppelin: Flink1.16 Shims</name>
+
+    <properties>
+        <flink.version>${flink1.16.version}</flink.version>
+        <flink.scala.binary.version>2.12</flink.scala.binary.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.zeppelin</groupId>
+            <artifactId>flink-shims</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-scala_${flink.scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-scala-bridge_${flink.scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala_${flink.scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-scala_${flink.scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-python</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-sql-client</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>eclipse-add-source</id>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile-first</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <scalaVersion>${flink.scala.version}</scalaVersion>
+                    <args>
+                        <arg>-unchecked</arg>
+                        <arg>-deprecation</arg>
+                        <arg>-feature</arg>
+                        <arg>-target:jvm-1.8</arg>
+                    </args>
+                    <jvmArgs>
+                        <jvmArg>-Xms1024m</jvmArg>
+                        <jvmArg>-Xmx1024m</jvmArg>
+                        <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg>
+                    </jvmArgs>
+                    <javacArgs>
+                        <javacArg>-source</javacArg>
+                        <javacArg>${java.version}</javacArg>
+                        <javacArg>-target</javacArg>
+                        <javacArg>${java.version}</javacArg>
+                        <javacArg>-Xlint:all,-serial,-path,-options</javacArg>
+                    </javacArgs>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy-interpreter-setting</id>
+                        <phase>none</phase>
+                        <configuration>
+                            <skip>true</skip>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116Shims.java
similarity index 68%
copy from flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java
copy to flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116Shims.java
index 98c250773d..b96e5f5e42 100644
--- a/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java
+++ b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116Shims.java
@@ -40,6 +40,9 @@ import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.client.resource.ClientResourceManager;
+import org.apache.flink.table.client.util.ClientClassloaderUtil;
+import org.apache.flink.table.client.util.ClientWrapperClassLoader;
 import org.apache.flink.table.delegation.Executor;
 import org.apache.flink.table.delegation.ExecutorFactory;
 import org.apache.flink.table.delegation.Planner;
@@ -51,11 +54,13 @@ import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.resource.ResourceManager;
 import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.FlinkException;
-import org.apache.zeppelin.flink.shims115.CollectStreamTableSink;
+import org.apache.zeppelin.flink.shims116.CollectStreamTableSink;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.slf4j.Logger;
@@ -65,6 +70,7 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
+import java.net.URL;
 import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.List;
@@ -72,30 +78,44 @@ import java.util.Properties;
 
 
 /**
- * Shims for flink 1.15
+ * Shims for flink 1.16
  */
-public class Flink115Shims extends FlinkShims {
+public class Flink116Shims extends FlinkShims {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(Flink115Shims.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(Flink116Shims.class);
 
-  private Flink115SqlInterpreter batchSqlInterpreter;
-  private Flink115SqlInterpreter streamSqlInterpreter;
+  private Flink116SqlInterpreter batchSqlInterpreter;
+  private Flink116SqlInterpreter streamSqlInterpreter;
 
-  public Flink115Shims(FlinkVersion flinkVersion, Properties properties) {
+  public Flink116Shims(FlinkVersion flinkVersion, Properties properties) {
     super(flinkVersion, properties);
   }
 
   public void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext) {
-    this.batchSqlInterpreter = new Flink115SqlInterpreter(flinkSqlContext, true);
+    this.batchSqlInterpreter = new Flink116SqlInterpreter(flinkSqlContext, true);
   }
 
   public void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext) {
-    this.streamSqlInterpreter = new Flink115SqlInterpreter(flinkSqlContext, false);
+    this.streamSqlInterpreter = new Flink116SqlInterpreter(flinkSqlContext, false);
   }
 
   @Override
-  public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager) {
-    return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager);
+  public Object createResourceManager(List<URL> jars, Object tableConfig) {
+    Configuration configuration = ((TableConfig) tableConfig).getConfiguration().clone();
+    ClientWrapperClassLoader userClassLoader =
+            new ClientWrapperClassLoader(
+                    ClientClassloaderUtil.buildUserClassLoader(
+                            jars,
+                            Thread.currentThread().getContextClassLoader(),
+                            new Configuration(configuration)),
+                    configuration);
+    return new ClientResourceManager(configuration, userClassLoader);
+  }
+
+  @Override
+  public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager, List<URL> jars) {
+    ResourceManager resourceManager = (ResourceManager) createResourceManager(jars, (TableConfig) tableConfig);
+    return new FunctionCatalog((TableConfig) tableConfig, resourceManager, (CatalogManager) catalogManager, (ModuleManager) moduleManager);
   }
 
   @Override
@@ -103,6 +123,62 @@ public class Flink115Shims extends FlinkShims {
     // do nothing
   }
 
+  @Override
+  public Object createScalaBlinkStreamTableEnvironment(Object environmentSettingsObj,
+                                                       Object senvObj,
+                                                       Object tableConfigObj,
+                                                       Object moduleManagerObj,
+                                                       Object functionCatalogObj,
+                                                       Object catalogManagerObj,
+                                                       List<URL> jars,
+                                                       ClassLoader classLoader) {
+    EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj;
+    StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj;
+    TableConfig tableConfig = (TableConfig) tableConfigObj;
+    ModuleManager moduleManager = (ModuleManager) moduleManagerObj;
+    FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj;
+    CatalogManager catalogManager = (CatalogManager) catalogManagerObj;
+    ImmutablePair<Object, Object> pair = createPlannerAndExecutor(
+            classLoader, environmentSettings, senv,
+            tableConfig, moduleManager, functionCatalog, catalogManager);
+    Planner planner = (Planner) pair.left;
+    Executor executor = (Executor) pair.right;
+
+    ResourceManager resourceManager = (ResourceManager) createResourceManager(jars, tableConfig);
+
+    return new org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl(catalogManager,
+            moduleManager, resourceManager,
+            functionCatalog, tableConfig, new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(senv),
+            planner, executor, environmentSettings.isStreamingMode());
+  }
+
+  @Override
+  public Object createJavaBlinkStreamTableEnvironment(Object environmentSettingsObj,
+                                                      Object senvObj,
+                                                      Object tableConfigObj,
+                                                      Object moduleManagerObj,
+                                                      Object functionCatalogObj,
+                                                      Object catalogManagerObj,
+                                                      List<URL> jars,
+                                                      ClassLoader classLoader) {
+    EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj;
+    StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj;
+    TableConfig tableConfig = (TableConfig) tableConfigObj;
+    ModuleManager moduleManager = (ModuleManager) moduleManagerObj;
+    FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj;
+    CatalogManager catalogManager = (CatalogManager) catalogManagerObj;
+    ImmutablePair<Object, Object> pair = createPlannerAndExecutor(
+            classLoader, environmentSettings, senv,
+            tableConfig, moduleManager, functionCatalog, catalogManager);
+    Planner planner = (Planner) pair.left;
+    Executor executor = (Executor) pair.right;
+
+    ResourceManager resourceManager = (ResourceManager) createResourceManager(jars, tableConfig);
+
+    return new StreamTableEnvironmentImpl(catalogManager, moduleManager, resourceManager,
+            functionCatalog, tableConfig, senv, planner, executor, environmentSettings.isStreamingMode());
+  }
+
   @Override
   public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) {
     return new StreamExecutionEnvironmentFactory() {
@@ -261,7 +337,11 @@ public class Flink115Shims extends FlinkShims {
 
   @Override
   public boolean isTimeIndicatorType(Object type) {
-    return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type);
+    if (type instanceof TimeIndicatorTypeInfo) {
+      return true;
+    } else {
+      return false;
+    }
   }
 
   private Object lookupExecutor(ClassLoader classLoader,
@@ -292,6 +372,7 @@ public class Flink115Shims extends FlinkShims {
     Executor executor = (Executor) lookupExecutor(classLoader, environmentSettings, sEnv);
     Planner planner = PlannerFactoryUtil.createPlanner(executor,
             (TableConfig) tableConfig,
+            Thread.currentThread().getContextClassLoader(),
             (ModuleManager) moduleManager,
             (CatalogManager) catalogManager,
             (FunctionCatalog) functionCatalog);
diff --git a/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116SqlInterpreter.java b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116SqlInterpreter.java
new file mode 100644
index 0000000000..e4f098cea7
--- /dev/null
+++ b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116SqlInterpreter.java
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.SqlParserException;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.*;
+import org.apache.flink.table.operations.command.HelpOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.operations.ddl.*;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.ZeppelinContext;
+import org.apache.zeppelin.interpreter.util.SqlSplitter;
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+
+public class Flink116SqlInterpreter {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(Flink116SqlInterpreter.class);
+  private static final String CMD_DESC_DELIMITER = "\t\t";
+
+  /**
+   * SQL Client HELP command helper class.
+   */
+  private static final class SQLCliCommandsDescriptions {
+    private int commandMaxLength;
+    private final Map<String, String> commandsDescriptions;
+
+    public SQLCliCommandsDescriptions() {
+      this.commandsDescriptions = new LinkedHashMap<>();
+      this.commandMaxLength = -1;
+    }
+
+    public SQLCliCommandsDescriptions commandDescription(String command, String description) {
+      Preconditions.checkState(
+              StringUtils.isNotBlank(command), "content of command must not be empty.");
+      Preconditions.checkState(
+              StringUtils.isNotBlank(description),
+              "content of command's description must not be empty.");
+      this.updateMaxCommandLength(command.length());
+      this.commandsDescriptions.put(command, description);
+      return this;
+    }
+
+    private void updateMaxCommandLength(int newLength) {
+      Preconditions.checkState(newLength > 0);
+      if (this.commandMaxLength < newLength) {
+        this.commandMaxLength = newLength;
+      }
+    }
+
+    public AttributedString build() {
+      AttributedStringBuilder attributedStringBuilder = new AttributedStringBuilder();
+      if (!this.commandsDescriptions.isEmpty()) {
+        this.commandsDescriptions.forEach(
+                (cmd, cmdDesc) -> {
+                  attributedStringBuilder
+                          .style(AttributedStyle.DEFAULT.bold())
+                          .append(
+                                  String.format(
+                                          String.format("%%-%ds", commandMaxLength), cmd))
+                          .append(CMD_DESC_DELIMITER)
+                          .style(AttributedStyle.DEFAULT)
+                          .append(cmdDesc)
+                          .append('\n');
+                });
+      }
+      return attributedStringBuilder.toAttributedString();
+    }
+  }
+
+  private static final AttributedString SQL_CLI_COMMANDS_DESCRIPTIONS =
+          new SQLCliCommandsDescriptions()
+                  .commandDescription("HELP", "Prints the available commands.")
+                  .commandDescription(
+                          "SET",
+                          "Sets a session configuration property. Syntax: \"SET '<key>'='<value>';\". Use \"SET;\" for listing all properties.")
+                  .commandDescription(
+                          "RESET",
+                          "Resets a session configuration property. Syntax: \"RESET '<key>';\". Use \"RESET;\" for reset all session properties.")
+                  .commandDescription(
+                          "INSERT INTO",
+                          "Inserts the results of a SQL SELECT query into a declared table sink.")
+                  .commandDescription(
+                          "INSERT OVERWRITE",
+                          "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.")
+                  .commandDescription(
+                          "SELECT", "Executes a SQL SELECT query on the Flink cluster.")
+                  .commandDescription(
+                          "EXPLAIN",
+                          "Describes the execution plan of a query or table with the given name.")
+                  .commandDescription(
+                          "BEGIN STATEMENT SET",
+                          "Begins a statement set. Syntax: \"BEGIN STATEMENT SET;\"")
+                  .commandDescription("END", "Ends a statement set. Syntax: \"END;\"")
+                  // (TODO) zjffdu, ADD/REMOVE/SHOW JAR
+                  .build();
+
+  // --------------------------------------------------------------------------------------------
+
+  public static final AttributedString MESSAGE_HELP =
+          new AttributedStringBuilder()
+                  .append("The following commands are available:\n\n")
+                  .append(SQL_CLI_COMMANDS_DESCRIPTIONS)
+                  .style(AttributedStyle.DEFAULT.underline())
+                  .append("\nHint")
+                  .style(AttributedStyle.DEFAULT)
+                  .append(
+                          ": Make sure that a statement ends with \";\" for finalizing (multi-line) statements.")
+                  // About Documentation Link.
+                  .style(AttributedStyle.DEFAULT)
+                  .append(
+                          "\nYou can also type any Flink SQL statement, please visit https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/overview/ for more details.")
+                  .toAttributedString();
+
+  private static final String MESSAGE_NO_STATEMENT_IN_STATEMENT_SET = "No statement in the statement set, skip submit.";
+
+  private FlinkSqlContext flinkSqlContext;
+  private TableEnvironment tbenv;
+  private ZeppelinContext z;
+  private Parser sqlParser;
+  private SqlSplitter sqlSplitter;
+  // paragraphId -> list of ModifyOperation, used for statement set in 2 syntax:
+  // 1. runAsOne= true
+  // 2. begin statement set;
+  //    ...
+  //    end;
+  private Map<String, List<ModifyOperation>> statementOperationsMap = new HashMap<>();
+  private boolean isBatch;
+  private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock();
+
+
+  public Flink116SqlInterpreter(FlinkSqlContext flinkSqlContext, boolean isBatch) {
+    this.flinkSqlContext = flinkSqlContext;
+    this.isBatch = isBatch;
+    if (isBatch) {
+      this.tbenv = (TableEnvironment) flinkSqlContext.getBtenv();
+    } else {
+      this.tbenv = (TableEnvironment) flinkSqlContext.getStenv();
+    }
+    this.z = (ZeppelinContext) flinkSqlContext.getZeppelinContext();
+    this.sqlParser = ((TableEnvironmentInternal) tbenv).getParser();
+    this.sqlSplitter = new SqlSplitter();
+    JobListener jobListener = new JobListener() {
+      @Override
+      public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
+        if (lock.isHeldByCurrentThread()) {
+          lock.unlock();
+          LOGGER.info("UnLock JobSubmitLock");
+        }
+      }
+
+      @Override
+      public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
+
+      }
+    };
+
+    ((ExecutionEnvironment) flinkSqlContext.getBenv()).registerJobListener(jobListener);
+    ((StreamExecutionEnvironment) flinkSqlContext.getSenv()).registerJobListener(jobListener);
+  }
+
+  public InterpreterResult runSqlList(String st, InterpreterContext context) {
+    try {
+      boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
+      if (runAsOne) {
+        statementOperationsMap.put(context.getParagraphId(), new ArrayList<>());
+      }
+
+      String jobName = context.getLocalProperties().get("jobName");
+      if (StringUtils.isNotBlank(jobName)) {
+        tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName);
+      }
+
+      List<String> sqls = sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList());
+      for (String sql : sqls) {
+        List<Operation> operations = null;
+        try {
+          operations = sqlParser.parse(sql);
+        } catch (SqlParserException e) {
+          context.out.write("%text Invalid Sql statement: " + sql + "\n");
+          context.out.write(MESSAGE_HELP.toString());
+          return new InterpreterResult(InterpreterResult.Code.ERROR, e.toString());
+        }
+
+        try {
+          callOperation(sql, operations.get(0), context);
+          context.out.flush();
+        } catch (Throwable e) {
+          LOGGER.error("Fail to run sql:" + sql, e);
+          try {
+            context.out.write("%text Fail to run sql command: " +
+                    sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n");
+          } catch (IOException ex) {
+            LOGGER.warn("Unexpected exception:", ex);
+            return new InterpreterResult(InterpreterResult.Code.ERROR,
+                    ExceptionUtils.getStackTrace(e));
+          }
+          return new InterpreterResult(InterpreterResult.Code.ERROR);
+        }
+      }
+
+      if (runAsOne && !statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()).isEmpty()) {
+        try {
+          lock.lock();
+          List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>());
+          if (!modifyOperations.isEmpty()) {
+            callInserts(modifyOperations, context);
+          }
+        } catch (Exception e) {
+          LOGGER.error("Fail to execute sql as one job", e);
+          return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
+        } finally {
+          if (lock.isHeldByCurrentThread()) {
+            lock.unlock();
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Fail to execute sql", e);
+      return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
+    } finally {
+      statementOperationsMap.remove(context.getParagraphId());
+    }
+
+    return new InterpreterResult(InterpreterResult.Code.SUCCESS);
+  }
+
+  private void callOperation(String sql, Operation operation, InterpreterContext context) throws IOException {
+    if (operation instanceof HelpOperation) {
+      // HELP
+      callHelp(context);
+    } else if (operation instanceof SetOperation) {
+      // SET
+      callSet((SetOperation) operation, context);
+    } else if (operation instanceof ModifyOperation) {
+      // INSERT INTO/OVERWRITE
+      callInsert((ModifyOperation) operation, context);
+    } else if (operation instanceof QueryOperation) {
+      // SELECT
+      callSelect(sql, (QueryOperation) operation, context);
+    } else if (operation instanceof ExplainOperation) {
+      // EXPLAIN
+      callExplain((ExplainOperation) operation, context);
+    } else if (operation instanceof BeginStatementSetOperation) {
+      // BEGIN STATEMENT SET
+      callBeginStatementSet(context);
+    } else if (operation instanceof EndStatementSetOperation) {
+      // END
+      callEndStatementSet(context);
+    } else if (operation instanceof ShowCreateTableOperation) {
+      // SHOW CREATE TABLE
+      callShowCreateTable((ShowCreateTableOperation) operation, context);
+    } else if (operation instanceof ShowCatalogsOperation) {
+      callShowCatalogs(context);
+    } else if (operation instanceof ShowCurrentCatalogOperation) {
+      callShowCurrentCatalog(context);
+    } else if (operation instanceof UseCatalogOperation) {
+      callUseCatalog(((UseCatalogOperation) operation).getCatalogName(), context);
+    } else if (operation instanceof CreateCatalogOperation) {
+      callDDL(sql, context, "Catalog has been created.");
+    } else if (operation instanceof DropCatalogOperation) {
+      callDDL(sql, context, "Catalog has been dropped.");
+    } else if (operation instanceof UseDatabaseOperation) {
+      UseDatabaseOperation useDBOperation = (UseDatabaseOperation) operation;
+      callUseDatabase(useDBOperation.getDatabaseName(), context);
+    } else if (operation instanceof CreateDatabaseOperation) {
+      callDDL(sql, context, "Database has been created.");
+    } else if (operation instanceof DropDatabaseOperation) {
+      callDDL(sql, context, "Database has been removed.");
+    } else if (operation instanceof AlterDatabaseOperation) {
+      callDDL(sql, context, "Alter database succeeded!");
+    } else if (operation instanceof ShowDatabasesOperation) {
+      callShowDatabases(context);
+    } else if (operation instanceof ShowCurrentDatabaseOperation) {
+      callShowCurrentDatabase(context);
+    } else if (operation instanceof CreateTableOperation || operation instanceof CreateTableASOperation) {
+      callDDL(sql, context, "Table has been created.");
+    } else if (operation instanceof AlterTableOperation) {
+      callDDL(sql, context, "Alter table succeeded!");
+    } else if (operation instanceof DropTableOperation) {
+      callDDL(sql, context, "Table has been dropped.");
+    } else if (operation instanceof DescribeTableOperation) {
+      DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation;
+      callDescribe(describeTableOperation.getSqlIdentifier().getObjectName(), context);
+    } else if (operation instanceof ShowTablesOperation) {
+      callShowTables(context);
+    } else if (operation instanceof CreateViewOperation) {
+      callDDL(sql, context, "View has been created.");
+    } else if (operation instanceof DropViewOperation) {
+      callDDL(sql, context, "View has been dropped.");
+    } else if (operation instanceof AlterViewOperation) {
+      callDDL(sql, context, "Alter view succeeded!");
+    } else if (operation instanceof CreateCatalogFunctionOperation || operation instanceof CreateTempSystemFunctionOperation) {
+      callDDL(sql, context, "Function has been created.");
+    } else if (operation instanceof DropCatalogFunctionOperation || operation instanceof DropTempSystemFunctionOperation) {
+      callDDL(sql, context, "Function has been removed.");
+    } else if (operation instanceof AlterCatalogFunctionOperation) {
+      callDDL(sql, context, "Alter function succeeded!");
+    } else if (operation instanceof ShowFunctionsOperation) {
+      callShowFunctions(context);
+    } else if (operation instanceof ShowModulesOperation) {
+      callShowModules(context);
+    } else if (operation instanceof ShowPartitionsOperation) {
+      ShowPartitionsOperation showPartitionsOperation = (ShowPartitionsOperation) operation;
+      callShowPartitions(showPartitionsOperation.asSummaryString(), context);
+    } else {
+      throw new IOException(operation.getClass().getName() + " is not supported");
+    }
+  }
+
+
+  private void callHelp(InterpreterContext context) throws IOException {
+    context.out.write(MESSAGE_HELP.toString() + "\n");
+  }
+
+  private void callInsert(ModifyOperation operation, InterpreterContext context) throws IOException {
+    if (statementOperationsMap.containsKey(context.getParagraphId())) {
+      List<ModifyOperation> modifyOperations = statementOperationsMap.get(context.getParagraphId());
+      modifyOperations.add(operation);
+    } else {
+      callInserts(Collections.singletonList(operation), context);
+    }
+  }
+
+  private void callInserts(List<ModifyOperation> operations, InterpreterContext context) throws IOException {
+    if (!isBatch) {
+      context.getLocalProperties().put("flink.streaming.insert_into", "true");
+    }
+    TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations);
+    checkState(tableResult.getJobClient().isPresent());
+    try {
+      tableResult.await();
+      JobClient jobClient = tableResult.getJobClient().get();
+      if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
+        context.out.write("Insertion successfully.\n");
+      } else {
+        throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString());
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Flink job is interrupted", e);
+    } catch (ExecutionException e) {
+      throw new IOException("Flink job is failed", e);
+    }
+  }
+
+  private void callShowCreateTable(ShowCreateTableOperation showCreateTableOperation, InterpreterContext context) throws IOException {
+    try {
+      lock.lock();
+      TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(showCreateTableOperation);
+      String explanation =
+              Objects.requireNonNull(tableResult.collect().next().getField(0)).toString();
+      context.out.write(explanation + "\n");
+    } finally {
+      if (lock.isHeldByCurrentThread()) {
+        lock.unlock();
+      }
+    }
+  }
+
+  private void callExplain(ExplainOperation explainOperation, InterpreterContext context) throws IOException {
+    try {
+      lock.lock();
+      TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(explainOperation);
+      String explanation =
+              Objects.requireNonNull(tableResult.collect().next().getField(0)).toString();
+      context.out.write(explanation + "\n");
+    } finally {
+      if (lock.isHeldByCurrentThread()) {
+        lock.unlock();
+      }
+    }
+  }
+
+  public void callSelect(String sql, QueryOperation queryOperation, InterpreterContext context) throws IOException {
+    try {
+      lock.lock();
+      if (isBatch) {
+        callBatchInnerSelect(sql, context);
+      } else {
+        callStreamInnerSelect(sql, context);
+      }
+    } finally {
+      if (lock.isHeldByCurrentThread()) {
+        lock.unlock();
+      }
+    }
+  }
+
+  public void callBatchInnerSelect(String sql, InterpreterContext context) throws IOException {
+    Table table = this.tbenv.sqlQuery(sql);
+    String result = z.showData(table);
+    context.out.write(result);
+  }
+
+  public void callStreamInnerSelect(String sql, InterpreterContext context) throws IOException {
+    flinkSqlContext.getStreamSqlSelectConsumer().accept(sql);
+  }
+
+  public void callSet(SetOperation setOperation, InterpreterContext context) throws IOException {
+    if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) {
+      // set a property
+      String key = setOperation.getKey().get().trim();
+      String value = setOperation.getValue().get().trim();
+      this.tbenv.getConfig().getConfiguration().setString(key, value);
+      LOGGER.info("Set table config: {}={}", key, value);
+    } else {
+      // show all properties
+      final Map<String, String> properties = this.tbenv.getConfig().getConfiguration().toMap();
+      List<String> prettyEntries = new ArrayList<>();
+      for (String key : properties.keySet()) {
+        prettyEntries.add(
+                String.format(
+                        "'%s' = '%s'",
+                        EncodingUtils.escapeSingleQuotes(key),
+                        EncodingUtils.escapeSingleQuotes(properties.get(key))));
+      }
+      prettyEntries.sort(String::compareTo);
+      prettyEntries.forEach(entry -> {
+        try {
+          context.out.write(entry + "\n");
+        } catch (IOException e) {
+          LOGGER.warn("Fail to write output", e);
+        }
+      });
+    }
+  }
+
+  private void callBeginStatementSet(InterpreterContext context) throws IOException {
+    statementOperationsMap.put(context.getParagraphId(), new ArrayList<>());
+  }
+
+  private void callEndStatementSet(InterpreterContext context) throws IOException {
+    List<ModifyOperation> modifyOperations = statementOperationsMap.get(context.getParagraphId());
+    if (modifyOperations != null && !modifyOperations.isEmpty()) {
+      callInserts(modifyOperations, context);
+    } else {
+      context.out.write(MESSAGE_NO_STATEMENT_IN_STATEMENT_SET);
+    }
+  }
+
+  private void callUseCatalog(String catalog, InterpreterContext context) throws IOException {
+    tbenv.executeSql("USE CATALOG `" + catalog + "`");
+  }
+
+  private void callUseDatabase(String databaseName,
+                               InterpreterContext context) throws IOException {
+    this.tbenv.executeSql("USE `" + databaseName + "`");
+  }
+
+  private void callShowCatalogs(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Catalogs");
+    List<String> catalogs = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+            .map(r -> checkNotNull(r.getField(0)).toString())
+            .collect(Collectors.toList());
+    context.out.write("%table catalog\n" + StringUtils.join(catalogs, "\n") + "\n");
+  }
+
+  private void callShowCurrentCatalog(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Current Catalog");
+    String catalog = tableResult.collect().next().getField(0).toString();
+    context.out.write("%text current catalog: " + catalog + "\n");
+  }
+
+  private void callShowDatabases(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Databases");
+    List<String> databases = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+            .map(r -> checkNotNull(r.getField(0)).toString())
+            .collect(Collectors.toList());
+    context.out.write(
+            "%table database\n" + StringUtils.join(databases, "\n") + "\n");
+  }
+
+  private void callShowCurrentDatabase(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Current Database");
+    String database = tableResult.collect().next().getField(0).toString();
+    context.out.write("%text current database: " + database + "\n");
+  }
+
+  private void callShowTables(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Tables");
+    List<String> tables = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+            .map(r -> checkNotNull(r.getField(0)).toString())
+            .filter(tbl -> !tbl.startsWith("UnnamedTable"))
+            .collect(Collectors.toList());
+    context.out.write(
+            "%table table\n" + StringUtils.join(tables, "\n") + "\n");
+  }
+
+  private void callShowFunctions(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Functions");
+    List<String> functions = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+            .map(r -> checkNotNull(r.getField(0)).toString())
+            .collect(Collectors.toList());
+    context.out.write(
+            "%table function\n" + StringUtils.join(functions, "\n") + "\n");
+  }
+
+  private void callShowModules(InterpreterContext context) throws IOException {
+    String[] modules = this.tbenv.listModules();
+    context.out.write("%table modules\n" + StringUtils.join(modules, "\n") + "\n");
+  }
+
+  private void callShowPartitions(String sql, InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql(sql);
+    List<String> partions = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+            .map(r -> checkNotNull(r.getField(0)).toString())
+            .collect(Collectors.toList());
+    context.out.write(
+            "%table partitions\n" + StringUtils.join(partions, "\n") + "\n");
+  }
+
+  private void callDDL(String sql, InterpreterContext context, String message) throws IOException {
+    try {
+      lock.lock();
+      this.tbenv.executeSql(sql);
+    } finally {
+      if (lock.isHeldByCurrentThread()) {
+        lock.unlock();
+      }
+    }
+    context.out.write(message + "\n");
+  }
+
+  private void callDescribe(String name, InterpreterContext context) throws IOException {
+    TableResult tableResult = null;
+    try {
+      tableResult = tbenv.executeSql("DESCRIBE " + name);
+    } catch (Exception e) {
+      throw new IOException("Fail to describe table: " + name, e);
+    }
+    CloseableIterator<Row> result = tableResult.collect();
+    StringBuilder builder = new StringBuilder();
+    builder.append("Column\tType\n");
+    while (result.hasNext()) {
+      Row row = result.next();
+      builder.append(row.getField(0) + "\t" + row.getField(1) + "\n");
+    }
+    context.out.write("%table\n" + builder.toString());
+  }
+}
diff --git a/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java
new file mode 100644
index 0000000000..a35ad3a6cd
--- /dev/null
+++ b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.*;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.StringUtils;
+
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static org.apache.zeppelin.flink.TimestampStringUtils.*;
+
+/**
+ * Copied from flink-project with minor modification.
+ * */
+public class PrintUtils {
+
+  public static final String NULL_COLUMN = "(NULL)";
+  private static final String COLUMN_TRUNCATED_FLAG = "...";
+
+  private PrintUtils() {}
+
+
+  public static String[] rowToString(
+          Row row, ResolvedSchema resolvedSchema, ZoneId sessionTimeZone) {
+    return rowToString(row, NULL_COLUMN, false, resolvedSchema, sessionTimeZone);
+  }
+
+  public static String[] rowToString(
+          Row row,
+          String nullColumn,
+          boolean printRowKind,
+          ResolvedSchema resolvedSchema,
+          ZoneId sessionTimeZone) {
+    final int len = printRowKind ? row.getArity() + 1 : row.getArity();
+    final List<String> fields = new ArrayList<>(len);
+    if (printRowKind) {
+      fields.add(row.getKind().shortString());
+    }
+    for (int i = 0; i < row.getArity(); i++) {
+      final Object field = row.getField(i);
+      final LogicalType fieldType =
+              resolvedSchema.getColumnDataTypes().get(i).getLogicalType();
+      if (field == null) {
+        fields.add(nullColumn);
+      } else {
+        fields.add(
+                StringUtils.arrayAwareToString(
+                        formattedTimestamp(field, fieldType, sessionTimeZone)));
+      }
+    }
+    return fields.toArray(new String[0]);
+  }
+
+  /**
+   * Normalizes field that contains TIMESTAMP, TIMESTAMP_LTZ and TIME type data.
+   *
+   * <p>This method also supports nested type ARRAY, ROW, MAP.
+   */
+  private static Object formattedTimestamp(
+          Object field, LogicalType fieldType, ZoneId sessionTimeZone) {
+    final LogicalTypeRoot typeRoot = fieldType.getTypeRoot();
+    if (field == null) {
+      return "null";
+    }
+    switch (typeRoot) {
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        return formatTimestampField(field, fieldType, sessionTimeZone);
+      case TIME_WITHOUT_TIME_ZONE:
+        return formatTimeField(field);
+      case ARRAY:
+        LogicalType elementType = ((ArrayType) fieldType).getElementType();
+        if (field instanceof List) {
+          List<?> array = (List<?>) field;
+          Object[] formattedArray = new Object[array.size()];
+          for (int i = 0; i < array.size(); i++) {
+            formattedArray[i] =
+                    formattedTimestamp(array.get(i), elementType, sessionTimeZone);
+          }
+          return formattedArray;
+        } else if (field.getClass().isArray()) {
+          // primitive type
+          if (field.getClass() == byte[].class) {
+            byte[] array = (byte[]) field;
+            Object[] formattedArray = new Object[array.length];
+            for (int i = 0; i < array.length; i++) {
+              formattedArray[i] =
+                      formattedTimestamp(array[i], elementType, sessionTimeZone);
+            }
+            return formattedArray;
+          } else if (field.getClass() == short[].class) {
+            short[] array = (short[]) field;
+            Object[] formattedArray = new Object[array.length];
+            for (int i = 0; i < array.length; i++) {
+              formattedArray[i] =
+                      formattedTimestamp(array[i], elementType, sessionTimeZone);
+            }
+            return formattedArray;
+          } else if (field.getClass() == int[].class) {
+            int[] array = (int[]) field;
+            Object[] formattedArray = new Object[array.length];
+            for (int i = 0; i < array.length; i++) {
+              formattedArray[i] =
+                      formattedTimestamp(array[i], elementType, sessionTimeZone);
+            }
+            return formattedArray;
+          } else if (field.getClass() == long[].class) {
+            long[] array = (long[]) field;
+            Object[] formattedArray = new Object[array.length];
+            for (int i = 0; i < array.length; i++) {
+              formattedArray[i] =
+                      formattedTimestamp(array[i], elementType, sessionTimeZone);
+            }
+            return formattedArray;
+          } else if (field.getClass() == float[].class) {
+            float[] array = (float[]) field;
+            Object[] formattedArray = new Object[array.length];
+            for (int i = 0; i < array.length; i++) {
+              formattedArray[i] =
+                      formattedTimestamp(array[i], elementType, sessionTimeZone);
+            }
+            return formattedArray;
+          } else if (field.getClass() == double[].class) {
+            double[] array = (double[]) field;
+            Object[] formattedArray = new Object[array.length];
+            for (int i = 0; i < array.length; i++) {
+              formattedArray[i] =
+                      formattedTimestamp(array[i], elementType, sessionTimeZone);
+            }
+            return formattedArray;
+          } else if (field.getClass() == boolean[].class) {
+            boolean[] array = (boolean[]) field;
+            Object[] formattedArray = new Object[array.length];
+            for (int i = 0; i < array.length; i++) {
+              formattedArray[i] =
+                      formattedTimestamp(array[i], elementType, sessionTimeZone);
+            }
+            return formattedArray;
+          } else if (field.getClass() == char[].class) {
+            char[] array = (char[]) field;
+            Object[] formattedArray = new Object[array.length];
+            for (int i = 0; i < array.length; i++) {
+              formattedArray[i] =
+                      formattedTimestamp(array[i], elementType, sessionTimeZone);
+            }
+            return formattedArray;
+          } else {
+            // non-primitive type
+            Object[] array = (Object[]) field;
+            Object[] formattedArray = new Object[array.length];
+            for (int i = 0; i < array.length; i++) {
+              formattedArray[i] =
+                      formattedTimestamp(array[i], elementType, sessionTimeZone);
+            }
+            return formattedArray;
+          }
+        } else {
+          return field;
+        }
+      case ROW:
+        if (fieldType instanceof RowType && field instanceof Row) {
+          Row row = (Row) field;
+          Row formattedRow = new Row(row.getKind(), row.getArity());
+          for (int i = 0; i < ((RowType) fieldType).getFields().size(); i++) {
+            LogicalType type = ((RowType) fieldType).getFields().get(i).getType();
+            formattedRow.setField(
+                    i, formattedTimestamp(row.getField(i), type, sessionTimeZone));
+          }
+          return formattedRow;
+
+        } else if (fieldType instanceof RowType && field instanceof RowData) {
+          RowData rowData = (RowData) field;
+          Row formattedRow = new Row(rowData.getRowKind(), rowData.getArity());
+          for (int i = 0; i < ((RowType) fieldType).getFields().size(); i++) {
+            LogicalType type = ((RowType) fieldType).getFields().get(i).getType();
+            RowData.FieldGetter fieldGetter = RowData.createFieldGetter(type, i);
+            formattedRow.setField(
+                    i,
+                    formattedTimestamp(
+                            fieldGetter.getFieldOrNull(rowData),
+                            type,
+                            sessionTimeZone));
+          }
+          return formattedRow;
+        } else {
+          return field;
+        }
+      case MAP:
+        LogicalType keyType = ((MapType) fieldType).getKeyType();
+        LogicalType valueType = ((MapType) fieldType).getValueType();
+        if (fieldType instanceof MapType && field instanceof Map) {
+          Map<Object, Object> map = ((Map) field);
+          Map<Object, Object> formattedMap = new HashMap<>(map.size());
+          for (Object key : map.keySet()) {
+            formattedMap.put(
+                    formattedTimestamp(key, keyType, sessionTimeZone),
+                    formattedTimestamp(map.get(key), valueType, sessionTimeZone));
+          }
+          return formattedMap;
+        } else if (fieldType instanceof MapType && field instanceof MapData) {
+          MapData map = ((MapData) field);
+          Map<Object, Object> formattedMap = new HashMap<>(map.size());
+          Object[] keyArray =
+                  (Object[]) formattedTimestamp(map.keyArray(), keyType, sessionTimeZone);
+          Object[] valueArray =
+                  (Object[])
+                          formattedTimestamp(
+                                  map.valueArray(), valueType, sessionTimeZone);
+          for (int i = 0; i < keyArray.length; i++) {
+            formattedMap.put(keyArray[i], valueArray[i]);
+          }
+          return formattedMap;
+        } else {
+          return field;
+        }
+      default:
+        return field;
+    }
+  }
+
+  /**
+   * Formats the print content of TIMESTAMP and TIMESTAMP_LTZ type data, consider the user
+   * configured time zone.
+   */
+  private static Object formatTimestampField(
+          Object timestampField, LogicalType fieldType, ZoneId sessionTimeZone) {
+    switch (fieldType.getTypeRoot()) {
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+        final int precision = getPrecision(fieldType);
+        if (timestampField instanceof java.sql.Timestamp) {
+          // conversion between java.sql.Timestamp and TIMESTAMP_WITHOUT_TIME_ZONE
+          return timestampToString(
+                  ((Timestamp) timestampField).toLocalDateTime(), precision);
+        } else if (timestampField instanceof java.time.LocalDateTime) {
+          return timestampToString(((LocalDateTime) timestampField), precision);
+        } else if (timestampField instanceof TimestampData) {
+          return timestampToString(
+                  ((TimestampData) timestampField).toLocalDateTime(), precision);
+        } else {
+          return timestampField;
+        }
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        Instant instant = null;
+        if (timestampField instanceof java.time.Instant) {
+          instant = ((Instant) timestampField);
+        } else if (timestampField instanceof java.sql.Timestamp) {
+          Timestamp timestamp = ((Timestamp) timestampField);
+          // conversion between java.sql.Timestamp and TIMESTAMP_WITH_LOCAL_TIME_ZONE
+          instant =
+                  TimestampData.fromEpochMillis(
+                                  timestamp.getTime(), timestamp.getNanos() % 1000_000)
+                          .toInstant();
+        } else if (timestampField instanceof TimestampData) {
+          instant = ((TimestampData) timestampField).toInstant();
+        } else if (timestampField instanceof Integer) {
+          instant = Instant.ofEpochSecond((Integer) timestampField);
+        } else if (timestampField instanceof Long) {
+          instant = Instant.ofEpochMilli((Long) timestampField);
+        }
+        if (instant != null) {
+          return timestampToString(
+                  instant.atZone(sessionTimeZone).toLocalDateTime(),
+                  getPrecision(fieldType));
+        } else {
+          return timestampField;
+        }
+      default:
+        return timestampField;
+    }
+  }
+
+  /** Formats the print content of TIME type data. */
+  private static Object formatTimeField(Object timeField) {
+    if (timeField.getClass().isAssignableFrom(int.class) || timeField instanceof Integer) {
+      return unixTimeToString((int) timeField);
+    } else if (timeField.getClass().isAssignableFrom(long.class) || timeField instanceof Long) {
+      return unixTimeToString(((Long) timeField).intValue());
+    } else if (timeField instanceof Time) {
+      return unixTimeToString(timeToInternal((Time) timeField));
+    } else if (timeField instanceof LocalTime) {
+      return unixTimeToString(localTimeToUnixDate((LocalTime) timeField));
+    } else {
+      return timeField;
+    }
+  }
+}
diff --git a/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java
new file mode 100644
index 0000000000..c52104e45a
--- /dev/null
+++ b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import java.sql.Time;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.TimeZone;
+
+/**
+ * Copied from flink-project with minor modification.
+ * */
+public class TimestampStringUtils {
+
+  private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+  public TimestampStringUtils() {
+  }
+
+  public static String timestampToString(LocalDateTime ldt, int precision) {
+    String fraction;
+    for(fraction = pad(9, (long)ldt.getNano()); fraction.length() > precision && fraction.endsWith("0"); fraction = fraction.substring(0, fraction.length() - 1)) {
+    }
+
+    StringBuilder ymdhms = ymdhms(new StringBuilder(), ldt.getYear(), ldt.getMonthValue(), ldt.getDayOfMonth(), ldt.getHour(), ldt.getMinute(), ldt.getSecond());
+    if (fraction.length() > 0) {
+      ymdhms.append(".").append(fraction);
+    }
+
+    return ymdhms.toString();
+  }
+
+  private static String pad(int length, long v) {
+    StringBuilder s = new StringBuilder(Long.toString(v));
+
+    while(s.length() < length) {
+      s.insert(0, "0");
+    }
+
+    return s.toString();
+  }
+
+  private static StringBuilder hms(StringBuilder b, int h, int m, int s) {
+    int2(b, h);
+    b.append(':');
+    int2(b, m);
+    b.append(':');
+    int2(b, s);
+    return b;
+  }
+
+  private static StringBuilder ymdhms(StringBuilder b, int year, int month, int day, int h, int m, int s) {
+    ymd(b, year, month, day);
+    b.append(' ');
+    hms(b, h, m, s);
+    return b;
+  }
+
+  private static StringBuilder ymd(StringBuilder b, int year, int month, int day) {
+    int4(b, year);
+    b.append('-');
+    int2(b, month);
+    b.append('-');
+    int2(b, day);
+    return b;
+  }
+
+  private static void int4(StringBuilder buf, int i) {
+    buf.append((char)(48 + i / 1000 % 10));
+    buf.append((char)(48 + i / 100 % 10));
+    buf.append((char)(48 + i / 10 % 10));
+    buf.append((char)(48 + i % 10));
+  }
+
+  private static void int2(StringBuilder buf, int i) {
+    buf.append((char)(48 + i / 10 % 10));
+    buf.append((char)(48 + i % 10));
+  }
+
+  public static String unixTimeToString(int time) {
+    StringBuilder buf = new StringBuilder(8);
+    unixTimeToString(buf, time, 0);
+    return buf.toString();
+  }
+
+  private static void unixTimeToString(StringBuilder buf, int time, int precision) {
+    while(time < 0) {
+      time = (int)((long)time + 86400000L);
+    }
+
+    int h = time / 3600000;
+    int time2 = time % 3600000;
+    int m = time2 / '\uea60';
+    int time3 = time2 % '\uea60';
+    int s = time3 / 1000;
+    int ms = time3 % 1000;
+    int2(buf, h);
+    buf.append(':');
+    int2(buf, m);
+    buf.append(':');
+    int2(buf, s);
+    if (precision > 0) {
+      buf.append('.');
+
+      while(precision > 0) {
+        buf.append((char)(48 + ms / 100));
+        ms %= 100;
+        ms *= 10;
+        if (ms == 0) {
+          break;
+        }
+
+        --precision;
+      }
+    }
+
+  }
+
+  public static int timeToInternal(Time time) {
+    long ts = time.getTime() + (long)LOCAL_TZ.getOffset(time.getTime());
+    return (int)(ts % 86400000L);
+  }
+
+  public static int localTimeToUnixDate(LocalTime time) {
+    return time.getHour() * 3600000 + time.getMinute() * '\uea60' + time.getSecond() * 1000 + time.getNano() / 1000000;
+  }
+}
diff --git a/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/shims116/CollectStreamTableSink.java b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/shims116/CollectStreamTableSink.java
new file mode 100644
index 0000000000..cf7968e7e6
--- /dev/null
+++ b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/shims116/CollectStreamTableSink.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.shims116;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.experimental.CollectSink;
+import org.apache.flink.table.sinks.RetractStreamTableSink;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.UUID;
+
+/**
+ * Table sink for collecting the results locally using sockets.
+ */
+public class CollectStreamTableSink implements RetractStreamTableSink<Row> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(CollectStreamTableSink.class);
+
+  private final InetAddress targetAddress;
+  private final int targetPort;
+  private final TypeSerializer<Tuple2<Boolean, Row>> serializer;
+
+  private String[] fieldNames;
+  private TypeInformation<?>[] fieldTypes;
+
+  public CollectStreamTableSink(InetAddress targetAddress,
+                                int targetPort,
+                                TypeSerializer<Tuple2<Boolean, Row>> serializer) {
+    LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + targetPort);
+    this.targetAddress = targetAddress;
+    this.targetPort = targetPort;
+    this.serializer = serializer;
+  }
+
+  @Override
+  public String[] getFieldNames() {
+    return fieldNames;
+  }
+
+  @Override
+  public TypeInformation<?>[] getFieldTypes() {
+    return fieldTypes;
+  }
+
+  @Override
+  public CollectStreamTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+    final CollectStreamTableSink copy =
+            new CollectStreamTableSink(targetAddress, targetPort, serializer);
+    copy.fieldNames = fieldNames;
+    copy.fieldTypes = fieldTypes;
+    return copy;
+  }
+
+  @Override
+  public TypeInformation<Row> getRecordType() {
+    return Types.ROW_NAMED(fieldNames, fieldTypes);
+  }
+
+  @Override
+  public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> stream) {
+    // add sink
+    return stream
+            .addSink(new CollectSink<>(targetAddress, targetPort, serializer))
+            .name("Zeppelin Flink Sql Stream Collect Sink " + UUID.randomUUID())
+            .setParallelism(1);
+  }
+
+  @Override
+  public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
+    return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
+  }
+}
diff --git a/flink/pom.xml b/flink/pom.xml
index d432293e2f..5ac374ce37 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -40,6 +40,7 @@
         <module>flink1.13-shims</module>
         <module>flink1.14-shims</module>
         <module>flink1.15-shims</module>
+        <module>flink1.16-shims</module>
     </modules>
 
     <properties>
@@ -47,12 +48,21 @@
         <flink1.13.version>1.13.2</flink1.13.version>
         <flink1.14.version>1.14.0</flink1.14.version>
         <flink1.15.version>1.15.1</flink1.15.version>
+        <flink1.16.version>1.16.0</flink1.16.version>
 
         <flink.scala.version>2.11.12</flink.scala.version>
         <flink.scala.binary.version>2.11</flink.scala.binary.version>
     </properties>
 
     <profiles>
+        <profile>
+            <id>flink-116</id>
+            <!-- Flink 1.16 only support scala 2.12-->
+            <modules>
+                <module>flink-scala-2.12</module>
+            </modules>
+        </profile>
+
         <profile>
             <id>flink-115</id>
             <!-- Flink 1.15 only support scala 2.12-->
diff --git a/testing/env_python_3_with_flink_116.yml b/testing/env_python_3_with_flink_116.yml
new file mode 100644
index 0000000000..9b35aba642
--- /dev/null
+++ b/testing/env_python_3_with_flink_116.yml
@@ -0,0 +1,29 @@
+name: python_3_with_flink
+channels:
+  - conda-forge
+  - defaults
+dependencies:
+  - pycodestyle
+  - scipy
+  - numpy=1.19.5
+  - grpcio
+  - protobuf
+  - pandasql
+  - ipython
+  - ipython_genutils
+  - ipykernel
+  - jupyter_client=5
+  - hvplot
+  - plotnine
+  - seaborn
+  - intake
+  - intake-parquet
+  - intake-xarray
+  - altair
+  - vega_datasets
+  - plotly
+  - jinja2=3.0.3
+  - pip
+  - pip:
+      - apache-flink==1.16.0
+