You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2021/10/05 23:52:11 UTC
[zeppelin] branch master updated: [ZEPPELIN-5469] Support Flink 1.14
This is an automated email from the ASF dual-hosted git repository.
jongyoul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 1e63b4f [ZEPPELIN-5469] Support Flink 1.14
1e63b4f is described below
commit 1e63b4ff8d3476391a6db5e28ba59b0fe460c8e2
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Mon Jul 26 18:29:32 2021 +0800
[ZEPPELIN-5469] Support Flink 1.14
### What is this PR for?
This PR is to add support of Flink 1.14. main changes:
* Add new module flink114-shims
* Add new profile flink-114. (flink 1.14 change its module, so we move some flink dependency into profile. e.g. there's no module `flink-runtime_${flink.scala.binary.version}` in flink-1.14, it changes to `flink-runtime`
* flink planner is not supported in flink-1.14, so there's no `bt_env2` and `st_env2` in flink-1.14
### What type of PR is it?
[ Feature ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5469
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #4195 from zjffdu/ZEPPELIN-5469 and squashes the following commits:
465b8778b [Jeff Zhang] [ZEPPELIN-5469] Support Flink 1.14
---
.github/workflows/core.yml | 2 +-
flink/flink-scala-parent/pom.xml | 190 +++++++++++++++++---
.../apache/zeppelin/flink/IPyFlinkInterpreter.java | 4 +
.../apache/zeppelin/flink/PyFlinkInterpreter.java | 4 +
.../org/apache/zeppelin/flink/TableEnvFactory.java | 112 ++++--------
.../flink/YarnApplicationStreamEnvironment.java | 19 +-
.../internal/ScalaShellStreamEnvironment.java | 31 +++-
.../zeppelin/flink/sql/AbstractStreamSqlJob.java | 7 +-
.../zeppelin/flink/sql/AppendStreamSqlJob.java | 33 ++--
.../src/main/resources/python/zeppelin_ipyflink.py | 12 +-
.../src/main/resources/python/zeppelin_pyflink.py | 13 +-
.../zeppelin/flink/FlinkScalaInterpreter.scala | 30 ++--
.../zeppelin/flink/FlinkZeppelinContext.scala | 14 +-
.../zeppelin/flink/internal/FlinkILoop.scala | 1 +
.../flink/FlinkBatchSqlInterpreterTest.java | 24 +--
.../zeppelin/flink/FlinkInterpreterTest.java | 11 +-
.../zeppelin/flink/IPyFlinkInterpreterTest.java | 29 ++-
.../zeppelin/flink/PyFlinkInterpreterTest.java | 18 +-
.../java/org/apache/zeppelin/flink/FlinkShims.java | 22 ++-
.../org/apache/zeppelin/flink/FlinkVersion.java | 4 +
.../org/apache/zeppelin/flink/Flink110Shims.java | 55 +++++-
.../org/apache/zeppelin/flink/Flink111Shims.java | 56 +++++-
.../org/apache/zeppelin/flink/Flink112Shims.java | 55 +++++-
.../org/apache/zeppelin/flink/Flink113Shims.java | 54 +++++-
flink/flink1.14-shims/pom.xml | 199 +++++++++++++++++++++
.../org/apache/zeppelin/flink/Flink114Shims.java} | 72 ++++++--
.../flink/shims114/CollectStreamTableSink.java | 97 ++++++++++
flink/pom.xml | 2 +
testing/env_python_3_with_flink_114.yml | 27 +++
.../integration/FlinkIntegrationTest114.java | 40 +++++
30 files changed, 1023 insertions(+), 214 deletions(-)
diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
index f65e832..1777c29 100644
--- a/.github/workflows/core.yml
+++ b/.github/workflows/core.yml
@@ -148,7 +148,7 @@ jobs:
strategy:
fail-fast: false
matrix:
- flink: [110, 111, 112, 113]
+ flink: [110, 111, 112, 113, 114]
steps:
- name: Checkout
uses: actions/checkout@v2
diff --git a/flink/flink-scala-parent/pom.xml b/flink/flink-scala-parent/pom.xml
index d970540..b51ed70 100644
--- a/flink/flink-scala-parent/pom.xml
+++ b/flink/flink-scala-parent/pom.xml
@@ -80,6 +80,12 @@
<dependency>
<groupId>org.apache.zeppelin</groupId>
+ <artifactId>flink1.14-shims</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-python</artifactId>
<version>${project.version}</version>
<exclusions>
@@ -160,13 +166,6 @@
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_${flink.scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
@@ -206,6 +205,13 @@
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${flink.scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
@@ -321,26 +327,6 @@
</dependency>
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>org.reflections</groupId>
- <artifactId>reflections</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${flink.hadoop.version}</version>
@@ -624,7 +610,6 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
-
</dependencies>
<build>
@@ -950,6 +935,38 @@
<properties>
<flink.version>${flink1.10.version}</flink.version>
</properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
</profile>
<profile>
@@ -957,6 +974,38 @@
<properties>
<flink.version>${flink1.11.version}</flink.version>
</properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
</profile>
<profile>
@@ -964,6 +1013,38 @@
<properties>
<flink.version>${flink1.12.version}</flink.version>
</properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
</profile>
<profile>
@@ -971,6 +1052,59 @@
<properties>
<flink.version>${flink1.13.version}</flink.version>
</properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ </profile>
+
+ <profile>
+ <id>flink-114</id>
+ <properties>
+ <flink.version>${flink1.14.version}</flink.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
</profile>
<profile>
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
index 763795f..25ee255 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
@@ -61,6 +61,10 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
opened = true;
}
+ public boolean isAfterFlink114() {
+ return flinkInterpreter.getFlinkVersion().isAfterFlink114();
+ }
+
@Override
public ZeppelinContext buildZeppelinContext() {
return flinkInterpreter.getZeppelinContext();
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
index d27f0fa..4162cc3 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
@@ -186,6 +186,10 @@ public class PyFlinkInterpreter extends PythonInterpreter {
return flinkInterpreter.getFlinkVersion().isFlink110();
}
+ public boolean isAfterFlink114() {
+ return flinkInterpreter.getFlinkVersion().isAfterFlink114();
+ }
+
public org.apache.flink.api.java.ExecutionEnvironment getJavaExecutionEnvironment() {
return flinkInterpreter.getExecutionEnvironment().getJavaEnv();
}
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
index 46ad481..6c080fd 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
@@ -18,6 +18,7 @@
package org.apache.zeppelin.flink;
import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
@@ -27,17 +28,12 @@ import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.delegation.Executor;
-import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
-import org.apache.flink.table.delegation.PlannerFactory;
-import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.module.ModuleManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
-import java.util.Map;
/**
* Factory class for creating flink table env for different purpose:
@@ -52,7 +48,6 @@ public class TableEnvFactory {
private FlinkVersion flinkVersion;
private FlinkShims flinkShims;
- private Executor executor;
private org.apache.flink.api.scala.ExecutionEnvironment benv;
private org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv;
@@ -133,17 +128,11 @@ public class TableEnvFactory {
public TableEnvironment createScalaFlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) {
try {
- Map<String, String> executorProperties = settings.toExecutorProperties();
- Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
-
- Map<String, String> plannerProperties = settings.toPlannerProperties();
- Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
- .create(
- plannerProperties,
- executor,
- streamTableConfig,
- oldPlannerFunctionCatalog,
- catalogManager);
+ ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor(
+ classLoader, settings, senv.getJavaEnv(),
+ oldPlannerStreamTableConfig, functionCatalog, catalogManager);
+ Planner planner = (Planner) pair.left;
+ Executor executor = (Executor) pair.right;
Class clazz = null;
if (flinkVersion.isFlink110()) {
@@ -231,15 +220,14 @@ public class TableEnvFactory {
}
}
- public TableEnvironment createJavaFlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) {
-
+ public TableEnvironment createJavaFlinkStreamTableEnvironment(EnvironmentSettings settings,
+ ClassLoader classLoader) {
try {
- Map<String, String> executorProperties = settings.toExecutorProperties();
- Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
-
- Map<String, String> plannerProperties = settings.toPlannerProperties();
- Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
- .create(plannerProperties, executor, streamTableConfig, oldPlannerFunctionCatalog, catalogManager);
+ ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor(
+ classLoader, settings, senv.getJavaEnv(),
+ oldPlannerBatchTableConfig, functionCatalog, catalogManager);
+ Planner planner = (Planner) pair.left;
+ Executor executor = (Executor) pair.right;
Class clazz = null;
if (flinkVersion.isFlink110()) {
@@ -303,18 +291,11 @@ public class TableEnvFactory {
public TableEnvironment createScalaBlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) {
try {
- Map<String, String> executorProperties = settings.toExecutorProperties();
- Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
-
- Map<String, String> plannerProperties = settings.toPlannerProperties();
- Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
- .create(
- plannerProperties,
- executor,
- streamTableConfig,
- functionCatalog,
- catalogManager);
-
+ ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor(
+ classLoader, settings, senv.getJavaEnv(),
+ streamTableConfig, functionCatalog, catalogManager);
+ Planner planner = (Planner) pair.left;
+ Executor executor = (Executor) pair.right;
Class clazz = null;
if (flinkVersion.isFlink110()) {
@@ -372,14 +353,12 @@ public class TableEnvFactory {
}
public TableEnvironment createJavaBlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) {
-
try {
- Map<String, String> executorProperties = settings.toExecutorProperties();
- Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
-
- Map<String, String> plannerProperties = settings.toPlannerProperties();
- Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
- .create(plannerProperties, executor, streamTableConfig, functionCatalog, catalogManager);
+ ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor(
+ classLoader, settings, senv.getJavaEnv(),
+ streamTableConfig, functionCatalog, catalogManager);
+ Planner planner = (Planner) pair.left;
+ Executor executor = (Executor) pair.right;
Class clazz = null;
if (flinkVersion.isFlink110()) {
@@ -439,11 +418,11 @@ public class TableEnvFactory {
public TableEnvironment createJavaBlinkBatchTableEnvironment(
EnvironmentSettings settings, ClassLoader classLoader) {
try {
- final Map<String, String> executorProperties = settings.toExecutorProperties();
- executor = lookupExecutor(executorProperties, senv.getJavaEnv());
- final Map<String, String> plannerProperties = settings.toPlannerProperties();
- final Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
- .create(plannerProperties, executor, batchTableConfig, functionCatalog, catalogManager);
+ ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor(
+ classLoader, settings, senv.getJavaEnv(),
+ batchTableConfig, functionCatalog, catalogManager);
+ Planner planner = (Planner) pair.left;
+ Executor executor = (Executor) pair.right;
Class clazz = null;
if (flinkVersion.isFlink110()) {
@@ -501,38 +480,11 @@ public class TableEnvFactory {
}
}
-
public void createStreamPlanner(EnvironmentSettings settings) {
- Map<String, String> executorProperties = settings.toExecutorProperties();
- Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
-
- Map<String, String> plannerProperties = settings.toPlannerProperties();
- Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
- .create(
- plannerProperties,
- executor,
- streamTableConfig,
- functionCatalog,
- catalogManager);
+ ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor(
+ Thread.currentThread().getContextClassLoader(), settings, senv.getJavaEnv(),
+ streamTableConfig, functionCatalog, catalogManager);
+ Planner planner = (Planner) pair.left;
this.flinkShims.setCatalogManagerSchemaResolver(catalogManager, planner.getParser(), settings);
}
-
- private static Executor lookupExecutor(
- Map<String, String> executorProperties,
- StreamExecutionEnvironment executionEnvironment) {
- try {
- ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
- Method createMethod = executorFactory.getClass()
- .getMethod("create", Map.class, StreamExecutionEnvironment.class);
-
- return (Executor) createMethod.invoke(
- executorFactory,
- executorProperties,
- executionEnvironment);
- } catch (Exception e) {
- throw new TableException(
- "Could not instantiate the executor. Make sure a planner module is on the classpath",
- e);
- }
- }
}
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java
index ca9089f..7f2fc92 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.lang.reflect.Field;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
@@ -72,7 +73,7 @@ public class YarnApplicationStreamEnvironment extends StreamExecutionEnvironment
}
private void updateDependencies() throws Exception {
- final Configuration configuration = getConfiguration();
+ final Configuration configuration = (Configuration) getFlinkConfiguration();
checkState(
configuration.getBoolean(DeploymentOptions.ATTACHED),
"Only ATTACHED mode is supported by the scala shell.");
@@ -82,6 +83,22 @@ public class YarnApplicationStreamEnvironment extends StreamExecutionEnvironment
configuration, PipelineOptions.JARS, updatedJarFiles, URL::toString);
}
+ public Object getFlinkConfiguration() {
+ if (flinkScalaInterpreter.getFlinkVersion().isAfterFlink114()) {
+ // starting from Flink 1.14, getConfiguration() return the readonly copy of internal
+ // configuration, so we need to get the internal configuration object via reflection.
+ try {
+ Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration");
+ configurationField.setAccessible(true);
+ return configurationField.get(this);
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to get configuration from StreamExecutionEnvironment", e);
+ }
+ } else {
+ return super.getConfiguration();
+ }
+ }
+
private List<URL> getUpdatedJarFiles() throws MalformedURLException {
final URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
final List<URL> allJarFiles = new ArrayList<>();
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java
index 448c34b..6bba854 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java
@@ -19,24 +19,22 @@
package org.apache.zeppelin.flink.internal;
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.JarUtils;
+import org.apache.zeppelin.flink.FlinkVersion;
+import java.lang.reflect.Field;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
/**
* This class is copied from flink project, the reason is that flink scala shell only supports
@@ -54,12 +52,17 @@ public class ScalaShellStreamEnvironment extends StreamExecutionEnvironment {
*/
private final FlinkILoop flinkILoop;
+ private final FlinkVersion flinkVersion;
+
+
public ScalaShellStreamEnvironment(
final Configuration configuration,
final FlinkILoop flinkILoop,
+ final FlinkVersion flinkVersion,
final String... jarFiles) {
super(configuration);
this.flinkILoop = checkNotNull(flinkILoop);
+ this.flinkVersion = checkNotNull(flinkVersion);
this.jarFiles = checkNotNull(JarUtils.getJarFiles(jarFiles));
}
@@ -70,16 +73,28 @@ public class ScalaShellStreamEnvironment extends StreamExecutionEnvironment {
}
private void updateDependencies() throws Exception {
- final Configuration configuration = getConfiguration();
final List<URL> updatedJarFiles = getUpdatedJarFiles();
ConfigUtils.encodeCollectionToConfig(
- configuration, PipelineOptions.JARS, updatedJarFiles, URL::toString);
+ (Configuration) getFlinkConfiguration(), PipelineOptions.JARS, updatedJarFiles, URL::toString);
}
- public Configuration getClientConfiguration() {
- return getConfiguration();
+ public Object getFlinkConfiguration() {
+ if (flinkVersion.isAfterFlink114()) {
+ // starting from Flink 1.14, getConfiguration() return the readonly copy of internal
+ // configuration, so we need to get the internal configuration object via reflection.
+ try {
+ Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration");
+ configurationField.setAccessible(true);
+ return configurationField.get(this);
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to get configuration from StreamExecutionEnvironment", e);
+ }
+ } else {
+ return super.getConfiguration();
+ }
}
+
private List<URL> getUpdatedJarFiles() throws MalformedURLException {
final URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
final List<URL> allJarFiles = new ArrayList<>(jarFiles);
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
index 1fb7832..8288615 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
@@ -29,7 +29,6 @@ import org.apache.flink.streaming.experimental.SocketStreamIterator;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.calcite.FlinkTypeFactory;
import org.apache.flink.table.sinks.RetractStreamTableSink;
import org.apache.flink.types.Row;
import org.apache.zeppelin.flink.FlinkShims;
@@ -86,12 +85,12 @@ public abstract class AbstractStreamSqlJob {
this.flinkShims = flinkShims;
}
- private static TableSchema removeTimeAttributes(TableSchema schema) {
+ private static TableSchema removeTimeAttributes(FlinkShims flinkShims, TableSchema schema) {
final TableSchema.Builder builder = TableSchema.builder();
for (int i = 0; i < schema.getFieldCount(); i++) {
final TypeInformation<?> type = schema.getFieldTypes()[i];
final TypeInformation<?> convertedType;
- if (FlinkTypeFactory.isTimeIndicatorType(type)) {
+ if (flinkShims.isTimeIndicatorType(type)) {
convertedType = Types.SQL_TIMESTAMP;
} else {
convertedType = type;
@@ -115,7 +114,7 @@ public abstract class AbstractStreamSqlJob {
this.table = table;
int parallelism = Integer.parseInt(context.getLocalProperties()
.getOrDefault("parallelism", defaultParallelism + ""));
- this.schema = removeTimeAttributes(table.getSchema());
+ this.schema = removeTimeAttributes(flinkShims, table.getSchema());
checkTableSchema(schema);
LOGGER.info("ResultTable Schema: " + this.schema);
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
index 7f636f3..c6791ea 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
@@ -31,6 +31,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.sql.Timestamp;
+import java.time.temporal.TemporalField;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -98,16 +100,27 @@ public class AppendStreamSqlJob extends AbstractStreamSqlJob {
});
if (materializedTable.size() != 0) {
- long maxTimestamp =
- ((java.sql.Timestamp) materializedTable.get(materializedTable.size() - 1)
- .getField(0)).getTime();
-
- materializedTable = materializedTable.stream()
- .filter(row -> ((java.sql.Timestamp) row.getField(0)).getTime() >
- maxTimestamp - tsWindowThreshold)
- .collect(Collectors.toList());
-
- builder.append(tableToString(materializedTable));
+ // Timestamp type before/after Flink 1.14 has changed.
+ if (flinkShims.getFlinkVersion().isAfterFlink114()) {
+ java.time.LocalDateTime ldt = ((java.time.LocalDateTime) materializedTable
+ .get(materializedTable.size() - 1)
+ .getField(0));
+ final long maxTimestamp = Timestamp.valueOf(ldt).getTime();
+ materializedTable = materializedTable.stream()
+ .filter(row -> Timestamp.valueOf(((java.time.LocalDateTime) row.getField(0))).getTime() >
+ maxTimestamp - tsWindowThreshold)
+ .collect(Collectors.toList());
+ builder.append(tableToString(materializedTable));
+ } else {
+ final long maxTimestamp =
+ ((java.sql.Timestamp) materializedTable.get(materializedTable.size() - 1)
+ .getField(0)).getTime();
+ materializedTable = materializedTable.stream()
+ .filter(row -> ((java.sql.Timestamp) row.getField(0)).getTime() >
+ maxTimestamp - tsWindowThreshold)
+ .collect(Collectors.toList());
+ builder.append(tableToString(materializedTable));
+ }
}
builder.append("\n%text ");
return builder.toString();
diff --git a/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py b/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py
index 50efbea..9249453 100644
--- a/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py
+++ b/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py
@@ -19,7 +19,6 @@
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
from pyflink.common import *
-from pyflink.dataset import *
from pyflink.datastream import *
from pyflink.table import *
from pyflink.table.catalog import *
@@ -45,19 +44,24 @@ pyflink.java_gateway._gateway = gateway
pyflink.java_gateway.import_flink_view(gateway)
pyflink.java_gateway.install_exception_handler()
-b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
s_env = StreamExecutionEnvironment(intp.getJavaStreamExecutionEnvironment())
if intp.isFlink110():
+ from pyflink.dataset import *
+ b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"), True)
bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"), False)
st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"), True)
st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"), False)
-else:
+elif not intp.isAfterFlink114():
+ from pyflink.dataset import *
+ b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"))
- bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"))
st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"))
+ bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"))
st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"))
+else:
+ st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"))
class IPyFlinkZeppelinContext(PyZeppelinContext):
diff --git a/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py b/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py
index 542ab8f..06b99c9 100644
--- a/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py
+++ b/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py
@@ -16,7 +16,6 @@
#
from pyflink.common import *
-from pyflink.dataset import *
from pyflink.datastream import *
from pyflink.table import *
from pyflink.table.catalog import *
@@ -34,19 +33,25 @@ pyflink.java_gateway._gateway = gateway
pyflink.java_gateway.import_flink_view(gateway)
pyflink.java_gateway.install_exception_handler()
-b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
s_env = StreamExecutionEnvironment(intp.getJavaStreamExecutionEnvironment())
if intp.isFlink110():
+ from pyflink.dataset import *
+ b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"), True)
bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"), False)
st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"), True)
st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"), False)
-else:
+elif not intp.isAfterFlink114():
+ from pyflink.dataset import *
+ b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"))
- bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"))
st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"))
+ bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"))
st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"))
+else:
+ st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"))
+
from zeppelin_context import PyZeppelinContext
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 1545bcd..0918084 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -447,17 +447,19 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
flinkILoop.bind("stenv", stenv.getClass().getCanonicalName(), stenv, List("@transient"))
this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader)
- // flink planner
- this.btenv_2 = tblEnvFactory.createScalaFlinkBatchTableEnvironment()
- flinkILoop.bind("btenv_2", btenv_2.getClass().getCanonicalName(), btenv_2, List("@transient"))
- stEnvSetting =
- EnvironmentSettings.newInstance().inStreamingMode().useOldPlanner().build()
- this.stenv_2 = tblEnvFactory.createScalaFlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader)
- flinkILoop.bind("stenv_2", stenv_2.getClass().getCanonicalName(), stenv_2, List("@transient"))
-
- this.java_btenv_2 = tblEnvFactory.createJavaFlinkBatchTableEnvironment()
- btEnvSetting = EnvironmentSettings.newInstance.useOldPlanner.inStreamingMode.build
- this.java_stenv_2 = tblEnvFactory.createJavaFlinkStreamTableEnvironment(btEnvSetting, getFlinkClassLoader)
+ if (!flinkVersion.isAfterFlink114()) {
+ // flink planner is not supported after flink 1.14
+ this.btenv_2 = tblEnvFactory.createScalaFlinkBatchTableEnvironment()
+ flinkILoop.bind("btenv_2", btenv_2.getClass().getCanonicalName(), btenv_2, List("@transient"))
+ stEnvSetting =
+ EnvironmentSettings.newInstance().inStreamingMode().useOldPlanner().build()
+ this.stenv_2 = tblEnvFactory.createScalaFlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader)
+ flinkILoop.bind("stenv_2", stenv_2.getClass().getCanonicalName(), stenv_2, List("@transient"))
+
+ this.java_btenv_2 = tblEnvFactory.createJavaFlinkBatchTableEnvironment()
+ btEnvSetting = EnvironmentSettings.newInstance.useOldPlanner.inStreamingMode.build
+ this.java_stenv_2 = tblEnvFactory.createJavaFlinkStreamTableEnvironment(btEnvSetting, getFlinkClassLoader)
+ }
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader)
}
@@ -919,8 +921,8 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
def completion(buf: String, cursor: Int, context: InterpreterContext): java.util.List[InterpreterCompletion]
private def getConfigurationOfStreamExecutionEnv(): Configuration = {
- val getConfigurationMethod = classOf[JStreamExecutionEnvironment].getDeclaredMethod("getConfiguration")
- getConfigurationMethod.setAccessible(true)
- getConfigurationMethod.invoke(this.senv.getJavaEnv).asInstanceOf[Configuration]
+ val configurationField = classOf[JStreamExecutionEnvironment].getDeclaredField("configuration")
+ configurationField.setAccessible(true)
+ configurationField.get(this.senv.getJavaEnv).asInstanceOf[Configuration]
}
}
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
index 517c67a..b5dba22 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
@@ -96,11 +96,15 @@ class FlinkZeppelinContext(val flinkInterpreter: FlinkScalaInterpreter,
override def showData(obj: Any, maxResult: Int): String = {
if (obj.isInstanceOf[DataSet[_]]) {
val ds = obj.asInstanceOf[DataSet[_]]
- val btenv = flinkInterpreter.getBatchTableEnvironment("flink")
- val table = flinkInterpreter.getFlinkShims.fromDataSet(btenv, ds).asInstanceOf[Table]
- val columnNames: Array[String] = table.getSchema.getFieldNames
- val dsRows: DataSet[Row] = flinkInterpreter.getFlinkShims.toDataSet(btenv, table).asInstanceOf[DataSet[Row]]
- showTable(columnNames, dsRows.first(maxResult + 1).collect())
+ if (flinkInterpreter.getFlinkVersion.isAfterFlink114) {
+ "z.show(DataSet) is not supported after Flink 1.14"
+ } else {
+ val btenv = flinkInterpreter.getBatchTableEnvironment("flink")
+ val table = flinkInterpreter.getFlinkShims.fromDataSet(btenv, ds).asInstanceOf[Table]
+ val columnNames: Array[String] = table.getSchema.getFieldNames
+ val dsRows: DataSet[Row] = flinkInterpreter.getFlinkShims.toDataSet(btenv, table).asInstanceOf[DataSet[Row]]
+ showTable(columnNames, dsRows.first(maxResult + 1).collect())
+ }
} else if (obj.isInstanceOf[Table]) {
val rows = JavaConversions.asScalaBuffer(
flinkInterpreter.getFlinkShims.collectToList(obj.asInstanceOf[TableImpl]).asInstanceOf[java.util.List[Row]]).toSeq
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
index 27ab381..86abeb5 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
@@ -60,6 +60,7 @@ class FlinkILoop(
val remoteSenv = new ScalaShellStreamEnvironment(
flinkConfig,
this,
+ flinkScalaInterpreter.getFlinkVersion,
getExternalJars(): _*)
(remoteBenv,remoteSenv)
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
index e09bd1b..875756f 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
@@ -71,14 +71,16 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData());
// z.show
- context = getInterpreterContext();
- result =
- flinkInterpreter.interpret("z.show(btenv.sqlQuery(\"select * from source_table\"))", context);
- resultMessages = context.out.toInterpreterResultMessage();
- assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(1, resultMessages.size());
- assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
- assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData());
+ if (!flinkInterpreter.getFlinkVersion().isAfterFlink114()) {
+ context = getInterpreterContext();
+ result =
+ flinkInterpreter.interpret("z.show(btenv.sqlQuery(\"select * from source_table\"))", context);
+ resultMessages = context.out.toInterpreterResultMessage();
+ assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, resultMessages.size());
+ assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+ assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData());
+ }
// define scala udf
result = flinkInterpreter.interpret(
@@ -87,7 +89,7 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
"}", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- result = flinkInterpreter.interpret("btenv.registerFunction(\"addOne\", new AddOne())",
+ result = flinkInterpreter.interpret("stenv.registerFunction(\"addOne\", new AddOne())",
getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -108,7 +110,7 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
context = getInterpreterContext();
- result = pyFlinkInterpreter.interpret("bt_env.register_function(\"python_upper\", " +
+ result = pyFlinkInterpreter.interpret("st_env.register_function(\"python_upper\", " +
"udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))",
context);
assertEquals(result.toString(), InterpreterResult.Code.SUCCESS, result.code());
@@ -133,7 +135,7 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
" return s.upper()", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- result = iPyFlinkInterpreter.interpret("bt_env.register_function(\"ipython_upper\", " +
+ result = iPyFlinkInterpreter.interpret("st_env.register_function(\"ipython_upper\", " +
"udf(IPythonUpper(), DataTypes.STRING(), DataTypes.STRING()))",
getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index 8649138..c3939db 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -231,8 +231,13 @@ public class FlinkInterpreterTest {
result = interpreter.interpret("z.show(data)", context);
assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
- assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
- assertEquals("_1\t_2\n1\tjeff\n2\tandy\n3\tjames\n", resultMessages.get(0).getData());
+ if (interpreter.getFlinkVersion().isAfterFlink114()) {
+ assertEquals(InterpreterResult.Type.TEXT, resultMessages.get(0).getType());
+ assertEquals("z.show(DataSet) is not supported after Flink 1.14", resultMessages.get(0).getData());
+ } else {
+ assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+ assertEquals("_1\t_2\n1\tjeff\n2\tandy\n3\tjames\n", resultMessages.get(0).getData());
+ }
}
@Test
@@ -263,7 +268,7 @@ public class FlinkInterpreterTest {
" .groupBy(0)\n" +
" .sum(1)\n" +
" .print()", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
String[] expectedCounts = {"(hello,3)", "(world,1)", "(flink,1)", "(hadoop,1)"};
Arrays.sort(expectedCounts);
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
index ac8d65b..3f06453 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
@@ -60,7 +60,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
private RemoteInterpreterEventClient mockIntpEventClient =
mock(RemoteInterpreterEventClient.class);
private LazyOpenInterpreter flinkScalaInterpreter;
-
+ private FlinkInterpreter flinkInnerInterpreter;
public IPyFlinkInterpreterTest() {
super();
@@ -85,8 +85,8 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
context.setIntpEventClient(mockIntpEventClient);
InterpreterContext.set(context);
- this.flinkScalaInterpreter = new LazyOpenInterpreter(
- new FlinkInterpreter(properties));
+ this.flinkInnerInterpreter = new FlinkInterpreter(properties);
+ this.flinkScalaInterpreter = new LazyOpenInterpreter(flinkInnerInterpreter);
intpGroup = new InterpreterGroup();
intpGroup.put("session_1", new ArrayList<Interpreter>());
intpGroup.get("session_1").add(flinkScalaInterpreter);
@@ -119,12 +119,16 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
@Test
public void testBatchIPyFlink() throws InterpreterException, IOException {
- testBatchPyFlink(interpreter, flinkScalaInterpreter);
+ if (!flinkInnerInterpreter.getFlinkVersion().isAfterFlink114()) {
+ testBatchPyFlink(interpreter, flinkScalaInterpreter);
+ }
}
@Test
public void testStreamIPyFlink() throws InterpreterException, IOException {
- testStreamPyFlink(interpreter, flinkScalaInterpreter);
+ if (!flinkInnerInterpreter.getFlinkVersion().isAfterFlink114()) {
+ testStreamPyFlink(interpreter, flinkScalaInterpreter);
+ }
}
@Test
@@ -154,7 +158,8 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
testResumeStreamSqlFromSavePoint(interpreter, flinkScalaInterpreter);
}
- public static void testBatchPyFlink(Interpreter pyflinkInterpreter, Interpreter flinkScalaInterpreter) throws InterpreterException, IOException {
+ public static void testBatchPyFlink(Interpreter pyflinkInterpreter,
+ LazyOpenInterpreter flinkScalaInterpreter) throws InterpreterException, IOException {
InterpreterContext context = createInterpreterContext();
InterpreterResult result = pyflinkInterpreter.interpret(
"import tempfile\n" +
@@ -273,9 +278,15 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
, context);
assertEquals(result.toString(),InterpreterResult.Code.SUCCESS, result.code());
List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
- assertEquals(context.out.toString(), 1, resultMessages.size());
- assertEquals(context.out.toString(), InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
- assertEquals(context.out.toString(), "a\tb\tc\n1\thi\thello\n2\thi\thello\n", resultMessages.get(0).getData());
+ FlinkVersion flinkVersion = ((FlinkInterpreter) flinkScalaInterpreter.getInnerInterpreter()).getFlinkVersion();
+ if (flinkVersion.isAfterFlink114()) {
+ assertEquals(InterpreterResult.Type.TEXT, resultMessages.get(0).getType());
+ assertEquals("z.show(DataSet) is not supported after Flink 1.14", resultMessages.get(0).getData());
+ } else {
+ assertEquals(context.out.toString(), 1, resultMessages.size());
+ assertEquals(context.out.toString(), InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+ assertEquals(context.out.toString(), "a\tb\tc\n1\thi\thello\n2\thi\thello\n", resultMessages.get(0).getData());
+ }
}
@Override
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
index c891ec1..0ecccfd 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
@@ -40,9 +40,10 @@ import static org.mockito.Mockito.mock;
public class PyFlinkInterpreterTest extends PythonInterpreterTest {
- private Interpreter flinkScalaInterpreter;
- private Interpreter streamSqlInterpreter;
- private Interpreter batchSqlInterpreter;
+ private FlinkInterpreter flinkInnerInterpreter;
+ private LazyOpenInterpreter flinkScalaInterpreter;
+ private LazyOpenInterpreter streamSqlInterpreter;
+ private LazyOpenInterpreter batchSqlInterpreter;
@Override
@@ -63,7 +64,8 @@ public class PyFlinkInterpreterTest extends PythonInterpreterTest {
IPyFlinkInterpreterTest.angularObjectRegistry = new AngularObjectRegistry("flink", null);
InterpreterContext context = getInterpreterContext();
InterpreterContext.set(context);
- flinkScalaInterpreter = new LazyOpenInterpreter(new FlinkInterpreter(properties));
+ this.flinkInnerInterpreter = new FlinkInterpreter(properties);
+ flinkScalaInterpreter = new LazyOpenInterpreter(flinkInnerInterpreter);
intpGroup.get("session_1").add(flinkScalaInterpreter);
flinkScalaInterpreter.setInterpreterGroup(intpGroup);
@@ -95,12 +97,16 @@ public class PyFlinkInterpreterTest extends PythonInterpreterTest {
@Test
public void testBatchPyFlink() throws InterpreterException, IOException {
- IPyFlinkInterpreterTest.testBatchPyFlink(interpreter, flinkScalaInterpreter);
+ if (!flinkInnerInterpreter.getFlinkVersion().isAfterFlink114()){
+ IPyFlinkInterpreterTest.testBatchPyFlink(interpreter, flinkScalaInterpreter);
+ }
}
@Test
public void testStreamIPyFlink() throws InterpreterException, IOException {
- IPyFlinkInterpreterTest.testStreamPyFlink(interpreter, flinkScalaInterpreter);
+ if (!flinkInnerInterpreter.getFlinkVersion().isAfterFlink114()) {
+ IPyFlinkInterpreterTest.testStreamPyFlink(interpreter, flinkScalaInterpreter);
+ }
}
@Test
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index c41488f..ba25ec9 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -18,6 +18,7 @@
package org.apache.zeppelin.flink;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.zeppelin.flink.sql.SqlCommandParser;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.jline.utils.AttributedString;
@@ -45,8 +46,10 @@ public abstract class FlinkShims {
private static FlinkShims flinkShims;
protected Properties properties;
+ protected FlinkVersion flinkVersion;
- public FlinkShims(Properties properties) {
+ public FlinkShims(FlinkVersion flinkVersion, Properties properties) {
+ this.flinkVersion = flinkVersion;
this.properties = properties;
}
@@ -65,12 +68,15 @@ public abstract class FlinkShims {
} else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 13) {
LOGGER.info("Initializing shims for Flink 1.13");
flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink113Shims");
+ } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 14) {
+ LOGGER.info("Initializing shims for Flink 1.14");
+ flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink114Shims");
} else {
throw new Exception("Flink version: '" + flinkVersion + "' is not supported yet");
}
- Constructor c = flinkShimsClass.getConstructor(Properties.class);
- return (FlinkShims) c.newInstance(properties);
+ Constructor c = flinkShimsClass.getConstructor(FlinkVersion.class, Properties.class);
+ return (FlinkShims) c.newInstance(flinkVersion, properties);
}
/**
@@ -98,6 +104,10 @@ public abstract class FlinkShims {
.toAttributedString();
}
+ public FlinkVersion getFlinkVersion() {
+ return flinkVersion;
+ }
+
public abstract void disableSysoutLogging(Object batchConfig, Object streamConfig);
public abstract Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment);
@@ -159,4 +169,10 @@ public abstract class FlinkShims {
}
public abstract String[] rowToString(Object row, Object table, Object tableConfig);
+
+ public abstract boolean isTimeIndicatorType(Object type);
+
+ public abstract ImmutablePair<Object, Object> createPlannerAndExecutor(
+ ClassLoader classLoader, Object environmentSettings, Object sEnv,
+ Object tableConfig, Object functionCatalog, Object catalogManager);
}
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java
index a042162..2e1f47e 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java
@@ -82,4 +82,8 @@ public class FlinkVersion {
public boolean isFlink110() {
return this.majorVersion == 1 && minorVersion == 10;
}
+
+ public boolean isAfterFlink114() {
+ return newerThanOrEqual(FlinkVersion.fromVersionString("1.14.0"));
+ }
}
diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
index 3faa52c..5711884 100644
--- a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
+++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
@@ -19,27 +19,38 @@
package org.apache.zeppelin.flink;
import org.apache.commons.cli.CommandLine;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.scala.DataSet;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.python.util.ResourceUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
+import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableUtils;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.api.scala.BatchTableEnvironment;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
@@ -103,8 +114,8 @@ public class Flink110Shims extends FlinkShims {
.append(": Make sure that a statement ends with ';' for finalizing (multi-line) statements.")
.toAttributedString();
- public Flink110Shims(Properties properties) {
- super(properties);
+ public Flink110Shims(FlinkVersion flinkVersion, Properties properties) {
+ super(flinkVersion, properties);
}
@Override
@@ -337,4 +348,42 @@ public class Flink110Shims extends FlinkShims {
}
return fields;
}
+
+ public boolean isTimeIndicatorType(Object type) {
+ return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type);
+ }
+
+ private Object lookupExecutor(ClassLoader classLoader,
+ Object settings,
+ Object sEnv) {
+ try {
+ Map<String, String> executorProperties = ((EnvironmentSettings) settings).toExecutorProperties();
+ ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
+ Method createMethod = executorFactory.getClass()
+ .getMethod("create", Map.class, StreamExecutionEnvironment.class);
+
+ return (Executor) createMethod.invoke(
+ executorFactory,
+ executorProperties,
+ (StreamExecutionEnvironment) sEnv);
+ } catch (Exception e) {
+ throw new TableException(
+ "Could not instantiate the executor. Make sure a planner module is on the classpath",
+ e);
+ }
+ }
+
+ @Override
+ public ImmutablePair<Object, Object> createPlannerAndExecutor(
+ ClassLoader classLoader, Object environmentSettings, Object sEnv,
+ Object tableConfig, Object functionCatalog, Object catalogManager) {
+ EnvironmentSettings settings = (EnvironmentSettings) environmentSettings;
+ Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv);
+ Map<String, String> plannerProperties = settings.toPlannerProperties();
+ Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+ .create(plannerProperties, executor, (TableConfig) tableConfig,
+ (FunctionCatalog) functionCatalog,
+ (CatalogManager) catalogManager);
+ return ImmutablePair.of(planner, executor);
+ }
}
diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
index c8db525..64979fd 100644
--- a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
+++ b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
@@ -20,8 +20,10 @@ package org.apache.zeppelin.flink;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.scala.DataSet;
@@ -36,7 +38,9 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFact
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment;
@@ -46,8 +50,14 @@ import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
@@ -78,6 +88,7 @@ import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
import org.apache.flink.table.operations.ddl.DropTableOperation;
import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.DropViewOperation;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.utils.PrintUtils;
import org.apache.flink.types.Row;
@@ -98,6 +109,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.HashMap;
@@ -141,10 +153,9 @@ public class Flink111Shims extends FlinkShims {
private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>();
- public Flink111Shims(Properties properties) {
- super(properties);
+ public Flink111Shims(FlinkVersion flinkVersion, Properties properties) {
+ super(flinkVersion, properties);
}
-
@Override
public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
((ExecutionConfig) batchConfig).disableSysoutLogging();
@@ -473,4 +484,43 @@ public class Flink111Shims extends FlinkShims {
public String[] rowToString(Object row, Object table, Object tableConfig) {
return PrintUtils.rowToString((Row) row);
}
+
+ public boolean isTimeIndicatorType(Object type) {
+ return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type);
+ }
+
+ private Object lookupExecutor(ClassLoader classLoader,
+ Object settings,
+ Object sEnv) {
+ try {
+ Map<String, String> executorProperties = ((EnvironmentSettings) settings).toExecutorProperties();
+ ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
+ Method createMethod = executorFactory.getClass()
+ .getMethod("create", Map.class, StreamExecutionEnvironment.class);
+
+ return (Executor) createMethod.invoke(
+ executorFactory,
+ executorProperties,
+ (StreamExecutionEnvironment) sEnv);
+ } catch (Exception e) {
+ throw new TableException(
+ "Could not instantiate the executor. Make sure a planner module is on the classpath",
+ e);
+ }
+ }
+
+ @Override
+ public ImmutablePair<Object, Object> createPlannerAndExecutor(
+ ClassLoader classLoader, Object environmentSettings, Object sEnv,
+ Object tableConfig, Object functionCatalog, Object catalogManager) {
+ EnvironmentSettings settings = (EnvironmentSettings) environmentSettings;
+ Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv);
+ Map<String, String> plannerProperties = settings.toPlannerProperties();
+ Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+ .create(plannerProperties, executor, (TableConfig) tableConfig,
+ (FunctionCatalog) functionCatalog,
+ (CatalogManager) catalogManager);
+ return ImmutablePair.of(planner, executor);
+
+ }
}
diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
index 648826a..a713d1c 100644
--- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
+++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
@@ -21,7 +21,9 @@ package org.apache.zeppelin.flink;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.scala.DataSet;
@@ -37,7 +39,9 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFact
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment;
@@ -47,8 +51,14 @@ import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
@@ -79,6 +89,7 @@ import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
import org.apache.flink.table.operations.ddl.DropTableOperation;
import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.DropViewOperation;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.utils.PrintUtils;
import org.apache.flink.types.Row;
@@ -99,6 +110,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.HashMap;
@@ -142,10 +154,9 @@ public class Flink112Shims extends FlinkShims {
private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>();
- public Flink112Shims(Properties properties) {
- super(properties);
+ public Flink112Shims(FlinkVersion flinkVersion, Properties properties) {
+ super(flinkVersion, properties);
}
-
@Override
public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
// do nothing
@@ -486,4 +497,42 @@ public class Flink112Shims extends FlinkShims {
public String[] rowToString(Object row, Object table, Object tableConfig) {
return PrintUtils.rowToString((Row) row);
}
+
+ public boolean isTimeIndicatorType(Object type) {
+ return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type);
+ }
+
+ private Object lookupExecutor(ClassLoader classLoader,
+ Object settings,
+ Object sEnv) {
+ try {
+ Map<String, String> executorProperties = ((EnvironmentSettings) settings).toExecutorProperties();
+ ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
+ Method createMethod = executorFactory.getClass()
+ .getMethod("create", Map.class, StreamExecutionEnvironment.class);
+
+ return (Executor) createMethod.invoke(
+ executorFactory,
+ executorProperties,
+ (StreamExecutionEnvironment) sEnv);
+ } catch (Exception e) {
+ throw new TableException(
+ "Could not instantiate the executor. Make sure a planner module is on the classpath",
+ e);
+ }
+ }
+
+ @Override
+ public ImmutablePair<Object, Object> createPlannerAndExecutor(
+ ClassLoader classLoader, Object environmentSettings, Object sEnv,
+ Object tableConfig, Object functionCatalog, Object catalogManager) {
+ EnvironmentSettings settings = (EnvironmentSettings) environmentSettings;
+ Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv);
+ Map<String, String> plannerProperties = settings.toPlannerProperties();
+ Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+ .create(plannerProperties, executor, (TableConfig) tableConfig,
+ (FunctionCatalog) functionCatalog,
+ (CatalogManager) catalogManager);
+ return ImmutablePair.of(planner, executor);
+ }
}
diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
index 054f07f..481bfee 100644
--- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
+++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
@@ -21,8 +21,10 @@ package org.apache.zeppelin.flink;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.scala.DataSet;
@@ -36,11 +38,13 @@ import org.apache.flink.core.execution.JobClient;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
+import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.PlannerType;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
@@ -50,9 +54,15 @@ import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
@@ -83,6 +93,7 @@ import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
import org.apache.flink.table.operations.ddl.DropTableOperation;
import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.DropViewOperation;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.utils.PrintUtils;
import org.apache.flink.types.Row;
@@ -103,6 +114,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.net.InetAddress;
import java.time.ZoneId;
import java.util.Arrays;
@@ -147,8 +159,8 @@ public class Flink113Shims extends FlinkShims {
private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>();
- public Flink113Shims(Properties properties) {
- super(properties);
+ public Flink113Shims(FlinkVersion flinkVersion, Properties properties) {
+ super(flinkVersion, properties);
}
@Override
@@ -508,4 +520,42 @@ public class Flink113Shims extends FlinkShims {
ResolvedSchema resolvedSchema = ((Table) table).getResolvedSchema();
return PrintUtils.rowToString((Row) row, resolvedSchema, zoneId);
}
+
+ public boolean isTimeIndicatorType(Object type) {
+ return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type);
+ }
+
+ private Object lookupExecutor(ClassLoader classLoader,
+ Object settings,
+ Object sEnv) {
+ try {
+ Map<String, String> executorProperties = ((EnvironmentSettings) settings).toExecutorProperties();
+ ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
+ Method createMethod = executorFactory.getClass()
+ .getMethod("create", Map.class, StreamExecutionEnvironment.class);
+
+ return createMethod.invoke(
+ executorFactory,
+ executorProperties,
+ sEnv);
+ } catch (Exception e) {
+ throw new TableException(
+ "Could not instantiate the executor. Make sure a planner module is on the classpath",
+ e);
+ }
+ }
+
+ @Override
+ public ImmutablePair<Object, Object> createPlannerAndExecutor(
+ ClassLoader classLoader, Object environmentSettings, Object sEnv,
+ Object tableConfig, Object functionCatalog, Object catalogManager) {
+ EnvironmentSettings settings = (EnvironmentSettings) environmentSettings;
+ Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv);
+ Map<String, String> plannerProperties = settings.toPlannerProperties();
+ Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+ .create(plannerProperties, executor, (TableConfig) tableConfig,
+ (FunctionCatalog) functionCatalog,
+ (CatalogManager) catalogManager);
+ return ImmutablePair.of(planner, executor);
+ }
}
diff --git a/flink/flink1.14-shims/pom.xml b/flink/flink1.14-shims/pom.xml
new file mode 100644
index 0000000..c160548
--- /dev/null
+++ b/flink/flink1.14-shims/pom.xml
@@ -0,0 +1,199 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-parent</artifactId>
+ <groupId>org.apache.zeppelin</groupId>
+ <version>0.11.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>flink1.14-shims</artifactId>
+ <version>0.11.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+ <name>Zeppelin: Flink1.14 Shims</name>
+
+ <properties>
+ <flink.version>${flink1.14.version}</flink.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>flink-shims</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-scala_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-scala-bridge_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-scala_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-python_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>eclipse-add-source</id>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>scala-test-compile-first</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <scalaVersion>${flink.scala.version}</scalaVersion>
+ <args>
+ <arg>-unchecked</arg>
+ <arg>-deprecation</arg>
+ <arg>-feature</arg>
+ <arg>-target:jvm-1.8</arg>
+ </args>
+ <jvmArgs>
+ <jvmArg>-Xms1024m</jvmArg>
+ <jvmArg>-Xmx1024m</jvmArg>
+ <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg>
+ </jvmArgs>
+ <javacArgs>
+ <javacArg>-source</javacArg>
+ <javacArg>${java.version}</javacArg>
+ <javacArg>-target</javacArg>
+ <javacArg>${java.version}</javacArg>
+ <javacArg>-Xlint:all,-serial,-path,-options</javacArg>
+ </javacArgs>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-interpreter-setting</id>
+ <phase>none</phase>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
similarity index 88%
copy from flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
copy to flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
index 054f07f..56fdc7e 100644
--- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
+++ b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
@@ -21,11 +21,12 @@ package org.apache.zeppelin.flink;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.scala.DataSet;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.configuration.ConfigOption;
@@ -36,23 +37,29 @@ import org.apache.flink.core.execution.JobClient;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
+import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.PlannerType;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
-import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
@@ -83,13 +90,13 @@ import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
import org.apache.flink.table.operations.ddl.DropTableOperation;
import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.DropViewOperation;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.utils.PrintUtils;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.FlinkException;
-import org.apache.zeppelin.flink.shims113.CollectStreamTableSink;
-import org.apache.zeppelin.flink.shims113.Flink113ScalaShims;
+import org.apache.zeppelin.flink.shims114.CollectStreamTableSink;
import org.apache.zeppelin.flink.sql.SqlCommandParser;
import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommandCall;
@@ -103,6 +110,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.net.InetAddress;
import java.time.ZoneId;
import java.util.Arrays;
@@ -116,11 +124,11 @@ import java.util.regex.Matcher;
/**
- * Shims for flink 1.13
+ * Shims for flink 1.14
*/
-public class Flink113Shims extends FlinkShims {
+public class Flink114Shims extends FlinkShims {
- private static final Logger LOGGER = LoggerFactory.getLogger(Flink113Shims.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(Flink114Shims.class);
public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder()
.append("The following commands are available:\n\n")
.append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under current catalog and database."))
@@ -147,8 +155,8 @@ public class Flink113Shims extends FlinkShims {
private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>();
- public Flink113Shims(Properties properties) {
- super(properties);
+ public Flink114Shims(FlinkVersion flinkVersion, Properties properties) {
+ super(flinkVersion, properties);
}
@Override
@@ -251,12 +259,14 @@ public class Flink113Shims extends FlinkShims {
@Override
public Object fromDataSet(Object btenv, Object ds) {
- return Flink113ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds);
+ return null;
+ //return Flink114ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds);
}
@Override
public Object toDataSet(Object btenv, Object table) {
- return Flink113ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table);
+ return null;
+ //return Flink114ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table);
}
@Override
@@ -508,4 +518,42 @@ public class Flink113Shims extends FlinkShims {
ResolvedSchema resolvedSchema = ((Table) table).getResolvedSchema();
return PrintUtils.rowToString((Row) row, resolvedSchema, zoneId);
}
+
+ @Override
+ public boolean isTimeIndicatorType(Object type) {
+ return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type);
+ }
+
+ private Object lookupExecutor(ClassLoader classLoader,
+ Object settings,
+ Object sEnv) {
+ try {
+ final ExecutorFactory executorFactory =
+ FactoryUtil.discoverFactory(
+ classLoader, ExecutorFactory.class, ((EnvironmentSettings) settings).getExecutor());
+ final Method createMethod =
+ executorFactory
+ .getClass()
+ .getMethod("create", StreamExecutionEnvironment.class);
+
+ return createMethod.invoke(executorFactory, sEnv);
+ } catch (Exception e) {
+ throw new TableException(
+ "Could not instantiate the executor. Make sure a planner module is on the classpath",
+ e);
+ }
+ }
+
+ @Override
+ public ImmutablePair<Object, Object> createPlannerAndExecutor(
+ ClassLoader classLoader, Object environmentSettings, Object sEnv,
+ Object tableConfig, Object functionCatalog, Object catalogManager) {
+ EnvironmentSettings settings = (EnvironmentSettings) environmentSettings;
+ Executor executor = (Executor) lookupExecutor(classLoader, environmentSettings, sEnv);
+ Planner planner = PlannerFactoryUtil.createPlanner(settings.getPlanner(), executor,
+ (TableConfig) tableConfig,
+ (CatalogManager) catalogManager,
+ (FunctionCatalog) functionCatalog);
+ return ImmutablePair.of(planner, executor);
+ }
}
diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/shims114/CollectStreamTableSink.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/shims114/CollectStreamTableSink.java
new file mode 100644
index 0000000..7a224e1
--- /dev/null
+++ b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/shims114/CollectStreamTableSink.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.shims114;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.experimental.CollectSink;
+import org.apache.flink.table.sinks.RetractStreamTableSink;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.UUID;
+
+/**
+ * Table sink for collecting the results locally using sockets.
+ */
+public class CollectStreamTableSink implements RetractStreamTableSink<Row> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CollectStreamTableSink.class);
+
+ private final InetAddress targetAddress;
+ private final int targetPort;
+ private final TypeSerializer<Tuple2<Boolean, Row>> serializer;
+
+ private String[] fieldNames;
+ private TypeInformation<?>[] fieldTypes;
+
+ public CollectStreamTableSink(InetAddress targetAddress,
+ int targetPort,
+ TypeSerializer<Tuple2<Boolean, Row>> serializer) {
+ LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + targetPort);
+ this.targetAddress = targetAddress;
+ this.targetPort = targetPort;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public String[] getFieldNames() {
+ return fieldNames;
+ }
+
+ @Override
+ public TypeInformation<?>[] getFieldTypes() {
+ return fieldTypes;
+ }
+
+ @Override
+ public CollectStreamTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+ final CollectStreamTableSink copy =
+ new CollectStreamTableSink(targetAddress, targetPort, serializer);
+ copy.fieldNames = fieldNames;
+ copy.fieldTypes = fieldTypes;
+ return copy;
+ }
+
+ @Override
+ public TypeInformation<Row> getRecordType() {
+ return Types.ROW_NAMED(fieldNames, fieldTypes);
+ }
+
+ @Override
+ public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> stream) {
+ // add sink
+ return stream
+ .addSink(new CollectSink<>(targetAddress, targetPort, serializer))
+ .name("Zeppelin Flink Sql Stream Collect Sink " + UUID.randomUUID())
+ .setParallelism(1);
+ }
+
+ @Override
+ public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
+ return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
+ }
+}
diff --git a/flink/pom.xml b/flink/pom.xml
index 0fcb776..329f79e 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -41,6 +41,7 @@
<module>flink1.11-shims</module>
<module>flink1.12-shims</module>
<module>flink1.13-shims</module>
+ <module>flink1.14-shims</module>
</modules>
<properties>
@@ -48,6 +49,7 @@
<flink1.11.version>1.11.3</flink1.11.version>
<flink1.12.version>1.12.4</flink1.12.version>
<flink1.13.version>1.13.2</flink1.13.version>
+ <flink1.14.version>1.14.0</flink1.14.version>
<flink.scala.version>2.11.12</flink.scala.version>
<flink.scala.binary.version>2.11</flink.scala.binary.version>
diff --git a/testing/env_python_3_with_flink_114.yml b/testing/env_python_3_with_flink_114.yml
new file mode 100644
index 0000000..37c6fa8
--- /dev/null
+++ b/testing/env_python_3_with_flink_114.yml
@@ -0,0 +1,27 @@
+name: python_3_with_flink
+channels:
+ - conda-forge
+ - defaults
+dependencies:
+ - pycodestyle
+ - scipy
+ - numpy=1.19.5
+ - grpcio
+ - protobuf
+ - pandasql
+ - ipython
+ - ipykernel
+ - jupyter_client=5
+ - hvplot
+ - plotnine
+ - seaborn
+ - intake
+ - intake-parquet
+ - intake-xarray
+ - altair
+ - vega_datasets
+ - plotly
+ - pip
+ - pip:
+ - apache-flink==1.14.0
+
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest114.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest114.java
new file mode 100644
index 0000000..68f6810
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest114.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(value = Parameterized.class)
+public class FlinkIntegrationTest114 extends FlinkIntegrationTest {
+
+ @Parameterized.Parameters
+ public static List<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {"1.14.0", "2.11"},
+ {"1.14.0", "2.12"}
+ });
+ }
+
+ public FlinkIntegrationTest114(String flinkVersion, String scalaVersion) {
+ super(flinkVersion, scalaVersion);
+ }
+}