You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/12/17 03:17:09 UTC
[zeppelin] branch master updated: [ZEPPELIN-5156]. Support flink
1.12
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 89f4ee7 [ZEPPELIN-5156]. Support flink 1.12
89f4ee7 is described below
commit 89f4ee77464030bf0aae6f07af6aa4ba0250041e
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Dec 9 14:00:14 2020 +0800
[ZEPPELIN-5156]. Support flink 1.12
### What is this PR for?
Flink 1.12 is almost the same as flink 1.11 in api perpsctive, the only change is ExecutionConfig#disableSysoutLogging is removed. So in this PR I fix this issue by didn't call this method if flink version is newer or equal to 1.12.
### What type of PR is it?
[ Improvement]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5156
### How should this be tested?
* Manually tested on flink 1.12
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3990 from zjffdu/ZEPPELIN-5156 and squashes the following commits:
f5f64cd4d [Jeff Zhang] [ZEPPELIN-5156]. Support flink 1.12
---
.github/workflows/core.yml | 9 +-
.../java/org/apache/zeppelin/flink/FlinkShims.java | 11 +-
.../org/apache/zeppelin/flink/FlinkVersion.java | 7 +
.../org/apache/zeppelin/flink/Flink110Shims.java | 34 +++-
.../org/apache/zeppelin/flink/Flink111Shims.java | 30 ++-
flink/flink1.12-shims/pom.xml | 221 +++++++++++++++++++++
.../org/apache/zeppelin/flink/Flink112Shims.java} | 46 ++++-
.../flink/shims112/CollectStreamTableSink.java | 97 +++++++++
.../flink/shims112/Flink112ScalaShims.scala | 36 ++++
flink/interpreter/pom.xml | 21 +-
.../zeppelin/flink/FlinkScalaInterpreter.scala | 7 +-
.../org/apache/zeppelin/flink/FlinkShell.scala | 26 +--
.../flink/FlinkStreamSqlInterpreterTest.java | 9 +-
flink/pom.xml | 2 +
...nk_1_10.yml => env_python_3_with_flink_110.yml} | 0
...nk_1_11.yml => env_python_3_with_flink_111.yml} | 0
...nk_1_11.yml => env_python_3_with_flink_112.yml} | 9 +-
.../integration/FlinkIntegrationTest112.java | 39 ++++
.../integration/ZeppelinFlinkClusterTest112.java | 38 ++++
19 files changed, 595 insertions(+), 47 deletions(-)
diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
index 4faf67b..9e680de 100644
--- a/.github/workflows/core.yml
+++ b/.github/workflows/core.yml
@@ -126,8 +126,9 @@ jobs:
test-flink-and-flink-integration-test:
runs-on: ubuntu-18.04
strategy:
+ fail-fast: false
matrix:
- flink: [ flink_1_10, flink_1_11]
+ flink: [ 110, 111, 112]
steps:
- name: Checkout
uses: actions/checkout@v2
@@ -148,15 +149,15 @@ jobs:
uses: conda-incubator/setup-miniconda@v2
with:
activate-environment: python_3_with_flink
- environment-file: testing/env_python_3 with_${{ matrix.flink }}.yml
+ environment-file: testing/env_python_3_with_flink_${{ matrix.flink }}.yml
python-version: 3.7
auto-activate-base: false
- name: install environment
run: |
- mvn install -DskipTests -DskipRat -am -pl flink/interpreter,zeppelin-interpreter-integration -Pflink-1.10 -Pintegration -B
+ mvn install -DskipTests -DskipRat -am -pl flink/interpreter,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration -B
mvn clean package -T 2C -pl zeppelin-plugins -amd -DskipTests -B
- name: run tests
- run: mvn test -DskipRat -pl flink/interpreter,zeppelin-interpreter-integration -Pflink-1.10 -Pintegration -B -Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest110,ZeppelinFlinkClusterTest110
+ run: mvn test -DskipRat -pl flink/interpreter,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration -B -Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest${{ matrix.flink }},ZeppelinFlinkClusterTest${{ matrix.flink }}
run-spark-intergration-test:
runs-on: ubuntu-18.04
steps:
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index f7a7514..717ef3d 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -56,9 +56,12 @@ public abstract class FlinkShims {
if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 10) {
LOGGER.info("Initializing shims for Flink 1.10");
flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink110Shims");
- } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() >= 11) {
+ } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 11) {
LOGGER.info("Initializing shims for Flink 1.11");
flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink111Shims");
+ } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 12) {
+ LOGGER.info("Initializing shims for Flink 1.12");
+ flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink112Shims");
} else {
throw new Exception("Flink version: '" + flinkVersion + "' is not supported yet");
}
@@ -92,6 +95,10 @@ public abstract class FlinkShims {
.toAttributedString();
}
+ public abstract void disableSysoutLogging(Object batchConfig, Object streamConfig);
+
+ public abstract Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment);
+
public abstract Object createCatalogManager(Object config);
public abstract String getPyFlinkPythonPath(Properties properties) throws IOException;
@@ -134,7 +141,7 @@ public abstract class FlinkShims {
Object parser,
Object environmentSetting);
- public abstract Object getCustomCli(Object cliFrontend, Object commandLine);
+ public abstract Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object executorConfig);
public abstract Map extractTableConfigOptions();
}
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java
index d593746..dcf32d0 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java
@@ -26,6 +26,7 @@ public class FlinkVersion {
private int majorVersion;
private int minorVersion;
private int patchVersion;
+ private int version;
private String versionString;
FlinkVersion(String versionString) {
@@ -46,6 +47,8 @@ public class FlinkVersion {
this.patchVersion = Integer.parseInt(versions[2]);
}
+ this.version = Integer.parseInt(String.format("%d%02d%02d",
+ majorVersion, minorVersion, patchVersion));
} catch (Exception e) {
logger.error("Can not recognize Flink version " + versionString +
". Assume it's a future release", e);
@@ -56,6 +59,10 @@ public class FlinkVersion {
return majorVersion;
}
+ public boolean olderThan(FlinkVersion versionToCompare) {
+ return this.version < versionToCompare.version;
+ }
+
public int getMinorVersion() {
return minorVersion;
}
diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
index e917b3c..ee551ea 100644
--- a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
+++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
@@ -19,13 +19,18 @@
package org.apache.zeppelin.flink;
import org.apache.commons.cli.CommandLine;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.scala.DataSet;
import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.python.util.ResourceUtil;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableUtils;
@@ -40,6 +45,7 @@ import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkException;
import org.apache.zeppelin.flink.shims110.CollectStreamTableSink;
import org.apache.zeppelin.flink.shims110.Flink110ScalaShims;
import org.apache.zeppelin.flink.sql.SqlCommandParser;
@@ -100,6 +106,22 @@ public class Flink110Shims extends FlinkShims {
}
@Override
+ public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
+ ((ExecutionConfig) batchConfig).disableSysoutLogging();
+ ((ExecutionConfig) streamConfig).disableSysoutLogging();
+ }
+
+ @Override
+ public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) {
+ return new StreamExecutionEnvironmentFactory() {
+ @Override
+ public StreamExecutionEnvironment createExecutionEnvironment() {
+ return (StreamExecutionEnvironment) streamExecutionEnvironment;
+ }
+ };
+ }
+
+ @Override
public Object createCatalogManager(Object config) {
return new CatalogManager("default_catalog",
new GenericInMemoryCatalog("default_catalog", "default_database"));
@@ -242,18 +264,24 @@ public class Flink110Shims extends FlinkShims {
}
@Override
- public Object getCustomCli(Object cliFrontend, Object commandLine) {
+ public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object effectiveConfig) {
+ CustomCommandLine customCommandLine = null;
try {
- return ((CliFrontend) cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine);
+ customCommandLine = ((CliFrontend) cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine);
} catch (NoSuchMethodError e) {
try {
Method method = CliFrontend.class.getMethod("getActiveCustomCommandLine", CommandLine.class);
- return method.invoke((CliFrontend) cliFrontend, commandLine);
+ customCommandLine = (CustomCommandLine) method.invoke((CliFrontend) cliFrontend, commandLine);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ex) {
LOGGER.error("Fail to call getCustomCli", ex);
throw new RuntimeException("Fail to call getCustomCli", ex);
}
}
+ try {
+ return customCommandLine.applyCommandLineOptionsToConfiguration((CommandLine) commandLine);
+ } catch (FlinkException e) {
+ throw new RuntimeException("Fail to call applyCommandLineOptionsToConfiguration", e);
+ }
}
@Override
diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
index 8fad5d4..9c26346 100644
--- a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
+++ b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
@@ -20,15 +20,19 @@ package org.apache.zeppelin.flink;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.compress.utils.Lists;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.scala.DataSet;
import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.python.PythonOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
@@ -76,6 +80,7 @@ import org.apache.flink.table.operations.ddl.DropViewOperation;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
+import org.apache.flink.util.FlinkException;
import org.apache.zeppelin.flink.shims111.CollectStreamTableSink;
import org.apache.zeppelin.flink.shims111.Flink111ScalaShims;
import org.apache.zeppelin.flink.sql.SqlCommandParser;
@@ -139,6 +144,22 @@ public class Flink111Shims extends FlinkShims {
}
@Override
+ public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
+ ((ExecutionConfig) batchConfig).disableSysoutLogging();
+ ((ExecutionConfig) streamConfig).disableSysoutLogging();
+ }
+
+ @Override
+ public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) {
+ return new StreamExecutionEnvironmentFactory() {
+ @Override
+ public StreamExecutionEnvironment createExecutionEnvironment() {
+ return (StreamExecutionEnvironment) streamExecutionEnvironment;
+ }
+ };
+ }
+
+ @Override
public Object createCatalogManager(Object config) {
return CatalogManager.newBuilder()
.classLoader(Thread.currentThread().getContextClassLoader())
@@ -402,8 +423,13 @@ public class Flink111Shims extends FlinkShims {
}
@Override
- public Object getCustomCli(Object cliFrontend, Object commandLine) {
- return ((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine);
+ public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object effectiveConfig) {
+ CustomCommandLine customCommandLine = ((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine);
+ try {
+ return customCommandLine.applyCommandLineOptionsToConfiguration((CommandLine) commandLine);
+ } catch (FlinkException e) {
+ throw new RuntimeException("Fail to call applyCommandLineOptionsToConfiguration", e);
+ }
}
@Override
diff --git a/flink/flink1.12-shims/pom.xml b/flink/flink1.12-shims/pom.xml
new file mode 100644
index 0000000..e234b5c
--- /dev/null
+++ b/flink/flink1.12-shims/pom.xml
@@ -0,0 +1,221 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-parent</artifactId>
+ <groupId>org.apache.zeppelin</groupId>
+ <version>0.9.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>flink1.12-shims</artifactId>
+ <version>0.9.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+ <name>Zeppelin: Flink1.12 Shims</name>
+
+ <properties>
+ <flink.version>${flink1.12.version}</flink.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.12</scala.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>flink-shims</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-scala_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-scala_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-python_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala-shell_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>eclipse-add-source</id>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>scala-test-compile-first</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <scalaVersion>${scala.version}</scalaVersion>
+ <args>
+ <arg>-unchecked</arg>
+ <arg>-deprecation</arg>
+ <arg>-feature</arg>
+ <arg>-target:jvm-1.8</arg>
+ </args>
+ <jvmArgs>
+ <jvmArg>-Xms1024m</jvmArg>
+ <jvmArg>-Xmx1024m</jvmArg>
+ <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg>
+ </jvmArgs>
+ <javacArgs>
+ <javacArg>-source</javacArg>
+ <javacArg>${java.version}</javacArg>
+ <javacArg>-target</javacArg>
+ <javacArg>${java.version}</javacArg>
+ <javacArg>-Xlint:all,-serial,-path,-options</javacArg>
+ </javacArgs>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-interpreter-setting</id>
+ <phase>none</phase>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
similarity index 92%
copy from flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
copy to flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
index 8fad5d4..dc5a4db 100644
--- a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
+++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
@@ -20,15 +20,20 @@ package org.apache.zeppelin.flink;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.compress.utils.Lists;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.scala.DataSet;
import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.python.PythonOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
@@ -76,8 +81,9 @@ import org.apache.flink.table.operations.ddl.DropViewOperation;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
+import org.apache.flink.util.FlinkException;
import org.apache.zeppelin.flink.shims111.CollectStreamTableSink;
-import org.apache.zeppelin.flink.shims111.Flink111ScalaShims;
+import org.apache.zeppelin.flink.shims112.Flink112ScalaShims;
import org.apache.zeppelin.flink.sql.SqlCommandParser;
import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommandCall;
@@ -103,11 +109,11 @@ import java.util.regex.Matcher;
/**
- * Shims for flink 1.11
+ * Shims for flink 1.12
*/
-public class Flink111Shims extends FlinkShims {
+public class Flink112Shims extends FlinkShims {
- private static final Logger LOGGER = LoggerFactory.getLogger(Flink111Shims.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(Flink112Shims.class);
public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder()
.append("The following commands are available:\n\n")
.append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under current catalog and database."))
@@ -134,11 +140,27 @@ public class Flink111Shims extends FlinkShims {
private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>();
- public Flink111Shims(Properties properties) {
+ public Flink112Shims(Properties properties) {
super(properties);
}
@Override
+ public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
+ // do nothing
+ }
+
+
+ @Override
+ public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) {
+ return new StreamExecutionEnvironmentFactory() {
+ @Override
+ public StreamExecutionEnvironment createExecutionEnvironment(Configuration configuration) {
+ return (StreamExecutionEnvironment) streamExecutionEnvironment;
+ }
+ };
+ }
+
+ @Override
public Object createCatalogManager(Object config) {
return CatalogManager.newBuilder()
.classLoader(Thread.currentThread().getContextClassLoader())
@@ -211,12 +233,12 @@ public class Flink111Shims extends FlinkShims {
@Override
public Object fromDataSet(Object btenv, Object ds) {
- return Flink111ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds);
+ return Flink112ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds);
}
@Override
public Object toDataSet(Object btenv, Object table) {
- return Flink111ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table);
+ return Flink112ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table);
}
@Override
@@ -402,8 +424,14 @@ public class Flink111Shims extends FlinkShims {
}
@Override
- public Object getCustomCli(Object cliFrontend, Object commandLine) {
- return ((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine);
+ public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object effectiveConfig) {
+ CustomCommandLine customCommandLine = ((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine);
+ try {
+ ((Configuration) effectiveConfig).addAll(customCommandLine.toConfiguration((CommandLine) commandLine));
+ return effectiveConfig;
+ } catch (FlinkException e) {
+ throw new RuntimeException("Fail to call addAll", e);
+ }
}
@Override
diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/CollectStreamTableSink.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/CollectStreamTableSink.java
new file mode 100644
index 0000000..b98f406
--- /dev/null
+++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/CollectStreamTableSink.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.shims111;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.experimental.CollectSink;
+import org.apache.flink.table.sinks.RetractStreamTableSink;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.UUID;
+
+/**
+ * Table sink for collecting the results locally using sockets.
+ */
+public class CollectStreamTableSink implements RetractStreamTableSink<Row> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CollectStreamTableSink.class);
+
+ private final InetAddress targetAddress;
+ private final int targetPort;
+ private final TypeSerializer<Tuple2<Boolean, Row>> serializer;
+
+ private String[] fieldNames;
+ private TypeInformation<?>[] fieldTypes;
+
+ public CollectStreamTableSink(InetAddress targetAddress,
+ int targetPort,
+ TypeSerializer<Tuple2<Boolean, Row>> serializer) {
+ LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + targetPort);
+ this.targetAddress = targetAddress;
+ this.targetPort = targetPort;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public String[] getFieldNames() {
+ return fieldNames;
+ }
+
+ @Override
+ public TypeInformation<?>[] getFieldTypes() {
+ return fieldTypes;
+ }
+
+ @Override
+ public CollectStreamTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+ final CollectStreamTableSink copy =
+ new CollectStreamTableSink(targetAddress, targetPort, serializer);
+ copy.fieldNames = fieldNames;
+ copy.fieldTypes = fieldTypes;
+ return copy;
+ }
+
+ @Override
+ public TypeInformation<Row> getRecordType() {
+ return Types.ROW_NAMED(fieldNames, fieldTypes);
+ }
+
+ @Override
+ public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> stream) {
+ // add sink
+ return stream
+ .addSink(new CollectSink<>(targetAddress, targetPort, serializer))
+ .name("Zeppelin Flink Sql Stream Collect Sink " + UUID.randomUUID())
+ .setParallelism(1);
+ }
+
+ @Override
+ public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
+ return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
+ }
+}
diff --git a/flink/flink1.12-shims/src/main/scala/org/apache/zeppelin/flink/shims112/Flink112ScalaShims.scala b/flink/flink1.12-shims/src/main/scala/org/apache/zeppelin/flink/shims112/Flink112ScalaShims.scala
new file mode 100644
index 0000000..988ad4e
--- /dev/null
+++ b/flink/flink1.12-shims/src/main/scala/org/apache/zeppelin/flink/shims112/Flink112ScalaShims.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.shims112
+
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment
+import org.apache.flink.types.Row
+
+object Flink112ScalaShims {
+
+ def fromDataSet(btenv: BatchTableEnvironment, ds: DataSet[_]): Table = {
+ btenv.fromDataSet(ds)
+ }
+
+ def toDataSet(btenv: BatchTableEnvironment, table: Table): DataSet[Row] = {
+ btenv.toDataSet[Row](table)
+ }
+}
diff --git a/flink/interpreter/pom.xml b/flink/interpreter/pom.xml
index df56a42..f1f720c 100644
--- a/flink/interpreter/pom.xml
+++ b/flink/interpreter/pom.xml
@@ -77,6 +77,12 @@
<dependency>
<groupId>org.apache.zeppelin</groupId>
+ <artifactId>flink1.12-shims</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-python</artifactId>
<version>${project.version}</version>
<exclusions>
@@ -689,9 +695,9 @@
<!-- set sun.zip.disableMemoryMapping=true because of
https://blogs.oracle.com/poonam/crashes-in-zipgetentry
https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8191484 -->
- <argLine>-Xmx4096m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine>
+ <argLine>-Xmx5120m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine>
<!-- <argLine>-Xmx4096m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=6006</argLine>-->
-
+o
<environmentVariables>
<!-- <FLINK_HOME>/Users/jzhang/github/flink/build-target</FLINK_HOME>-->
<FLINK_HOME>${project.build.directory}/flink-${flink.version}</FLINK_HOME>
@@ -853,20 +859,27 @@
<profiles>
<profile>
- <id>flink-1.10</id>
+ <id>flink-110</id>
<properties>
<flink.version>${flink1.10.version}</flink.version>
</properties>
</profile>
<profile>
- <id>flink-1.11</id>
+ <id>flink-111</id>
<properties>
<flink.version>${flink1.11.version}</flink.version>
</properties>
</profile>
<profile>
+ <id>flink-112</id>
+ <properties>
+ <flink.version>${flink1.12.version}</flink.version>
+ </properties>
+ </profile>
+
+ <profile>
<id>hive2</id>
<activation>
<activeByDefault>true</activeByDefault>
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 9865145..572426b 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -445,8 +445,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
if (java.lang.Boolean.parseBoolean(
properties.getProperty("zeppelin.flink.disableSysoutLogging", "true"))) {
- this.benv.getConfig.disableSysoutLogging()
- this.senv.getConfig.disableSysoutLogging()
+ flinkShims.disableSysoutLogging(this.benv.getConfig, this.senv.getConfig);
}
}
@@ -511,9 +510,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
}
private def setAsContext(): Unit = {
- val streamFactory = new StreamExecutionEnvironmentFactory() {
- override def createExecutionEnvironment = senv.getJavaEnv
- }
+ val streamFactory = flinkShims.createStreamExecutionEnvironmentFactory(this.senv.getJavaEnv)
//StreamExecutionEnvironment
var method = classOf[JStreamExecutionEnvironment].getDeclaredMethod("initializeContextEnvironment",
classOf[StreamExecutionEnvironmentFactory])
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkShell.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkShell.scala
index 3f814b9..e4bcf1b 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkShell.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkShell.scala
@@ -109,7 +109,7 @@ object FlinkShell {
}
private def deployNewYarnCluster(config: Config, flinkConfig: Configuration, flinkShims: FlinkShims) = {
- val effectiveConfig = new Configuration(flinkConfig)
+ var effectiveConfig = new Configuration(flinkConfig)
val args = parseArgList(config, "yarn-cluster")
val configurationDirectory = getConfigDir(config)
@@ -123,24 +123,25 @@ object FlinkShell {
frontend.getCustomCommandLineOptions)
val commandLine = CliFrontendParser.parse(commandLineOptions, args, true)
- val customCLI = flinkShims.getCustomCli(frontend, commandLine).asInstanceOf[CustomCommandLine]
- val executorConfig = customCLI.applyCommandLineOptionsToConfiguration(commandLine)
+ effectiveConfig = flinkShims
+ .updateEffectiveConfig(frontend, commandLine, effectiveConfig)
+ .asInstanceOf[Configuration]
val serviceLoader = new DefaultClusterClientServiceLoader
- val clientFactory = serviceLoader.getClusterClientFactory(executorConfig)
- val clusterDescriptor = clientFactory.createClusterDescriptor(executorConfig)
- val clusterSpecification = clientFactory.getClusterSpecification(executorConfig)
+ val clientFactory = serviceLoader.getClusterClientFactory(effectiveConfig)
+ val clusterDescriptor = clientFactory.createClusterDescriptor(effectiveConfig)
+ val clusterSpecification = clientFactory.getClusterSpecification(effectiveConfig)
val clusterClient = try {
clusterDescriptor
.deploySessionCluster(clusterSpecification)
.getClusterClient
} finally {
- executorConfig.set(DeploymentOptions.TARGET, "yarn-session")
+ effectiveConfig.set(DeploymentOptions.TARGET, "yarn-session")
clusterDescriptor.close()
}
- (executorConfig, Some(clusterClient))
+ (effectiveConfig, Some(clusterClient))
}
private def fetchDeployedYarnClusterInfo(
@@ -149,7 +150,7 @@ object FlinkShell {
mode: String,
flinkShims: FlinkShims) = {
- val effectiveConfig = new Configuration(flinkConfig)
+ var effectiveConfig = new Configuration(flinkConfig)
val args = parseArgList(config, mode)
val configurationDirectory = getConfigDir(config)
@@ -163,10 +164,11 @@ object FlinkShell {
frontend.getCustomCommandLineOptions)
val commandLine = CliFrontendParser.parse(commandLineOptions, args, true)
- val customCLI = flinkShims.getCustomCli(frontend, commandLine).asInstanceOf[CustomCommandLine]
- val executorConfig = customCLI.applyCommandLineOptionsToConfiguration(commandLine);
+ effectiveConfig = flinkShims
+ .updateEffectiveConfig(frontend, commandLine, effectiveConfig)
+ .asInstanceOf[Configuration]
- (executorConfig, None)
+ (effectiveConfig, None)
}
def parseArgList(config: Config, mode: String): Array[String] = {
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
index 16e7ee1..d27f422 100644
--- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
@@ -331,8 +331,13 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
result = sqlInterpreter.interpret("select url, count(1) as pv from " +
"log group by url", context);
- assertEquals(InterpreterResult.Code.ERROR, result.code());
- assertTrue(context.out.toString(), context.out.toString().contains("Cannot find checkpoint or savepoint"));
+ if (flinkInterpreter.getFlinkVersion().olderThan(FlinkVersion.fromVersionString("1.12.0"))) {
+ assertEquals(InterpreterResult.Code.ERROR, result.code());
+ assertTrue(context.out.toString(), context.out.toString().contains("Cannot find checkpoint or savepoint"));
+ } else {
+ // flink 1.12 would start from scratch if save point is not found.
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ }
}
@Test
diff --git a/flink/pom.xml b/flink/pom.xml
index abbb03d..a8ede36 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -39,11 +39,13 @@
<module>flink-shims</module>
<module>flink1.10-shims</module>
<module>flink1.11-shims</module>
+ <module>flink1.12-shims</module>
</modules>
<properties>
<flink1.10.version>1.10.2</flink1.10.version>
<flink1.11.version>1.11.2</flink1.11.version>
+ <flink1.12.version>1.12.0</flink1.12.version>
</properties>
<dependencies>
diff --git a/testing/env_python_3 with_flink_1_10.yml b/testing/env_python_3_with_flink_110.yml
similarity index 100%
rename from testing/env_python_3 with_flink_1_10.yml
rename to testing/env_python_3_with_flink_110.yml
diff --git a/testing/env_python_3 with_flink_1_11.yml b/testing/env_python_3_with_flink_111.yml
similarity index 100%
copy from testing/env_python_3 with_flink_1_11.yml
copy to testing/env_python_3_with_flink_111.yml
diff --git a/testing/env_python_3 with_flink_1_11.yml b/testing/env_python_3_with_flink_112.yml
similarity index 86%
rename from testing/env_python_3 with_flink_1_11.yml
rename to testing/env_python_3_with_flink_112.yml
index e23bedb..8af5119 100644
--- a/testing/env_python_3 with_flink_1_11.yml
+++ b/testing/env_python_3_with_flink_112.yml
@@ -3,6 +3,10 @@ channels:
- conda-forge
- defaults
dependencies:
+ - pip
+ - pip:
+ - bkzep==0.6.1
+ - apache-flink==1.12.0
- pycodestyle
- numpy=1
- pandas=0.25
@@ -19,7 +23,4 @@ dependencies:
- panel
- holoviews
- pyyaml=3
- - pip
- - pip:
- - bkzep==0.6.1
- - apache-flink==1.11.1
+
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest112.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest112.java
new file mode 100644
index 0000000..56b318b
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest112.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(value = Parameterized.class)
+public class FlinkIntegrationTest112 extends FlinkIntegrationTest {
+
+ @Parameterized.Parameters
+ public static List<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {"1.12.0"}
+ });
+ }
+
+ public FlinkIntegrationTest112(String flinkVersion) {
+ super(flinkVersion);
+ }
+}
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java
new file mode 100644
index 0000000..443b254
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+//@RunWith(value = Parameterized.class)
+public class ZeppelinFlinkClusterTest112 extends ZeppelinFlinkClusterTest {
+
+ @Parameterized.Parameters
+ public static List<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {"1.12.0"}
+ });
+ }
+
+ public ZeppelinFlinkClusterTest112(String flinkVersion) throws Exception {
+ super(flinkVersion);
+ }
+}