You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/05/13 05:44:44 UTC
[zeppelin] branch master updated: [ZEPPELIN-5356] Support flink 1.13
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 369e80b [ZEPPELIN-5356] Support flink 1.13
369e80b is described below
commit 369e80b89e5a9c83a66f90f8021a925c90bdd333
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Mon May 10 22:01:38 2021 +0800
[ZEPPELIN-5356] Support flink 1.13
### What is this PR for?
Add new module `flink-1.13-shims` to support flink 1.3. The main change is due to the api change of flink 1.3, we have to use different `TableConfig` for each `TableEnvironment` instances, otherwise it will cause conflicts.
### What type of PR is it?
[ Feature ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5356
### How should this be tested?
* Ci pass and manually tested 1.13.0
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #4111 from zjffdu/ZEPPELIN-5356 and squashes the following commits:
6161a67b9 [Jeff Zhang] address comment
964b4ab16 [Jeff Zhang] fix pyflink issue
ac8c1fc52 [Jeff Zhang] [ZEPPELIN-5356] Support flink 1.13
dbadae867 [Jeff Zhang] save
---
.github/workflows/core.yml | 2 +-
.../java/org/apache/zeppelin/flink/FlinkShims.java | 11 +
.../org/apache/zeppelin/flink/Flink112Shims.java | 1 -
flink/flink1.13-shims/pom.xml | 221 +++++++++++++++++++++
.../org/apache/zeppelin/flink/Flink113Shims.java} | 39 ++--
.../flink/shims113/CollectStreamTableSink.java | 97 +++++++++
.../flink/shims113/Flink113ScalaShims.scala | 36 ++++
flink/interpreter/pom.xml | 16 +-
.../org/apache/zeppelin/flink/TableEnvFactory.java | 133 ++++++++-----
.../zeppelin/flink/FlinkScalaInterpreter.scala | 21 +-
.../zeppelin/flink/FlinkZeppelinContext.scala | 1 -
.../flink/FlinkStreamSqlInterpreterTest.java | 2 +-
.../zeppelin/flink/IPyFlinkInterpreterTest.java | 2 +-
.../apache/zeppelin/flink/SqlInterpreterTest.java | 15 +-
flink/pom.xml | 2 +
testing/env_python_3_with_flink_113.yml | 26 +++
.../integration/FlinkIntegrationTest113.java | 39 ++++
17 files changed, 571 insertions(+), 93 deletions(-)
diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
index f7b6562..475d569 100644
--- a/.github/workflows/core.yml
+++ b/.github/workflows/core.yml
@@ -144,7 +144,7 @@ jobs:
strategy:
fail-fast: false
matrix:
- flink: [ 110, 111, 112]
+ flink: [ 110, 111, 112, 113]
steps:
- name: Checkout
uses: actions/checkout@v2
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index bbe718c..acdb4a9 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -62,6 +62,9 @@ public abstract class FlinkShims {
} else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 12) {
LOGGER.info("Initializing shims for Flink 1.12");
flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink112Shims");
+ } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 13) {
+ LOGGER.info("Initializing shims for Flink 1.13");
+ flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink113Shims");
} else {
throw new Exception("Flink version: '" + flinkVersion + "' is not supported yet");
}
@@ -146,4 +149,12 @@ public abstract class FlinkShims {
public abstract Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object executorConfig);
public abstract Map extractTableConfigOptions();
+
+ public void setBatchRuntimeMode(Object tableConfig) {
+ // only needed after flink 1.13
+ }
+
+ public void setOldPlanner(Object tableConfig) {
+ // only needed after flink 1.13
+ }
}
diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
index 4f6b698..b3f6ccc 100644
--- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
+++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
@@ -21,7 +21,6 @@ package org.apache.zeppelin.flink;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
diff --git a/flink/flink1.13-shims/pom.xml b/flink/flink1.13-shims/pom.xml
new file mode 100644
index 0000000..e7ef24e
--- /dev/null
+++ b/flink/flink1.13-shims/pom.xml
@@ -0,0 +1,221 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-parent</artifactId>
+ <groupId>org.apache.zeppelin</groupId>
+ <version>0.10.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>flink1.13-shims</artifactId>
+ <version>0.10.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+ <name>Zeppelin: Flink1.13 Shims</name>
+
+ <properties>
+ <flink.version>${flink1.13.version}</flink.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.12</scala.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>flink-shims</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-scala_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-scala_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-python_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala-shell_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>eclipse-add-source</id>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>scala-test-compile-first</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <scalaVersion>${scala.version}</scalaVersion>
+ <args>
+ <arg>-unchecked</arg>
+ <arg>-deprecation</arg>
+ <arg>-feature</arg>
+ <arg>-target:jvm-1.8</arg>
+ </args>
+ <jvmArgs>
+ <jvmArg>-Xms1024m</jvmArg>
+ <jvmArg>-Xmx1024m</jvmArg>
+ <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg>
+ </jvmArgs>
+ <javacArgs>
+ <javacArg>-source</javacArg>
+ <javacArg>${java.version}</javacArg>
+ <javacArg>-target</javacArg>
+ <javacArg>${java.version}</javacArg>
+ <javacArg>-Xlint:all,-serial,-path,-options</javacArg>
+ </javacArgs>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-interpreter-setting</id>
+ <phase>none</phase>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
similarity index 95%
copy from flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
copy to flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
index 4f6b698..8f6f813 100644
--- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
+++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
@@ -21,8 +21,8 @@ package org.apache.zeppelin.flink;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.scala.DataSet;
@@ -30,14 +30,16 @@ import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
-import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.PlannerType;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
@@ -46,7 +48,6 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
-import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.delegation.Parser;
@@ -84,8 +85,8 @@ import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.FlinkException;
-import org.apache.zeppelin.flink.shims112.CollectStreamTableSink;
-import org.apache.zeppelin.flink.shims112.Flink112ScalaShims;
+import org.apache.zeppelin.flink.shims113.CollectStreamTableSink;
+import org.apache.zeppelin.flink.shims113.Flink113ScalaShims;
import org.apache.zeppelin.flink.sql.SqlCommandParser;
import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommandCall;
@@ -111,11 +112,11 @@ import java.util.regex.Matcher;
/**
- * Shims for flink 1.12
+ * Shims for flink 1.13
*/
-public class Flink112Shims extends FlinkShims {
+public class Flink113Shims extends FlinkShims {
- private static final Logger LOGGER = LoggerFactory.getLogger(Flink112Shims.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(Flink113Shims.class);
public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder()
.append("The following commands are available:\n\n")
.append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under current catalog and database."))
@@ -142,7 +143,7 @@ public class Flink112Shims extends FlinkShims {
private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>();
- public Flink112Shims(Properties properties) {
+ public Flink113Shims(Properties properties) {
super(properties);
}
@@ -246,12 +247,12 @@ public class Flink112Shims extends FlinkShims {
@Override
public Object fromDataSet(Object btenv, Object ds) {
- return Flink112ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds);
+ return Flink113ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds);
}
@Override
public Object toDataSet(Object btenv, Object table) {
- return Flink112ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table);
+ return Flink113ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table);
}
@Override
@@ -436,9 +437,7 @@ public class Flink112Shims extends FlinkShims {
public void setCatalogManagerSchemaResolver(Object catalogManager,
Object parserObject,
Object environmentSetting) {
- ((CatalogManager) catalogManager).setCatalogTableSchemaResolver(
- new CatalogTableSchemaResolver((Parser)parserObject,
- ((EnvironmentSettings)environmentSetting).isStreamingMode()));
+
}
@Override
@@ -481,4 +480,16 @@ public class Flink112Shims extends FlinkShims {
}
return configOptions;
}
+
+ @Override
+ public void setBatchRuntimeMode(Object tableConfig) {
+ ((TableConfig) tableConfig).getConfiguration()
+ .set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
+ }
+
+ @Override
+ public void setOldPlanner(Object tableConfig) {
+ ((TableConfig) tableConfig).getConfiguration()
+ .set(TableConfigOptions.TABLE_PLANNER, PlannerType.OLD);
+ }
}
diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/shims113/CollectStreamTableSink.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/shims113/CollectStreamTableSink.java
new file mode 100644
index 0000000..3cbcc75
--- /dev/null
+++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/shims113/CollectStreamTableSink.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.shims113;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.experimental.CollectSink;
+import org.apache.flink.table.sinks.RetractStreamTableSink;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.UUID;
+
+/**
+ * Table sink for collecting the results locally using sockets.
+ */
+public class CollectStreamTableSink implements RetractStreamTableSink<Row> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CollectStreamTableSink.class);
+
+ private final InetAddress targetAddress;
+ private final int targetPort;
+ private final TypeSerializer<Tuple2<Boolean, Row>> serializer;
+
+ private String[] fieldNames;
+ private TypeInformation<?>[] fieldTypes;
+
+ public CollectStreamTableSink(InetAddress targetAddress,
+ int targetPort,
+ TypeSerializer<Tuple2<Boolean, Row>> serializer) {
+ LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + targetPort);
+ this.targetAddress = targetAddress;
+ this.targetPort = targetPort;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public String[] getFieldNames() {
+ return fieldNames;
+ }
+
+ @Override
+ public TypeInformation<?>[] getFieldTypes() {
+ return fieldTypes;
+ }
+
+ @Override
+ public CollectStreamTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+ final CollectStreamTableSink copy =
+ new CollectStreamTableSink(targetAddress, targetPort, serializer);
+ copy.fieldNames = fieldNames;
+ copy.fieldTypes = fieldTypes;
+ return copy;
+ }
+
+ @Override
+ public TypeInformation<Row> getRecordType() {
+ return Types.ROW_NAMED(fieldNames, fieldTypes);
+ }
+
+ @Override
+ public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> stream) {
+ // add sink
+ return stream
+ .addSink(new CollectSink<>(targetAddress, targetPort, serializer))
+ .name("Zeppelin Flink Sql Stream Collect Sink " + UUID.randomUUID())
+ .setParallelism(1);
+ }
+
+ @Override
+ public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
+ return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
+ }
+}
diff --git a/flink/flink1.13-shims/src/main/scala/org/apache/zeppelin/flink/shims113/Flink113ScalaShims.scala b/flink/flink1.13-shims/src/main/scala/org/apache/zeppelin/flink/shims113/Flink113ScalaShims.scala
new file mode 100644
index 0000000..10250b0
--- /dev/null
+++ b/flink/flink1.13-shims/src/main/scala/org/apache/zeppelin/flink/shims113/Flink113ScalaShims.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.shims113
+
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment
+import org.apache.flink.types.Row
+
+object Flink113ScalaShims {
+
+ def fromDataSet(btenv: BatchTableEnvironment, ds: DataSet[_]): Table = {
+ btenv.fromDataSet(ds)
+ }
+
+ def toDataSet(btenv: BatchTableEnvironment, table: Table): DataSet[Row] = {
+ btenv.toDataSet[Row](table)
+ }
+}
diff --git a/flink/interpreter/pom.xml b/flink/interpreter/pom.xml
index ef9f8f1..bd5d5f1 100644
--- a/flink/interpreter/pom.xml
+++ b/flink/interpreter/pom.xml
@@ -91,6 +91,12 @@
<dependency>
<groupId>org.apache.zeppelin</groupId>
+ <artifactId>flink1.13-shims</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-python</artifactId>
<version>${project.version}</version>
<exclusions>
@@ -705,9 +711,8 @@
https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8191484 -->
<argLine>-Xmx5120m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine>
<!-- <argLine>-Xmx4096m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=6006</argLine>-->
-o
+
<environmentVariables>
-<!-- <FLINK_HOME>/Users/jzhang/github/flink/build-target</FLINK_HOME>-->
<FLINK_HOME>${project.build.directory}/flink-${flink.version}</FLINK_HOME>
<FLINK_CONF_DIR>${project.build.directory}/test-classes</FLINK_CONF_DIR>
</environmentVariables>
@@ -888,6 +893,13 @@ o
</profile>
<profile>
+ <id>flink-113</id>
+ <properties>
+ <flink.version>${flink1.13.version}</flink.version>
+ </properties>
+ </profile>
+
+ <profile>
<id>hive2</id>
<activation>
<activeByDefault>true</activeByDefault>
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
index 9fb6efd..46ad481 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
@@ -55,30 +55,56 @@ public class TableEnvFactory {
private Executor executor;
private org.apache.flink.api.scala.ExecutionEnvironment benv;
private org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv;
- private TableConfig tblConfig;
+
+ /***********************************************************************
+ Should use different TableConfig for different kinds of table_env
+ otherwise it will cause conflicts after flink 1.13
+ ***********************************************************************/
+ // tableConfig used for StreamTableEnvironment.
+ private TableConfig streamTableConfig;
+ // tableConfig used for BatchTableEnvironment.
+ private TableConfig batchTableConfig;
+ // tableConfig for old planner
+ private TableConfig oldPlannerStreamTableConfig;
+ private TableConfig oldPlannerBatchTableConfig;
+
private CatalogManager catalogManager;
+ private CatalogManager oldPlannerCatalogManager;
private ModuleManager moduleManager;
- private FunctionCatalog flinkFunctionCatalog;
- private FunctionCatalog blinkFunctionCatalog;
+ private FunctionCatalog functionCatalog;
+ private FunctionCatalog oldPlannerFunctionCatalog;
+
public TableEnvFactory(FlinkVersion flinkVersion,
FlinkShims flinkShims,
org.apache.flink.api.scala.ExecutionEnvironment env,
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv,
- TableConfig tblConfig,
- CatalogManager catalogManager,
- ModuleManager moduleManager,
- FunctionCatalog flinkFunctionCatalog,
- FunctionCatalog blinkFunctionCatalog) {
+ TableConfig streamTableConfig) {
+
this.flinkVersion = flinkVersion;
this.flinkShims = flinkShims;
this.benv = env;
this.senv = senv;
- this.tblConfig = tblConfig;
- this.catalogManager = catalogManager;
- this.moduleManager = moduleManager;
- this.flinkFunctionCatalog = flinkFunctionCatalog;
- this.blinkFunctionCatalog = blinkFunctionCatalog;
+ this.streamTableConfig = streamTableConfig;
+ this.batchTableConfig = new TableConfig();
+ this.batchTableConfig.getConfiguration().addAll(streamTableConfig.getConfiguration());
+ flinkShims.setBatchRuntimeMode(this.batchTableConfig);
+ this.oldPlannerBatchTableConfig = new TableConfig();
+ this.oldPlannerBatchTableConfig.getConfiguration().addAll(streamTableConfig.getConfiguration());
+ flinkShims.setOldPlanner(this.oldPlannerBatchTableConfig);
+ this.oldPlannerStreamTableConfig = new TableConfig();
+ this.oldPlannerStreamTableConfig.getConfiguration().addAll(streamTableConfig.getConfiguration());
+ flinkShims.setOldPlanner(this.oldPlannerStreamTableConfig);
+
+ this.catalogManager = (CatalogManager) flinkShims.createCatalogManager(streamTableConfig.getConfiguration());
+ this.oldPlannerCatalogManager = (CatalogManager) flinkShims.createCatalogManager(
+ this.oldPlannerStreamTableConfig.getConfiguration());
+
+ this.moduleManager = new ModuleManager();
+
+ this.functionCatalog = new FunctionCatalog(streamTableConfig, catalogManager, moduleManager);
+ this.oldPlannerFunctionCatalog = new FunctionCatalog(
+ this.oldPlannerStreamTableConfig, this.oldPlannerCatalogManager, moduleManager);
}
public TableEnvironment createScalaFlinkBatchTableEnvironment() {
@@ -99,7 +125,7 @@ public class TableEnvFactory {
ModuleManager.class);
return (TableEnvironment)
- constructor.newInstance(benv, tblConfig, catalogManager, moduleManager);
+ constructor.newInstance(benv, oldPlannerBatchTableConfig, oldPlannerCatalogManager, moduleManager);
} catch (Exception e) {
throw new TableException("Fail to createScalaFlinkBatchTableEnvironment", e);
}
@@ -115,8 +141,8 @@ public class TableEnvFactory {
.create(
plannerProperties,
executor,
- tblConfig,
- flinkFunctionCatalog,
+ streamTableConfig,
+ oldPlannerFunctionCatalog,
catalogManager);
Class clazz = null;
@@ -127,6 +153,7 @@ public class TableEnvFactory {
clazz = Class
.forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl");
}
+
try {
Constructor constructor = clazz
.getConstructor(
@@ -138,10 +165,11 @@ public class TableEnvFactory {
Planner.class,
Executor.class,
boolean.class);
- return (TableEnvironment) constructor.newInstance(catalogManager,
+ return (TableEnvironment) constructor.newInstance(
+ oldPlannerCatalogManager,
moduleManager,
- flinkFunctionCatalog,
- tblConfig,
+ oldPlannerFunctionCatalog,
+ oldPlannerStreamTableConfig,
senv,
planner,
executor,
@@ -159,10 +187,11 @@ public class TableEnvFactory {
Executor.class,
boolean.class,
ClassLoader.class);
- return (TableEnvironment) constructor.newInstance(catalogManager,
+ return (TableEnvironment) constructor.newInstance(
+ oldPlannerCatalogManager,
moduleManager,
- flinkFunctionCatalog,
- tblConfig,
+ oldPlannerFunctionCatalog,
+ oldPlannerStreamTableConfig,
senv,
planner,
executor,
@@ -191,10 +220,11 @@ public class TableEnvFactory {
TableConfig.class,
CatalogManager.class,
ModuleManager.class);
+
return (TableEnvironment) con.newInstance(
benv.getJavaEnv(),
- tblConfig,
- catalogManager,
+ oldPlannerBatchTableConfig,
+ oldPlannerCatalogManager,
moduleManager);
} catch (Throwable t) {
throw new TableException("Create BatchTableEnvironment failed.", t);
@@ -209,7 +239,7 @@ public class TableEnvFactory {
Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
- .create(plannerProperties, executor, tblConfig, flinkFunctionCatalog, catalogManager);
+ .create(plannerProperties, executor, streamTableConfig, oldPlannerFunctionCatalog, catalogManager);
Class clazz = null;
if (flinkVersion.isFlink110()) {
@@ -219,6 +249,7 @@ public class TableEnvFactory {
clazz = Class
.forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl");
}
+
try {
Constructor constructor = clazz
.getConstructor(
@@ -230,10 +261,11 @@ public class TableEnvFactory {
Planner.class,
Executor.class,
boolean.class);
- return (TableEnvironment) constructor.newInstance(catalogManager,
+ return (TableEnvironment) constructor.newInstance(
+ oldPlannerCatalogManager,
moduleManager,
- flinkFunctionCatalog,
- tblConfig,
+ oldPlannerFunctionCatalog,
+ oldPlannerStreamTableConfig,
senv.getJavaEnv(),
planner,
executor,
@@ -251,10 +283,11 @@ public class TableEnvFactory {
Executor.class,
boolean.class,
ClassLoader.class);
- return (TableEnvironment) constructor.newInstance(catalogManager,
+ return (TableEnvironment) constructor.newInstance(
+ oldPlannerCatalogManager,
moduleManager,
- flinkFunctionCatalog,
- tblConfig,
+ oldPlannerFunctionCatalog,
+ oldPlannerStreamTableConfig,
senv.getJavaEnv(),
planner,
executor,
@@ -278,8 +311,8 @@ public class TableEnvFactory {
.create(
plannerProperties,
executor,
- tblConfig,
- blinkFunctionCatalog,
+ streamTableConfig,
+ functionCatalog,
catalogManager);
@@ -304,8 +337,8 @@ public class TableEnvFactory {
boolean.class);
return (TableEnvironment) constructor.newInstance(catalogManager,
moduleManager,
- blinkFunctionCatalog,
- tblConfig,
+ functionCatalog,
+ streamTableConfig,
senv,
planner,
executor,
@@ -325,8 +358,8 @@ public class TableEnvFactory {
ClassLoader.class);
return (TableEnvironment) constructor.newInstance(catalogManager,
moduleManager,
- blinkFunctionCatalog,
- tblConfig,
+ functionCatalog,
+ streamTableConfig,
senv,
planner,
executor,
@@ -346,7 +379,7 @@ public class TableEnvFactory {
Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
- .create(plannerProperties, executor, tblConfig, blinkFunctionCatalog, catalogManager);
+ .create(plannerProperties, executor, streamTableConfig, functionCatalog, catalogManager);
Class clazz = null;
if (flinkVersion.isFlink110()) {
@@ -369,8 +402,8 @@ public class TableEnvFactory {
boolean.class);
return (TableEnvironment) constructor.newInstance(catalogManager,
moduleManager,
- blinkFunctionCatalog,
- tblConfig,
+ functionCatalog,
+ streamTableConfig,
senv.getJavaEnv(),
planner,
executor,
@@ -390,8 +423,8 @@ public class TableEnvFactory {
ClassLoader.class);
return (TableEnvironment) constructor.newInstance(catalogManager,
moduleManager,
- blinkFunctionCatalog,
- tblConfig,
+ functionCatalog,
+ streamTableConfig,
senv.getJavaEnv(),
planner,
executor,
@@ -410,7 +443,7 @@ public class TableEnvFactory {
executor = lookupExecutor(executorProperties, senv.getJavaEnv());
final Map<String, String> plannerProperties = settings.toPlannerProperties();
final Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
- .create(plannerProperties, executor, tblConfig, blinkFunctionCatalog, catalogManager);
+ .create(plannerProperties, executor, batchTableConfig, functionCatalog, catalogManager);
Class clazz = null;
if (flinkVersion.isFlink110()) {
@@ -433,8 +466,8 @@ public class TableEnvFactory {
return (TableEnvironment) constructor.newInstance(
catalogManager,
moduleManager,
- blinkFunctionCatalog,
- tblConfig,
+ functionCatalog,
+ batchTableConfig,
senv.getJavaEnv(),
planner,
executor,
@@ -454,8 +487,8 @@ public class TableEnvFactory {
return (TableEnvironment) constructor.newInstance(
catalogManager,
moduleManager,
- blinkFunctionCatalog,
- tblConfig,
+ functionCatalog,
+ batchTableConfig,
senv.getJavaEnv(),
planner,
executor,
@@ -469,7 +502,7 @@ public class TableEnvFactory {
}
- public void createPlanner(EnvironmentSettings settings) {
+ public void createStreamPlanner(EnvironmentSettings settings) {
Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
@@ -478,8 +511,8 @@ public class TableEnvFactory {
.create(
plannerProperties,
executor,
- tblConfig,
- blinkFunctionCatalog,
+ streamTableConfig,
+ functionCatalog,
catalogManager);
this.flinkShims.setCatalogManagerSchemaResolver(catalogManager, planner.getParser(), settings);
}
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 043f3d4..1214009 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -40,7 +40,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, TableEnvironment}
-import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
+import org.apache.flink.table.catalog.CatalogManager
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableAggregateFunction, TableFunction}
import org.apache.flink.table.module.ModuleManager
@@ -435,18 +435,11 @@ class FlinkScalaInterpreter(val properties: Properties) {
val originalClassLoader = Thread.currentThread().getContextClassLoader
try {
Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
- val tblConfig = new TableConfig
- tblConfig.getConfiguration.addAll(configuration)
- // Step 1.1 Initialize the CatalogManager if required.
- val catalogManager = flinkShims.createCatalogManager(tblConfig.getConfiguration).asInstanceOf[CatalogManager]
- // Step 1.2 Initialize the ModuleManager if required.
- val moduleManager = new ModuleManager();
- // Step 1.3 Initialize the FunctionCatalog if required.
- val flinkFunctionCatalog = new FunctionCatalog(tblConfig, catalogManager, moduleManager);
- val blinkFunctionCatalog = new FunctionCatalog(tblConfig, catalogManager, moduleManager);
-
- this.tblEnvFactory = new TableEnvFactory(this.flinkVersion, this.flinkShims, this.benv, this.senv, tblConfig,
- catalogManager, moduleManager, flinkFunctionCatalog, blinkFunctionCatalog)
+ val tableConfig = new TableConfig
+ tableConfig.getConfiguration.addAll(configuration)
+
+ this.tblEnvFactory = new TableEnvFactory(this.flinkVersion, this.flinkShims,
+ this.benv, this.senv, tableConfig)
val modifiers = new java.util.ArrayList[String]()
modifiers.add("@transient")
@@ -635,7 +628,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
val stEnvSetting =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
- this.tblEnvFactory.createPlanner(stEnvSetting)
+ this.tblEnvFactory.createStreamPlanner(stEnvSetting)
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader)
}
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
index b3a964a..69bfa33 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
@@ -135,7 +135,6 @@ class FlinkZeppelinContext(val flinkInterpreter: FlinkScalaInterpreter,
}
def show(table: Table, streamType: String, configs: Map[String, String] = Map.empty): Unit = {
- val stenv = flinkInterpreter.getJavaStreamTableEnvironment("blink")
val context = InterpreterContext.get()
configs.foreach(e => context.getLocalProperties.put(e._1, e._2))
val tableName = "UnnamedTable_" + context.getParagraphId.replace("-", "_") + "_" + SQL_INDEX.getAndIncrement()
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
index e392816..382a9b9 100644
--- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
@@ -470,7 +470,7 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
" 'update-mode' = 'append'\n" +
")\n",
context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
assertEquals(1, resultMessages.size());
assertEquals(InterpreterResult.Type.TEXT, resultMessages.get(0).getType());
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
index 84f391b..6a8d71f 100644
--- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
@@ -176,7 +176,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
"t.select(\"a + 1, b, c\").insert_into(\"batch_sink\")\n" +
"bt_env.execute(\"batch_job\")"
, context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(result.toString(), InterpreterResult.Code.SUCCESS, result.code());
// use group by
context = createInterpreterContext();
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
index 5fd7e52..1d49686 100644
--- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
@@ -72,11 +72,6 @@ import static org.mockito.Mockito.mock;
public abstract class SqlInterpreterTest {
private static final Logger LOGGER = LoggerFactory.getLogger(SqlInterpreterTest.class);
- protected static final String[][] INPUT_DATA = {
- {"1", "1.1", "hello world", "true"},
- {"2", "2.3", "hello flink", "true"},
- {"3", "3.2", "hello hadoop", "false"},
- };
protected FlinkInterpreter flinkInterpreter;
@@ -189,7 +184,7 @@ public abstract class SqlInterpreterTest {
assertEquals("table\n", resultMessages.get(0).getData());
context = getInterpreterContext();
- result = sqlInterpreter.interpret("CREATE TABLE source (msg INT)", context);
+ result = sqlInterpreter.interpret("CREATE TABLE source (msg INT) with ('connector'='print')", context);
assertEquals(Code.SUCCESS, result.code());
context = getInterpreterContext();
@@ -242,7 +237,7 @@ public abstract class SqlInterpreterTest {
InterpreterContext context = getInterpreterContext();
InterpreterResult result = sqlInterpreter.interpret(
"CREATE TABLE source_table (int_col INT, double_col double, " +
- "varchar_col varchar, bool_col boolean)",
+ "varchar_col varchar, bool_col boolean) with ('connector'='print')",
context);
assertEquals(Code.SUCCESS, result.code());
List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
@@ -403,7 +398,11 @@ public abstract class SqlInterpreterTest {
resultMessages = context.out.toInterpreterResultMessage();
assertEquals(1, resultMessages.size());
assertEquals(Type.TEXT, resultMessages.get(0).getType());
- assertTrue(resultMessages.get(0).getData(), resultMessages.get(0).getData().contains("Physical Execution Plan"));
+ if (flinkInterpreter.getFlinkVersion().olderThan(FlinkVersion.fromVersionString("1.13.0"))) {
+ assertTrue(resultMessages.get(0).getData(), resultMessages.get(0).getData().contains("Physical Execution Plan"));
+ } else {
+ assertTrue(resultMessages.get(0).getData(), resultMessages.get(0).getData().contains("Optimized Execution Plan"));
+ }
}
@Test
diff --git a/flink/pom.xml b/flink/pom.xml
index 3b5d1cd..2ee34e7 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -40,12 +40,14 @@
<module>flink1.10-shims</module>
<module>flink1.11-shims</module>
<module>flink1.12-shims</module>
+ <module>flink1.13-shims</module>
</modules>
<properties>
<flink1.10.version>1.10.3</flink1.10.version>
<flink1.11.version>1.11.3</flink1.11.version>
<flink1.12.version>1.12.0</flink1.12.version>
+ <flink1.13.version>1.13.0</flink1.13.version>
</properties>
<dependencies>
diff --git a/testing/env_python_3_with_flink_113.yml b/testing/env_python_3_with_flink_113.yml
new file mode 100644
index 0000000..796afff
--- /dev/null
+++ b/testing/env_python_3_with_flink_113.yml
@@ -0,0 +1,26 @@
+name: python_3_with_flink
+channels:
+ - conda-forge
+ - defaults
+dependencies:
+ - pip
+ - pip:
+ - bkzep==0.6.1
+ - apache-flink==1.13.0
+ - pycodestyle
+ - numpy=1
+ - pandas=0.25
+ - scipy=1
+ - grpcio
+ - hvplot
+ - protobuf=3
+ - pandasql=0.7.3
+ - ipython=7
+ - matplotlib=3
+ - ipykernel=5
+ - jupyter_client=5
+ - bokeh=1.3.4
+ - panel
+ - holoviews
+ - pyyaml=3
+
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest113.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest113.java
new file mode 100644
index 0000000..9d48949
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest113.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(value = Parameterized.class)
+public class FlinkIntegrationTest113 extends FlinkIntegrationTest {
+
+ @Parameterized.Parameters
+ public static List<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {"1.13.0"}
+ });
+ }
+
+ public FlinkIntegrationTest113(String flinkVersion) {
+ super(flinkVersion);
+ }
+}