You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/12/17 03:17:38 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5156]. Support flink 1.12

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 35e9d2b  [ZEPPELIN-5156]. Support flink 1.12
35e9d2b is described below

commit 35e9d2b461036de8b5f19a77edd56d2df168e260
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Dec 9 14:00:14 2020 +0800

    [ZEPPELIN-5156]. Support flink 1.12
    
    ### What is this PR for?
    
    Flink 1.12 is almost the same as flink 1.11 in api perpsctive, the only change is ExecutionConfig#disableSysoutLogging is removed. So in this PR I fix this issue by didn't call this method if flink version is newer or equal to 1.12.
    
    ### What type of PR is it?
    [ Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5156
    
    ### How should this be tested?
    * Manually tested on flink 1.12
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3990 from zjffdu/ZEPPELIN-5156 and squashes the following commits:
    
    f5f64cd4d [Jeff Zhang] [ZEPPELIN-5156]. Support flink 1.12
    
    (cherry picked from commit 89f4ee77464030bf0aae6f07af6aa4ba0250041e)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .github/workflows/core.yml                         |   9 +-
 .../java/org/apache/zeppelin/flink/FlinkShims.java |  11 +-
 .../org/apache/zeppelin/flink/FlinkVersion.java    |   7 +
 .../org/apache/zeppelin/flink/Flink110Shims.java   |  34 +++-
 .../org/apache/zeppelin/flink/Flink111Shims.java   |  30 ++-
 flink/flink1.12-shims/pom.xml                      | 221 +++++++++++++++++++++
 .../org/apache/zeppelin/flink/Flink112Shims.java}  |  46 ++++-
 .../flink/shims112/CollectStreamTableSink.java     |  97 +++++++++
 .../flink/shims112/Flink112ScalaShims.scala        |  36 ++++
 flink/interpreter/pom.xml                          |  21 +-
 .../zeppelin/flink/FlinkScalaInterpreter.scala     |   7 +-
 .../org/apache/zeppelin/flink/FlinkShell.scala     |  26 +--
 .../flink/FlinkStreamSqlInterpreterTest.java       |   9 +-
 flink/pom.xml                                      |   2 +
 ...nk_1_10.yml => env_python_3_with_flink_110.yml} |   0
 ...nk_1_11.yml => env_python_3_with_flink_111.yml} |   0
 ...nk_1_11.yml => env_python_3_with_flink_112.yml} |   9 +-
 .../integration/FlinkIntegrationTest112.java       |  39 ++++
 .../integration/ZeppelinFlinkClusterTest112.java   |  38 ++++
 19 files changed, 595 insertions(+), 47 deletions(-)

diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
index 4faf67b..9e680de 100644
--- a/.github/workflows/core.yml
+++ b/.github/workflows/core.yml
@@ -126,8 +126,9 @@ jobs:
   test-flink-and-flink-integration-test:
     runs-on: ubuntu-18.04
     strategy:
+      fail-fast: false
       matrix:
-        flink: [ flink_1_10, flink_1_11]
+        flink: [ 110, 111, 112]
     steps:
       - name: Checkout
         uses: actions/checkout@v2
@@ -148,15 +149,15 @@ jobs:
         uses: conda-incubator/setup-miniconda@v2
         with:
           activate-environment: python_3_with_flink
-          environment-file: testing/env_python_3 with_${{ matrix.flink }}.yml
+          environment-file: testing/env_python_3_with_flink_${{ matrix.flink }}.yml
           python-version: 3.7
           auto-activate-base: false
       - name: install environment
         run: |
-          mvn install -DskipTests -DskipRat -am -pl flink/interpreter,zeppelin-interpreter-integration -Pflink-1.10 -Pintegration -B
+          mvn install -DskipTests -DskipRat -am -pl flink/interpreter,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration -B
           mvn clean package -T 2C -pl zeppelin-plugins -amd -DskipTests -B
       - name: run tests
