You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2021/10/05 23:52:11 UTC

[zeppelin] branch master updated: [ZEPPELIN-5469] Support Flink 1.14

This is an automated email from the ASF dual-hosted git repository.

jongyoul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e63b4f  [ZEPPELIN-5469] Support Flink 1.14
1e63b4f is described below

commit 1e63b4ff8d3476391a6db5e28ba59b0fe460c8e2
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Mon Jul 26 18:29:32 2021 +0800

    [ZEPPELIN-5469] Support Flink 1.14
    
    ### What is this PR for?
    
    This PR is to add support of Flink 1.14. main changes:
    * Add new module flink114-shims
    * Add new profile flink-114. (flink 1.14 change its module, so we move some flink dependency into profile. e.g. there's no module `flink-runtime_${flink.scala.binary.version}` in flink-1.14, it changes to `flink-runtime`
    * flink planner is not supported in flink-1.14, so there's no `bt_env2` and `st_env2` in flink-1.14
    
    ### What type of PR is it?
    [ Feature ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5469
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #4195 from zjffdu/ZEPPELIN-5469 and squashes the following commits:
    
    465b8778b [Jeff Zhang] [ZEPPELIN-5469] Support Flink 1.14
---
 .github/workflows/core.yml                         |   2 +-
 flink/flink-scala-parent/pom.xml                   | 190 +++++++++++++++++---
 .../apache/zeppelin/flink/IPyFlinkInterpreter.java |   4 +
 .../apache/zeppelin/flink/PyFlinkInterpreter.java  |   4 +
 .../org/apache/zeppelin/flink/TableEnvFactory.java | 112 ++++--------
 .../flink/YarnApplicationStreamEnvironment.java    |  19 +-
 .../internal/ScalaShellStreamEnvironment.java      |  31 +++-
 .../zeppelin/flink/sql/AbstractStreamSqlJob.java   |   7 +-
 .../zeppelin/flink/sql/AppendStreamSqlJob.java     |  33 ++--
 .../src/main/resources/python/zeppelin_ipyflink.py |  12 +-
 .../src/main/resources/python/zeppelin_pyflink.py  |  13 +-
 .../zeppelin/flink/FlinkScalaInterpreter.scala     |  30 ++--
 .../zeppelin/flink/FlinkZeppelinContext.scala      |  14 +-
 .../zeppelin/flink/internal/FlinkILoop.scala       |   1 +
 .../flink/FlinkBatchSqlInterpreterTest.java        |  24 +--
 .../zeppelin/flink/FlinkInterpreterTest.java       |  11 +-
 .../zeppelin/flink/IPyFlinkInterpreterTest.java    |  29 ++-
 .../zeppelin/flink/PyFlinkInterpreterTest.java     |  18 +-
 .../java/org/apache/zeppelin/flink/FlinkShims.java |  22 ++-
 .../org/apache/zeppelin/flink/FlinkVersion.java    |   4 +
 .../org/apache/zeppelin/flink/Flink110Shims.java   |  55 +++++-
 .../org/apache/zeppelin/flink/Flink111Shims.java   |  56 +++++-
 .../org/apache/zeppelin/flink/Flink112Shims.java   |  55 +++++-
 .../org/apache/zeppelin/flink/Flink113Shims.java   |  54 +++++-
 flink/flink1.14-shims/pom.xml                      | 199 +++++++++++++++++++++
 .../org/apache/zeppelin/flink/Flink114Shims.java}  |  72 ++++++--
 .../flink/shims114/CollectStreamTableSink.java     |  97 ++++++++++
 flink/pom.xml                                      |   2 +
 testing/env_python_3_with_flink_114.yml            |  27 +++
 .../integration/FlinkIntegrationTest114.java       |  40 +++++
 30 files changed, 1023 insertions(+), 214 deletions(-)

diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
index f65e832..1777c29 100644
--- a/.github/workflows/core.yml
+++ b/.github/workflows/core.yml
@@ -148,7 +148,7 @@ jobs:
     strategy:
       fail-fast: false
       matrix:
-        flink: [110, 111, 112, 113]
+        flink: [110, 111, 112, 113, 114]
     steps:
       - name: Checkout
         uses: actions/checkout@v2
diff --git a/flink/flink-scala-parent/pom.xml b/flink/flink-scala-parent/pom.xml
index d970540..b51ed70 100644
--- a/flink/flink-scala-parent/pom.xml
+++ b/flink/flink-scala-parent/pom.xml
@@ -80,6 +80,12 @@
 
     <dependency>
       <groupId>org.apache.zeppelin</groupId>
+      <artifactId>flink1.14-shims</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
       <artifactId>zeppelin-python</artifactId>
       <version>${project.version}</version>
       <exclusions>
@@ -160,13 +166,6 @@
 
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId>
-      <version>${flink.version}</version>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
       <artifactId>flink-yarn_${flink.scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <scope>provided</scope>
@@ -206,6 +205,13 @@
 
     <dependency>
       <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
       <artifactId>flink-scala_${flink.scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <scope>provided</scope>
@@ -321,26 +327,6 @@
     </dependency>
 
     <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
-      <version>${flink.version}</version>
-      <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.reflections</groupId>
-          <artifactId>reflections</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
-      <version>${flink.version}</version>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <version>${flink.hadoop.version}</version>
@@ -624,7 +610,6 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
-
   </dependencies>
 
   <build>
@@ -950,6 +935,38 @@
       <properties>
         <flink.version>${flink1.10.version}</flink.version>
       </properties>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>org.reflections</groupId>
+              <artifactId>reflections</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+      </dependencies>
     </profile>
 
     <profile>
@@ -957,6 +974,38 @@
       <properties>
         <flink.version>${flink1.11.version}</flink.version>
       </properties>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>org.reflections</groupId>
+              <artifactId>reflections</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+      </dependencies>
     </profile>
 
     <profile>
@@ -964,6 +1013,38 @@
       <properties>
         <flink.version>${flink1.12.version}</flink.version>
       </properties>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>org.reflections</groupId>
+              <artifactId>reflections</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+      </dependencies>
     </profile>
 
     <profile>
@@ -971,6 +1052,59 @@
       <properties>
         <flink.version>${flink1.13.version}</flink.version>
       </properties>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>org.reflections</groupId>
+              <artifactId>reflections</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+      </dependencies>
+    </profile>
+
+    <profile>
+      <id>flink-114</id>
+      <properties>
+        <flink.version>${flink1.14.version}</flink.version>
+      </properties>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-runtime</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+      </dependencies>
     </profile>
 
     <profile>
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
index 763795f..25ee255 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
@@ -61,6 +61,10 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
     opened = true;
   }
 
+  public boolean isAfterFlink114() {
+    return flinkInterpreter.getFlinkVersion().isAfterFlink114();
+  }
+
   @Override
   public ZeppelinContext buildZeppelinContext() {
     return flinkInterpreter.getZeppelinContext();
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
index d27f0fa..4162cc3 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
@@ -186,6 +186,10 @@ public class PyFlinkInterpreter extends PythonInterpreter {
     return flinkInterpreter.getFlinkVersion().isFlink110();
   }
 
+  public boolean isAfterFlink114() {
+    return flinkInterpreter.getFlinkVersion().isAfterFlink114();
+  }
+
   public org.apache.flink.api.java.ExecutionEnvironment getJavaExecutionEnvironment() {
     return flinkInterpreter.getExecutionEnvironment().getJavaEnv();
   }
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
index 46ad481..6c080fd 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
@@ -18,6 +18,7 @@
 package org.apache.zeppelin.flink;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
@@ -27,17 +28,12 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.delegation.Executor;
-import org.apache.flink.table.delegation.ExecutorFactory;
 import org.apache.flink.table.delegation.Planner;
-import org.apache.flink.table.delegation.PlannerFactory;
-import org.apache.flink.table.factories.ComponentFactoryService;
 import org.apache.flink.table.module.ModuleManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
-import java.util.Map;
 
 /**
  * Factory class for creating flink table env for different purpose:
@@ -52,7 +48,6 @@ public class TableEnvFactory {
 
   private FlinkVersion flinkVersion;
   private FlinkShims flinkShims;
-  private Executor executor;
   private org.apache.flink.api.scala.ExecutionEnvironment benv;
   private org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv;
 
@@ -133,17 +128,11 @@ public class TableEnvFactory {
 
   public TableEnvironment createScalaFlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) {
     try {
-      Map<String, String> executorProperties = settings.toExecutorProperties();
-      Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
-
-      Map<String, String> plannerProperties = settings.toPlannerProperties();
-      Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
-              .create(
-                      plannerProperties,
-                      executor,
-                      streamTableConfig,
-                      oldPlannerFunctionCatalog,
-                      catalogManager);
+      ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor(
+              classLoader, settings, senv.getJavaEnv(),
+              oldPlannerStreamTableConfig, functionCatalog, catalogManager);
+      Planner planner = (Planner) pair.left;
+      Executor executor = (Executor) pair.right;
 
       Class clazz = null;
       if (flinkVersion.isFlink110()) {
@@ -231,15 +220,14 @@ public class TableEnvFactory {
     }
   }
 
-  public TableEnvironment createJavaFlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) {
-
+  public TableEnvironment createJavaFlinkStreamTableEnvironment(EnvironmentSettings settings,
+                                                                ClassLoader classLoader) {
     try {
-      Map<String, String> executorProperties = settings.toExecutorProperties();
-      Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
-
-      Map<String, String> plannerProperties = settings.toPlannerProperties();
-      Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
-              .create(plannerProperties, executor, streamTableConfig, oldPlannerFunctionCatalog, catalogManager);
+      ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor(
+              classLoader, settings, senv.getJavaEnv(),
+              oldPlannerBatchTableConfig, functionCatalog, catalogManager);
+      Planner planner = (Planner) pair.left;
+      Executor executor = (Executor) pair.right;
 
       Class clazz = null;
       if (flinkVersion.isFlink110()) {
@@ -303,18 +291,11 @@ public class TableEnvFactory {
   public TableEnvironment createScalaBlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) {
 
     try {
-      Map<String, String> executorProperties = settings.toExecutorProperties();
-      Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
-
-      Map<String, String> plannerProperties = settings.toPlannerProperties();
-      Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
-              .create(
-                      plannerProperties,
-                      executor,
-                      streamTableConfig,
-                      functionCatalog,
-                      catalogManager);
-
+      ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor(
+              classLoader, settings, senv.getJavaEnv(),
+              streamTableConfig, functionCatalog, catalogManager);
+      Planner planner = (Planner) pair.left;
+      Executor executor = (Executor) pair.right;
 
       Class clazz = null;
       if (flinkVersion.isFlink110()) {
@@ -372,14 +353,12 @@ public class TableEnvFactory {
   }
 
   public TableEnvironment createJavaBlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) {
-
     try {
-      Map<String, String> executorProperties = settings.toExecutorProperties();
-      Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
-
-      Map<String, String> plannerProperties = settings.toPlannerProperties();
-      Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
-              .create(plannerProperties, executor, streamTableConfig, functionCatalog, catalogManager);
+      ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor(
+              classLoader, settings, senv.getJavaEnv(),
+              streamTableConfig, functionCatalog, catalogManager);
+      Planner planner = (Planner) pair.left;
+      Executor executor = (Executor) pair.right;
 
       Class clazz = null;
       if (flinkVersion.isFlink110()) {
@@ -439,11 +418,11 @@ public class TableEnvFactory {
   public TableEnvironment createJavaBlinkBatchTableEnvironment(
           EnvironmentSettings settings, ClassLoader classLoader) {
     try {
-      final Map<String, String> executorProperties = settings.toExecutorProperties();
-      executor = lookupExecutor(executorProperties, senv.getJavaEnv());
-      final Map<String, String> plannerProperties = settings.toPlannerProperties();
-      final Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
-              .create(plannerProperties, executor, batchTableConfig, functionCatalog, catalogManager);
+      ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor(
+              classLoader, settings, senv.getJavaEnv(),
+              batchTableConfig, functionCatalog, catalogManager);
+      Planner planner = (Planner) pair.left;
+      Executor executor = (Executor) pair.right;
 
       Class clazz = null;
       if (flinkVersion.isFlink110()) {
@@ -501,38 +480,11 @@ public class TableEnvFactory {
     }
   }
 
-
   public void createStreamPlanner(EnvironmentSettings settings) {
-    Map<String, String> executorProperties = settings.toExecutorProperties();
-    Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
-
-    Map<String, String> plannerProperties = settings.toPlannerProperties();
-    Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
-            .create(
-                    plannerProperties,
-                    executor,
-                    streamTableConfig,
-                    functionCatalog,
-                    catalogManager);
+    ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor(
+            Thread.currentThread().getContextClassLoader(), settings, senv.getJavaEnv(),
+            streamTableConfig, functionCatalog, catalogManager);
+    Planner planner = (Planner) pair.left;
     this.flinkShims.setCatalogManagerSchemaResolver(catalogManager, planner.getParser(), settings);
   }
-
-  private static Executor lookupExecutor(
-          Map<String, String> executorProperties,
-          StreamExecutionEnvironment executionEnvironment) {
-    try {
-      ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
-      Method createMethod = executorFactory.getClass()
-              .getMethod("create", Map.class, StreamExecutionEnvironment.class);
-
-      return (Executor) createMethod.invoke(
-              executorFactory,
-              executorProperties,
-              executionEnvironment);
-    } catch (Exception e) {
-      throw new TableException(
-              "Could not instantiate the executor. Make sure a planner module is on the classpath",
-              e);
-    }
-  }
 }
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java
index ca9089f..7f2fc92 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.lang.reflect.Field;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.ArrayList;
@@ -72,7 +73,7 @@ public class YarnApplicationStreamEnvironment extends StreamExecutionEnvironment
   }
 
   private void updateDependencies() throws Exception {
-    final Configuration configuration = getConfiguration();
+    final Configuration configuration = (Configuration) getFlinkConfiguration();
     checkState(
             configuration.getBoolean(DeploymentOptions.ATTACHED),
             "Only ATTACHED mode is supported by the scala shell.");
@@ -82,6 +83,22 @@ public class YarnApplicationStreamEnvironment extends StreamExecutionEnvironment
             configuration, PipelineOptions.JARS, updatedJarFiles, URL::toString);
   }
 
+  public Object getFlinkConfiguration() {
+    if (flinkScalaInterpreter.getFlinkVersion().isAfterFlink114()) {
+      // starting from Flink 1.14, getConfiguration() return the readonly copy of internal
+      // configuration, so we need to get the internal configuration object via reflection.
+      try {
+        Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration");
+        configurationField.setAccessible(true);
+        return configurationField.get(this);
+      } catch (Exception e) {
+        throw new RuntimeException("Fail to get configuration from StreamExecutionEnvironment", e);
+      }
+    } else {
+      return super.getConfiguration();
+    }
+  }
+
   private List<URL> getUpdatedJarFiles() throws MalformedURLException {
     final URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
     final List<URL> allJarFiles = new ArrayList<>();
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java
index 448c34b..6bba854 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java
@@ -19,24 +19,22 @@
 package org.apache.zeppelin.flink.internal;
 
 
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.util.JarUtils;
+import org.apache.zeppelin.flink.FlinkVersion;
 
+import java.lang.reflect.Field;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * This class is copied from flink project, the reason is that flink scala shell only supports
@@ -54,12 +52,17 @@ public class ScalaShellStreamEnvironment extends StreamExecutionEnvironment {
    */
   private final FlinkILoop flinkILoop;
 
+  private final FlinkVersion flinkVersion;
+
+
   public ScalaShellStreamEnvironment(
           final Configuration configuration,
           final FlinkILoop flinkILoop,
+          final FlinkVersion flinkVersion,
           final String... jarFiles) {
     super(configuration);
     this.flinkILoop = checkNotNull(flinkILoop);
+    this.flinkVersion = checkNotNull(flinkVersion);
     this.jarFiles = checkNotNull(JarUtils.getJarFiles(jarFiles));
   }
 
@@ -70,16 +73,28 @@ public class ScalaShellStreamEnvironment extends StreamExecutionEnvironment {
   }
 
   private void updateDependencies() throws Exception {
-    final Configuration configuration = getConfiguration();
     final List<URL> updatedJarFiles = getUpdatedJarFiles();
     ConfigUtils.encodeCollectionToConfig(
-            configuration, PipelineOptions.JARS, updatedJarFiles, URL::toString);
+            (Configuration) getFlinkConfiguration(), PipelineOptions.JARS, updatedJarFiles, URL::toString);
   }
 
-  public Configuration getClientConfiguration() {
-    return getConfiguration();
+  public Object getFlinkConfiguration() {
+    if (flinkVersion.isAfterFlink114()) {
+      // starting from Flink 1.14, getConfiguration() return the readonly copy of internal
+      // configuration, so we need to get the internal configuration object via reflection.
+      try {
+        Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration");
+        configurationField.setAccessible(true);
+        return configurationField.get(this);
+      } catch (Exception e) {
+        throw new RuntimeException("Fail to get configuration from StreamExecutionEnvironment", e);
+      }
+    } else {
+      return super.getConfiguration();
+    }
   }
 
+
   private List<URL> getUpdatedJarFiles() throws MalformedURLException {
     final URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
     final List<URL> allJarFiles = new ArrayList<>(jarFiles);
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
index 1fb7832..8288615 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
@@ -29,7 +29,6 @@ import org.apache.flink.streaming.experimental.SocketStreamIterator;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.calcite.FlinkTypeFactory;
 import org.apache.flink.table.sinks.RetractStreamTableSink;
 import org.apache.flink.types.Row;
 import org.apache.zeppelin.flink.FlinkShims;
@@ -86,12 +85,12 @@ public abstract class AbstractStreamSqlJob {
     this.flinkShims = flinkShims;
   }
 
-  private static TableSchema removeTimeAttributes(TableSchema schema) {
+  private static TableSchema removeTimeAttributes(FlinkShims flinkShims, TableSchema schema) {
     final TableSchema.Builder builder = TableSchema.builder();
     for (int i = 0; i < schema.getFieldCount(); i++) {
       final TypeInformation<?> type = schema.getFieldTypes()[i];
       final TypeInformation<?> convertedType;
-      if (FlinkTypeFactory.isTimeIndicatorType(type)) {
+      if (flinkShims.isTimeIndicatorType(type)) {
         convertedType = Types.SQL_TIMESTAMP;
       } else {
         convertedType = type;
@@ -115,7 +114,7 @@ public abstract class AbstractStreamSqlJob {
       this.table = table;
       int parallelism = Integer.parseInt(context.getLocalProperties()
               .getOrDefault("parallelism", defaultParallelism + ""));
-      this.schema = removeTimeAttributes(table.getSchema());
+      this.schema = removeTimeAttributes(flinkShims, table.getSchema());
       checkTableSchema(schema);
 
       LOGGER.info("ResultTable Schema: " + this.schema);
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
index 7f636f3..c6791ea 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
@@ -31,6 +31,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.sql.Timestamp;
+import java.time.temporal.TemporalField;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -98,16 +100,27 @@ public class AppendStreamSqlJob extends AbstractStreamSqlJob {
     });
 
     if (materializedTable.size() != 0) {
-      long maxTimestamp =
-              ((java.sql.Timestamp) materializedTable.get(materializedTable.size() - 1)
-                      .getField(0)).getTime();
-
-      materializedTable = materializedTable.stream()
-              .filter(row -> ((java.sql.Timestamp) row.getField(0)).getTime() >
-                      maxTimestamp - tsWindowThreshold)
-              .collect(Collectors.toList());
-
-      builder.append(tableToString(materializedTable));
+      // Timestamp type before/after Flink 1.14 has changed.
+      if (flinkShims.getFlinkVersion().isAfterFlink114()) {
+        java.time.LocalDateTime ldt = ((java.time.LocalDateTime) materializedTable
+                .get(materializedTable.size() - 1)
+                .getField(0));
+        final long maxTimestamp = Timestamp.valueOf(ldt).getTime();
+        materializedTable = materializedTable.stream()
+                .filter(row -> Timestamp.valueOf(((java.time.LocalDateTime) row.getField(0))).getTime() >
+                        maxTimestamp - tsWindowThreshold)
+                .collect(Collectors.toList());
+        builder.append(tableToString(materializedTable));
+      } else {
+        final long maxTimestamp =
+                ((java.sql.Timestamp) materializedTable.get(materializedTable.size() - 1)
+                        .getField(0)).getTime();
+        materializedTable = materializedTable.stream()
+                .filter(row -> ((java.sql.Timestamp) row.getField(0)).getTime() >
+                        maxTimestamp - tsWindowThreshold)
+                .collect(Collectors.toList());
+        builder.append(tableToString(materializedTable));
+      }
     }
     builder.append("\n%text ");
     return builder.toString();
diff --git a/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py b/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py
index 50efbea..9249453 100644
--- a/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py
+++ b/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py
@@ -19,7 +19,6 @@
 from py4j.java_gateway import java_import, JavaGateway, GatewayClient
 
 from pyflink.common import *
-from pyflink.dataset import *
 from pyflink.datastream import *
 from pyflink.table import *
 from pyflink.table.catalog import *
@@ -45,19 +44,24 @@ pyflink.java_gateway._gateway = gateway
 pyflink.java_gateway.import_flink_view(gateway)
 pyflink.java_gateway.install_exception_handler()
 
-b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
 s_env = StreamExecutionEnvironment(intp.getJavaStreamExecutionEnvironment())
 
 if intp.isFlink110():
+    from pyflink.dataset import *
+    b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
     bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"), True)
     bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"), False)
     st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"), True)
     st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"), False)
-else:
+elif not intp.isAfterFlink114():
+    from pyflink.dataset import *
+    b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
     bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"))
-    bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"))
     st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"))
+    bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"))
     st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"))
+else:
+    st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"))
 
 class IPyFlinkZeppelinContext(PyZeppelinContext):
 
diff --git a/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py b/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py
index 542ab8f..06b99c9 100644
--- a/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py
+++ b/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py
@@ -16,7 +16,6 @@
 #
 
 from pyflink.common import *
-from pyflink.dataset import *
 from pyflink.datastream import *
 from pyflink.table import *
 from pyflink.table.catalog import *
@@ -34,19 +33,25 @@ pyflink.java_gateway._gateway = gateway
 pyflink.java_gateway.import_flink_view(gateway)
 pyflink.java_gateway.install_exception_handler()
 
-b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
 s_env = StreamExecutionEnvironment(intp.getJavaStreamExecutionEnvironment())
 
 if intp.isFlink110():
+  from pyflink.dataset import *
+  b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
   bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"), True)
   bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"), False)
   st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"), True)
   st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"), False)
-else:
+elif not intp.isAfterFlink114():
+  from pyflink.dataset import *
+  b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
   bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"))
-  bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"))
   st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"))
+  bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"))
   st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"))
+else:
+  st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"))
+
 
 from zeppelin_context import PyZeppelinContext
 
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 1545bcd..0918084 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -447,17 +447,19 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
       flinkILoop.bind("stenv", stenv.getClass().getCanonicalName(), stenv, List("@transient"))
       this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader)
 
-      // flink planner
-      this.btenv_2 = tblEnvFactory.createScalaFlinkBatchTableEnvironment()
-      flinkILoop.bind("btenv_2", btenv_2.getClass().getCanonicalName(), btenv_2, List("@transient"))
-      stEnvSetting =
-        EnvironmentSettings.newInstance().inStreamingMode().useOldPlanner().build()
-      this.stenv_2 = tblEnvFactory.createScalaFlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader)
-      flinkILoop.bind("stenv_2", stenv_2.getClass().getCanonicalName(), stenv_2, List("@transient"))
-
-      this.java_btenv_2 = tblEnvFactory.createJavaFlinkBatchTableEnvironment()
-      btEnvSetting = EnvironmentSettings.newInstance.useOldPlanner.inStreamingMode.build
-      this.java_stenv_2 = tblEnvFactory.createJavaFlinkStreamTableEnvironment(btEnvSetting, getFlinkClassLoader)
+      if (!flinkVersion.isAfterFlink114()) {
+        // flink planner is not supported after flink 1.14
+        this.btenv_2 = tblEnvFactory.createScalaFlinkBatchTableEnvironment()
+        flinkILoop.bind("btenv_2", btenv_2.getClass().getCanonicalName(), btenv_2, List("@transient"))
+        stEnvSetting =
+          EnvironmentSettings.newInstance().inStreamingMode().useOldPlanner().build()
+        this.stenv_2 = tblEnvFactory.createScalaFlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader)
+        flinkILoop.bind("stenv_2", stenv_2.getClass().getCanonicalName(), stenv_2, List("@transient"))
+
+        this.java_btenv_2 = tblEnvFactory.createJavaFlinkBatchTableEnvironment()
+        btEnvSetting = EnvironmentSettings.newInstance.useOldPlanner.inStreamingMode.build
+        this.java_stenv_2 = tblEnvFactory.createJavaFlinkStreamTableEnvironment(btEnvSetting, getFlinkClassLoader)
+      }
     } finally {
       Thread.currentThread().setContextClassLoader(originalClassLoader)
     }
@@ -919,8 +921,8 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
   def completion(buf: String, cursor: Int, context: InterpreterContext): java.util.List[InterpreterCompletion]
 
   private def getConfigurationOfStreamExecutionEnv(): Configuration = {
-    val getConfigurationMethod = classOf[JStreamExecutionEnvironment].getDeclaredMethod("getConfiguration")
-    getConfigurationMethod.setAccessible(true)
-    getConfigurationMethod.invoke(this.senv.getJavaEnv).asInstanceOf[Configuration]
+    val configurationField = classOf[JStreamExecutionEnvironment].getDeclaredField("configuration")
+    configurationField.setAccessible(true)
+    configurationField.get(this.senv.getJavaEnv).asInstanceOf[Configuration]
   }
 }
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
index 517c67a..b5dba22 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
@@ -96,11 +96,15 @@ class FlinkZeppelinContext(val flinkInterpreter: FlinkScalaInterpreter,
   override def showData(obj: Any, maxResult: Int): String = {
     if (obj.isInstanceOf[DataSet[_]]) {
       val ds = obj.asInstanceOf[DataSet[_]]
-      val btenv = flinkInterpreter.getBatchTableEnvironment("flink")
-      val table =  flinkInterpreter.getFlinkShims.fromDataSet(btenv, ds).asInstanceOf[Table]
-      val columnNames: Array[String] = table.getSchema.getFieldNames
-      val dsRows: DataSet[Row] = flinkInterpreter.getFlinkShims.toDataSet(btenv, table).asInstanceOf[DataSet[Row]]
-      showTable(columnNames, dsRows.first(maxResult + 1).collect())
+      if (flinkInterpreter.getFlinkVersion.isAfterFlink114) {
+        "z.show(DataSet) is not supported after Flink 1.14"
+      } else {
+        val btenv = flinkInterpreter.getBatchTableEnvironment("flink")
+        val table = flinkInterpreter.getFlinkShims.fromDataSet(btenv, ds).asInstanceOf[Table]
+        val columnNames: Array[String] = table.getSchema.getFieldNames
+        val dsRows: DataSet[Row] = flinkInterpreter.getFlinkShims.toDataSet(btenv, table).asInstanceOf[DataSet[Row]]
+        showTable(columnNames, dsRows.first(maxResult + 1).collect())
+      }
     } else if (obj.isInstanceOf[Table]) {
       val rows = JavaConversions.asScalaBuffer(
         flinkInterpreter.getFlinkShims.collectToList(obj.asInstanceOf[TableImpl]).asInstanceOf[java.util.List[Row]]).toSeq
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
index 27ab381..86abeb5 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
@@ -60,6 +60,7 @@ class FlinkILoop(
     val remoteSenv = new ScalaShellStreamEnvironment(
       flinkConfig,
       this,
+      flinkScalaInterpreter.getFlinkVersion,
       getExternalJars(): _*)
 
     (remoteBenv,remoteSenv)
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
index e09bd1b..875756f 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
@@ -71,14 +71,16 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
     assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData());
 
     // z.show
-    context = getInterpreterContext();
-    result =
-            flinkInterpreter.interpret("z.show(btenv.sqlQuery(\"select * from source_table\"))", context);
-    resultMessages = context.out.toInterpreterResultMessage();
-    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
-    assertEquals(1, resultMessages.size());
-    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
-    assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData());
+    if (!flinkInterpreter.getFlinkVersion().isAfterFlink114()) {
+      context = getInterpreterContext();
+      result =
+              flinkInterpreter.interpret("z.show(btenv.sqlQuery(\"select * from source_table\"))", context);
+      resultMessages = context.out.toInterpreterResultMessage();
+      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
+      assertEquals(1, resultMessages.size());
+      assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+      assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData());
+    }
 
     // define scala udf
     result = flinkInterpreter.interpret(
@@ -87,7 +89,7 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
                     "}", getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
-    result = flinkInterpreter.interpret("btenv.registerFunction(\"addOne\", new AddOne())",
+    result = flinkInterpreter.interpret("stenv.registerFunction(\"addOne\", new AddOne())",
             getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
@@ -108,7 +110,7 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
     context = getInterpreterContext();
-    result = pyFlinkInterpreter.interpret("bt_env.register_function(\"python_upper\", " +
+    result = pyFlinkInterpreter.interpret("st_env.register_function(\"python_upper\", " +
                     "udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))",
             context);
     assertEquals(result.toString(), InterpreterResult.Code.SUCCESS, result.code());
@@ -133,7 +135,7 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
                     "    return s.upper()", getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
-    result = iPyFlinkInterpreter.interpret("bt_env.register_function(\"ipython_upper\", " +
+    result = iPyFlinkInterpreter.interpret("st_env.register_function(\"ipython_upper\", " +
                     "udf(IPythonUpper(), DataTypes.STRING(), DataTypes.STRING()))",
             getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index 8649138..c3939db 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -231,8 +231,13 @@ public class FlinkInterpreterTest {
     result = interpreter.interpret("z.show(data)", context);
     assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
     List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
-    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
-    assertEquals("_1\t_2\n1\tjeff\n2\tandy\n3\tjames\n", resultMessages.get(0).getData());
+    if (interpreter.getFlinkVersion().isAfterFlink114()) {
+      assertEquals(InterpreterResult.Type.TEXT, resultMessages.get(0).getType());
+      assertEquals("z.show(DataSet) is not supported after Flink 1.14", resultMessages.get(0).getData());
+    } else {
+      assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+      assertEquals("_1\t_2\n1\tjeff\n2\tandy\n3\tjames\n", resultMessages.get(0).getData());
+    }
   }
 
   @Test
@@ -263,7 +268,7 @@ public class FlinkInterpreterTest {
             "  .groupBy(0)\n" +
             "  .sum(1)\n" +
             "  .print()", context);
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
 
     String[] expectedCounts = {"(hello,3)", "(world,1)", "(flink,1)", "(hadoop,1)"};
     Arrays.sort(expectedCounts);
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
index ac8d65b..3f06453 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
@@ -60,7 +60,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
   private RemoteInterpreterEventClient mockIntpEventClient =
           mock(RemoteInterpreterEventClient.class);
   private LazyOpenInterpreter flinkScalaInterpreter;
-
+  private FlinkInterpreter flinkInnerInterpreter;
 
   public IPyFlinkInterpreterTest() {
     super();
@@ -85,8 +85,8 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
     context.setIntpEventClient(mockIntpEventClient);
     InterpreterContext.set(context);
 
-    this.flinkScalaInterpreter = new LazyOpenInterpreter(
-        new FlinkInterpreter(properties));
+    this.flinkInnerInterpreter = new FlinkInterpreter(properties);
+    this.flinkScalaInterpreter = new LazyOpenInterpreter(flinkInnerInterpreter);
     intpGroup = new InterpreterGroup();
     intpGroup.put("session_1", new ArrayList<Interpreter>());
     intpGroup.get("session_1").add(flinkScalaInterpreter);
@@ -119,12 +119,16 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
 
   @Test
   public void testBatchIPyFlink() throws InterpreterException, IOException {
-    testBatchPyFlink(interpreter, flinkScalaInterpreter);
+    if (!flinkInnerInterpreter.getFlinkVersion().isAfterFlink114()) {
+      testBatchPyFlink(interpreter, flinkScalaInterpreter);
+    }
   }
 
   @Test
   public void testStreamIPyFlink() throws InterpreterException, IOException {
-    testStreamPyFlink(interpreter, flinkScalaInterpreter);
+    if (!flinkInnerInterpreter.getFlinkVersion().isAfterFlink114()) {
+      testStreamPyFlink(interpreter, flinkScalaInterpreter);
+    }
   }
 
   @Test
@@ -154,7 +158,8 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
     testResumeStreamSqlFromSavePoint(interpreter, flinkScalaInterpreter);
   }
 
-  public static void testBatchPyFlink(Interpreter pyflinkInterpreter, Interpreter flinkScalaInterpreter) throws InterpreterException, IOException {
+  public static void testBatchPyFlink(Interpreter pyflinkInterpreter,
+                                      LazyOpenInterpreter flinkScalaInterpreter) throws InterpreterException, IOException {
     InterpreterContext context = createInterpreterContext();
     InterpreterResult result = pyflinkInterpreter.interpret(
         "import tempfile\n" +
@@ -273,9 +278,15 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
             , context);
     assertEquals(result.toString(),InterpreterResult.Code.SUCCESS, result.code());
     List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
-    assertEquals(context.out.toString(), 1, resultMessages.size());
-    assertEquals(context.out.toString(), InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
-    assertEquals(context.out.toString(), "a\tb\tc\n1\thi\thello\n2\thi\thello\n", resultMessages.get(0).getData());
+    FlinkVersion flinkVersion = ((FlinkInterpreter) flinkScalaInterpreter.getInnerInterpreter()).getFlinkVersion();
+    if (flinkVersion.isAfterFlink114()) {
+      assertEquals(InterpreterResult.Type.TEXT, resultMessages.get(0).getType());
+      assertEquals("z.show(DataSet) is not supported after Flink 1.14", resultMessages.get(0).getData());
+    } else {
+      assertEquals(context.out.toString(), 1, resultMessages.size());
+      assertEquals(context.out.toString(), InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+      assertEquals(context.out.toString(), "a\tb\tc\n1\thi\thello\n2\thi\thello\n", resultMessages.get(0).getData());
+    }
   }
 
   @Override
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
index c891ec1..0ecccfd 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
@@ -40,9 +40,10 @@ import static org.mockito.Mockito.mock;
 
 public class PyFlinkInterpreterTest extends PythonInterpreterTest {
 
-  private Interpreter flinkScalaInterpreter;
-  private Interpreter streamSqlInterpreter;
-  private Interpreter batchSqlInterpreter;
+  private FlinkInterpreter flinkInnerInterpreter;
+  private LazyOpenInterpreter flinkScalaInterpreter;
+  private LazyOpenInterpreter streamSqlInterpreter;
+  private LazyOpenInterpreter batchSqlInterpreter;
 
 
   @Override
@@ -63,7 +64,8 @@ public class PyFlinkInterpreterTest extends PythonInterpreterTest {
     IPyFlinkInterpreterTest.angularObjectRegistry = new AngularObjectRegistry("flink", null);
     InterpreterContext context = getInterpreterContext();
     InterpreterContext.set(context);
-    flinkScalaInterpreter = new LazyOpenInterpreter(new FlinkInterpreter(properties));
+    this.flinkInnerInterpreter = new FlinkInterpreter(properties);
+    flinkScalaInterpreter = new LazyOpenInterpreter(flinkInnerInterpreter);
     intpGroup.get("session_1").add(flinkScalaInterpreter);
     flinkScalaInterpreter.setInterpreterGroup(intpGroup);
 
@@ -95,12 +97,16 @@ public class PyFlinkInterpreterTest extends PythonInterpreterTest {
 
   @Test
   public void testBatchPyFlink() throws InterpreterException, IOException {
-    IPyFlinkInterpreterTest.testBatchPyFlink(interpreter, flinkScalaInterpreter);
+    if (!flinkInnerInterpreter.getFlinkVersion().isAfterFlink114()){
+      IPyFlinkInterpreterTest.testBatchPyFlink(interpreter, flinkScalaInterpreter);
+    }
   }
 
   @Test
   public void testStreamIPyFlink() throws InterpreterException, IOException {
-    IPyFlinkInterpreterTest.testStreamPyFlink(interpreter, flinkScalaInterpreter);
+    if (!flinkInnerInterpreter.getFlinkVersion().isAfterFlink114()) {
+      IPyFlinkInterpreterTest.testStreamPyFlink(interpreter, flinkScalaInterpreter);
+    }
   }
 
   @Test
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index c41488f..ba25ec9 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -18,6 +18,7 @@
 package org.apache.zeppelin.flink;
 
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.zeppelin.flink.sql.SqlCommandParser;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.jline.utils.AttributedString;
@@ -45,8 +46,10 @@ public abstract class FlinkShims {
   private static FlinkShims flinkShims;
 
   protected Properties properties;
+  protected FlinkVersion flinkVersion;
 
-  public FlinkShims(Properties properties) {
+  public FlinkShims(FlinkVersion flinkVersion, Properties properties) {
+    this.flinkVersion = flinkVersion;
     this.properties = properties;
   }
 
@@ -65,12 +68,15 @@ public abstract class FlinkShims {
     } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 13) {
       LOGGER.info("Initializing shims for Flink 1.13");
       flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink113Shims");
+    } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 14) {
+      LOGGER.info("Initializing shims for Flink 1.14");
+      flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink114Shims");
     } else {
       throw new Exception("Flink version: '" + flinkVersion + "' is not supported yet");
     }
 
-    Constructor c = flinkShimsClass.getConstructor(Properties.class);
-    return (FlinkShims) c.newInstance(properties);
+    Constructor c = flinkShimsClass.getConstructor(FlinkVersion.class, Properties.class);
+    return (FlinkShims) c.newInstance(flinkVersion, properties);
   }
 
   /**
@@ -98,6 +104,10 @@ public abstract class FlinkShims {
             .toAttributedString();
   }
 
+  public FlinkVersion getFlinkVersion() {
+    return flinkVersion;
+  }
+
   public abstract void disableSysoutLogging(Object batchConfig, Object streamConfig);
 
   public abstract Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment);
@@ -159,4 +169,10 @@ public abstract class FlinkShims {
   }
 
   public abstract String[] rowToString(Object row, Object table, Object tableConfig);
+
+  public abstract boolean isTimeIndicatorType(Object type);
+
+  public abstract ImmutablePair<Object, Object> createPlannerAndExecutor(
+          ClassLoader classLoader, Object environmentSettings, Object sEnv,
+          Object tableConfig, Object functionCatalog, Object catalogManager);
 }
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java
index a042162..2e1f47e 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java
@@ -82,4 +82,8 @@ public class FlinkVersion {
   public boolean isFlink110() {
     return this.majorVersion == 1 && minorVersion == 10;
   }
+
+  public boolean isAfterFlink114() {
+    return newerThanOrEqual(FlinkVersion.fromVersionString("1.14.0"));
+  }
 }
diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
index 3faa52c..5711884 100644
--- a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
+++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
@@ -19,27 +19,38 @@
 package org.apache.zeppelin.flink;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.scala.DataSet;
 import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.python.PythonOptions;
 import org.apache.flink.python.util.ResourceUtil;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableUtils;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
 import org.apache.flink.table.api.scala.BatchTableEnvironment;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
 import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.factories.ComponentFactoryService;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
@@ -103,8 +114,8 @@ public class Flink110Shims extends FlinkShims {
           .append(": Make sure that a statement ends with ';' for finalizing (multi-line) statements.")
           .toAttributedString();
 
-  public Flink110Shims(Properties properties) {
-    super(properties);
+  public Flink110Shims(FlinkVersion flinkVersion, Properties properties) {
+    super(flinkVersion, properties);
   }
 
   @Override
@@ -337,4 +348,42 @@ public class Flink110Shims extends FlinkShims {
     }
     return fields;
   }
+
+  public boolean isTimeIndicatorType(Object type) {
+    return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type);
+  }
+
+  private Object lookupExecutor(ClassLoader classLoader,
+                               Object settings,
+                               Object sEnv) {
+    try {
+      Map<String, String> executorProperties = ((EnvironmentSettings) settings).toExecutorProperties();
+      ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
+      Method createMethod = executorFactory.getClass()
+              .getMethod("create", Map.class, StreamExecutionEnvironment.class);
+
+      return (Executor) createMethod.invoke(
+              executorFactory,
+              executorProperties,
+              (StreamExecutionEnvironment) sEnv);
+    } catch (Exception e) {
+      throw new TableException(
+              "Could not instantiate the executor. Make sure a planner module is on the classpath",
+              e);
+    }
+  }
+
+  @Override
+  public ImmutablePair<Object, Object> createPlannerAndExecutor(
+          ClassLoader classLoader, Object environmentSettings, Object sEnv,
+          Object tableConfig, Object functionCatalog, Object catalogManager) {
+    EnvironmentSettings settings = (EnvironmentSettings) environmentSettings;
+    Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv);
+    Map<String, String> plannerProperties = settings.toPlannerProperties();
+    Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+            .create(plannerProperties, executor, (TableConfig) tableConfig,
+                    (FunctionCatalog) functionCatalog,
+                    (CatalogManager) catalogManager);
+    return ImmutablePair.of(planner, executor);
+  }
 }
diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
index c8db525..64979fd 100644
--- a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
+++ b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
@@ -20,8 +20,10 @@ package org.apache.zeppelin.flink;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.scala.DataSet;
@@ -36,7 +38,9 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFact
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
 import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment;
@@ -46,8 +50,14 @@ import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
 import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
 import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.factories.ComponentFactoryService;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
@@ -78,6 +88,7 @@ import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
 import org.apache.flink.table.operations.ddl.DropTableOperation;
 import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropViewOperation;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.utils.PrintUtils;
 import org.apache.flink.types.Row;
@@ -98,6 +109,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -141,10 +153,9 @@ public class Flink111Shims extends FlinkShims {
 
   private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>();
 
-  public Flink111Shims(Properties properties) {
-    super(properties);
+  public Flink111Shims(FlinkVersion flinkVersion, Properties properties) {
+    super(flinkVersion, properties);
   }
-
   @Override
   public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
     ((ExecutionConfig) batchConfig).disableSysoutLogging();
@@ -473,4 +484,43 @@ public class Flink111Shims extends FlinkShims {
   public String[] rowToString(Object row, Object table, Object tableConfig) {
     return PrintUtils.rowToString((Row) row);
   }
+
+  public boolean isTimeIndicatorType(Object type) {
+    return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type);
+  }
+
+  private Object lookupExecutor(ClassLoader classLoader,
+                                Object settings,
+                                Object sEnv) {
+    try {
+      Map<String, String> executorProperties = ((EnvironmentSettings) settings).toExecutorProperties();
+      ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
+      Method createMethod = executorFactory.getClass()
+              .getMethod("create", Map.class, StreamExecutionEnvironment.class);
+
+      return (Executor) createMethod.invoke(
+              executorFactory,
+              executorProperties,
+              (StreamExecutionEnvironment) sEnv);
+    } catch (Exception e) {
+      throw new TableException(
+              "Could not instantiate the executor. Make sure a planner module is on the classpath",
+              e);
+    }
+  }
+
+  @Override
+  public ImmutablePair<Object, Object> createPlannerAndExecutor(
+          ClassLoader classLoader, Object environmentSettings, Object sEnv,
+          Object tableConfig, Object functionCatalog, Object catalogManager) {
+    EnvironmentSettings settings = (EnvironmentSettings) environmentSettings;
+    Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv);
+    Map<String, String> plannerProperties = settings.toPlannerProperties();
+    Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+            .create(plannerProperties, executor, (TableConfig) tableConfig,
+                    (FunctionCatalog) functionCatalog,
+                    (CatalogManager) catalogManager);
+    return ImmutablePair.of(planner, executor);
+
+  }
 }
diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
index 648826a..a713d1c 100644
--- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
+++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
@@ -21,7 +21,9 @@ package org.apache.zeppelin.flink;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.compress.utils.Lists;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.scala.DataSet;
@@ -37,7 +39,9 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFact
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
 import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment;
@@ -47,8 +51,14 @@ import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
 import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
 import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.factories.ComponentFactoryService;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
@@ -79,6 +89,7 @@ import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
 import org.apache.flink.table.operations.ddl.DropTableOperation;
 import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropViewOperation;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.utils.PrintUtils;
 import org.apache.flink.types.Row;
@@ -99,6 +110,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -142,10 +154,9 @@ public class Flink112Shims extends FlinkShims {
 
   private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>();
 
-  public Flink112Shims(Properties properties) {
-    super(properties);
+  public Flink112Shims(FlinkVersion flinkVersion, Properties properties) {
+    super(flinkVersion, properties);
   }
-
   @Override
   public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
     // do nothing
@@ -486,4 +497,42 @@ public class Flink112Shims extends FlinkShims {
   public String[] rowToString(Object row, Object table, Object tableConfig) {
     return PrintUtils.rowToString((Row) row);
   }
+
+  public boolean isTimeIndicatorType(Object type) {
+    return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type);
+  }
+
+  private Object lookupExecutor(ClassLoader classLoader,
+                                Object settings,
+                                Object sEnv) {
+    try {
+      Map<String, String> executorProperties = ((EnvironmentSettings) settings).toExecutorProperties();
+      ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
+      Method createMethod = executorFactory.getClass()
+              .getMethod("create", Map.class, StreamExecutionEnvironment.class);
+
+      return (Executor) createMethod.invoke(
+              executorFactory,
+              executorProperties,
+              (StreamExecutionEnvironment) sEnv);
+    } catch (Exception e) {
+      throw new TableException(
+              "Could not instantiate the executor. Make sure a planner module is on the classpath",
+              e);
+    }
+  }
+
+  @Override
+  public ImmutablePair<Object, Object> createPlannerAndExecutor(
+          ClassLoader classLoader, Object environmentSettings, Object sEnv,
+          Object tableConfig, Object functionCatalog, Object catalogManager) {
+    EnvironmentSettings settings = (EnvironmentSettings) environmentSettings;
+    Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv);
+    Map<String, String> plannerProperties = settings.toPlannerProperties();
+    Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+            .create(plannerProperties, executor, (TableConfig) tableConfig,
+                    (FunctionCatalog) functionCatalog,
+                    (CatalogManager) catalogManager);
+    return ImmutablePair.of(planner, executor);
+  }
 }
diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
index 054f07f..481bfee 100644
--- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
+++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
@@ -21,8 +21,10 @@ package org.apache.zeppelin.flink;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.compress.utils.Lists;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.scala.DataSet;
@@ -36,11 +38,13 @@ import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.python.PythonOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.PlannerType;
 import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
@@ -50,9 +54,15 @@ import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
 import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.factories.ComponentFactoryService;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
@@ -83,6 +93,7 @@ import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
 import org.apache.flink.table.operations.ddl.DropTableOperation;
 import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropViewOperation;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.utils.PrintUtils;
 import org.apache.flink.types.Row;
@@ -103,6 +114,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.time.ZoneId;
 import java.util.Arrays;
@@ -147,8 +159,8 @@ public class Flink113Shims extends FlinkShims {
 
   private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>();
 
-  public Flink113Shims(Properties properties) {
-    super(properties);
+  public Flink113Shims(FlinkVersion flinkVersion, Properties properties) {
+    super(flinkVersion, properties);
   }
 
   @Override
@@ -508,4 +520,42 @@ public class Flink113Shims extends FlinkShims {
     ResolvedSchema resolvedSchema = ((Table) table).getResolvedSchema();
     return PrintUtils.rowToString((Row) row, resolvedSchema, zoneId);
   }
+
+  public boolean isTimeIndicatorType(Object type) {
+    return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type);
+  }
+
+  private Object lookupExecutor(ClassLoader classLoader,
+                                Object settings,
+                                Object sEnv) {
+    try {
+      Map<String, String> executorProperties = ((EnvironmentSettings) settings).toExecutorProperties();
+      ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
+      Method createMethod = executorFactory.getClass()
+              .getMethod("create", Map.class, StreamExecutionEnvironment.class);
+
+      return createMethod.invoke(
+              executorFactory,
+              executorProperties,
+              sEnv);
+    } catch (Exception e) {
+      throw new TableException(
+              "Could not instantiate the executor. Make sure a planner module is on the classpath",
+              e);
+    }
+  }
+
+  @Override
+  public ImmutablePair<Object, Object> createPlannerAndExecutor(
+          ClassLoader classLoader, Object environmentSettings, Object sEnv,
+          Object tableConfig, Object functionCatalog, Object catalogManager) {
+    EnvironmentSettings settings = (EnvironmentSettings) environmentSettings;
+    Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv);
+    Map<String, String> plannerProperties = settings.toPlannerProperties();
+    Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+            .create(plannerProperties, executor, (TableConfig) tableConfig,
+                    (FunctionCatalog) functionCatalog,
+                    (CatalogManager) catalogManager);
+    return ImmutablePair.of(planner, executor);
+  }
 }
diff --git a/flink/flink1.14-shims/pom.xml b/flink/flink1.14-shims/pom.xml
new file mode 100644
index 0000000..c160548
--- /dev/null
+++ b/flink/flink1.14-shims/pom.xml
@@ -0,0 +1,199 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>flink-parent</artifactId>
+        <groupId>org.apache.zeppelin</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.zeppelin</groupId>
+    <artifactId>flink1.14-shims</artifactId>
+    <version>0.11.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+    <name>Zeppelin: Flink1.14 Shims</name>
+
+    <properties>
+        <flink.version>${flink1.14.version}</flink.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.zeppelin</groupId>
+            <artifactId>flink-shims</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_${flink.scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-scala_${flink.scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-scala-bridge_${flink.scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala_${flink.scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${flink.scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-scala_${flink.scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-python_${flink.scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>eclipse-add-source</id>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile-first</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <scalaVersion>${flink.scala.version}</scalaVersion>
+                    <args>
+                        <arg>-unchecked</arg>
+                        <arg>-deprecation</arg>
+                        <arg>-feature</arg>
+                        <arg>-target:jvm-1.8</arg>
+                    </args>
+                    <jvmArgs>
+                        <jvmArg>-Xms1024m</jvmArg>
+                        <jvmArg>-Xmx1024m</jvmArg>
+                        <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg>
+                    </jvmArgs>
+                    <javacArgs>
+                        <javacArg>-source</javacArg>
+                        <javacArg>${java.version}</javacArg>
+                        <javacArg>-target</javacArg>
+                        <javacArg>${java.version}</javacArg>
+                        <javacArg>-Xlint:all,-serial,-path,-options</javacArg>
+                    </javacArgs>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy-interpreter-setting</id>
+                        <phase>none</phase>
+                        <configuration>
+                            <skip>true</skip>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
similarity index 88%
copy from flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
copy to flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
index 054f07f..56fdc7e 100644
--- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
+++ b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
@@ -21,11 +21,12 @@ package org.apache.zeppelin.flink;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.compress.utils.Lists;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.scala.DataSet;
 import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigOption;
@@ -36,23 +37,29 @@ import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.python.PythonOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.PlannerType;
 import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
-import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
 import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.PlannerFactoryUtil;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
@@ -83,13 +90,13 @@ import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
 import org.apache.flink.table.operations.ddl.DropTableOperation;
 import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropViewOperation;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.utils.PrintUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.FlinkException;
-import org.apache.zeppelin.flink.shims113.CollectStreamTableSink;
-import org.apache.zeppelin.flink.shims113.Flink113ScalaShims;
+import org.apache.zeppelin.flink.shims114.CollectStreamTableSink;
 import org.apache.zeppelin.flink.sql.SqlCommandParser;
 import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
 import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommandCall;
@@ -103,6 +110,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.time.ZoneId;
 import java.util.Arrays;
@@ -116,11 +124,11 @@ import java.util.regex.Matcher;
 
 
 /**
- * Shims for flink 1.13
+ * Shims for flink 1.14
  */
-public class Flink113Shims extends FlinkShims {
+public class Flink114Shims extends FlinkShims {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(Flink113Shims.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(Flink114Shims.class);
   public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder()
           .append("The following commands are available:\n\n")
           .append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under current catalog and database."))
@@ -147,8 +155,8 @@ public class Flink113Shims extends FlinkShims {
 
   private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>();
 
-  public Flink113Shims(Properties properties) {
-    super(properties);
+  public Flink114Shims(FlinkVersion flinkVersion, Properties properties) {
+    super(flinkVersion, properties);
   }
 
   @Override
@@ -251,12 +259,14 @@ public class Flink113Shims extends FlinkShims {
 
   @Override
   public Object fromDataSet(Object btenv, Object ds) {
-    return Flink113ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds);
+    return null;
+    //return Flink114ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds);
   }
 
   @Override
   public Object toDataSet(Object btenv, Object table) {
-    return Flink113ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table);
+    return null;
+    //return Flink114ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table);
   }
 
   @Override
@@ -508,4 +518,42 @@ public class Flink113Shims extends FlinkShims {
     ResolvedSchema resolvedSchema = ((Table) table).getResolvedSchema();
     return PrintUtils.rowToString((Row) row, resolvedSchema, zoneId);
   }
+
+  @Override
+  public boolean isTimeIndicatorType(Object type) {
+    return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type);
+  }
+
+  private Object lookupExecutor(ClassLoader classLoader,
+                               Object settings,
+                               Object sEnv) {
+    try {
+      final ExecutorFactory executorFactory =
+              FactoryUtil.discoverFactory(
+                      classLoader, ExecutorFactory.class, ((EnvironmentSettings) settings).getExecutor());
+      final Method createMethod =
+              executorFactory
+                      .getClass()
+                      .getMethod("create", StreamExecutionEnvironment.class);
+
+      return createMethod.invoke(executorFactory, sEnv);
+    } catch (Exception e) {
+      throw new TableException(
+              "Could not instantiate the executor. Make sure a planner module is on the classpath",
+              e);
+    }
+  }
+
+  @Override
+  public ImmutablePair<Object, Object> createPlannerAndExecutor(
+          ClassLoader classLoader, Object environmentSettings, Object sEnv,
+          Object tableConfig, Object functionCatalog, Object catalogManager) {
+    EnvironmentSettings settings = (EnvironmentSettings) environmentSettings;
+    Executor executor = (Executor) lookupExecutor(classLoader, environmentSettings, sEnv);
+    Planner planner = PlannerFactoryUtil.createPlanner(settings.getPlanner(), executor,
+            (TableConfig) tableConfig,
+            (CatalogManager) catalogManager,
+            (FunctionCatalog) functionCatalog);
+    return ImmutablePair.of(planner, executor);
+  }
 }
diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/shims114/CollectStreamTableSink.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/shims114/CollectStreamTableSink.java
new file mode 100644
index 0000000..7a224e1
--- /dev/null
+++ b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/shims114/CollectStreamTableSink.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.shims114;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.experimental.CollectSink;
+import org.apache.flink.table.sinks.RetractStreamTableSink;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.UUID;
+
+/**
+ * Table sink for collecting the results locally using sockets.
+ */
+public class CollectStreamTableSink implements RetractStreamTableSink<Row> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(CollectStreamTableSink.class);
+
+  private final InetAddress targetAddress;
+  private final int targetPort;
+  private final TypeSerializer<Tuple2<Boolean, Row>> serializer;
+
+  private String[] fieldNames;
+  private TypeInformation<?>[] fieldTypes;
+
+  public CollectStreamTableSink(InetAddress targetAddress,
+                                int targetPort,
+                                TypeSerializer<Tuple2<Boolean, Row>> serializer) {
+    LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + targetPort);
+    this.targetAddress = targetAddress;
+    this.targetPort = targetPort;
+    this.serializer = serializer;
+  }
+
+  @Override
+  public String[] getFieldNames() {
+    return fieldNames;
+  }
+
+  @Override
+  public TypeInformation<?>[] getFieldTypes() {
+    return fieldTypes;
+  }
+
+  @Override
+  public CollectStreamTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+    final CollectStreamTableSink copy =
+            new CollectStreamTableSink(targetAddress, targetPort, serializer);
+    copy.fieldNames = fieldNames;
+    copy.fieldTypes = fieldTypes;
+    return copy;
+  }
+
+  @Override
+  public TypeInformation<Row> getRecordType() {
+    return Types.ROW_NAMED(fieldNames, fieldTypes);
+  }
+
+  @Override
+  public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> stream) {
+    // add sink
+    return stream
+            .addSink(new CollectSink<>(targetAddress, targetPort, serializer))
+            .name("Zeppelin Flink Sql Stream Collect Sink " + UUID.randomUUID())
+            .setParallelism(1);
+  }
+
+  @Override
+  public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
+    return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
+  }
+}
diff --git a/flink/pom.xml b/flink/pom.xml
index 0fcb776..329f79e 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -41,6 +41,7 @@
         <module>flink1.11-shims</module>
         <module>flink1.12-shims</module>
         <module>flink1.13-shims</module>
+        <module>flink1.14-shims</module>
     </modules>
 
     <properties>
@@ -48,6 +49,7 @@
         <flink1.11.version>1.11.3</flink1.11.version>
         <flink1.12.version>1.12.4</flink1.12.version>
         <flink1.13.version>1.13.2</flink1.13.version>
+        <flink1.14.version>1.14.0</flink1.14.version>
 
         <flink.scala.version>2.11.12</flink.scala.version>
         <flink.scala.binary.version>2.11</flink.scala.binary.version>
diff --git a/testing/env_python_3_with_flink_114.yml b/testing/env_python_3_with_flink_114.yml
new file mode 100644
index 0000000..37c6fa8
--- /dev/null
+++ b/testing/env_python_3_with_flink_114.yml
@@ -0,0 +1,27 @@
+name: python_3_with_flink
+channels:
+  - conda-forge
+  - defaults
+dependencies:
+  - pycodestyle
+  - scipy
+  - numpy=1.19.5
+  - grpcio
+  - protobuf
+  - pandasql
+  - ipython
+  - ipykernel
+  - jupyter_client=5
+  - hvplot
+  - plotnine
+  - seaborn
+  - intake
+  - intake-parquet
+  - intake-xarray
+  - altair
+  - vega_datasets
+  - plotly
+  - pip
+  - pip:
+      - apache-flink==1.14.0
+
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest114.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest114.java
new file mode 100644
index 0000000..68f6810
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest114.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(value = Parameterized.class)
+public class FlinkIntegrationTest114 extends FlinkIntegrationTest {
+
+  @Parameterized.Parameters
+  public static List<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+            {"1.14.0", "2.11"},
+            {"1.14.0", "2.12"}
+    });
+  }
+
+  public FlinkIntegrationTest114(String flinkVersion, String scalaVersion) {
+    super(flinkVersion, scalaVersion);
+  }
+}