You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/05/13 05:45:03 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5356] Support flink 1.13

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new ed67735  [ZEPPELIN-5356] Support flink 1.13
ed67735 is described below

commit ed677356ce1da43d252f47b388de6459c50de55b
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Mon May 10 22:01:38 2021 +0800

    [ZEPPELIN-5356] Support flink 1.13
    
    ### What is this PR for?
    
    Add new module `flink-1.13-shims` to support flink 1.3. The main change is due to the api change of flink 1.3, we have to use different `TableConfig` for each `TableEnvironment` instances, otherwise it will cause conflicts.
    
    ### What type of PR is it?
    [ Feature ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5356
    
    ### How should this be tested?
    * Ci pass and manually tested 1.13.0
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #4111 from zjffdu/ZEPPELIN-5356 and squashes the following commits:
    
    6161a67b9 [Jeff Zhang] address comment
    964b4ab16 [Jeff Zhang] fix pyflink issue
    ac8c1fc52 [Jeff Zhang] [ZEPPELIN-5356] Support flink 1.13
    dbadae867 [Jeff Zhang] save
    
    (cherry picked from commit 369e80b89e5a9c83a66f90f8021a925c90bdd333)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .github/workflows/core.yml                         |   2 +-
 .../java/org/apache/zeppelin/flink/FlinkShims.java |  11 +
 .../org/apache/zeppelin/flink/Flink112Shims.java   |   1 -
 flink/flink1.13-shims/pom.xml                      | 221 +++++++++++++++++++++
 .../org/apache/zeppelin/flink/Flink113Shims.java}  |  39 ++--
 .../flink/shims113/CollectStreamTableSink.java     |  97 +++++++++
 .../flink/shims113/Flink113ScalaShims.scala        |  36 ++++
 flink/interpreter/pom.xml                          |  16 +-
 .../org/apache/zeppelin/flink/TableEnvFactory.java | 133 ++++++++-----
 .../zeppelin/flink/FlinkScalaInterpreter.scala     |  21 +-
 .../zeppelin/flink/FlinkZeppelinContext.scala      |   1 -
 .../flink/FlinkStreamSqlInterpreterTest.java       |   2 +-
 .../zeppelin/flink/IPyFlinkInterpreterTest.java    |   2 +-
 .../apache/zeppelin/flink/SqlInterpreterTest.java  |  15 +-
 flink/pom.xml                                      |   2 +
 testing/env_python_3_with_flink_113.yml            |  26 +++
 .../integration/FlinkIntegrationTest113.java       |  39 ++++
 17 files changed, 571 insertions(+), 93 deletions(-)

diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
index f7b6562..475d569 100644
--- a/.github/workflows/core.yml
+++ b/.github/workflows/core.yml
@@ -144,7 +144,7 @@ jobs:
     strategy:
       fail-fast: false
       matrix:
-        flink: [ 110, 111, 112]
+        flink: [ 110, 111, 112, 113]
     steps:
       - name: Checkout
         uses: actions/checkout@v2
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index bbe718c..acdb4a9 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -62,6 +62,9 @@ public abstract class FlinkShims {
     } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 12) {
       LOGGER.info("Initializing shims for Flink 1.12");
       flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink112Shims");
+    } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 13) {
+      LOGGER.info("Initializing shims for Flink 1.13");
+      flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink113Shims");
     } else {
       throw new Exception("Flink version: '" + flinkVersion + "' is not supported yet");
     }
@@ -146,4 +149,12 @@ public abstract class FlinkShims {
   public abstract Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object executorConfig);
 
   public abstract Map extractTableConfigOptions();
+
+  public void setBatchRuntimeMode(Object tableConfig) {
+    // only needed after flink 1.13
+  }
+
+  public void setOldPlanner(Object tableConfig) {
+    // only needed after flink 1.13
+  }
 }
diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
index 4f6b698..b3f6ccc 100644
--- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
+++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
@@ -21,7 +21,6 @@ package org.apache.zeppelin.flink;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.compress.utils.Lists;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
diff --git a/flink/flink1.13-shims/pom.xml b/flink/flink1.13-shims/pom.xml
new file mode 100644
index 0000000..e7ef24e
--- /dev/null
+++ b/flink/flink1.13-shims/pom.xml
@@ -0,0 +1,221 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>flink-parent</artifactId>
+        <groupId>org.apache.zeppelin</groupId>
+        <version>0.10.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.zeppelin</groupId>
+    <artifactId>flink1.13-shims</artifactId>
+    <version>0.10.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+    <name>Zeppelin: Flink1.13 Shims</name>
+
+    <properties>
+        <flink.version>${flink1.13.version}</flink.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.12</scala.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.zeppelin</groupId>
+            <artifactId>flink-shims</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-scala_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-scala_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner-blink_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.reflections</groupId>
+                    <artifactId>reflections</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-python_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala-shell_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>eclipse-add-source</id>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile-first</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <scalaVersion>${scala.version}</scalaVersion>
+                    <args>
+                        <arg>-unchecked</arg>
+                        <arg>-deprecation</arg>
+                        <arg>-feature</arg>
+                        <arg>-target:jvm-1.8</arg>
+                    </args>
+                    <jvmArgs>
+                        <jvmArg>-Xms1024m</jvmArg>
+                        <jvmArg>-Xmx1024m</jvmArg>
+                        <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg>
+                    </jvmArgs>
+                    <javacArgs>
+                        <javacArg>-source</javacArg>
+                        <javacArg>${java.version}</javacArg>
+                        <javacArg>-target</javacArg>
+                        <javacArg>${java.version}</javacArg>
+                        <javacArg>-Xlint:all,-serial,-path,-options</javacArg>
+                    </javacArgs>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy-interpreter-setting</id>
+                        <phase>none</phase>
+                        <configuration>
+                            <skip>true</skip>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
similarity index 95%
copy from flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
copy to flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
index 4f6b698..8f6f813 100644
--- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
+++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
@@ -21,8 +21,8 @@ package org.apache.zeppelin.flink;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.compress.utils.Lists;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.scala.DataSet;
@@ -30,14 +30,16 @@ import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.python.PythonOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
-import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.PlannerType;
 import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
@@ -46,7 +48,6 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
-import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.delegation.Parser;
@@ -84,8 +85,8 @@ import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.FlinkException;
-import org.apache.zeppelin.flink.shims112.CollectStreamTableSink;
-import org.apache.zeppelin.flink.shims112.Flink112ScalaShims;
+import org.apache.zeppelin.flink.shims113.CollectStreamTableSink;
+import org.apache.zeppelin.flink.shims113.Flink113ScalaShims;
 import org.apache.zeppelin.flink.sql.SqlCommandParser;
 import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
 import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommandCall;
@@ -111,11 +112,11 @@ import java.util.regex.Matcher;
 
 
 /**
- * Shims for flink 1.12
+ * Shims for flink 1.13
  */