-        run: mvn test -DskipRat -pl flink/interpreter,zeppelin-interpreter-integration -Pflink-1.10 -Pintegration -B -Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest110,ZeppelinFlinkClusterTest110
+        run: mvn test -DskipRat -pl flink/interpreter,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration -B -Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest${{ matrix.flink }},ZeppelinFlinkClusterTest${{ matrix.flink }}
   run-spark-intergration-test:
     runs-on: ubuntu-18.04
     steps:
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index f7a7514..717ef3d 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -56,9 +56,12 @@ public abstract class FlinkShims {
     if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 10) {
       LOGGER.info("Initializing shims for Flink 1.10");
       flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink110Shims");
-    } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() >= 11) {
+    } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 11) {
       LOGGER.info("Initializing shims for Flink 1.11");
       flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink111Shims");
+    } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 12) {
+      LOGGER.info("Initializing shims for Flink 1.12");
+      flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink112Shims");
     } else {
       throw new Exception("Flink version: '" + flinkVersion + "' is not supported yet");
     }
@@ -92,6 +95,10 @@ public abstract class FlinkShims {
             .toAttributedString();
   }
 
+  public abstract void disableSysoutLogging(Object batchConfig, Object streamConfig);
+
+  public abstract Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment);
+
   public abstract Object createCatalogManager(Object config);
 
   public abstract String getPyFlinkPythonPath(Properties properties) throws IOException;
@@ -134,7 +141,7 @@ public abstract class FlinkShims {
                                                        Object parser,
                                                        Object environmentSetting);
 
-  public abstract Object getCustomCli(Object cliFrontend, Object commandLine);
+  public abstract Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object executorConfig);
 
   public abstract Map extractTableConfigOptions();
 }
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java
index d593746..dcf32d0 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java
@@ -26,6 +26,7 @@ public class FlinkVersion {
   private int majorVersion;
   private int minorVersion;
   private int patchVersion;
+  private int version;
   private String versionString;
 
   FlinkVersion(String versionString) {
@@ -46,6 +47,8 @@ public class FlinkVersion {
         this.patchVersion = Integer.parseInt(versions[2]);
       }
 
+      this.version = Integer.parseInt(String.format("%d%02d%02d",
+              majorVersion, minorVersion, patchVersion));
     } catch (Exception e) {
       logger.error("Can not recognize Flink version " + versionString +
           ". Assume it's a future release", e);
@@ -56,6 +59,10 @@ public class FlinkVersion {
     return majorVersion;
   }
 
+  public boolean olderThan(FlinkVersion versionToCompare) {
+    return this.version < versionToCompare.version;
+  }
+
   public int getMinorVersion() {
     return minorVersion;
   }
diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
index e917b3c..ee551ea 100644
--- a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
+++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
@@ -19,13 +19,18 @@
 package org.apache.zeppelin.flink;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.scala.DataSet;
 import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.python.PythonOptions;
 import org.apache.flink.python.util.ResourceUtil;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableUtils;
@@ -40,6 +45,7 @@ import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkException;
 import org.apache.zeppelin.flink.shims110.CollectStreamTableSink;
 import org.apache.zeppelin.flink.shims110.Flink110ScalaShims;
 import org.apache.zeppelin.flink.sql.SqlCommandParser;
@@ -100,6 +106,22 @@ public class Flink110Shims extends FlinkShims {
   }
 
   @Override
+  public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
+    ((ExecutionConfig) batchConfig).disableSysoutLogging();
+    ((ExecutionConfig) streamConfig).disableSysoutLogging();
+  }
+
+  @Override
+  public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) {
+    return new StreamExecutionEnvironmentFactory() {
+      @Override
+      public StreamExecutionEnvironment createExecutionEnvironment() {
+        return (StreamExecutionEnvironment) streamExecutionEnvironment;
+      }
+    };
+  }
+
+  @Override
   public Object createCatalogManager(Object config) {
     return new CatalogManager("default_catalog",
             new GenericInMemoryCatalog("default_catalog", "default_database"));
@@ -242,18 +264,24 @@ public class Flink110Shims extends FlinkShims {
   }
 
   @Override
-  public Object getCustomCli(Object cliFrontend, Object commandLine) {
+  public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object effectiveConfig) {
+    CustomCommandLine customCommandLine = null;
     try {
-      return ((CliFrontend) cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine);
+      customCommandLine = ((CliFrontend) cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine);
     } catch (NoSuchMethodError e) {
       try {
         Method method = CliFrontend.class.getMethod("getActiveCustomCommandLine", CommandLine.class);
-        return method.invoke((CliFrontend) cliFrontend, commandLine);
+        customCommandLine = (CustomCommandLine) method.invoke((CliFrontend) cliFrontend, commandLine);
       } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ex) {
         LOGGER.error("Fail to call getCustomCli", ex);
         throw new RuntimeException("Fail to call getCustomCli", ex);
       }
     }
+    try {
+      return customCommandLine.applyCommandLineOptionsToConfiguration((CommandLine) commandLine);
+    } catch (FlinkException e) {
+      throw new RuntimeException("Fail to call applyCommandLineOptionsToConfiguration", e);
+    }
   }
 
   @Override
diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
index 8fad5d4..9c26346 100644
--- a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
+++ b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
@@ -20,15 +20,19 @@ package org.apache.zeppelin.flink;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.compress.utils.Lists;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.scala.DataSet;
 import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.python.PythonOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.Table;
@@ -76,6 +80,7 @@ import org.apache.flink.table.operations.ddl.DropViewOperation;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
+import org.apache.flink.util.FlinkException;
 import org.apache.zeppelin.flink.shims111.CollectStreamTableSink;
 import org.apache.zeppelin.flink.shims111.Flink111ScalaShims;
 import org.apache.zeppelin.flink.sql.SqlCommandParser;
@@ -139,6 +144,22 @@ public class Flink111Shims extends FlinkShims {
   }
 
   @Override
+  public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
+    ((ExecutionConfig) batchConfig).disableSysoutLogging();
+    ((ExecutionConfig) streamConfig).disableSysoutLogging();
+  }
+
+  @Override
+  public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) {
+    return new StreamExecutionEnvironmentFactory() {
+      @Override
+      public StreamExecutionEnvironment createExecutionEnvironment() {
+        return (StreamExecutionEnvironment) streamExecutionEnvironment;
+      }
+    };
+  }
+
+  @Override
   public Object createCatalogManager(Object config) {
     return CatalogManager.newBuilder()
             .classLoader(Thread.currentThread().getContextClassLoader())
@@ -402,8 +423,13 @@ public class Flink111Shims extends FlinkShims {
   }
 
   @Override
-  public Object getCustomCli(Object cliFrontend, Object commandLine) {
-    return ((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine);
+  public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object effectiveConfig) {
+    CustomCommandLine customCommandLine = ((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine);
+    try {
+      return customCommandLine.applyCommandLineOptionsToConfiguration((CommandLine) commandLine);
+    } catch (FlinkException e) {
+      throw new RuntimeException("Fail to call applyCommandLineOptionsToConfiguration", e);
+    }
   }
 
   @Override
diff --git a/flink/flink1.12-shims/pom.xml b/flink/flink1.12-shims/pom.xml
new file mode 100644
index 0000000..e234b5c
--- /dev/null
+++ b/flink/flink1.12-shims/pom.xml
@@ -0,0 +1,221 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>flink-parent</artifactId>
+        <groupId>org.apache.zeppelin</groupId>
+        <version>0.9.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.zeppelin</groupId>
+    <artifactId>flink1.12-shims</artifactId>
+    <version>0.9.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+    <name>Zeppelin: Flink1.12 Shims</name>
+
+    <properties>
+        <flink.version>${flink1.12.version}</flink.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.12</scala.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.zeppelin</groupId>
+            <artifactId>flink-shims</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-scala_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-scala_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner-blink_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.reflections</groupId>
+                    <artifactId>reflections</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-python_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala-shell_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>eclipse-add-source</id>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile-first</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <scalaVersion>${scala.version}</scalaVersion>
+                    <args>
+                        <arg>-unchecked</arg>
+                        <arg>-deprecation</arg>
+                        <arg>-feature</arg>
+                        <arg>-target:jvm-1.8</arg>
+                    </args>
+                    <jvmArgs>
+                        <jvmArg>-Xms1024m</jvmArg>
+                        <jvmArg>-Xmx1024m</jvmArg>
+                        <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg>
+                    </jvmArgs>
+                    <javacArgs>
+                        <javacArg>-source</javacArg>
+                        <javacArg>${java.version}</javacArg>
+                        <javacArg>-target</javacArg>
+                        <javacArg>${java.version}</javacArg>
+                        <javacArg>-Xlint:all,-serial,-path,-options</javacArg>
+                    </javacArgs>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy-interpreter-setting</id>
+                        <phase>none</phase>
+                        <configuration>
+                            <skip>true</skip>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
similarity index 92%
copy from flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
copy to flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
index 8fad5d4..dc5a4db 100644
--- a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
+++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
@@ -20,15 +20,20 @@ package org.apache.zeppelin.flink;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.compress.utils.Lists;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.scala.DataSet;
 import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.python.PythonOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.Table;
@@ -76,8 +81,9 @@ import org.apache.flink.table.operations.ddl.DropViewOperation;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
+import org.apache.flink.util.FlinkException;
 import org.apache.zeppelin.flink.shims111.CollectStreamTableSink;
-import org.apache.zeppelin.flink.shims111.Flink111ScalaShims;
+import org.apache.zeppelin.flink.shims112.Flink112ScalaShims;
 import org.apache.zeppelin.flink.sql.SqlCommandParser;
 import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
 import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommandCall;
@@ -103,11 +109,11 @@ import java.util.regex.Matcher;
 
 
 /**
- * Shims for flink 1.11
+ * Shims for flink 1.12
  */
-public class Flink111Shims extends FlinkShims {
+public class Flink112Shims extends FlinkShims {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(Flink111Shims.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(Flink112Shims.class);
   public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder()
           .append("The following commands are available:\n\n")
           .append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under current catalog and database."))
@@ -134,11 +140,27 @@ public class Flink111Shims extends FlinkShims {
 
   private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>();
 
-  public Flink111Shims(Properties properties) {
+  public Flink112Shims(Properties properties) {
     super(properties);
   }
 
   @Override
+  public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
+    // do nothing
+  }
+
+
+  @Override
+  public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) {
+    return new StreamExecutionEnvironmentFactory() {
+      @Override
+      public StreamExecutionEnvironment createExecutionEnvironment(Configuration configuration) {
+        return (StreamExecutionEnvironment) streamExecutionEnvironment;
+      }
+    };
+  }
+
+  @Override
   public Object createCatalogManager(Object config) {
     return CatalogManager.newBuilder()
             .classLoader(Thread.currentThread().getContextClassLoader())
@@ -211,12 +233,12 @@ public class Flink111Shims extends FlinkShims {
 
   @Override
   public Object fromDataSet(Object btenv, Object ds) {
-    return Flink111ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds);
+    return Flink112ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds);
   }
 
   @Override
   public Object toDataSet(Object btenv, Object table) {
-    return Flink111ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table);
+    return Flink112ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table);
   }
 
   @Override
@@ -402,8 +424,14 @@ public class Flink111Shims extends FlinkShims {
   }
 
   @Override
-  public Object getCustomCli(Object cliFrontend, Object commandLine) {
-    return ((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine);
+  public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object effectiveConfig) {
+    CustomCommandLine customCommandLine = ((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine);
+    try {
+       ((Configuration) effectiveConfig).addAll(customCommandLine.toConfiguration((CommandLine) commandLine));
+       return effectiveConfig;
+    } catch (FlinkException e) {
+      throw new RuntimeException("Fail to call addAll", e);
+    }
   }
 
   @Override
diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/CollectStreamTableSink.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/CollectStreamTableSink.java
new file mode 100644
index 0000000..b98f406
--- /dev/null
+++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/CollectStreamTableSink.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.shims111;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.experimental.CollectSink;
+import org.apache.flink.table.sinks.RetractStreamTableSink;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.UUID;
+
+/**
+ * Table sink for collecting the results locally using sockets.
+ */
+public class CollectStreamTableSink implements RetractStreamTableSink<Row> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(CollectStreamTableSink.class);
+
+  private final InetAddress targetAddress;
+  private final int targetPort;
+  private final TypeSerializer<Tuple2<Boolean, Row>> serializer;
+
+  private String[] fieldNames;
+  private TypeInformation<?>[] fieldTypes;
+
+  public CollectStreamTableSink(InetAddress targetAddress,
+                                int targetPort,
+                                TypeSerializer<Tuple2<Boolean, Row>> serializer) {
+    LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + targetPort);
+    this.targetAddress = targetAddress;
+    this.targetPort = targetPort;
+    this.serializer = serializer;
+  }
+
+  @Override
+  public String[] getFieldNames() {
+    return fieldNames;
+  }
+
+  @Override
+  public TypeInformation<?>[] getFieldTypes() {
+    return fieldTypes;
+  }
+
+  @Override
+  public CollectStreamTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+    final CollectStreamTableSink copy =
+            new CollectStreamTableSink(targetAddress, targetPort, serializer);
+    copy.fieldNames = fieldNames;
+    copy.fieldTypes = fieldTypes;
+    return copy;
+  }
+
+  @Override
+  public TypeInformation<Row> getRecordType() {
+    return Types.ROW_NAMED(fieldNames, fieldTypes);
+  }
+
+  @Override
+  public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> stream) {
+    // add sink
+    return stream
+            .addSink(new CollectSink<>(targetAddress, targetPort, serializer))
+            .name("Zeppelin Flink Sql Stream Collect Sink " + UUID.randomUUID())
+            .setParallelism(1);
+  }
+
+  @Override
+  public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
+    return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
+  }
+}
diff --git a/flink/flink1.12-shims/src/main/scala/org/apache/zeppelin/flink/shims112/Flink112ScalaShims.scala b/flink/flink1.12-shims/src/main/scala/org/apache/zeppelin/flink/shims112/Flink112ScalaShims.scala
new file mode 100644
index 0000000..988ad4e
--- /dev/null
+++ b/flink/flink1.12-shims/src/main/scala/org/apache/zeppelin/flink/shims112/Flink112ScalaShims.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.shims112
+
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment
+import org.apache.flink.types.Row
+
+object Flink112ScalaShims {
+
+  def fromDataSet(btenv: BatchTableEnvironment, ds: DataSet[_]): Table = {
+    btenv.fromDataSet(ds)
+  }
+
+  def toDataSet(btenv: BatchTableEnvironment, table: Table): DataSet[Row] = {
+    btenv.toDataSet[Row](table)
+  }
+}
diff --git a/flink/interpreter/pom.xml b/flink/interpreter/pom.xml
index 8b34a19..2807552 100644
--- a/flink/interpreter/pom.xml
+++ b/flink/interpreter/pom.xml
@@ -77,6 +77,12 @@
 
     <dependency>
       <groupId>org.apache.zeppelin</groupId>
+      <artifactId>flink1.12-shims</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
       <artifactId>zeppelin-python</artifactId>
       <version>${project.version}</version>
       <exclusions>
@@ -689,9 +695,9 @@
           <!-- set sun.zip.disableMemoryMapping=true because of
           https://blogs.oracle.com/poonam/crashes-in-zipgetentry
           https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8191484 -->
-          <argLine>-Xmx4096m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine>
+          <argLine>-Xmx5120m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine>
 <!--          <argLine>-Xmx4096m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=6006</argLine>-->
-
+o
           <environmentVariables>
 <!--            <FLINK_HOME>/Users/jzhang/github/flink/build-target</FLINK_HOME>-->
             <FLINK_HOME>${project.build.directory}/flink-${flink.version}</FLINK_HOME>
@@ -853,20 +859,27 @@
   <profiles>
 
     <profile>
-      <id>flink-1.10</id>
+      <id>flink-110</id>
       <properties>
         <flink.version>${flink1.10.version}</flink.version>
       </properties>
     </profile>
 
     <profile>
-      <id>flink-1.11</id>
+      <id>flink-111</id>
       <properties>
         <flink.version>${flink1.11.version}</flink.version>
       </properties>
     </profile>
 
     <profile>
+      <id>flink-112</id>
+      <properties>
+        <flink.version>${flink1.12.version}</flink.version>
+      </properties>
+    </profile>
+
+    <profile>
       <id>hive2</id>
       <activation>
         <activeByDefault>true</activeByDefault>
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 9865145..572426b 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -445,8 +445,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
 
     if (java.lang.Boolean.parseBoolean(
       properties.getProperty("zeppelin.flink.disableSysoutLogging", "true"))) {
-      this.benv.getConfig.disableSysoutLogging()
-      this.senv.getConfig.disableSysoutLogging()
+      flinkShims.disableSysoutLogging(this.benv.getConfig, this.senv.getConfig);
     }
   }
 
