You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/07/23 02:19:35 UTC

[zeppelin] branch master updated: [ZEPPELIN-4965]. Support flink 1.11.1

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new b523fb9  [ZEPPELIN-4965]. Support flink 1.11.1
b523fb9 is described below

commit b523fb9e9eb1c45184a2e1b45fb3ba18e2808c52
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Jul 22 20:28:28 2020 +0800

    [ZEPPELIN-4965]. Support flink 1.11.1
    
    ### What is this PR for?
    
    This PR is to support flink 1.11.1.
    * Update flink 1.11.0 to 1.11.1
    * Flink 1.11.1 introduce new api for TableEnvironment, so this pr also update `TableEnvFactory`.
    * Split `FlinkIntegrationTest` to `FlinkIntegrationTest110` and `FlinkIntegrationTest111`
    
    ### What type of PR is it?
    [Improvement ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4965
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3863 from zjffdu/ZEPPELIN-4965 and squashes the following commits:
    
    64a625b06 [Jeff Zhang] [ZEPPELIN-4965]. Support flink 1.11.1
---
 .travis.yml                                        |   6 +-
 .../org/apache/zeppelin/flink/TableEnvFactory.java | 310 ++++++++++++++-------
 .../zeppelin/flink/FlinkScalaInterpreter.scala     |  10 +-
 flink/pom.xml                                      |   2 +-
 .../zeppelin/integration/FlinkIntegrationTest.java |  11 +-
 .../integration/FlinkIntegrationTest110.java       |  40 +++
 .../integration/FlinkIntegrationTest111.java       |  40 +++
 7 files changed, 305 insertions(+), 114 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index a458a70..23c1179 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -95,15 +95,15 @@ jobs:
       dist: xenial
       env: PYTHON="3" R="true" SCALA_VER="2.10" TENSORFLOW="1.13.1" PROFILE="-Pscala-2.10" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl $(echo .,zeppelin-interpreter,zeppelin-interpreter-shaded,${INTERPRETERS} | sed 's/!//g')" TEST_PROJECTS=""
 
-    # Test flink 1.10
+    # Test flink 1.10 & flink integration test
     - jdk: "openjdk8"
       dist: xenial
-      env: PYTHON="3" FLINK="1.10.1" PROFILE="-Pflink-1.10" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl flink/interpreter" TEST_PROJECTS="-Dtest=org.apache.zeppelin.flink.*"
+      env: BUILD_PLUGINS="true" PYTHON="3" FLINK="1.10.1" PROFILE="-Pflink-1.10 -Pintegration" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl flink/interpreter,zeppelin-interpreter-integration" TEST_PROJECTS="-Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest110"
 
     # Test flink 1.11 & flink integration test
     - jdk: "openjdk8"
       dist: xenial
-      env: BUILD_PLUGINS="true" PYTHON="3" FLINK="1.11.0" PROFILE="-Pflink-1.11 -Pintegration" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl flink/interpreter,zeppelin-interpreter-integration" TEST_PROJECTS="-Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest"
+      env: BUILD_PLUGINS="true" PYTHON="3" FLINK="1.11.1" PROFILE="-Pflink-1.11 -Pintegration" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl flink/interpreter,zeppelin-interpreter-integration" TEST_PROJECTS="-Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest111"
 
     # Run Spark integration test and unit test
 
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
index b514f49..9fb6efd 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
@@ -105,7 +105,7 @@ public class TableEnvFactory {
     }
   }
 