-public class Flink112Shims extends FlinkShims {
+public class Flink113Shims extends FlinkShims {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(Flink112Shims.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(Flink113Shims.class);
   public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder()
           .append("The following commands are available:\n\n")
           .append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under current catalog and database."))
@@ -142,7 +143,7 @@ public class Flink112Shims extends FlinkShims {
 
   private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>();
 
-  public Flink112Shims(Properties properties) {
+  public Flink113Shims(Properties properties) {
     super(properties);
   }
 
@@ -246,12 +247,12 @@ public class Flink112Shims extends FlinkShims {
 
   @Override
   public Object fromDataSet(Object btenv, Object ds) {
-    return Flink112ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds);
+    return Flink113ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds);
   }
 
   @Override
   public Object toDataSet(Object btenv, Object table) {
-    return Flink112ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table);
+    return Flink113ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table);
   }
 
   @Override
@@ -436,9 +437,7 @@ public class Flink112Shims extends FlinkShims {
   public void setCatalogManagerSchemaResolver(Object catalogManager,
                                               Object parserObject,
                                               Object environmentSetting) {
-    ((CatalogManager) catalogManager).setCatalogTableSchemaResolver(
-            new CatalogTableSchemaResolver((Parser)parserObject,
-                    ((EnvironmentSettings)environmentSetting).isStreamingMode()));
+
   }
 
   @Override
@@ -481,4 +480,16 @@ public class Flink112Shims extends FlinkShims {
     }
     return configOptions;
   }
+
+  @Override
+  public void setBatchRuntimeMode(Object tableConfig) {
+    ((TableConfig) tableConfig).getConfiguration()
+            .set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
+  }
+
+  @Override
+  public void setOldPlanner(Object tableConfig) {
+    ((TableConfig) tableConfig).getConfiguration()
+            .set(TableConfigOptions.TABLE_PLANNER, PlannerType.OLD);
+  }
 }
diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/shims113/CollectStreamTableSink.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/shims113/CollectStreamTableSink.java
new file mode 100644
index 0000000..3cbcc75
--- /dev/null
+++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/shims113/CollectStreamTableSink.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.shims113;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.experimental.CollectSink;
+import org.apache.flink.table.sinks.RetractStreamTableSink;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.UUID;
+
+/**
+ * Table sink for collecting the results locally using sockets.
+ */
+public class CollectStreamTableSink implements RetractStreamTableSink<Row> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(CollectStreamTableSink.class);
+
+  private final InetAddress targetAddress;
+  private final int targetPort;
+  private final TypeSerializer<Tuple2<Boolean, Row>> serializer;
+
+  private String[] fieldNames;
+  private TypeInformation<?>[] fieldTypes;
+
+  public CollectStreamTableSink(InetAddress targetAddress,
+                                int targetPort,
+                                TypeSerializer<Tuple2<Boolean, Row>> serializer) {
+    LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + targetPort);
+    this.targetAddress = targetAddress;
+    this.targetPort = targetPort;
+    this.serializer = serializer;
+  }
+
+  @Override
+  public String[] getFieldNames() {
+    return fieldNames;
+  }
+
+  @Override
+  public TypeInformation<?>[] getFieldTypes() {
+    return fieldTypes;
+  }
+
+  @Override
+  public CollectStreamTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+    final CollectStreamTableSink copy =
+            new CollectStreamTableSink(targetAddress, targetPort, serializer);
+    copy.fieldNames = fieldNames;
+    copy.fieldTypes = fieldTypes;
+    return copy;
+  }
+
+  @Override
+  public TypeInformation<Row> getRecordType() {
+    return Types.ROW_NAMED(fieldNames, fieldTypes);
+  }
+
+  @Override
+  public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> stream) {
+    // add sink
+    return stream
+            .addSink(new CollectSink<>(targetAddress, targetPort, serializer))
+            .name("Zeppelin Flink Sql Stream Collect Sink " + UUID.randomUUID())
+            .setParallelism(1);
+  }
+
+  @Override
+  public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
+    return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
+  }
+}
diff --git a/flink/flink1.13-shims/src/main/scala/org/apache/zeppelin/flink/shims113/Flink113ScalaShims.scala b/flink/flink1.13-shims/src/main/scala/org/apache/zeppelin/flink/shims113/Flink113ScalaShims.scala
new file mode 100644
index 0000000..10250b0
--- /dev/null
+++ b/flink/flink1.13-shims/src/main/scala/org/apache/zeppelin/flink/shims113/Flink113ScalaShims.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.shims113
+
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment
+import org.apache.flink.types.Row
+
+object Flink113ScalaShims {
+
+  def fromDataSet(btenv: BatchTableEnvironment, ds: DataSet[_]): Table = {
+    btenv.fromDataSet(ds)
+  }
+
+  def toDataSet(btenv: BatchTableEnvironment, table: Table): DataSet[Row] = {
+    btenv.toDataSet[Row](table)
+  }
+}
diff --git a/flink/interpreter/pom.xml b/flink/interpreter/pom.xml
index 65c0279..c09c0b1 100644
--- a/flink/interpreter/pom.xml
+++ b/flink/interpreter/pom.xml
@@ -91,6 +91,12 @@
 
     <dependency>
       <groupId>org.apache.zeppelin</groupId>
+      <artifactId>flink1.13-shims</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
       <artifactId>zeppelin-python</artifactId>
       <version>${project.version}</version>
       <exclusions>