@@ -511,9 +510,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
   }
 
   private def setAsContext(): Unit = {
-    val streamFactory = new StreamExecutionEnvironmentFactory() {
-      override def createExecutionEnvironment = senv.getJavaEnv
-    }
+    val streamFactory = flinkShims.createStreamExecutionEnvironmentFactory(this.senv.getJavaEnv)
     //StreamExecutionEnvironment
     var method = classOf[JStreamExecutionEnvironment].getDeclaredMethod("initializeContextEnvironment",
       classOf[StreamExecutionEnvironmentFactory])
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkShell.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkShell.scala
index 3f814b9..e4bcf1b 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkShell.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkShell.scala
@@ -109,7 +109,7 @@ object FlinkShell {
   }
 
   private def deployNewYarnCluster(config: Config, flinkConfig: Configuration, flinkShims: FlinkShims) = {
-    val effectiveConfig = new Configuration(flinkConfig)
+    var effectiveConfig = new Configuration(flinkConfig)
     val args = parseArgList(config, "yarn-cluster")
 
     val configurationDirectory = getConfigDir(config)
@@ -123,24 +123,25 @@ object FlinkShell {
       frontend.getCustomCommandLineOptions)
     val commandLine = CliFrontendParser.parse(commandLineOptions, args, true)
 
-    val customCLI = flinkShims.getCustomCli(frontend, commandLine).asInstanceOf[CustomCommandLine]
-    val executorConfig = customCLI.applyCommandLineOptionsToConfiguration(commandLine)
+    effectiveConfig = flinkShims
+      .updateEffectiveConfig(frontend, commandLine, effectiveConfig)
+      .asInstanceOf[Configuration]
 
     val serviceLoader = new DefaultClusterClientServiceLoader
-    val clientFactory = serviceLoader.getClusterClientFactory(executorConfig)
-    val clusterDescriptor = clientFactory.createClusterDescriptor(executorConfig)
-    val clusterSpecification = clientFactory.getClusterSpecification(executorConfig)
+    val clientFactory = serviceLoader.getClusterClientFactory(effectiveConfig)
+    val clusterDescriptor = clientFactory.createClusterDescriptor(effectiveConfig)
+    val clusterSpecification = clientFactory.getClusterSpecification(effectiveConfig)
 
     val clusterClient = try {
       clusterDescriptor
         .deploySessionCluster(clusterSpecification)
         .getClusterClient
     } finally {
-      executorConfig.set(DeploymentOptions.TARGET, "yarn-session")
+      effectiveConfig.set(DeploymentOptions.TARGET, "yarn-session")
       clusterDescriptor.close()
     }
 
-    (executorConfig, Some(clusterClient))
+    (effectiveConfig, Some(clusterClient))
   }
 
   private def fetchDeployedYarnClusterInfo(
@@ -149,7 +150,7 @@ object FlinkShell {
       mode: String,
       flinkShims: FlinkShims) = {
 
-    val effectiveConfig = new Configuration(flinkConfig)
+    var effectiveConfig = new Configuration(flinkConfig)
     val args = parseArgList(config, mode)
 
     val configurationDirectory = getConfigDir(config)
@@ -163,10 +164,11 @@ object FlinkShell {
       frontend.getCustomCommandLineOptions)
     val commandLine = CliFrontendParser.parse(commandLineOptions, args, true)
 
-    val customCLI = flinkShims.getCustomCli(frontend, commandLine).asInstanceOf[CustomCommandLine]
-    val executorConfig = customCLI.applyCommandLineOptionsToConfiguration(commandLine);
+    effectiveConfig = flinkShims
+      .updateEffectiveConfig(frontend, commandLine, effectiveConfig)
+      .asInstanceOf[Configuration]
 
-    (executorConfig, None)
+    (effectiveConfig, None)
   }
 
   def parseArgList(config: Config, mode: String): Array[String] = {
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
index 16e7ee1..d27f422 100644
--- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
@@ -331,8 +331,13 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
     result = sqlInterpreter.interpret("select url, count(1) as pv from " +
             "log group by url", context);
 
-    assertEquals(InterpreterResult.Code.ERROR, result.code());
-    assertTrue(context.out.toString(), context.out.toString().contains("Cannot find checkpoint or savepoint"));
+    if (flinkInterpreter.getFlinkVersion().olderThan(FlinkVersion.fromVersionString("1.12.0"))) {
+      assertEquals(InterpreterResult.Code.ERROR, result.code());
+      assertTrue(context.out.toString(), context.out.toString().contains("Cannot find checkpoint or savepoint"));
+    } else {
+      // flink 1.12 would start from scratch if save point is not found.
+      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    }
   }
 
   @Test
diff --git a/flink/pom.xml b/flink/pom.xml
index 2ddcef3..c9e9b32 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -39,11 +39,13 @@
         <module>flink-shims</module>
         <module>flink1.10-shims</module>
         <module>flink1.11-shims</module>
+        <module>flink1.12-shims</module>
     </modules>
 
     <properties>
         <flink1.10.version>1.10.2</flink1.10.version>
         <flink1.11.version>1.11.2</flink1.11.version>
+        <flink1.12.version>1.12.0</flink1.12.version>
     </properties>
 
     <dependencies>
diff --git a/testing/env_python_3 with_flink_1_10.yml b/testing/env_python_3_with_flink_110.yml
similarity index 100%
rename from testing/env_python_3 with_flink_1_10.yml
rename to testing/env_python_3_with_flink_110.yml
diff --git a/testing/env_python_3 with_flink_1_11.yml b/testing/env_python_3_with_flink_111.yml
similarity index 100%
copy from testing/env_python_3 with_flink_1_11.yml
copy to testing/env_python_3_with_flink_111.yml
diff --git a/testing/env_python_3 with_flink_1_11.yml b/testing/env_python_3_with_flink_112.yml
similarity index 86%
rename from testing/env_python_3 with_flink_1_11.yml
rename to testing/env_python_3_with_flink_112.yml
index e23bedb..8af5119 100644
--- a/testing/env_python_3 with_flink_1_11.yml	
+++ b/testing/env_python_3_with_flink_112.yml
@@ -3,6 +3,10 @@ channels:
   - conda-forge
   - defaults
 dependencies:
+  - pip
+  - pip:
+      - bkzep==0.6.1
+      - apache-flink==1.12.0
   - pycodestyle
   - numpy=1
   - pandas=0.25
@@ -19,7 +23,4 @@ dependencies:
   - panel
   - holoviews
   - pyyaml=3
-  - pip
-  - pip:
-    - bkzep==0.6.1
-    - apache-flink==1.11.1
+
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest112.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest112.java
new file mode 100644
index 0000000..56b318b
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest112.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(value = Parameterized.class)
+public class FlinkIntegrationTest112 extends FlinkIntegrationTest {
+
+  @Parameterized.Parameters
+  public static List<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+            {"1.12.0"}
+    });
+  }
+
+  public FlinkIntegrationTest112(String flinkVersion) {
+    super(flinkVersion);
+  }
+}
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java
new file mode 100644
index 0000000..443b254
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+//@RunWith(value = Parameterized.class)
+public class ZeppelinFlinkClusterTest112 extends ZeppelinFlinkClusterTest {
+
+  @Parameterized.Parameters
+  public static List<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+            {"1.12.0"}
+    });
+  }
+
+  public ZeppelinFlinkClusterTest112(String flinkVersion) throws Exception {
+    super(flinkVersion);
+  }
+}