-  public TableEnvironment createScalaFlinkStreamTableEnvironment(EnvironmentSettings settings) {
+  public TableEnvironment createScalaFlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) {
     try {
       Map<String, String> executorProperties = settings.toExecutorProperties();
       Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
@@ -127,24 +127,48 @@ public class TableEnvFactory {
         clazz = Class
                 .forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl");
       }
-      Constructor constructor = clazz
-              .getConstructor(
-                      CatalogManager.class,
-                      ModuleManager.class,
-                      FunctionCatalog.class,
-                      TableConfig.class,
-                      org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class,
-                      Planner.class,
-                      Executor.class,
-                      boolean.class);
-      return (TableEnvironment) constructor.newInstance(catalogManager,
-              moduleManager,
-              flinkFunctionCatalog,
-              tblConfig,
-              senv,
-              planner,
-              executor,
-              settings.isStreamingMode());
+      try {
+        Constructor constructor = clazz
+                .getConstructor(
+                        CatalogManager.class,
+                        ModuleManager.class,
+                        FunctionCatalog.class,
+                        TableConfig.class,
+                        org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class,
+                        Planner.class,
+                        Executor.class,
+                        boolean.class);
+        return (TableEnvironment) constructor.newInstance(catalogManager,
+                moduleManager,
+                flinkFunctionCatalog,
+                tblConfig,
+                senv,
+                planner,
+                executor,
+                settings.isStreamingMode());
+      } catch (NoSuchMethodException e) {
+        // Flink 1.11.1 change the constructor signature, FLINK-18419
+        Constructor constructor = clazz
+                .getConstructor(
+                        CatalogManager.class,
+                        ModuleManager.class,
+                        FunctionCatalog.class,
+                        TableConfig.class,
+                        org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class,
+                        Planner.class,
+                        Executor.class,
+                        boolean.class,
+                        ClassLoader.class);
+        return (TableEnvironment) constructor.newInstance(catalogManager,
+                moduleManager,
+                flinkFunctionCatalog,
+                tblConfig,
+                senv,
+                planner,
+                executor,
+                settings.isStreamingMode(),
+                classLoader);
+      }
 
     } catch (Exception e) {
       throw new TableException("Fail to createScalaFlinkStreamTableEnvironment", e);
@@ -177,7 +201,7 @@ public class TableEnvFactory {
     }
   }
 
-  public TableEnvironment createJavaFlinkStreamTableEnvironment(EnvironmentSettings settings) {
+  public TableEnvironment createJavaFlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) {
 
     try {
       Map<String, String> executorProperties = settings.toExecutorProperties();
@@ -195,31 +219,55 @@ public class TableEnvFactory {
         clazz = Class
                 .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl");
       }
-      Constructor constructor = clazz
-              .getConstructor(
-                      CatalogManager.class,
-                      ModuleManager.class,
-                      FunctionCatalog.class,
-                      TableConfig.class,
-                      org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class,
-                      Planner.class,
-                      Executor.class,
-                      boolean.class);
-      return (TableEnvironment) constructor.newInstance(catalogManager,
-              moduleManager,
-              flinkFunctionCatalog,
-              tblConfig,
-              senv.getJavaEnv(),
-              planner,
-              executor,
-              settings.isStreamingMode());
+      try {
+        Constructor constructor = clazz
+                .getConstructor(
+                        CatalogManager.class,
+                        ModuleManager.class,
+                        FunctionCatalog.class,
+                        TableConfig.class,
+                        org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class,
+                        Planner.class,
+                        Executor.class,
+                        boolean.class);
+        return (TableEnvironment) constructor.newInstance(catalogManager,
+                moduleManager,
+                flinkFunctionCatalog,
+                tblConfig,
+                senv.getJavaEnv(),
+                planner,
+                executor,
+                settings.isStreamingMode());
+      } catch (NoSuchMethodException e) {
+        // Flink 1.11.1 change the constructor signature, FLINK-18419
+        Constructor constructor = clazz
+                .getConstructor(
+                        CatalogManager.class,
+                        ModuleManager.class,
+                        FunctionCatalog.class,
+                        TableConfig.class,
+                        org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class,
+                        Planner.class,
+                        Executor.class,
+                        boolean.class,
+                        ClassLoader.class);
+        return (TableEnvironment) constructor.newInstance(catalogManager,
+                moduleManager,
+                flinkFunctionCatalog,
+                tblConfig,
+                senv.getJavaEnv(),
+                planner,
+                executor,
+                settings.isStreamingMode(),
+                classLoader);
+      }
 
     } catch (Exception e) {
       throw new TableException("Fail to createJavaFlinkStreamTableEnvironment", e);
     }
   }
 
-  public TableEnvironment createScalaBlinkStreamTableEnvironment(EnvironmentSettings settings) {
+  public TableEnvironment createScalaBlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) {
 
     try {
       Map<String, String> executorProperties = settings.toExecutorProperties();
@@ -243,30 +291,54 @@ public class TableEnvFactory {
         clazz = Class
                 .forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl");
       }
-      Constructor constructor = clazz
-              .getConstructor(
-                      CatalogManager.class,
-                      ModuleManager.class,
-                      FunctionCatalog.class,
-                      TableConfig.class,
-                      org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class,
-                      Planner.class,
-                      Executor.class,
-                      boolean.class);
-      return (TableEnvironment) constructor.newInstance(catalogManager,
-              moduleManager,
-              blinkFunctionCatalog,
-              tblConfig,
-              senv,
-              planner,
-              executor,
-              settings.isStreamingMode());
+      try {
+        Constructor constructor = clazz
+                .getConstructor(
+                        CatalogManager.class,
+                        ModuleManager.class,
+                        FunctionCatalog.class,
+                        TableConfig.class,
+                        org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class,
+                        Planner.class,
+                        Executor.class,
+                        boolean.class);
+        return (TableEnvironment) constructor.newInstance(catalogManager,
+                moduleManager,
+                blinkFunctionCatalog,
+                tblConfig,
+                senv,
+                planner,
+                executor,
+                settings.isStreamingMode());
+      } catch (NoSuchMethodException e) {
+        // Flink 1.11.1 change the constructor signature, FLINK-18419
+        Constructor constructor = clazz
+                .getConstructor(
+                        CatalogManager.class,
+                        ModuleManager.class,
+                        FunctionCatalog.class,
+                        TableConfig.class,
+                        org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class,
+                        Planner.class,
+                        Executor.class,
+                        boolean.class,
+                        ClassLoader.class);
+        return (TableEnvironment) constructor.newInstance(catalogManager,
+                moduleManager,
+                blinkFunctionCatalog,
+                tblConfig,
+                senv,
+                planner,
+                executor,
+                settings.isStreamingMode(),
+                classLoader);
+      }
     } catch (Exception e) {
       throw new TableException("Fail to createScalaBlinkStreamTableEnvironment", e);
     }
   }
 
-  public TableEnvironment createJavaBlinkStreamTableEnvironment(EnvironmentSettings settings) {
+  public TableEnvironment createJavaBlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) {
 
     try {
       Map<String, String> executorProperties = settings.toExecutorProperties();
@@ -284,31 +356,55 @@ public class TableEnvFactory {
         clazz = Class
                 .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl");
       }
-      Constructor constructor = clazz
-              .getConstructor(
-                      CatalogManager.class,
-                      ModuleManager.class,
-                      FunctionCatalog.class,
-                      TableConfig.class,
-                      org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class,
-                      Planner.class,
-                      Executor.class,
-                      boolean.class);
-      return (TableEnvironment) constructor.newInstance(catalogManager,
-              moduleManager,
-              blinkFunctionCatalog,
-              tblConfig,
-              senv.getJavaEnv(),
-              planner,
-              executor,
-              settings.isStreamingMode());
+      try {
+        Constructor constructor = clazz
+                .getConstructor(
+                        CatalogManager.class,
+                        ModuleManager.class,
+                        FunctionCatalog.class,
+                        TableConfig.class,
+                        org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class,
+                        Planner.class,
+                        Executor.class,
+                        boolean.class);
+        return (TableEnvironment) constructor.newInstance(catalogManager,
+                moduleManager,
+                blinkFunctionCatalog,
+                tblConfig,
+                senv.getJavaEnv(),
+                planner,
+                executor,
+                settings.isStreamingMode());
+      } catch (NoSuchMethodException e) {
+        // Flink 1.11.1 change the constructor signature, FLINK-18419
+        Constructor constructor = clazz
+                .getConstructor(
+                        CatalogManager.class,
+                        ModuleManager.class,
+                        FunctionCatalog.class,
+                        TableConfig.class,
+                        org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class,
+                        Planner.class,
+                        Executor.class,
+                        boolean.class,
+                        ClassLoader.class);
+        return (TableEnvironment) constructor.newInstance(catalogManager,
+                moduleManager,
+                blinkFunctionCatalog,
+                tblConfig,
+                senv.getJavaEnv(),
+                planner,
+                executor,
+                settings.isStreamingMode(),
+                classLoader);
+      }
     } catch (Exception e) {
       throw new TableException("Fail to createJavaBlinkStreamTableEnvironment", e);
     }
   }
 
   public TableEnvironment createJavaBlinkBatchTableEnvironment(
-          EnvironmentSettings settings) {
+          EnvironmentSettings settings, ClassLoader classLoader) {
     try {
       final Map<String, String> executorProperties = settings.toExecutorProperties();
       executor = lookupExecutor(executorProperties, senv.getJavaEnv());
@@ -324,24 +420,48 @@ public class TableEnvFactory {
         clazz = Class
                 .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl");
       }
-      Constructor constructor = clazz.getConstructor(
-                      CatalogManager.class,
-                      ModuleManager.class,
-                      FunctionCatalog.class,
-                      TableConfig.class,
-                      StreamExecutionEnvironment.class,
-                      Planner.class,
-                      Executor.class,
-                      boolean.class);
-      return (TableEnvironment) constructor.newInstance(
-              catalogManager,
-              moduleManager,
-              blinkFunctionCatalog,
-              tblConfig,
-              senv.getJavaEnv(),
-              planner,
-              executor,
-              settings.isStreamingMode());
+      try {
+        Constructor constructor = clazz.getConstructor(
+                CatalogManager.class,
+                ModuleManager.class,
+                FunctionCatalog.class,
+                TableConfig.class,
+                StreamExecutionEnvironment.class,
+                Planner.class,
+                Executor.class,
+                boolean.class);
+        return (TableEnvironment) constructor.newInstance(
+                catalogManager,
+                moduleManager,
+                blinkFunctionCatalog,
+                tblConfig,
+                senv.getJavaEnv(),
+                planner,
+                executor,
+                settings.isStreamingMode());
+      } catch (NoSuchMethodException e) {
+        // Flink 1.11.1 change the constructor signature, FLINK-18419
+        Constructor constructor = clazz.getConstructor(
+                CatalogManager.class,
+                ModuleManager.class,
+                FunctionCatalog.class,
+                TableConfig.class,
+                StreamExecutionEnvironment.class,
+                Planner.class,
+                Executor.class,
+                boolean.class,
+                ClassLoader.class);
+        return (TableEnvironment) constructor.newInstance(
+                catalogManager,
+                moduleManager,
+                blinkFunctionCatalog,
+                tblConfig,
+                senv.getJavaEnv(),
+                planner,
+                executor,
+                settings.isStreamingMode(),
+                classLoader);
+      }
     } catch (Exception e) {
       LOGGER.info(ExceptionUtils.getStackTrace(e));
       throw new TableException("Fail to createJavaBlinkBatchTableEnvironment", e);
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 30ec177..4fc4a18 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -395,27 +395,27 @@ class FlinkScalaInterpreter(val properties: Properties) {
 
       // blink planner
       var btEnvSetting = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build()
-      this.btenv = tblEnvFactory.createJavaBlinkBatchTableEnvironment(btEnvSetting);
+      this.btenv = tblEnvFactory.createJavaBlinkBatchTableEnvironment(btEnvSetting, getFlinkClassLoader);
       flinkILoop.bind("btenv", btenv.getClass().getCanonicalName(), btenv, List("@transient"))
       this.java_btenv = this.btenv
 
       var stEnvSetting =
         EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
-      this.stenv = tblEnvFactory.createScalaBlinkStreamTableEnvironment(stEnvSetting)
+      this.stenv = tblEnvFactory.createScalaBlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader)
       flinkILoop.bind("stenv", stenv.getClass().getCanonicalName(), stenv, List("@transient"))
-      this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting)
+      this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader)
 
       // flink planner
       this.btenv_2 = tblEnvFactory.createScalaFlinkBatchTableEnvironment()
       flinkILoop.bind("btenv_2", btenv_2.getClass().getCanonicalName(), btenv_2, List("@transient"))
       stEnvSetting =
         EnvironmentSettings.newInstance().inStreamingMode().useOldPlanner().build()
-      this.stenv_2 = tblEnvFactory.createScalaFlinkStreamTableEnvironment(stEnvSetting)
+      this.stenv_2 = tblEnvFactory.createScalaFlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader)
       flinkILoop.bind("stenv_2", stenv_2.getClass().getCanonicalName(), stenv_2, List("@transient"))
 
       this.java_btenv_2 = tblEnvFactory.createJavaFlinkBatchTableEnvironment()
       btEnvSetting = EnvironmentSettings.newInstance.useOldPlanner.inStreamingMode.build