@@ -705,9 +711,8 @@
           https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8191484 -->
           <argLine>-Xmx5120m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine>
 <!--          <argLine>-Xmx4096m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=6006</argLine>-->
-o
+
           <environmentVariables>
-<!--            <FLINK_HOME>/Users/jzhang/github/flink/build-target</FLINK_HOME>-->
             <FLINK_HOME>${project.build.directory}/flink-${flink.version}</FLINK_HOME>
             <FLINK_CONF_DIR>${project.build.directory}/test-classes</FLINK_CONF_DIR>
           </environmentVariables>
@@ -888,6 +893,13 @@ o
     </profile>
 
     <profile>
+      <id>flink-113</id>
+      <properties>
+        <flink.version>${flink1.13.version}</flink.version>
+      </properties>
+    </profile>
+
+    <profile>
       <id>hive2</id>
       <activation>
         <activeByDefault>true</activeByDefault>
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
index 9fb6efd..46ad481 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
@@ -55,30 +55,56 @@ public class TableEnvFactory {
   private Executor executor;
   private org.apache.flink.api.scala.ExecutionEnvironment benv;
   private org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv;
-  private TableConfig tblConfig;
+
+  /***********************************************************************
+  Should use different TableConfig for different kinds of table_env
+  otherwise it will cause conflicts after flink 1.13
+   ***********************************************************************/
+  // tableConfig used for StreamTableEnvironment.
+  private TableConfig streamTableConfig;
+  // tableConfig used for BatchTableEnvironment.
+  private TableConfig batchTableConfig;
+  // tableConfig for old planner
+  private TableConfig oldPlannerStreamTableConfig;
+  private TableConfig oldPlannerBatchTableConfig;
+
   private CatalogManager catalogManager;
+  private CatalogManager oldPlannerCatalogManager;
   private ModuleManager moduleManager;
-  private FunctionCatalog flinkFunctionCatalog;
-  private FunctionCatalog blinkFunctionCatalog;
+  private FunctionCatalog functionCatalog;
+  private FunctionCatalog oldPlannerFunctionCatalog;
+
 
   public TableEnvFactory(FlinkVersion flinkVersion,
                          FlinkShims flinkShims,
                          org.apache.flink.api.scala.ExecutionEnvironment env,
                          org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv,
-                         TableConfig tblConfig,
-                         CatalogManager catalogManager,
-                         ModuleManager moduleManager,
-                         FunctionCatalog flinkFunctionCatalog,
-                         FunctionCatalog blinkFunctionCatalog) {
+                         TableConfig streamTableConfig) {
+
     this.flinkVersion = flinkVersion;
     this.flinkShims = flinkShims;
     this.benv = env;
     this.senv = senv;
-    this.tblConfig = tblConfig;
-    this.catalogManager = catalogManager;
-    this.moduleManager = moduleManager;
-    this.flinkFunctionCatalog = flinkFunctionCatalog;
-    this.blinkFunctionCatalog = blinkFunctionCatalog;
+    this.streamTableConfig = streamTableConfig;
+    this.batchTableConfig = new TableConfig();
+    this.batchTableConfig.getConfiguration().addAll(streamTableConfig.getConfiguration());
+    flinkShims.setBatchRuntimeMode(this.batchTableConfig);
+    this.oldPlannerBatchTableConfig = new TableConfig();
+    this.oldPlannerBatchTableConfig.getConfiguration().addAll(streamTableConfig.getConfiguration());
+    flinkShims.setOldPlanner(this.oldPlannerBatchTableConfig);
+    this.oldPlannerStreamTableConfig = new TableConfig();
+    this.oldPlannerStreamTableConfig.getConfiguration().addAll(streamTableConfig.getConfiguration());
+    flinkShims.setOldPlanner(this.oldPlannerStreamTableConfig);
+
+    this.catalogManager = (CatalogManager) flinkShims.createCatalogManager(streamTableConfig.getConfiguration());
+    this.oldPlannerCatalogManager = (CatalogManager) flinkShims.createCatalogManager(
+            this.oldPlannerStreamTableConfig.getConfiguration());
+
+    this.moduleManager = new ModuleManager();
+
+    this.functionCatalog = new FunctionCatalog(streamTableConfig, catalogManager, moduleManager);
+    this.oldPlannerFunctionCatalog = new FunctionCatalog(
+            this.oldPlannerStreamTableConfig, this.oldPlannerCatalogManager, moduleManager);
   }
 
   public TableEnvironment createScalaFlinkBatchTableEnvironment() {
@@ -99,7 +125,7 @@ public class TableEnvFactory {
                       ModuleManager.class);
 
       return (TableEnvironment)
-              constructor.newInstance(benv, tblConfig, catalogManager, moduleManager);
+              constructor.newInstance(benv, oldPlannerBatchTableConfig, oldPlannerCatalogManager, moduleManager);
     } catch (Exception e) {
       throw new TableException("Fail to createScalaFlinkBatchTableEnvironment", e);
     }
@@ -115,8 +141,8 @@ public class TableEnvFactory {
               .create(
                       plannerProperties,
                       executor,
-                      tblConfig,
-                      flinkFunctionCatalog,
+                      streamTableConfig,
+                      oldPlannerFunctionCatalog,
                       catalogManager);
 
       Class clazz = null;
@@ -127,6 +153,7 @@ public class TableEnvFactory {
         clazz = Class
                 .forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl");
       }
+
       try {
         Constructor constructor = clazz
                 .getConstructor(
@@ -138,10 +165,11 @@ public class TableEnvFactory {
                         Planner.class,
                         Executor.class,
                         boolean.class);
-        return (TableEnvironment) constructor.newInstance(catalogManager,
+        return (TableEnvironment) constructor.newInstance(
+                oldPlannerCatalogManager,
                 moduleManager,
-                flinkFunctionCatalog,
-                tblConfig,
+                oldPlannerFunctionCatalog,
+                oldPlannerStreamTableConfig,
                 senv,
                 planner,
                 executor,
@@ -159,10 +187,11 @@ public class TableEnvFactory {
                         Executor.class,
                         boolean.class,
                         ClassLoader.class);
-        return (TableEnvironment) constructor.newInstance(catalogManager,
+        return (TableEnvironment) constructor.newInstance(
+                oldPlannerCatalogManager,
                 moduleManager,
-                flinkFunctionCatalog,
-                tblConfig,
+                oldPlannerFunctionCatalog,
+                oldPlannerStreamTableConfig,
                 senv,
                 planner,
                 executor,
@@ -191,10 +220,11 @@ public class TableEnvFactory {
               TableConfig.class,
               CatalogManager.class,
               ModuleManager.class);
+
       return (TableEnvironment) con.newInstance(
               benv.getJavaEnv(),
-              tblConfig,
-              catalogManager,
+              oldPlannerBatchTableConfig,
+              oldPlannerCatalogManager,
               moduleManager);
     } catch (Throwable t) {
       throw new TableException("Create BatchTableEnvironment failed.", t);
@@ -209,7 +239,7 @@ public class TableEnvFactory {
 
       Map<String, String> plannerProperties = settings.toPlannerProperties();
       Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
-              .create(plannerProperties, executor, tblConfig, flinkFunctionCatalog, catalogManager);
+              .create(plannerProperties, executor, streamTableConfig, oldPlannerFunctionCatalog, catalogManager);
 
       Class clazz = null;
       if (flinkVersion.isFlink110()) {
@@ -219,6 +249,7 @@ public class TableEnvFactory {
         clazz = Class
                 .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl");
       }
+
       try {
         Constructor constructor = clazz
                 .getConstructor(
@@ -230,10 +261,11 @@ public class TableEnvFactory {
                         Planner.class,
                         Executor.class,
                         boolean.class);
-        return (TableEnvironment) constructor.newInstance(catalogManager,
+        return (TableEnvironment) constructor.newInstance(
+                oldPlannerCatalogManager,
                 moduleManager,
-                flinkFunctionCatalog,
-                tblConfig,
+                oldPlannerFunctionCatalog,
+                oldPlannerStreamTableConfig,
                 senv.getJavaEnv(),
                 planner,
                 executor,
@@ -251,10 +283,11 @@ public class TableEnvFactory {
                         Executor.class,
                         boolean.class,
                         ClassLoader.class);
-        return (TableEnvironment) constructor.newInstance(catalogManager,
+        return (TableEnvironment) constructor.newInstance(
+                oldPlannerCatalogManager,
                 moduleManager,
-                flinkFunctionCatalog,
-                tblConfig,
+                oldPlannerFunctionCatalog,
+                oldPlannerStreamTableConfig,
                 senv.getJavaEnv(),
                 planner,
                 executor,
@@ -278,8 +311,8 @@ public class TableEnvFactory {
               .create(
                       plannerProperties,
                       executor,
-                      tblConfig,
-                      blinkFunctionCatalog,
+                      streamTableConfig,
+                      functionCatalog,
                       catalogManager);
 
 
@@ -304,8 +337,8 @@ public class TableEnvFactory {
                         boolean.class);
         return (TableEnvironment) constructor.newInstance(catalogManager,
                 moduleManager,
-                blinkFunctionCatalog,
-                tblConfig,
+                functionCatalog,
+                streamTableConfig,
                 senv,
                 planner,
                 executor,
@@ -325,8 +358,8 @@ public class TableEnvFactory {
                         ClassLoader.class);
         return (TableEnvironment) constructor.newInstance(catalogManager,
                 moduleManager,
-                blinkFunctionCatalog,
-                tblConfig,
+                functionCatalog,
+                streamTableConfig,
                 senv,
                 planner,
                 executor,
@@ -346,7 +379,7 @@ public class TableEnvFactory {
 
       Map<String, String> plannerProperties = settings.toPlannerProperties();
       Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
-              .create(plannerProperties, executor, tblConfig, blinkFunctionCatalog, catalogManager);
+              .create(plannerProperties, executor, streamTableConfig, functionCatalog, catalogManager);
 
       Class clazz = null;
       if (flinkVersion.isFlink110()) {
@@ -369,8 +402,8 @@ public class TableEnvFactory {
                         boolean.class);
         return (TableEnvironment) constructor.newInstance(catalogManager,
                 moduleManager,
-                blinkFunctionCatalog,
-                tblConfig,
+                functionCatalog,
+                streamTableConfig,
                 senv.getJavaEnv(),
                 planner,
                 executor,
@@ -390,8 +423,8 @@ public class TableEnvFactory {
                         ClassLoader.class);
         return (TableEnvironment) constructor.newInstance(catalogManager,
                 moduleManager,
-                blinkFunctionCatalog,
-                tblConfig,
+                functionCatalog,
+                streamTableConfig,
                 senv.getJavaEnv(),
                 planner,
                 executor,
@@ -410,7 +443,7 @@ public class TableEnvFactory {
       executor = lookupExecutor(executorProperties, senv.getJavaEnv());
       final Map<String, String> plannerProperties = settings.toPlannerProperties();
       final Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
-              .create(plannerProperties, executor, tblConfig, blinkFunctionCatalog, catalogManager);
+              .create(plannerProperties, executor, batchTableConfig, functionCatalog, catalogManager);
 
       Class clazz = null;
       if (flinkVersion.isFlink110()) {
@@ -433,8 +466,8 @@ public class TableEnvFactory {
         return (TableEnvironment) constructor.newInstance(
                 catalogManager,
                 moduleManager,
-                blinkFunctionCatalog,
-                tblConfig,
+                functionCatalog,
+                batchTableConfig,
                 senv.getJavaEnv(),
                 planner,
                 executor,
@@ -454,8 +487,8 @@ public class TableEnvFactory {
         return (TableEnvironment) constructor.newInstance(
                 catalogManager,
                 moduleManager,
-                blinkFunctionCatalog,
-                tblConfig,
+                functionCatalog,
+                batchTableConfig,
                 senv.getJavaEnv(),
                 planner,
                 executor,
@@ -469,7 +502,7 @@ public class TableEnvFactory {
   }
 
 
-  public void createPlanner(EnvironmentSettings settings) {
+  public void createStreamPlanner(EnvironmentSettings settings) {
     Map<String, String> executorProperties = settings.toExecutorProperties();
     Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
 
@@ -478,8 +511,8 @@ public class TableEnvFactory {
             .create(
                     plannerProperties,
                     executor,
-                    tblConfig,
-                    blinkFunctionCatalog,
+                    streamTableConfig,
+                    functionCatalog,
                     catalogManager);
     this.flinkShims.setCatalogManagerSchemaResolver(catalogManager, planner.getParser(), settings);
   }
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 043f3d4..1214009 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -40,7 +40,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, TableEnvironment}
-import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
+import org.apache.flink.table.catalog.CatalogManager
 import org.apache.flink.table.catalog.hive.HiveCatalog
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableAggregateFunction, TableFunction}
 import org.apache.flink.table.module.ModuleManager
@@ -435,18 +435,11 @@ class FlinkScalaInterpreter(val properties: Properties) {
     val originalClassLoader = Thread.currentThread().getContextClassLoader
     try {
       Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
-      val tblConfig = new TableConfig
-      tblConfig.getConfiguration.addAll(configuration)
-      // Step 1.1 Initialize the CatalogManager if required.
-      val catalogManager = flinkShims.createCatalogManager(tblConfig.getConfiguration).asInstanceOf[CatalogManager]
-      // Step 1.2 Initialize the ModuleManager if required.
-      val moduleManager = new ModuleManager();
-      // Step 1.3 Initialize the FunctionCatalog if required.
-      val flinkFunctionCatalog = new FunctionCatalog(tblConfig, catalogManager, moduleManager);
-      val blinkFunctionCatalog = new FunctionCatalog(tblConfig, catalogManager, moduleManager);
-
-      this.tblEnvFactory = new TableEnvFactory(this.flinkVersion, this.flinkShims, this.benv, this.senv, tblConfig,
-        catalogManager, moduleManager, flinkFunctionCatalog, blinkFunctionCatalog)
+      val tableConfig = new TableConfig
+      tableConfig.getConfiguration.addAll(configuration)
+
+      this.tblEnvFactory = new TableEnvFactory(this.flinkVersion, this.flinkShims,
+        this.benv, this.senv, tableConfig)
 
       val modifiers = new java.util.ArrayList[String]()
       modifiers.add("@transient")
@@ -635,7 +628,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
       Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
       val stEnvSetting =
         EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
-      this.tblEnvFactory.createPlanner(stEnvSetting)
+      this.tblEnvFactory.createStreamPlanner(stEnvSetting)
     } finally {
       Thread.currentThread().setContextClassLoader(originalClassLoader)
     }
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
index b3a964a..69bfa33 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
@@ -135,7 +135,6 @@ class FlinkZeppelinContext(val flinkInterpreter: FlinkScalaInterpreter,
   }
 
   def show(table: Table, streamType: String, configs: Map[String, String] = Map.empty): Unit = {
-    val stenv = flinkInterpreter.getJavaStreamTableEnvironment("blink")
     val context = InterpreterContext.get()
     configs.foreach(e => context.getLocalProperties.put(e._1, e._2))
     val tableName = "UnnamedTable_" + context.getParagraphId.replace("-", "_") + "_" + SQL_INDEX.getAndIncrement()
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
index e392816..382a9b9 100644
--- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
@@ -470,7 +470,7 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
                     "  'update-mode' = 'append'\n" +
                     ")\n",
             context);
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
     List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
     assertEquals(1, resultMessages.size());
     assertEquals(InterpreterResult.Type.TEXT, resultMessages.get(0).getType());
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
index 84f391b..6a8d71f 100644
--- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
@@ -176,7 +176,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
         "t.select(\"a + 1, b, c\").insert_into(\"batch_sink\")\n" +
         "bt_env.execute(\"batch_job\")"
             , context);
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(result.toString(), InterpreterResult.Code.SUCCESS, result.code());
 
     // use group by
     context = createInterpreterContext();
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
index 5fd7e52..1d49686 100644
--- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
@@ -72,11 +72,6 @@ import static org.mockito.Mockito.mock;
 public abstract class SqlInterpreterTest {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SqlInterpreterTest.class);
-  protected static final String[][] INPUT_DATA = {
-          {"1", "1.1", "hello world", "true"},
-          {"2", "2.3", "hello flink", "true"},
-          {"3", "3.2", "hello hadoop", "false"},
-  };
 
 
   protected FlinkInterpreter flinkInterpreter;
@@ -189,7 +184,7 @@ public abstract class SqlInterpreterTest {
     assertEquals("table\n", resultMessages.get(0).getData());
 
     context = getInterpreterContext();
-    result = sqlInterpreter.interpret("CREATE TABLE source (msg INT)", context);
+    result = sqlInterpreter.interpret("CREATE TABLE source (msg INT) with ('connector'='print')", context);
     assertEquals(Code.SUCCESS, result.code());
 
     context = getInterpreterContext();
@@ -242,7 +237,7 @@ public abstract class SqlInterpreterTest {
     InterpreterContext context = getInterpreterContext();
     InterpreterResult result = sqlInterpreter.interpret(
             "CREATE TABLE source_table (int_col INT, double_col double, " +
-                    "varchar_col varchar, bool_col boolean)",
+                    "varchar_col varchar, bool_col boolean) with ('connector'='print')",
             context);
     assertEquals(Code.SUCCESS, result.code());
     List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
@@ -403,7 +398,11 @@ public abstract class SqlInterpreterTest {
     resultMessages = context.out.toInterpreterResultMessage();
     assertEquals(1, resultMessages.size());
     assertEquals(Type.TEXT, resultMessages.get(0).getType());
-    assertTrue(resultMessages.get(0).getData(), resultMessages.get(0).getData().contains("Physical Execution Plan"));
+    if (flinkInterpreter.getFlinkVersion().olderThan(FlinkVersion.fromVersionString("1.13.0"))) {
+      assertTrue(resultMessages.get(0).getData(), resultMessages.get(0).getData().contains("Physical Execution Plan"));
+    } else {
+      assertTrue(resultMessages.get(0).getData(), resultMessages.get(0).getData().contains("Optimized Execution Plan"));
+    }
   }
 
   @Test
diff --git a/flink/pom.xml b/flink/pom.xml
index 32460fc..62b333b 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -40,12 +40,14 @@
         <module>flink1.10-shims</module>
         <module>flink1.11-shims</module>
         <module>flink1.12-shims</module>
+        <module>flink1.13-shims</module>
     </modules>
 
     <properties>
         <flink1.10.version>1.10.3</flink1.10.version>
         <flink1.11.version>1.11.3</flink1.11.version>
         <flink1.12.version>1.12.0</flink1.12.version>
+        <flink1.13.version>1.13.0</flink1.13.version>
     </properties>
 
     <dependencies>
diff --git a/testing/env_python_3_with_flink_113.yml b/testing/env_python_3_with_flink_113.yml
new file mode 100644
index 0000000..796afff
--- /dev/null
+++ b/testing/env_python_3_with_flink_113.yml
@@ -0,0 +1,26 @@
+name: python_3_with_flink
+channels:
+  - conda-forge
+  - defaults
+dependencies:
+  - pip
+  - pip:
+      - bkzep==0.6.1
+      - apache-flink==1.13.0
+  - pycodestyle
+  - numpy=1
+  - pandas=0.25
+  - scipy=1
+  - grpcio
+  - hvplot
+  - protobuf=3
+  - pandasql=0.7.3
+  - ipython=7
+  - matplotlib=3
+  - ipykernel=5
+  - jupyter_client=5
+  - bokeh=1.3.4
+  - panel
+  - holoviews
+  - pyyaml=3
+
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest113.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest113.java
new file mode 100644
index 0000000..9d48949
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest113.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(value = Parameterized.class)
+public class FlinkIntegrationTest113 extends FlinkIntegrationTest {
+
+  @Parameterized.Parameters
+  public static List<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+            {"1.13.0"}
+    });
+  }
+
+  public FlinkIntegrationTest113(String flinkVersion) {
+    super(flinkVersion);
+  }
+}