You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/07/23 02:19:57 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4965]. Support
flink 1.11.1
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new deddf75 [ZEPPELIN-4965]. Support flink 1.11.1
deddf75 is described below
commit deddf75edcaf1c58b96726939bd1d085c0e803cd
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Jul 22 20:28:28 2020 +0800
[ZEPPELIN-4965]. Support flink 1.11.1
### What is this PR for?
This PR is to support flink 1.11.1.
* Update flink 1.11.0 to 1.11.1
* Flink 1.11.1 introduce new api for TableEnvironment, so this pr also update `TableEnvFactory`.
* Split `FlinkIntegrationTest` to `FlinkIntegrationTest110` and `FlinkIntegrationTest111`
### What type of PR is it?
[Improvement ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-4965
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3863 from zjffdu/ZEPPELIN-4965 and squashes the following commits:
64a625b06 [Jeff Zhang] [ZEPPELIN-4965]. Support flink 1.11.1
(cherry picked from commit b523fb9e9eb1c45184a2e1b45fb3ba18e2808c52)
Signed-off-by: Jeff Zhang <zj...@apache.org>
---
.travis.yml | 6 +-
.../org/apache/zeppelin/flink/TableEnvFactory.java | 310 ++++++++++++++-------
.../zeppelin/flink/FlinkScalaInterpreter.scala | 10 +-
flink/pom.xml | 2 +-
.../zeppelin/integration/FlinkIntegrationTest.java | 11 +-
.../integration/FlinkIntegrationTest110.java | 40 +++
.../integration/FlinkIntegrationTest111.java | 40 +++
7 files changed, 305 insertions(+), 114 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index a458a70..23c1179 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -95,15 +95,15 @@ jobs:
dist: xenial
env: PYTHON="3" R="true" SCALA_VER="2.10" TENSORFLOW="1.13.1" PROFILE="-Pscala-2.10" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl $(echo .,zeppelin-interpreter,zeppelin-interpreter-shaded,${INTERPRETERS} | sed 's/!//g')" TEST_PROJECTS=""
- # Test flink 1.10
+ # Test flink 1.10 & flink integration test
- jdk: "openjdk8"
dist: xenial
- env: PYTHON="3" FLINK="1.10.1" PROFILE="-Pflink-1.10" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl flink/interpreter" TEST_PROJECTS="-Dtest=org.apache.zeppelin.flink.*"
+ env: BUILD_PLUGINS="true" PYTHON="3" FLINK="1.10.1" PROFILE="-Pflink-1.10 -Pintegration" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl flink/interpreter,zeppelin-interpreter-integration" TEST_PROJECTS="-Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest110"
# Test flink 1.11 & flink integration test
- jdk: "openjdk8"
dist: xenial
- env: BUILD_PLUGINS="true" PYTHON="3" FLINK="1.11.0" PROFILE="-Pflink-1.11 -Pintegration" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl flink/interpreter,zeppelin-interpreter-integration" TEST_PROJECTS="-Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest"
+ env: BUILD_PLUGINS="true" PYTHON="3" FLINK="1.11.1" PROFILE="-Pflink-1.11 -Pintegration" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl flink/interpreter,zeppelin-interpreter-integration" TEST_PROJECTS="-Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest111"
# Run Spark integration test and unit test
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
index b514f49..9fb6efd 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
@@ -105,7 +105,7 @@ public class TableEnvFactory {
}
}
- public TableEnvironment createScalaFlinkStreamTableEnvironment(EnvironmentSettings settings) {
+ public TableEnvironment createScalaFlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) {
try {
Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
@@ -127,24 +127,48 @@ public class TableEnvFactory {
clazz = Class
.forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl");
}
- Constructor constructor = clazz
- .getConstructor(
- CatalogManager.class,
- ModuleManager.class,
- FunctionCatalog.class,
- TableConfig.class,
- org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class,
- Planner.class,
- Executor.class,
- boolean.class);
- return (TableEnvironment) constructor.newInstance(catalogManager,
- moduleManager,
- flinkFunctionCatalog,
- tblConfig,
- senv,
- planner,
- executor,
- settings.isStreamingMode());
+ try {
+ Constructor constructor = clazz
+ .getConstructor(
+ CatalogManager.class,
+ ModuleManager.class,
+ FunctionCatalog.class,
+ TableConfig.class,
+ org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class,
+ Planner.class,
+ Executor.class,
+ boolean.class);
+ return (TableEnvironment) constructor.newInstance(catalogManager,
+ moduleManager,
+ flinkFunctionCatalog,
+ tblConfig,
+ senv,
+ planner,
+ executor,
+ settings.isStreamingMode());
+ } catch (NoSuchMethodException e) {
+ // Flink 1.11.1 change the constructor signature, FLINK-18419
+ Constructor constructor = clazz
+ .getConstructor(
+ CatalogManager.class,
+ ModuleManager.class,
+ FunctionCatalog.class,
+ TableConfig.class,
+ org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class,
+ Planner.class,
+ Executor.class,
+ boolean.class,
+ ClassLoader.class);
+ return (TableEnvironment) constructor.newInstance(catalogManager,
+ moduleManager,
+ flinkFunctionCatalog,
+ tblConfig,
+ senv,
+ planner,
+ executor,
+ settings.isStreamingMode(),
+ classLoader);
+ }
} catch (Exception e) {
throw new TableException("Fail to createScalaFlinkStreamTableEnvironment", e);
@@ -177,7 +201,7 @@ public class TableEnvFactory {
}
}
- public TableEnvironment createJavaFlinkStreamTableEnvironment(EnvironmentSettings settings) {
+ public TableEnvironment createJavaFlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) {
try {
Map<String, String> executorProperties = settings.toExecutorProperties();
@@ -195,31 +219,55 @@ public class TableEnvFactory {
clazz = Class
.forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl");
}
- Constructor constructor = clazz
- .getConstructor(
- CatalogManager.class,
- ModuleManager.class,
- FunctionCatalog.class,
- TableConfig.class,
- org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class,
- Planner.class,
- Executor.class,
- boolean.class);
- return (TableEnvironment) constructor.newInstance(catalogManager,
- moduleManager,
- flinkFunctionCatalog,
- tblConfig,
- senv.getJavaEnv(),
- planner,
- executor,
- settings.isStreamingMode());
+ try {
+ Constructor constructor = clazz
+ .getConstructor(
+ CatalogManager.class,
+ ModuleManager.class,
+ FunctionCatalog.class,
+ TableConfig.class,
+ org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class,
+ Planner.class,
+ Executor.class,
+ boolean.class);
+ return (TableEnvironment) constructor.newInstance(catalogManager,
+ moduleManager,
+ flinkFunctionCatalog,
+ tblConfig,
+ senv.getJavaEnv(),
+ planner,
+ executor,
+ settings.isStreamingMode());
+ } catch (NoSuchMethodException e) {
+ // Flink 1.11.1 change the constructor signature, FLINK-18419
+ Constructor constructor = clazz
+ .getConstructor(
+ CatalogManager.class,
+ ModuleManager.class,
+ FunctionCatalog.class,
+ TableConfig.class,
+ org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class,
+ Planner.class,
+ Executor.class,
+ boolean.class,
+ ClassLoader.class);
+ return (TableEnvironment) constructor.newInstance(catalogManager,
+ moduleManager,
+ flinkFunctionCatalog,
+ tblConfig,
+ senv.getJavaEnv(),
+ planner,
+ executor,
+ settings.isStreamingMode(),
+ classLoader);
+ }
} catch (Exception e) {
throw new TableException("Fail to createJavaFlinkStreamTableEnvironment", e);
}
}
- public TableEnvironment createScalaBlinkStreamTableEnvironment(EnvironmentSettings settings) {
+ public TableEnvironment createScalaBlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) {
try {
Map<String, String> executorProperties = settings.toExecutorProperties();
@@ -243,30 +291,54 @@ public class TableEnvFactory {
clazz = Class
.forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl");
}
- Constructor constructor = clazz
- .getConstructor(
- CatalogManager.class,
- ModuleManager.class,
- FunctionCatalog.class,
- TableConfig.class,
- org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class,
- Planner.class,
- Executor.class,
- boolean.class);
- return (TableEnvironment) constructor.newInstance(catalogManager,
- moduleManager,
- blinkFunctionCatalog,
- tblConfig,
- senv,
- planner,
- executor,
- settings.isStreamingMode());
+ try {
+ Constructor constructor = clazz
+ .getConstructor(
+ CatalogManager.class,
+ ModuleManager.class,
+ FunctionCatalog.class,
+ TableConfig.class,
+ org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class,
+ Planner.class,
+ Executor.class,
+ boolean.class);
+ return (TableEnvironment) constructor.newInstance(catalogManager,
+ moduleManager,
+ blinkFunctionCatalog,
+ tblConfig,
+ senv,
+ planner,
+ executor,
+ settings.isStreamingMode());
+ } catch (NoSuchMethodException e) {
+ // Flink 1.11.1 change the constructor signature, FLINK-18419
+ Constructor constructor = clazz
+ .getConstructor(
+ CatalogManager.class,
+ ModuleManager.class,
+ FunctionCatalog.class,
+ TableConfig.class,
+ org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class,
+ Planner.class,
+ Executor.class,
+ boolean.class,
+ ClassLoader.class);
+ return (TableEnvironment) constructor.newInstance(catalogManager,
+ moduleManager,
+ blinkFunctionCatalog,
+ tblConfig,
+ senv,
+ planner,
+ executor,
+ settings.isStreamingMode(),
+ classLoader);
+ }
} catch (Exception e) {
throw new TableException("Fail to createScalaBlinkStreamTableEnvironment", e);
}
}
- public TableEnvironment createJavaBlinkStreamTableEnvironment(EnvironmentSettings settings) {
+ public TableEnvironment createJavaBlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) {
try {
Map<String, String> executorProperties = settings.toExecutorProperties();
@@ -284,31 +356,55 @@ public class TableEnvFactory {
clazz = Class
.forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl");
}
- Constructor constructor = clazz
- .getConstructor(
- CatalogManager.class,
- ModuleManager.class,
- FunctionCatalog.class,
- TableConfig.class,
- org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class,
- Planner.class,
- Executor.class,
- boolean.class);
- return (TableEnvironment) constructor.newInstance(catalogManager,
- moduleManager,
- blinkFunctionCatalog,
- tblConfig,
- senv.getJavaEnv(),
- planner,
- executor,
- settings.isStreamingMode());
+ try {
+ Constructor constructor = clazz
+ .getConstructor(
+ CatalogManager.class,
+ ModuleManager.class,
+ FunctionCatalog.class,
+ TableConfig.class,
+ org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class,
+ Planner.class,
+ Executor.class,
+ boolean.class);
+ return (TableEnvironment) constructor.newInstance(catalogManager,
+ moduleManager,
+ blinkFunctionCatalog,
+ tblConfig,
+ senv.getJavaEnv(),
+ planner,
+ executor,
+ settings.isStreamingMode());
+ } catch (NoSuchMethodException e) {
+ // Flink 1.11.1 change the constructor signature, FLINK-18419
+ Constructor constructor = clazz
+ .getConstructor(
+ CatalogManager.class,
+ ModuleManager.class,
+ FunctionCatalog.class,
+ TableConfig.class,
+ org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class,
+ Planner.class,
+ Executor.class,
+ boolean.class,
+ ClassLoader.class);
+ return (TableEnvironment) constructor.newInstance(catalogManager,
+ moduleManager,
+ blinkFunctionCatalog,
+ tblConfig,
+ senv.getJavaEnv(),
+ planner,
+ executor,
+ settings.isStreamingMode(),
+ classLoader);
+ }
} catch (Exception e) {
throw new TableException("Fail to createJavaBlinkStreamTableEnvironment", e);
}
}
public TableEnvironment createJavaBlinkBatchTableEnvironment(
- EnvironmentSettings settings) {
+ EnvironmentSettings settings, ClassLoader classLoader) {
try {
final Map<String, String> executorProperties = settings.toExecutorProperties();
executor = lookupExecutor(executorProperties, senv.getJavaEnv());
@@ -324,24 +420,48 @@ public class TableEnvFactory {
clazz = Class
.forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl");
}
- Constructor constructor = clazz.getConstructor(
- CatalogManager.class,
- ModuleManager.class,
- FunctionCatalog.class,
- TableConfig.class,
- StreamExecutionEnvironment.class,
- Planner.class,
- Executor.class,
- boolean.class);
- return (TableEnvironment) constructor.newInstance(
- catalogManager,
- moduleManager,
- blinkFunctionCatalog,
- tblConfig,
- senv.getJavaEnv(),
- planner,
- executor,
- settings.isStreamingMode());
+ try {
+ Constructor constructor = clazz.getConstructor(
+ CatalogManager.class,
+ ModuleManager.class,
+ FunctionCatalog.class,
+ TableConfig.class,
+ StreamExecutionEnvironment.class,
+ Planner.class,
+ Executor.class,
+ boolean.class);
+ return (TableEnvironment) constructor.newInstance(
+ catalogManager,
+ moduleManager,
+ blinkFunctionCatalog,
+ tblConfig,
+ senv.getJavaEnv(),
+ planner,
+ executor,
+ settings.isStreamingMode());
+ } catch (NoSuchMethodException e) {
+ // Flink 1.11.1 change the constructor signature, FLINK-18419
+ Constructor constructor = clazz.getConstructor(
+ CatalogManager.class,
+ ModuleManager.class,
+ FunctionCatalog.class,
+ TableConfig.class,
+ StreamExecutionEnvironment.class,
+ Planner.class,
+ Executor.class,
+ boolean.class,
+ ClassLoader.class);
+ return (TableEnvironment) constructor.newInstance(
+ catalogManager,
+ moduleManager,
+ blinkFunctionCatalog,
+ tblConfig,
+ senv.getJavaEnv(),
+ planner,
+ executor,
+ settings.isStreamingMode(),
+ classLoader);
+ }
} catch (Exception e) {
LOGGER.info(ExceptionUtils.getStackTrace(e));
throw new TableException("Fail to createJavaBlinkBatchTableEnvironment", e);
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 30ec177..4fc4a18 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -395,27 +395,27 @@ class FlinkScalaInterpreter(val properties: Properties) {
// blink planner
var btEnvSetting = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build()
- this.btenv = tblEnvFactory.createJavaBlinkBatchTableEnvironment(btEnvSetting);
+ this.btenv = tblEnvFactory.createJavaBlinkBatchTableEnvironment(btEnvSetting, getFlinkClassLoader);
flinkILoop.bind("btenv", btenv.getClass().getCanonicalName(), btenv, List("@transient"))
this.java_btenv = this.btenv
var stEnvSetting =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
- this.stenv = tblEnvFactory.createScalaBlinkStreamTableEnvironment(stEnvSetting)
+ this.stenv = tblEnvFactory.createScalaBlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader)
flinkILoop.bind("stenv", stenv.getClass().getCanonicalName(), stenv, List("@transient"))
- this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting)
+ this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader)
// flink planner
this.btenv_2 = tblEnvFactory.createScalaFlinkBatchTableEnvironment()
flinkILoop.bind("btenv_2", btenv_2.getClass().getCanonicalName(), btenv_2, List("@transient"))
stEnvSetting =
EnvironmentSettings.newInstance().inStreamingMode().useOldPlanner().build()
- this.stenv_2 = tblEnvFactory.createScalaFlinkStreamTableEnvironment(stEnvSetting)
+ this.stenv_2 = tblEnvFactory.createScalaFlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader)
flinkILoop.bind("stenv_2", stenv_2.getClass().getCanonicalName(), stenv_2, List("@transient"))
this.java_btenv_2 = tblEnvFactory.createJavaFlinkBatchTableEnvironment()
btEnvSetting = EnvironmentSettings.newInstance.useOldPlanner.inStreamingMode.build
- this.java_stenv_2 = tblEnvFactory.createJavaFlinkStreamTableEnvironment(btEnvSetting)
+ this.java_stenv_2 = tblEnvFactory.createJavaFlinkStreamTableEnvironment(btEnvSetting, getFlinkClassLoader)
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader)
}
diff --git a/flink/pom.xml b/flink/pom.xml
index d2bf16a..5e0ec61 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -43,7 +43,7 @@
<properties>
<flink1.10.version>1.10.1</flink1.10.version>
- <flink1.11.version>1.11.0</flink1.11.version>
+ <flink1.11.version>1.11.1</flink1.11.version>
</properties>
<dependencies>
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
index d873571..6040df1 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
@@ -48,8 +48,7 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-@RunWith(value = Parameterized.class)
-public class FlinkIntegrationTest {
+public abstract class FlinkIntegrationTest {
private static Logger LOGGER = LoggerFactory.getLogger(FlinkIntegrationTest.class);
private static MiniHadoopCluster hadoopCluster;
@@ -68,14 +67,6 @@ public class FlinkIntegrationTest {
this.hadoopHome = DownloadUtils.downloadHadoop("2.7.7");
}
- @Parameterized.Parameters
- public static List<Object[]> data() {
- return Arrays.asList(new Object[][]{
- {"1.10.1"},
- {"1.11.0"}
- });
- }
-
@BeforeClass
public static void setUp() throws IOException {
Configuration conf = new Configuration();
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
new file mode 100644
index 0000000..ca7e399
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(value = Parameterized.class)
+public class FlinkIntegrationTest110 extends FlinkIntegrationTest {
+
+ @Parameterized.Parameters
+ public static List<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {"1.10.0"},
+ {"1.10.1"}
+ });
+ }
+
+ public FlinkIntegrationTest110(String flinkVersion) {
+ super(flinkVersion);
+ }
+}
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java
new file mode 100644
index 0000000..b495844
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(value = Parameterized.class)
+public class FlinkIntegrationTest111 extends FlinkIntegrationTest {
+
+ @Parameterized.Parameters
+ public static List<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {"1.11.0"},
+ {"1.11.1"}
+ });
+ }
+
+ public FlinkIntegrationTest111(String flinkVersion) {
+ super(flinkVersion);
+ }
+}