-      this.java_stenv_2 = tblEnvFactory.createJavaFlinkStreamTableEnvironment(btEnvSetting)
+      this.java_stenv_2 = tblEnvFactory.createJavaFlinkStreamTableEnvironment(btEnvSetting, getFlinkClassLoader)
     } finally {
       Thread.currentThread().setContextClassLoader(originalClassLoader)
     }
diff --git a/flink/pom.xml b/flink/pom.xml
index d2bf16a..5e0ec61 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -43,7 +43,7 @@
 
     <properties>
         <flink1.10.version>1.10.1</flink1.10.version>
-        <flink1.11.version>1.11.0</flink1.11.version>
+        <flink1.11.version>1.11.1</flink1.11.version>
     </properties>
 
     <dependencies>
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
index d873571..6040df1 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
@@ -48,8 +48,7 @@ import java.util.List;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-@RunWith(value = Parameterized.class)
-public class FlinkIntegrationTest {
+public abstract class FlinkIntegrationTest {
   private static Logger LOGGER = LoggerFactory.getLogger(FlinkIntegrationTest.class);
 
   private static MiniHadoopCluster hadoopCluster;
@@ -68,14 +67,6 @@ public class FlinkIntegrationTest {
     this.hadoopHome = DownloadUtils.downloadHadoop("2.7.7");
   }
 
-  @Parameterized.Parameters
-  public static List<Object[]> data() {
-    return Arrays.asList(new Object[][]{
-        {"1.10.1"},
-        {"1.11.0"}
-    });
-  }
-
   @BeforeClass
   public static void setUp() throws IOException {
     Configuration conf = new Configuration();
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
new file mode 100644
index 0000000..ca7e399
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(value = Parameterized.class)
+public class FlinkIntegrationTest110 extends FlinkIntegrationTest {
+
+  @Parameterized.Parameters
+  public static List<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+            {"1.10.0"},
+            {"1.10.1"}
+    });
+  }
+
+  public FlinkIntegrationTest110(String flinkVersion) {
+    super(flinkVersion);
+  }
+}
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java
new file mode 100644
index 0000000..b495844
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(value = Parameterized.class)
+public class FlinkIntegrationTest111 extends FlinkIntegrationTest {
+
+  @Parameterized.Parameters
+  public static List<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+            {"1.11.0"},
+            {"1.11.1"}
+    });
+  }
+
+  public FlinkIntegrationTest111(String flinkVersion) {
+    super(flinkVersion);
+  }
+}