You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2022/05/13 06:02:06 UTC
[zeppelin] branch master updated: [ZEPPELIN-5600] Support Flink 1.15 (#4335)
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new deaf3d3e9f [ZEPPELIN-5600] Support Flink 1.15 (#4335)
deaf3d3e9f is described below
commit deaf3d3e9faf125e6d255c6cb405e650293b8b95
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri May 13 14:01:55 2022 +0800
[ZEPPELIN-5600] Support Flink 1.15 (#4335)
---
.github/workflows/core.yml | 27 +-
docs/interpreter/flink.md | 7 +
flink/flink-scala-parent/pom.xml | 40 +-
.../org/apache/zeppelin/flink/TableEnvFactory.java | 138 +----
.../internal/ScalaShellStreamEnvironment.java | 21 +-
.../zeppelin/flink/sql/AbstractStreamSqlJob.java | 16 +-
.../zeppelin/flink/FlinkScalaInterpreter.scala | 17 +-
.../zeppelin/flink/internal/FlinkILoop.scala | 2 +-
.../java/org/apache/zeppelin/flink/FlinkShims.java | 11 +-
.../org/apache/zeppelin/flink/Flink112Shims.java | 19 +-
.../org/apache/zeppelin/flink/Flink113Shims.java | 19 +-
.../org/apache/zeppelin/flink/Flink114Shims.java | 18 +-
flink/flink1.15-shims/pom.xml | 200 +++++++
.../org/apache/zeppelin/flink/Flink115Shims.java} | 65 ++-
.../zeppelin/flink/Flink115SqlInterpreter.java | 590 +++++++++++++++++++++
.../java/org/apache/zeppelin/flink/PrintUtils.java | 318 +++++++++++
.../zeppelin/flink/TimestampStringUtils.java | 143 +++++
.../flink/shims115/CollectStreamTableSink.java | 97 ++++
flink/pom.xml | 44 +-
testing/env_python_3_with_flink_115.yml | 29 +
.../integration/ZeppelinFlinkClusterTest115.java | 38 ++
.../launcher/FlinkInterpreterLauncher.java | 21 +-
22 files changed, 1666 insertions(+), 214 deletions(-)
diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
index 30b675b6e9..b9b3953fc8 100644
--- a/.github/workflows/core.yml
+++ b/.github/workflows/core.yml
@@ -198,7 +198,7 @@ jobs:
${{ runner.os }}-zeppelin-
- name: install environment
run: |
- ./mvnw install -DskipTests -DskipRat -Phadoop2 -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.11,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.11,flink/flink-scala-2.12,jdbc,shell -am
+ ./mvnw install -DskipTests -DskipRat -Phadoop2 -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.11,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.11,flink/flink-scala-2.12,jdbc,shell -am -Pflink-114
./mvnw package -DskipRat -pl zeppelin-plugins -amd -DskipTests -B
- name: Setup conda environment with python 3.7 and R
uses: conda-incubator/setup-miniconda@v2
@@ -216,13 +216,16 @@ jobs:
R -e "IRkernel::installspec()"
- name: run tests
run: ./mvnw test -DskipRat -pl zeppelin-interpreter-integration -Phadoop2 -Pintegration -DfailIfNoTests=false -Dtest=ZeppelinClientIntegrationTest,ZeppelinClientWithAuthIntegrationTest,ZSessionIntegrationTest,ShellIntegrationTest,JdbcIntegrationTest
-
+ - name: Print zeppelin logs
+ if: always()
+ run: if [ -d "logs" ]; then cat logs/*; fi
+
flink-test-and-flink-integration-test:
runs-on: ubuntu-20.04
strategy:
fail-fast: false
matrix:
- flink: [112, 113, 114]
+ flink: [112, 113, 114, 115]
steps:
- name: Checkout
uses: actions/checkout@v2
@@ -244,10 +247,16 @@ jobs:
key: ${{ runner.os }}-zeppelin-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-zeppelin-
- - name: install environment
+ - name: install environment for flink before 1.15 (exclusive)
+ if: matrix.flink != '115'
run: |
./mvnw install -DskipTests -DskipRat -am -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration -B
./mvnw clean package -pl zeppelin-plugins -amd -DskipTests -B
+ - name: install environment for flink after 1.15 (inclusive)
+ if: matrix.flink == '115'
+ run: |
+ ./mvnw install -DskipTests -DskipRat -am -pl flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration -B
+ ./mvnw clean package -pl zeppelin-plugins -amd -DskipTests -B
- name: Setup conda environment with python 3.7 and
uses: conda-incubator/setup-miniconda@v2
with:
@@ -259,8 +268,16 @@ jobs:
channel-priority: true
auto-activate-base: false
use-mamba: true
- - name: run tests
+ - name: run tests for flink before 1.15 (exclusive)
+ if: matrix.flink != '115'
run: ./mvnw test -DskipRat -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration -DfailIfNoTests=false -B -Dtest=org.apache.zeppelin.flink.*Test,FlinkIntegrationTest${{ matrix.flink }}
+ - name: run tests for flink before 1.15 (inclusive)
+ if: matrix.flink == '115'
+ run: ./mvnw test -DskipRat -pl flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration -DfailIfNoTests=false -B -Dtest=org.apache.zeppelin.flink.*Test,FlinkIntegrationTest${{ matrix.flink }}
+ - name: Print zeppelin logs
+ if: always()
+ run: if [ -d "logs" ]; then cat logs/*; fi
+
spark-integration-test:
runs-on: ubuntu-20.04
diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index fa86d8308e..0241ff9baf 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -144,6 +144,13 @@ docker run -u $(id -u) -p 8080:8080 --rm -v /mnt/disk1/flink-sql-cookbook-on-zep
Download Flink 1.10 or afterwards (Scala 2.11 & 2.12 are both supported)
+### Specific for Flink 1.15
+
+Flink 1.15 is scala free and has changed its binary distribution. If you would like to make Zeppelin work with Flink 1.15, you need to do the following extra steps.
+* Move FLINK_HOME/opt/flink-table-planner_2.12-1.15.0.jar to FLINK_HOME/lib
+* Move FLINK_HOME/lib/flink-table-planner-loader-1.15.0.jar to FLINK_HOME/opt
+* Download flink-table-api-scala-bridge_2.12-1.15.0.jar and flink-table-api-scala_2.12-1.15.0.jar to FLINK_HOME/lib
+
## Flink on Zeppelin Architecture
diff --git a/flink/flink-scala-parent/pom.xml b/flink/flink-scala-parent/pom.xml
index efebcedf8a..203773d6e2 100644
--- a/flink/flink-scala-parent/pom.xml
+++ b/flink/flink-scala-parent/pom.xml
@@ -41,6 +41,8 @@
<hive.version>2.3.4</hive.version>
<hiverunner.version>4.0.0</hiverunner.version>
<grpc.version>1.15.0</grpc.version>
+ <!-- Start from 1.15 flink is scala free (no scala version in its artifact) -->
+ <flink.library.scala.suffix>_${flink.scala.binary.version}</flink.library.scala.suffix>
<flink.bin.download.url>https://archive.apache.org/dist/flink/flink-${flink.version}/flink-${flink.version}-bin-scala_${flink.scala.binary.version}.tgz</flink.bin.download.url>
</properties>
@@ -72,6 +74,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>flink1.15-shims</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-python</artifactId>
@@ -147,14 +155,14 @@
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${flink.scala.binary.version}</artifactId>
+ <artifactId>flink-clients${flink.library.scala.suffix}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-yarn_${flink.scala.binary.version}</artifactId>
+ <artifactId>flink-yarn${flink.library.scala.suffix}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
@@ -193,7 +201,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId>
+ <artifactId>flink-table-api-java-bridge${flink.library.scala.suffix}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
@@ -207,7 +215,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${flink.scala.binary.version}</artifactId>
+ <artifactId>flink-streaming-java${flink.library.scala.suffix}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
@@ -1017,6 +1025,30 @@
</dependencies>
</profile>
+ <profile>
+ <id>flink-115</id>
+ <properties>
+ <flink.version>${flink1.15.version}</flink.version>
+ <flink.scala.version>2.12.7</flink.scala.version>
+ <flink.scala.binary.version>2.12</flink.scala.binary.version>
+ <flink.library.scala.suffix></flink.library.scala.suffix>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+
<profile>
<id>hive2</id>
<activation>
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
index 1456a9f307..711993578d 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
@@ -67,7 +67,6 @@ public class TableEnvFactory {
private CatalogManager oldPlannerCatalogManager;
private ModuleManager moduleManager;
private FunctionCatalog functionCatalog;
- private FunctionCatalog oldPlannerFunctionCatalog;
public TableEnvFactory(FlinkVersion flinkVersion,
@@ -94,12 +93,8 @@ public class TableEnvFactory {
this.catalogManager = (CatalogManager) flinkShims.createCatalogManager(streamTableConfig.getConfiguration());
this.oldPlannerCatalogManager = (CatalogManager) flinkShims.createCatalogManager(
this.oldPlannerStreamTableConfig.getConfiguration());
-
this.moduleManager = new ModuleManager();
-
- this.functionCatalog = new FunctionCatalog(streamTableConfig, catalogManager, moduleManager);
- this.oldPlannerFunctionCatalog = new FunctionCatalog(
- this.oldPlannerStreamTableConfig, this.oldPlannerCatalogManager, moduleManager);
+ this.functionCatalog = (FunctionCatalog) flinkShims.createFunctionCatalog(streamTableConfig, catalogManager, moduleManager);
}
public TableEnvironment createScalaFlinkBatchTableEnvironment() {
@@ -121,67 +116,6 @@ public class TableEnvFactory {
}
}
- public TableEnvironment createScalaFlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) {
- try {
- ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor(
- classLoader, settings, senv.getJavaEnv(),
- oldPlannerStreamTableConfig, functionCatalog, catalogManager);
- Planner planner = (Planner) pair.left;
- Executor executor = (Executor) pair.right;
-
- Class clazz = Class
- .forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl");
-
- try {
- Constructor constructor = clazz
- .getConstructor(
- CatalogManager.class,
- ModuleManager.class,
- FunctionCatalog.class,
- TableConfig.class,
- org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class,
- Planner.class,
- Executor.class,
- boolean.class);
- return (TableEnvironment) constructor.newInstance(
- oldPlannerCatalogManager,
- moduleManager,
- oldPlannerFunctionCatalog,
- oldPlannerStreamTableConfig,
- senv,
- planner,
- executor,
- settings.isStreamingMode());
- } catch (NoSuchMethodException e) {
- // Flink 1.11.1 change the constructor signature, FLINK-18419
- Constructor constructor = clazz
- .getConstructor(
- CatalogManager.class,
- ModuleManager.class,
- FunctionCatalog.class,
- TableConfig.class,
- org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class,
- Planner.class,
- Executor.class,
- boolean.class,
- ClassLoader.class);
- return (TableEnvironment) constructor.newInstance(
- oldPlannerCatalogManager,
- moduleManager,
- oldPlannerFunctionCatalog,
- oldPlannerStreamTableConfig,
- senv,
- planner,
- executor,
- settings.isStreamingMode(),
- classLoader);
- }
-
- } catch (Exception e) {
- throw new TableException("Fail to createScalaFlinkStreamTableEnvironment", e);
- }
- }
-
public TableEnvironment createJavaFlinkBatchTableEnvironment() {
try {
Class<?> clazz = Class
@@ -203,74 +137,12 @@ public class TableEnvFactory {
}
}
- public TableEnvironment createJavaFlinkStreamTableEnvironment(EnvironmentSettings settings,
- ClassLoader classLoader) {
- try {
- ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor(
- classLoader, settings, senv.getJavaEnv(),
- oldPlannerBatchTableConfig, functionCatalog, catalogManager);
- Planner planner = (Planner) pair.left;
- Executor executor = (Executor) pair.right;
-
- Class clazz = Class
- .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl");
-
- try {
- Constructor constructor = clazz
- .getConstructor(
- CatalogManager.class,
- ModuleManager.class,
- FunctionCatalog.class,
- TableConfig.class,
- org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class,
- Planner.class,
- Executor.class,
- boolean.class);
- return (TableEnvironment) constructor.newInstance(
- oldPlannerCatalogManager,
- moduleManager,
- oldPlannerFunctionCatalog,
- oldPlannerStreamTableConfig,
- senv.getJavaEnv(),
- planner,
- executor,
- settings.isStreamingMode());
- } catch (NoSuchMethodException e) {
- // Flink 1.11.1 change the constructor signature, FLINK-18419
- Constructor constructor = clazz
- .getConstructor(
- CatalogManager.class,
- ModuleManager.class,
- FunctionCatalog.class,
- TableConfig.class,
- org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class,
- Planner.class,
- Executor.class,
- boolean.class,
- ClassLoader.class);
- return (TableEnvironment) constructor.newInstance(
- oldPlannerCatalogManager,
- moduleManager,
- oldPlannerFunctionCatalog,
- oldPlannerStreamTableConfig,
- senv.getJavaEnv(),
- planner,
- executor,
- settings.isStreamingMode(),
- classLoader);
- }
-
- } catch (Exception e) {
- throw new TableException("Fail to createJavaFlinkStreamTableEnvironment", e);
- }
- }
-
public TableEnvironment createScalaBlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) {
try {
ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor(
classLoader, settings, senv.getJavaEnv(),
- streamTableConfig, functionCatalog, catalogManager);
+ streamTableConfig, moduleManager, functionCatalog, catalogManager);
Planner planner = (Planner) pair.left;
Executor executor = (Executor) pair.right;
@@ -327,7 +199,7 @@ public class TableEnvFactory {
try {
ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor(
classLoader, settings, senv.getJavaEnv(),
- streamTableConfig, functionCatalog, catalogManager);
+ streamTableConfig, moduleManager, functionCatalog, catalogManager);
Planner planner = (Planner) pair.left;
Executor executor = (Executor) pair.right;
@@ -386,7 +258,7 @@ public class TableEnvFactory {
try {
ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor(
classLoader, settings, senv.getJavaEnv(),
- batchTableConfig, functionCatalog, catalogManager);
+ batchTableConfig, moduleManager, functionCatalog, catalogManager);
Planner planner = (Planner) pair.left;
Executor executor = (Executor) pair.right;
@@ -443,7 +315,7 @@ public class TableEnvFactory {
public void createStreamPlanner(EnvironmentSettings settings) {
ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor(
Thread.currentThread().getContextClassLoader(), settings, senv.getJavaEnv(),
- streamTableConfig, functionCatalog, catalogManager);
+ streamTableConfig, moduleManager, functionCatalog, catalogManager);
Planner planner = (Planner) pair.left;
this.flinkShims.setCatalogManagerSchemaResolver(catalogManager, planner.getParser(), settings);
}
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java
index 6bba854ed2..bee40a0f69 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java
@@ -79,22 +79,17 @@ public class ScalaShellStreamEnvironment extends StreamExecutionEnvironment {
}
public Object getFlinkConfiguration() {
- if (flinkVersion.isAfterFlink114()) {
- // starting from Flink 1.14, getConfiguration() return the readonly copy of internal
- // configuration, so we need to get the internal configuration object via reflection.
- try {
- Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration");
- configurationField.setAccessible(true);
- return configurationField.get(this);
- } catch (Exception e) {
- throw new RuntimeException("Fail to get configuration from StreamExecutionEnvironment", e);
- }
- } else {
- return super.getConfiguration();
+ // starting from Flink 1.14, getConfiguration() return the readonly copy of internal
+ // configuration, so we need to get the internal configuration object via reflection.
+ try {
+ Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration");
+ configurationField.setAccessible(true);
+ return configurationField.get(this);
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to get configuration from StreamExecutionEnvironment", e);
}
}
-
private List<URL> getUpdatedJarFiles() throws MalformedURLException {
final URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
final List<URL> allJarFiles = new ArrayList<>(jarFiles);
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
index 8288615830..1ee26143cc 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
@@ -139,19 +139,7 @@ public abstract class AbstractStreamSqlJob {
// new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer);
collectTableSink = (RetractStreamTableSink) collectTableSink.configure(
outputType.getFieldNames(), outputType.getFieldTypes());
-
- // workaround, otherwise it won't find the sink properly
- String originalCatalog = stenv.getCurrentCatalog();
- String originalDatabase = stenv.getCurrentDatabase();
- try {
- stenv.useCatalog("default_catalog");
- stenv.useDatabase("default_database");
- flinkShims.registerTableSink(stenv, tableName, collectTableSink);
- table.insertInto(tableName);
- } finally {
- stenv.useCatalog(originalCatalog);
- stenv.useDatabase(originalDatabase);
- }
+ flinkShims.registerTableSink(stenv, tableName, collectTableSink);
long delay = 1000L;
long period = Long.parseLong(
@@ -163,7 +151,7 @@ public abstract class AbstractStreamSqlJob {
LOGGER.info("Run job: " + tableName + ", parallelism: " + parallelism);
String jobName = context.getStringLocalProperty("jobName", tableName);
- stenv.execute(jobName);
+ table.executeInsert(tableName).await();
LOGGER.info("Flink Job is finished, jobName: " + jobName);
// wait for retrieve thread consume all data
LOGGER.info("Waiting for retrieve thread to be done");
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 1e9b1dcabb..40adcffeae 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -432,13 +432,18 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
this.benv, this.senv, tableConfig)
// blink planner
- var btEnvSetting = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build()
+ var btEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder()
+ .asInstanceOf[EnvironmentSettings.Builder]
+ .inBatchMode()
+ .build()
this.btenv = tblEnvFactory.createJavaBlinkBatchTableEnvironment(btEnvSetting, getFlinkClassLoader);
flinkILoop.bind("btenv", btenv.getClass().getCanonicalName(), btenv, List("@transient"))
this.java_btenv = this.btenv
- var stEnvSetting =
- EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
+ var stEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder()
+ .asInstanceOf[EnvironmentSettings.Builder]
+ .inStreamingMode()
+ .build()
this.stenv = tblEnvFactory.createScalaBlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader)
flinkILoop.bind("stenv", stenv.getClass().getCanonicalName(), stenv, List("@transient"))
this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader)
@@ -586,8 +591,10 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
val originalClassLoader = Thread.currentThread().getContextClassLoader
try {
Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
- val stEnvSetting =
- EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
+ val stEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder()
+ .asInstanceOf[EnvironmentSettings.Builder]
+ .inStreamingMode()
+ .build()
this.tblEnvFactory.createStreamPlanner(stEnvSetting)
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader)
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
index eedcbe41e9..b133487749 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
@@ -30,7 +30,7 @@ import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.core.execution.PipelineExecutorServiceLoader
import org.apache.zeppelin.flink.{ApplicationModeExecutionEnvironment, ApplicationModeStreamEnvironment, FlinkScalaInterpreter}
import FlinkShell.ExecutionMode
-import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.StringUtils
import org.slf4j.{Logger, LoggerFactory}
import scala.tools.nsc.interpreter._
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index 2a3d82edef..a473a0f888 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -62,6 +62,9 @@ public abstract class FlinkShims {
} else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 14) {
LOGGER.info("Initializing shims for Flink 1.14");
flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink114Shims");
+ } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 15) {
+ LOGGER.info("Initializing shims for Flink 1.15");
+ flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink115Shims");
} else {
throw new Exception("Flink version: '" + flinkVersion + "' is not supported yet");
}
@@ -88,6 +91,8 @@ public abstract class FlinkShims {
return flinkVersion;
}
+ public abstract Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager);
+
public abstract void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext);
public abstract void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext);
@@ -142,7 +147,11 @@ public abstract class FlinkShims {
public abstract ImmutablePair<Object, Object> createPlannerAndExecutor(
ClassLoader classLoader, Object environmentSettings, Object sEnv,
- Object tableConfig, Object functionCatalog, Object catalogManager);
+ Object tableConfig, Object moduleManager, Object functionCatalog, Object catalogManager);
+
+ public abstract Object createBlinkPlannerEnvSettingBuilder();
+
+ public abstract Object createOldPlannerEnvSettingBuilder();
public abstract InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch);
}
diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
index 757e7a487b..5088a7f623 100644
--- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
+++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
@@ -48,6 +48,7 @@ import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.utils.PrintUtils;
@@ -94,6 +95,11 @@ public class Flink112Shims extends FlinkShims {
this.streamSqlInterpreter = new Flink112SqlInterpreter(flinkSqlContext, false);
}
+ @Override
+ public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager) {
+ return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager);
+ }
+
@Override
public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
// do nothing
@@ -265,7 +271,7 @@ public class Flink112Shims extends FlinkShims {
@Override
public ImmutablePair<Object, Object> createPlannerAndExecutor(
ClassLoader classLoader, Object environmentSettings, Object sEnv,
- Object tableConfig, Object functionCatalog, Object catalogManager) {
+ Object tableConfig, Object moduleManager, Object functionCatalog, Object catalogManager) {
EnvironmentSettings settings = (EnvironmentSettings) environmentSettings;
Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv);
Map<String, String> plannerProperties = settings.toPlannerProperties();
@@ -276,6 +282,17 @@ public class Flink112Shims extends FlinkShims {
return ImmutablePair.of(planner, executor);
}
+ @Override
+ public Object createBlinkPlannerEnvSettingBuilder() {
+ return EnvironmentSettings.newInstance().useBlinkPlanner();
+ }
+
+ @Override
+ public Object createOldPlannerEnvSettingBuilder() {
+ return EnvironmentSettings.newInstance().useOldPlanner();
+ }
+
+ @Override
public InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch) {
if (isBatch) {
return batchSqlInterpreter.runSqlList(st, context);
diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
index 792174ad94..1df3f92951 100644
--- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
+++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
@@ -51,6 +51,7 @@ import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.utils.PrintUtils;
@@ -97,6 +98,11 @@ public class Flink113Shims extends FlinkShims {
this.streamSqlInterpreter = new Flink113SqlInterpreter(flinkSqlContext, false);
}
+ @Override
+ public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager) {
+ return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager);
+ }
+
@Override
public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
// do nothing
@@ -285,7 +291,7 @@ public class Flink113Shims extends FlinkShims {
@Override
public ImmutablePair<Object, Object> createPlannerAndExecutor(
ClassLoader classLoader, Object environmentSettings, Object sEnv,
- Object tableConfig, Object functionCatalog, Object catalogManager) {
+ Object tableConfig, Object moduleManager, Object functionCatalog, Object catalogManager) {
EnvironmentSettings settings = (EnvironmentSettings) environmentSettings;
Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv);
Map<String, String> plannerProperties = settings.toPlannerProperties();
@@ -296,6 +302,17 @@ public class Flink113Shims extends FlinkShims {
return ImmutablePair.of(planner, executor);
}
+ @Override
+ public Object createBlinkPlannerEnvSettingBuilder() {
+ return EnvironmentSettings.newInstance().useBlinkPlanner();
+ }
+
+ @Override
+ public Object createOldPlannerEnvSettingBuilder() {
+ return EnvironmentSettings.newInstance().useOldPlanner();
+ }
+
+ @Override
public InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch) {
if (isBatch) {
return batchSqlInterpreter.runSqlList(st, context);
diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
index 2e4e4b42ca..9660c9f469 100644
--- a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
+++ b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
@@ -49,6 +49,7 @@ import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.utils.PrintUtils;
@@ -93,6 +94,11 @@ public class Flink114Shims extends FlinkShims {
this.streamSqlInterpreter = new Flink114SqlInterpreter(flinkSqlContext, false);
}
+ @Override
+ public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager) {
+ return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager);
+ }
+
@Override
public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
// do nothing
@@ -285,7 +291,7 @@ public class Flink114Shims extends FlinkShims {
@Override
public ImmutablePair<Object, Object> createPlannerAndExecutor(
ClassLoader classLoader, Object environmentSettings, Object sEnv,
- Object tableConfig, Object functionCatalog, Object catalogManager) {
+ Object tableConfig, Object moduleManager, Object functionCatalog, Object catalogManager) {
EnvironmentSettings settings = (EnvironmentSettings) environmentSettings;
Executor executor = (Executor) lookupExecutor(classLoader, environmentSettings, sEnv);
Planner planner = PlannerFactoryUtil.createPlanner(settings.getPlanner(), executor,
@@ -295,6 +301,16 @@ public class Flink114Shims extends FlinkShims {
return ImmutablePair.of(planner, executor);
}
+ @Override
+ public Object createBlinkPlannerEnvSettingBuilder() {
+ return EnvironmentSettings.newInstance().useBlinkPlanner();
+ }
+
+ @Override
+ public Object createOldPlannerEnvSettingBuilder() {
+ return EnvironmentSettings.newInstance().useOldPlanner();
+ }
+
public InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch) {
if (isBatch) {
return batchSqlInterpreter.runSqlList(st, context);
diff --git a/flink/flink1.15-shims/pom.xml b/flink/flink1.15-shims/pom.xml
new file mode 100644
index 0000000000..6921b06648
--- /dev/null
+++ b/flink/flink1.15-shims/pom.xml
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-parent</artifactId>
+ <groupId>org.apache.zeppelin</groupId>
+ <version>0.11.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>flink1.15-shims</artifactId>
+ <version>0.11.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+ <name>Zeppelin: Flink1.15 Shims</name>
+
+ <properties>
+ <flink.version>${flink1.15.version}</flink.version>
+ <flink.scala.binary.version>2.12</flink.scala.binary.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>flink-shims</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-scala_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-scala-bridge_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-scala_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-python_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>eclipse-add-source</id>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>scala-test-compile-first</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <scalaVersion>${flink.scala.version}</scalaVersion>
+ <args>
+ <arg>-unchecked</arg>
+ <arg>-deprecation</arg>
+ <arg>-feature</arg>
+ <arg>-target:jvm-1.8</arg>
+ </args>
+ <jvmArgs>
+ <jvmArg>-Xms1024m</jvmArg>
+ <jvmArg>-Xmx1024m</jvmArg>
+ <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg>
+ </jvmArgs>
+ <javacArgs>
+ <javacArg>-source</javacArg>
+ <javacArg>${java.version}</javacArg>
+ <javacArg>-target</javacArg>
+ <javacArg>${java.version}</javacArg>
+ <javacArg>-Xlint:all,-serial,-path,-options</javacArg>
+ </javacArgs>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-interpreter-setting</id>
+ <phase>none</phase>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java
similarity index 84%
copy from flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
copy to flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java
index 2e4e4b42ca..98c250773d 100644
--- a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
+++ b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java
@@ -49,13 +49,13 @@ import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.utils.PrintUtils;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.FlinkException;
-import org.apache.zeppelin.flink.shims114.CollectStreamTableSink;
+import org.apache.zeppelin.flink.shims115.CollectStreamTableSink;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.slf4j.Logger;
@@ -72,25 +72,30 @@ import java.util.Properties;
/**
- * Shims for flink 1.14
+ * Shims for flink 1.15
*/
-public class Flink114Shims extends FlinkShims {
+public class Flink115Shims extends FlinkShims {
- private static final Logger LOGGER = LoggerFactory.getLogger(Flink114Shims.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(Flink115Shims.class);
- private Flink114SqlInterpreter batchSqlInterpreter;
- private Flink114SqlInterpreter streamSqlInterpreter;
+ private Flink115SqlInterpreter batchSqlInterpreter;
+ private Flink115SqlInterpreter streamSqlInterpreter;
- public Flink114Shims(FlinkVersion flinkVersion, Properties properties) {
+ public Flink115Shims(FlinkVersion flinkVersion, Properties properties) {
super(flinkVersion, properties);
}
public void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext) {
- this.batchSqlInterpreter = new Flink114SqlInterpreter(flinkSqlContext, true);
+ this.batchSqlInterpreter = new Flink115SqlInterpreter(flinkSqlContext, true);
}
public void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext) {
- this.streamSqlInterpreter = new Flink114SqlInterpreter(flinkSqlContext, false);
+ this.streamSqlInterpreter = new Flink115SqlInterpreter(flinkSqlContext, false);
+ }
+
+ @Override
+ public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager) {
+ return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager);
}
@Override
@@ -98,7 +103,6 @@ public class Flink114Shims extends FlinkShims {
// do nothing
}
-
@Override
public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) {
return new StreamExecutionEnvironmentFactory() {
@@ -172,14 +176,12 @@ public class Flink114Shims extends FlinkShims {
@Override
public Object fromDataSet(Object btenv, Object ds) {
- return null;
- //return Flink114ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds);
+ throw new RuntimeException("Conversion from DataSet is not supported in Flink 1.15");
}
@Override
public Object toDataSet(Object btenv, Object table) {
- return null;
- //return Flink114ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table);
+ throw new RuntimeException("Conversion to DataSet is not supported in Flink 1.15");
}
@Override
@@ -190,7 +192,7 @@ public class Flink114Shims extends FlinkShims {
@Override
public void registerScalarFunction(Object btenv, String name, Object scalarFunction) {
- ((StreamTableEnvironmentImpl)(btenv)).createTemporarySystemFunction(name, (ScalarFunction) scalarFunction);
+ ((StreamTableEnvironmentImpl) (btenv)).createTemporarySystemFunction(name, (ScalarFunction) scalarFunction);
}
@Override
@@ -211,6 +213,7 @@ public class Flink114Shims extends FlinkShims {
/**
* Flink 1.11 bind CatalogManager with parser which make blink and flink could not share the same CatalogManager.
* This is a workaround which always reset CatalogTableSchemaResolver before running any flink code.
+ *
* @param catalogManager
* @param parserObject
* @param environmentSetting
@@ -224,10 +227,10 @@ public class Flink114Shims extends FlinkShims {
@Override
public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object effectiveConfig) {
- CustomCommandLine customCommandLine = ((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine);
+ CustomCommandLine customCommandLine = ((CliFrontend) cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine);
try {
- ((Configuration) effectiveConfig).addAll(customCommandLine.toConfiguration((CommandLine) commandLine));
- return effectiveConfig;
+ ((Configuration) effectiveConfig).addAll(customCommandLine.toConfiguration((CommandLine) commandLine));
+ return effectiveConfig;
} catch (FlinkException e) {
throw new RuntimeException("Fail to call addAll", e);
}
@@ -241,8 +244,7 @@ public class Flink114Shims extends FlinkShims {
@Override
public void setOldPlanner(Object tableConfig) {
- ((TableConfig) tableConfig).getConfiguration()
- .set(TableConfigOptions.TABLE_PLANNER, PlannerType.OLD);
+
}
@Override
@@ -263,12 +265,12 @@ public class Flink114Shims extends FlinkShims {
}
private Object lookupExecutor(ClassLoader classLoader,
- Object settings,
- Object sEnv) {
+ Object settings,
+ Object sEnv) {
try {
final ExecutorFactory executorFactory =
FactoryUtil.discoverFactory(
- classLoader, ExecutorFactory.class, ((EnvironmentSettings) settings).getExecutor());
+ classLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER);
final Method createMethod =
executorFactory
.getClass()
@@ -285,16 +287,27 @@ public class Flink114Shims extends FlinkShims {
@Override
public ImmutablePair<Object, Object> createPlannerAndExecutor(
ClassLoader classLoader, Object environmentSettings, Object sEnv,
- Object tableConfig, Object functionCatalog, Object catalogManager) {
+ Object tableConfig, Object moduleManager, Object functionCatalog, Object catalogManager) {
EnvironmentSettings settings = (EnvironmentSettings) environmentSettings;
Executor executor = (Executor) lookupExecutor(classLoader, environmentSettings, sEnv);
- Planner planner = PlannerFactoryUtil.createPlanner(settings.getPlanner(), executor,
+ Planner planner = PlannerFactoryUtil.createPlanner(executor,
(TableConfig) tableConfig,
+ (ModuleManager) moduleManager,
(CatalogManager) catalogManager,
(FunctionCatalog) functionCatalog);
return ImmutablePair.of(planner, executor);
}
+ @Override
+ public Object createBlinkPlannerEnvSettingBuilder() {
+ return EnvironmentSettings.newInstance();
+ }
+
+ @Override
+ public Object createOldPlannerEnvSettingBuilder() {
+ return EnvironmentSettings.newInstance();
+ }
+
public InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch) {
if (isBatch) {
return batchSqlInterpreter.runSqlList(st, context);
diff --git a/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java
new file mode 100644
index 0000000000..6c0c67fb2b
--- /dev/null
+++ b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.SqlParserException;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.*;
+import org.apache.flink.table.operations.command.HelpOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.operations.ddl.*;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.ZeppelinContext;
+import org.apache.zeppelin.interpreter.util.SqlSplitter;
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+
+public class Flink115SqlInterpreter {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Flink115SqlInterpreter.class);
+ private static final String CMD_DESC_DELIMITER = "\t\t";
+
+ /**
+ * SQL Client HELP command helper class.
+ */
+ private static final class SQLCliCommandsDescriptions {
+ private int commandMaxLength;
+ private final Map<String, String> commandsDescriptions;
+
+ public SQLCliCommandsDescriptions() {
+ this.commandsDescriptions = new LinkedHashMap<>();
+ this.commandMaxLength = -1;
+ }
+
+ public SQLCliCommandsDescriptions commandDescription(String command, String description) {
+ Preconditions.checkState(
+ StringUtils.isNotBlank(command), "content of command must not be empty.");
+ Preconditions.checkState(
+ StringUtils.isNotBlank(description),
+ "content of command's description must not be empty.");
+ this.updateMaxCommandLength(command.length());
+ this.commandsDescriptions.put(command, description);
+ return this;
+ }
+
+ private void updateMaxCommandLength(int newLength) {
+ Preconditions.checkState(newLength > 0);
+ if (this.commandMaxLength < newLength) {
+ this.commandMaxLength = newLength;
+ }
+ }
+
+ public AttributedString build() {
+ AttributedStringBuilder attributedStringBuilder = new AttributedStringBuilder();
+ if (!this.commandsDescriptions.isEmpty()) {
+ this.commandsDescriptions.forEach(
+ (cmd, cmdDesc) -> {
+ attributedStringBuilder
+ .style(AttributedStyle.DEFAULT.bold())
+ .append(
+ String.format(
+ String.format("%%-%ds", commandMaxLength), cmd))
+ .append(CMD_DESC_DELIMITER)
+ .style(AttributedStyle.DEFAULT)
+ .append(cmdDesc)
+ .append('\n');
+ });
+ }
+ return attributedStringBuilder.toAttributedString();
+ }
+ }
+
+ private static final AttributedString SQL_CLI_COMMANDS_DESCRIPTIONS =
+ new SQLCliCommandsDescriptions()
+ .commandDescription("HELP", "Prints the available commands.")
+ .commandDescription(
+ "SET",
+ "Sets a session configuration property. Syntax: \"SET '<key>'='<value>';\". Use \"SET;\" for listing all properties.")
+ .commandDescription(
+ "RESET",
+ "Resets a session configuration property. Syntax: \"RESET '<key>';\". Use \"RESET;\" for reset all session properties.")
+ .commandDescription(
+ "INSERT INTO",
+ "Inserts the results of a SQL SELECT query into a declared table sink.")
+ .commandDescription(
+ "INSERT OVERWRITE",
+ "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.")
+ .commandDescription(
+ "SELECT", "Executes a SQL SELECT query on the Flink cluster.")
+ .commandDescription(
+ "EXPLAIN",
+ "Describes the execution plan of a query or table with the given name.")
+ .commandDescription(
+ "BEGIN STATEMENT SET",
+ "Begins a statement set. Syntax: \"BEGIN STATEMENT SET;\"")
+ .commandDescription("END", "Ends a statement set. Syntax: \"END;\"")
+ // (TODO) zjffdu, ADD/REMOVE/SHOW JAR
+ .build();
+
+ // --------------------------------------------------------------------------------------------
+
+ public static final AttributedString MESSAGE_HELP =
+ new AttributedStringBuilder()
+ .append("The following commands are available:\n\n")
+ .append(SQL_CLI_COMMANDS_DESCRIPTIONS)
+ .style(AttributedStyle.DEFAULT.underline())
+ .append("\nHint")
+ .style(AttributedStyle.DEFAULT)
+ .append(
+ ": Make sure that a statement ends with \";\" for finalizing (multi-line) statements.")
+ // About Documentation Link.
+ .style(AttributedStyle.DEFAULT)
+ .append(
+ "\nYou can also type any Flink SQL statement, please visit https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/overview/ for more details.")
+ .toAttributedString();
+
+ private static final String MESSAGE_NO_STATEMENT_IN_STATEMENT_SET = "No statement in the statement set, skip submit.";
+
+ private FlinkSqlContext flinkSqlContext;
+ private TableEnvironment tbenv;
+ private ZeppelinContext z;
+ private Parser sqlParser;
+ private SqlSplitter sqlSplitter;
+ // paragraphId -> list of ModifyOperation, used for statement set in 2 syntax:
+ // 1. runAsOne= true
+ // 2. begin statement set;
+ // ...
+ // end;
+ private Map<String, List<ModifyOperation>> statementOperationsMap = new HashMap<>();
+ private boolean isBatch;
+ private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock();
+
+
+ public Flink115SqlInterpreter(FlinkSqlContext flinkSqlContext, boolean isBatch) {
+ this.flinkSqlContext = flinkSqlContext;
+ this.isBatch = isBatch;
+ if (isBatch) {
+ this.tbenv = (TableEnvironment) flinkSqlContext.getBtenv();
+ } else {
+ this.tbenv = (TableEnvironment) flinkSqlContext.getStenv();
+ }
+ this.z = (ZeppelinContext) flinkSqlContext.getZeppelinContext();
+ this.sqlParser = ((TableEnvironmentInternal) tbenv).getParser();
+ this.sqlSplitter = new SqlSplitter();
+ JobListener jobListener = new JobListener() {
+ @Override
+ public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ LOGGER.info("UnLock JobSubmitLock");
+ }
+ }
+
+ @Override
+ public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
+
+ }
+ };
+
+ ((ExecutionEnvironment) flinkSqlContext.getBenv()).registerJobListener(jobListener);
+ ((StreamExecutionEnvironment) flinkSqlContext.getSenv()).registerJobListener(jobListener);
+ }
+
+ public InterpreterResult runSqlList(String st, InterpreterContext context) {
+ try {
+ boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
+ if (runAsOne) {
+ statementOperationsMap.put(context.getParagraphId(), new ArrayList<>());
+ }
+
+ String jobName = context.getLocalProperties().get("jobName");
+ if (StringUtils.isNotBlank(jobName)) {
+ tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName);
+ }
+
+ List<String> sqls = sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList());
+ for (String sql : sqls) {
+ List<Operation> operations = null;
+ try {
+ operations = sqlParser.parse(sql);
+ } catch (SqlParserException e) {
+ context.out.write("%text Invalid Sql statement: " + sql + "\n");
+ context.out.write(MESSAGE_HELP.toString());
+ return new InterpreterResult(InterpreterResult.Code.ERROR, e.toString());
+ }
+
+ try {
+ callOperation(sql, operations.get(0), context);
+ context.out.flush();
+ } catch (Throwable e) {
+ LOGGER.error("Fail to run sql:" + sql, e);
+ try {
+ context.out.write("%text Fail to run sql command: " +
+ sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n");
+ } catch (IOException ex) {
+ LOGGER.warn("Unexpected exception:", ex);
+ return new InterpreterResult(InterpreterResult.Code.ERROR,
+ ExceptionUtils.getStackTrace(e));
+ }
+ return new InterpreterResult(InterpreterResult.Code.ERROR);
+ }
+ }
+
+ if (runAsOne && !statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()).isEmpty()) {
+ try {
+ lock.lock();
+ List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>());
+ if (!modifyOperations.isEmpty()) {
+ callInserts(modifyOperations, context);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Fail to execute sql as one job", e);
+ return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
+ } finally {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Fail to execute sql", e);
+ return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
+ } finally {
+ statementOperationsMap.remove(context.getParagraphId());
+ }
+
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS);
+ }
+
+ private void callOperation(String sql, Operation operation, InterpreterContext context) throws IOException {
+ if (operation instanceof HelpOperation) {
+ // HELP
+ callHelp(context);
+ } else if (operation instanceof SetOperation) {
+ // SET
+ callSet((SetOperation) operation, context);
+ } else if (operation instanceof ModifyOperation) {
+ // INSERT INTO/OVERWRITE
+ callInsert((ModifyOperation) operation, context);
+ } else if (operation instanceof QueryOperation) {
+ // SELECT
+ callSelect(sql, (QueryOperation) operation, context);
+ } else if (operation instanceof ExplainOperation) {
+ // EXPLAIN
+ callExplain((ExplainOperation) operation, context);
+ } else if (operation instanceof BeginStatementSetOperation) {
+ // BEGIN STATEMENT SET
+ callBeginStatementSet(context);
+ } else if (operation instanceof EndStatementSetOperation) {
+ // END
+ callEndStatementSet(context);
+ } else if (operation instanceof ShowCreateTableOperation) {
+ // SHOW CREATE TABLE
+ callShowCreateTable((ShowCreateTableOperation) operation, context);
+ } else if (operation instanceof ShowCatalogsOperation) {
+ callShowCatalogs(context);
+ } else if (operation instanceof ShowCurrentCatalogOperation) {
+ callShowCurrentCatalog(context);
+ } else if (operation instanceof UseCatalogOperation) {
+ callUseCatalog(((UseCatalogOperation) operation).getCatalogName(), context);
+ } else if (operation instanceof CreateCatalogOperation) {
+ callDDL(sql, context, "Catalog has been created.");
+ } else if (operation instanceof DropCatalogOperation) {
+ callDDL(sql, context, "Catalog has been dropped.");
+ } else if (operation instanceof UseDatabaseOperation) {
+ UseDatabaseOperation useDBOperation = (UseDatabaseOperation) operation;
+ callUseDatabase(useDBOperation.getDatabaseName(), context);
+ } else if (operation instanceof CreateDatabaseOperation) {
+ callDDL(sql, context, "Database has been created.");
+ } else if (operation instanceof DropDatabaseOperation) {
+ callDDL(sql, context, "Database has been removed.");
+ } else if (operation instanceof AlterDatabaseOperation) {
+ callDDL(sql, context, "Alter database succeeded!");
+ } else if (operation instanceof ShowDatabasesOperation) {
+ callShowDatabases(context);
+ } else if (operation instanceof ShowCurrentDatabaseOperation) {
+ callShowCurrentDatabase(context);
+ } else if (operation instanceof CreateTableOperation || operation instanceof CreateTableASOperation) {
+ callDDL(sql, context, "Table has been created.");
+ } else if (operation instanceof AlterTableOperation) {
+ callDDL(sql, context, "Alter table succeeded!");
+ } else if (operation instanceof DropTableOperation) {
+ callDDL(sql, context, "Table has been dropped.");
+ } else if (operation instanceof DescribeTableOperation) {
+ DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation;
+ callDescribe(describeTableOperation.getSqlIdentifier().getObjectName(), context);
+ } else if (operation instanceof ShowTablesOperation) {
+ callShowTables(context);
+ } else if (operation instanceof CreateViewOperation) {
+ callDDL(sql, context, "View has been created.");
+ } else if (operation instanceof DropViewOperation) {
+ callDDL(sql, context, "View has been dropped.");
+ } else if (operation instanceof AlterViewOperation) {
+ callDDL(sql, context, "Alter view succeeded!");
+ } else if (operation instanceof CreateCatalogFunctionOperation || operation instanceof CreateTempSystemFunctionOperation) {
+ callDDL(sql, context, "Function has been created.");
+ } else if (operation instanceof DropCatalogFunctionOperation || operation instanceof DropTempSystemFunctionOperation) {
+ callDDL(sql, context, "Function has been removed.");
+ } else if (operation instanceof AlterCatalogFunctionOperation) {
+ callDDL(sql, context, "Alter function succeeded!");
+ } else if (operation instanceof ShowFunctionsOperation) {
+ callShowFunctions(context);
+ } else if (operation instanceof ShowModulesOperation) {
+ callShowModules(context);
+ } else if (operation instanceof ShowPartitionsOperation) {
+ ShowPartitionsOperation showPartitionsOperation = (ShowPartitionsOperation) operation;
+ callShowPartitions(showPartitionsOperation.asSummaryString(), context);
+ } else {
+ throw new IOException(operation.getClass().getName() + " is not supported");
+ }
+ }
+
+
+ private void callHelp(InterpreterContext context) throws IOException {
+ context.out.write(MESSAGE_HELP.toString() + "\n");
+ }
+
+ private void callInsert(ModifyOperation operation, InterpreterContext context) throws IOException {
+ if (statementOperationsMap.containsKey(context.getParagraphId())) {
+ List<ModifyOperation> modifyOperations = statementOperationsMap.get(context.getParagraphId());
+ modifyOperations.add(operation);
+ } else {
+ callInserts(Collections.singletonList(operation), context);
+ }
+ }
+
+ private void callInserts(List<ModifyOperation> operations, InterpreterContext context) throws IOException {
+ if (!isBatch) {
+ context.getLocalProperties().put("flink.streaming.insert_into", "true");
+ }
+ TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations);
+ checkState(tableResult.getJobClient().isPresent());
+ try {
+ tableResult.await();
+ JobClient jobClient = tableResult.getJobClient().get();
+ if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
+ context.out.write("Insertion successfully.\n");
+ } else {
+ throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString());
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Flink job is interrupted", e);
+ } catch (ExecutionException e) {
+ throw new IOException("Flink job is failed", e);
+ }
+ }
+
+ private void callShowCreateTable(ShowCreateTableOperation showCreateTableOperation, InterpreterContext context) throws IOException {
+ try {
+ lock.lock();
+ TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(showCreateTableOperation);
+ String explanation =
+ Objects.requireNonNull(tableResult.collect().next().getField(0)).toString();
+ context.out.write(explanation + "\n");
+ } finally {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ }
+
+ private void callExplain(ExplainOperation explainOperation, InterpreterContext context) throws IOException {
+ try {
+ lock.lock();
+ TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(explainOperation);
+ String explanation =
+ Objects.requireNonNull(tableResult.collect().next().getField(0)).toString();
+ context.out.write(explanation + "\n");
+ } finally {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ }
+
+ public void callSelect(String sql, QueryOperation queryOperation, InterpreterContext context) throws IOException {
+ try {
+ lock.lock();
+ if (isBatch) {
+ callBatchInnerSelect(sql, context);
+ } else {
+ callStreamInnerSelect(sql, context);
+ }
+ } finally {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ }
+
+ public void callBatchInnerSelect(String sql, InterpreterContext context) throws IOException {
+ Table table = this.tbenv.sqlQuery(sql);
+ String result = z.showData(table);
+ context.out.write(result);
+ }
+
+ public void callStreamInnerSelect(String sql, InterpreterContext context) throws IOException {
+ flinkSqlContext.getStreamSqlSelectConsumer().accept(sql);
+ }
+
+ public void callSet(SetOperation setOperation, InterpreterContext context) throws IOException {
+ if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) {
+ // set a property
+ String key = setOperation.getKey().get().trim();
+ String value = setOperation.getValue().get().trim();
+ this.tbenv.getConfig().getConfiguration().setString(key, value);
+ LOGGER.info("Set table config: {}={}", key, value);
+ } else {
+ // show all properties
+ final Map<String, String> properties = this.tbenv.getConfig().getConfiguration().toMap();
+ List<String> prettyEntries = new ArrayList<>();
+ for (String key : properties.keySet()) {
+ prettyEntries.add(
+ String.format(
+ "'%s' = '%s'",
+ EncodingUtils.escapeSingleQuotes(key),
+ EncodingUtils.escapeSingleQuotes(properties.get(key))));
+ }
+ prettyEntries.sort(String::compareTo);
+ prettyEntries.forEach(entry -> {
+ try {
+ context.out.write(entry + "\n");
+ } catch (IOException e) {
+ LOGGER.warn("Fail to write output", e);
+ }
+ });
+ }
+ }
+
+ private void callBeginStatementSet(InterpreterContext context) throws IOException {
+ statementOperationsMap.put(context.getParagraphId(), new ArrayList<>());
+ }
+
+ private void callEndStatementSet(InterpreterContext context) throws IOException {
+ List<ModifyOperation> modifyOperations = statementOperationsMap.get(context.getParagraphId());
+ if (modifyOperations != null && !modifyOperations.isEmpty()) {
+ callInserts(modifyOperations, context);
+ } else {
+ context.out.write(MESSAGE_NO_STATEMENT_IN_STATEMENT_SET);
+ }
+ }
+
+ private void callUseCatalog(String catalog, InterpreterContext context) throws IOException {
+ tbenv.executeSql("USE CATALOG `" + catalog + "`");
+ }
+
+ private void callUseDatabase(String databaseName,
+ InterpreterContext context) throws IOException {
+ this.tbenv.executeSql("USE `" + databaseName + "`");
+ }
+
+ private void callShowCatalogs(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Catalogs");
+ List<String> catalogs = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+ .map(r -> checkNotNull(r.getField(0)).toString())
+ .collect(Collectors.toList());
+ context.out.write("%table catalog\n" + StringUtils.join(catalogs, "\n") + "\n");
+ }
+
+ private void callShowCurrentCatalog(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Current Catalog");
+ String catalog = tableResult.collect().next().getField(0).toString();
+ context.out.write("%text current catalog: " + catalog + "\n");
+ }
+
+ private void callShowDatabases(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Databases");
+ List<String> databases = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+ .map(r -> checkNotNull(r.getField(0)).toString())
+ .collect(Collectors.toList());
+ context.out.write(
+ "%table database\n" + StringUtils.join(databases, "\n") + "\n");
+ }
+
+ private void callShowCurrentDatabase(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Current Database");
+ String database = tableResult.collect().next().getField(0).toString();
+ context.out.write("%text current database: " + database + "\n");
+ }
+
+ private void callShowTables(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Tables");
+ List<String> tables = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+ .map(r -> checkNotNull(r.getField(0)).toString())
+ .filter(tbl -> !tbl.startsWith("UnnamedTable"))
+ .collect(Collectors.toList());
+ context.out.write(
+ "%table table\n" + StringUtils.join(tables, "\n") + "\n");
+ }
+
+ private void callShowFunctions(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Functions");
+ List<String> functions = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+ .map(r -> checkNotNull(r.getField(0)).toString())
+ .collect(Collectors.toList());
+ context.out.write(
+ "%table function\n" + StringUtils.join(functions, "\n") + "\n");
+ }
+
+ private void callShowModules(InterpreterContext context) throws IOException {
+ String[] modules = this.tbenv.listModules();
+ context.out.write("%table modules\n" + StringUtils.join(modules, "\n") + "\n");
+ }
+
+ private void callShowPartitions(String sql, InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql(sql);
+ List<String> partions = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+ .map(r -> checkNotNull(r.getField(0)).toString())
+ .collect(Collectors.toList());
+ context.out.write(
+ "%table partitions\n" + StringUtils.join(partions, "\n") + "\n");
+ }
+
+ private void callDDL(String sql, InterpreterContext context, String message) throws IOException {
+ try {
+ lock.lock();
+ this.tbenv.executeSql(sql);
+ } finally {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ context.out.write(message + "\n");
+ }
+
+ private void callDescribe(String name, InterpreterContext context) throws IOException {
+ TableResult tableResult = null;
+ try {
+ tableResult = tbenv.executeSql("DESCRIBE " + name);
+ } catch (Exception e) {
+ throw new IOException("Fail to describe table: " + name, e);
+ }
+ CloseableIterator<Row> result = tableResult.collect();
+ StringBuilder builder = new StringBuilder();
+ builder.append("Column\tType\n");
+ while (result.hasNext()) {
+ Row row = result.next();
+ builder.append(row.getField(0) + "\t" + row.getField(1) + "\n");
+ }
+ context.out.write("%table\n" + builder.toString());
+ }
+}
diff --git a/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java
new file mode 100644
index 0000000000..a35ad3a6cd
--- /dev/null
+++ b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.*;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.StringUtils;
+
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static org.apache.zeppelin.flink.TimestampStringUtils.*;
+
+/**
+ * Copied from flink-project with minor modification.
+ * */
+public class PrintUtils {
+
+ public static final String NULL_COLUMN = "(NULL)";
+ private static final String COLUMN_TRUNCATED_FLAG = "...";
+
+ private PrintUtils() {}
+
+
+ public static String[] rowToString(
+ Row row, ResolvedSchema resolvedSchema, ZoneId sessionTimeZone) {
+ return rowToString(row, NULL_COLUMN, false, resolvedSchema, sessionTimeZone);
+ }
+
+ public static String[] rowToString(
+ Row row,
+ String nullColumn,
+ boolean printRowKind,
+ ResolvedSchema resolvedSchema,
+ ZoneId sessionTimeZone) {
+ final int len = printRowKind ? row.getArity() + 1 : row.getArity();
+ final List<String> fields = new ArrayList<>(len);
+ if (printRowKind) {
+ fields.add(row.getKind().shortString());
+ }
+ for (int i = 0; i < row.getArity(); i++) {
+ final Object field = row.getField(i);
+ final LogicalType fieldType =
+ resolvedSchema.getColumnDataTypes().get(i).getLogicalType();
+ if (field == null) {
+ fields.add(nullColumn);
+ } else {
+ fields.add(
+ StringUtils.arrayAwareToString(
+ formattedTimestamp(field, fieldType, sessionTimeZone)));
+ }
+ }
+ return fields.toArray(new String[0]);
+ }
+
+ /**
+ * Normalizes field that contains TIMESTAMP, TIMESTAMP_LTZ and TIME type data.
+ *
+ * <p>This method also supports nested type ARRAY, ROW, MAP.
+ */
+ private static Object formattedTimestamp(
+ Object field, LogicalType fieldType, ZoneId sessionTimeZone) {
+ final LogicalTypeRoot typeRoot = fieldType.getTypeRoot();
+ if (field == null) {
+ return "null";
+ }
+ switch (typeRoot) {
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return formatTimestampField(field, fieldType, sessionTimeZone);
+ case TIME_WITHOUT_TIME_ZONE:
+ return formatTimeField(field);
+ case ARRAY:
+ LogicalType elementType = ((ArrayType) fieldType).getElementType();
+ if (field instanceof List) {
+ List<?> array = (List<?>) field;
+ Object[] formattedArray = new Object[array.size()];
+ for (int i = 0; i < array.size(); i++) {
+ formattedArray[i] =
+ formattedTimestamp(array.get(i), elementType, sessionTimeZone);
+ }
+ return formattedArray;
+ } else if (field.getClass().isArray()) {
+ // primitive type
+ if (field.getClass() == byte[].class) {
+ byte[] array = (byte[]) field;
+ Object[] formattedArray = new Object[array.length];
+ for (int i = 0; i < array.length; i++) {
+ formattedArray[i] =
+ formattedTimestamp(array[i], elementType, sessionTimeZone);
+ }
+ return formattedArray;
+ } else if (field.getClass() == short[].class) {
+ short[] array = (short[]) field;
+ Object[] formattedArray = new Object[array.length];
+ for (int i = 0; i < array.length; i++) {
+ formattedArray[i] =
+ formattedTimestamp(array[i], elementType, sessionTimeZone);
+ }
+ return formattedArray;
+ } else if (field.getClass() == int[].class) {
+ int[] array = (int[]) field;
+ Object[] formattedArray = new Object[array.length];
+ for (int i = 0; i < array.length; i++) {
+ formattedArray[i] =
+ formattedTimestamp(array[i], elementType, sessionTimeZone);
+ }
+ return formattedArray;
+ } else if (field.getClass() == long[].class) {
+ long[] array = (long[]) field;
+ Object[] formattedArray = new Object[array.length];
+ for (int i = 0; i < array.length; i++) {
+ formattedArray[i] =
+ formattedTimestamp(array[i], elementType, sessionTimeZone);
+ }
+ return formattedArray;
+ } else if (field.getClass() == float[].class) {
+ float[] array = (float[]) field;
+ Object[] formattedArray = new Object[array.length];
+ for (int i = 0; i < array.length; i++) {
+ formattedArray[i] =
+ formattedTimestamp(array[i], elementType, sessionTimeZone);
+ }
+ return formattedArray;
+ } else if (field.getClass() == double[].class) {
+ double[] array = (double[]) field;
+ Object[] formattedArray = new Object[array.length];
+ for (int i = 0; i < array.length; i++) {
+ formattedArray[i] =
+ formattedTimestamp(array[i], elementType, sessionTimeZone);
+ }
+ return formattedArray;
+ } else if (field.getClass() == boolean[].class) {
+ boolean[] array = (boolean[]) field;
+ Object[] formattedArray = new Object[array.length];
+ for (int i = 0; i < array.length; i++) {
+ formattedArray[i] =
+ formattedTimestamp(array[i], elementType, sessionTimeZone);
+ }
+ return formattedArray;
+ } else if (field.getClass() == char[].class) {
+ char[] array = (char[]) field;
+ Object[] formattedArray = new Object[array.length];
+ for (int i = 0; i < array.length; i++) {
+ formattedArray[i] =
+ formattedTimestamp(array[i], elementType, sessionTimeZone);
+ }
+ return formattedArray;
+ } else {
+ // non-primitive type
+ Object[] array = (Object[]) field;
+ Object[] formattedArray = new Object[array.length];
+ for (int i = 0; i < array.length; i++) {
+ formattedArray[i] =
+ formattedTimestamp(array[i], elementType, sessionTimeZone);
+ }
+ return formattedArray;
+ }
+ } else {
+ return field;
+ }
+ case ROW:
+ if (fieldType instanceof RowType && field instanceof Row) {
+ Row row = (Row) field;
+ Row formattedRow = new Row(row.getKind(), row.getArity());
+ for (int i = 0; i < ((RowType) fieldType).getFields().size(); i++) {
+ LogicalType type = ((RowType) fieldType).getFields().get(i).getType();
+ formattedRow.setField(
+ i, formattedTimestamp(row.getField(i), type, sessionTimeZone));
+ }
+ return formattedRow;
+
+ } else if (fieldType instanceof RowType && field instanceof RowData) {
+ RowData rowData = (RowData) field;
+ Row formattedRow = new Row(rowData.getRowKind(), rowData.getArity());
+ for (int i = 0; i < ((RowType) fieldType).getFields().size(); i++) {
+ LogicalType type = ((RowType) fieldType).getFields().get(i).getType();
+ RowData.FieldGetter fieldGetter = RowData.createFieldGetter(type, i);
+ formattedRow.setField(
+ i,
+ formattedTimestamp(
+ fieldGetter.getFieldOrNull(rowData),
+ type,
+ sessionTimeZone));
+ }
+ return formattedRow;
+ } else {
+ return field;
+ }
+ case MAP:
+ LogicalType keyType = ((MapType) fieldType).getKeyType();
+ LogicalType valueType = ((MapType) fieldType).getValueType();
+ if (fieldType instanceof MapType && field instanceof Map) {
+ Map<Object, Object> map = ((Map) field);
+ Map<Object, Object> formattedMap = new HashMap<>(map.size());
+ for (Object key : map.keySet()) {
+ formattedMap.put(
+ formattedTimestamp(key, keyType, sessionTimeZone),
+ formattedTimestamp(map.get(key), valueType, sessionTimeZone));
+ }
+ return formattedMap;
+ } else if (fieldType instanceof MapType && field instanceof MapData) {
+ MapData map = ((MapData) field);
+ Map<Object, Object> formattedMap = new HashMap<>(map.size());
+ Object[] keyArray =
+ (Object[]) formattedTimestamp(map.keyArray(), keyType, sessionTimeZone);
+ Object[] valueArray =
+ (Object[])
+ formattedTimestamp(
+ map.valueArray(), valueType, sessionTimeZone);
+ for (int i = 0; i < keyArray.length; i++) {
+ formattedMap.put(keyArray[i], valueArray[i]);
+ }
+ return formattedMap;
+ } else {
+ return field;
+ }
+ default:
+ return field;
+ }
+ }
+
+ /**
+ * Formats the print content of TIMESTAMP and TIMESTAMP_LTZ type data, consider the user
+ * configured time zone.
+ */
+ private static Object formatTimestampField(
+ Object timestampField, LogicalType fieldType, ZoneId sessionTimeZone) {
+ switch (fieldType.getTypeRoot()) {
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ final int precision = getPrecision(fieldType);
+ if (timestampField instanceof java.sql.Timestamp) {
+ // conversion between java.sql.Timestamp and TIMESTAMP_WITHOUT_TIME_ZONE
+ return timestampToString(
+ ((Timestamp) timestampField).toLocalDateTime(), precision);
+ } else if (timestampField instanceof java.time.LocalDateTime) {
+ return timestampToString(((LocalDateTime) timestampField), precision);
+ } else if (timestampField instanceof TimestampData) {
+ return timestampToString(
+ ((TimestampData) timestampField).toLocalDateTime(), precision);
+ } else {
+ return timestampField;
+ }
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ Instant instant = null;
+ if (timestampField instanceof java.time.Instant) {
+ instant = ((Instant) timestampField);
+ } else if (timestampField instanceof java.sql.Timestamp) {
+ Timestamp timestamp = ((Timestamp) timestampField);
+ // conversion between java.sql.Timestamp and TIMESTAMP_WITH_LOCAL_TIME_ZONE
+ instant =
+ TimestampData.fromEpochMillis(
+ timestamp.getTime(), timestamp.getNanos() % 1000_000)
+ .toInstant();
+ } else if (timestampField instanceof TimestampData) {
+ instant = ((TimestampData) timestampField).toInstant();
+ } else if (timestampField instanceof Integer) {
+ instant = Instant.ofEpochSecond((Integer) timestampField);
+ } else if (timestampField instanceof Long) {
+ instant = Instant.ofEpochMilli((Long) timestampField);
+ }
+ if (instant != null) {
+ return timestampToString(
+ instant.atZone(sessionTimeZone).toLocalDateTime(),
+ getPrecision(fieldType));
+ } else {
+ return timestampField;
+ }
+ default:
+ return timestampField;
+ }
+ }
+
+ /** Formats the print content of TIME type data. */
+ private static Object formatTimeField(Object timeField) {
+ if (timeField.getClass().isAssignableFrom(int.class) || timeField instanceof Integer) {
+ return unixTimeToString((int) timeField);
+ } else if (timeField.getClass().isAssignableFrom(long.class) || timeField instanceof Long) {
+ return unixTimeToString(((Long) timeField).intValue());
+ } else if (timeField instanceof Time) {
+ return unixTimeToString(timeToInternal((Time) timeField));
+ } else if (timeField instanceof LocalTime) {
+ return unixTimeToString(localTimeToUnixDate((LocalTime) timeField));
+ } else {
+ return timeField;
+ }
+ }
+}
diff --git a/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java
new file mode 100644
index 0000000000..c52104e45a
--- /dev/null
+++ b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import java.sql.Time;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.TimeZone;
+
+/**
+ * Copied from flink-project with minor modification.
+ * */
+public class TimestampStringUtils {
+
+ private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+ public TimestampStringUtils() {
+ }
+
+ public static String timestampToString(LocalDateTime ldt, int precision) {
+ String fraction;
+ for(fraction = pad(9, (long)ldt.getNano()); fraction.length() > precision && fraction.endsWith("0"); fraction = fraction.substring(0, fraction.length() - 1)) {
+ }
+
+ StringBuilder ymdhms = ymdhms(new StringBuilder(), ldt.getYear(), ldt.getMonthValue(), ldt.getDayOfMonth(), ldt.getHour(), ldt.getMinute(), ldt.getSecond());
+ if (fraction.length() > 0) {
+ ymdhms.append(".").append(fraction);
+ }
+
+ return ymdhms.toString();
+ }
+
+ private static String pad(int length, long v) {
+ StringBuilder s = new StringBuilder(Long.toString(v));
+
+ while(s.length() < length) {
+ s.insert(0, "0");
+ }
+
+ return s.toString();
+ }
+
+ private static StringBuilder hms(StringBuilder b, int h, int m, int s) {
+ int2(b, h);
+ b.append(':');
+ int2(b, m);
+ b.append(':');
+ int2(b, s);
+ return b;
+ }
+
+ private static StringBuilder ymdhms(StringBuilder b, int year, int month, int day, int h, int m, int s) {
+ ymd(b, year, month, day);
+ b.append(' ');
+ hms(b, h, m, s);
+ return b;
+ }
+
+ private static StringBuilder ymd(StringBuilder b, int year, int month, int day) {
+ int4(b, year);
+ b.append('-');
+ int2(b, month);
+ b.append('-');
+ int2(b, day);
+ return b;
+ }
+
+ private static void int4(StringBuilder buf, int i) {
+ buf.append((char)(48 + i / 1000 % 10));
+ buf.append((char)(48 + i / 100 % 10));
+ buf.append((char)(48 + i / 10 % 10));
+ buf.append((char)(48 + i % 10));
+ }
+
+ private static void int2(StringBuilder buf, int i) {
+ buf.append((char)(48 + i / 10 % 10));
+ buf.append((char)(48 + i % 10));
+ }
+
+ public static String unixTimeToString(int time) {
+ StringBuilder buf = new StringBuilder(8);
+ unixTimeToString(buf, time, 0);
+ return buf.toString();
+ }
+
+ private static void unixTimeToString(StringBuilder buf, int time, int precision) {
+ while(time < 0) {
+ time = (int)((long)time + 86400000L);
+ }
+
+ int h = time / 3600000;
+ int time2 = time % 3600000;
+ int m = time2 / '\uea60';
+ int time3 = time2 % '\uea60';
+ int s = time3 / 1000;
+ int ms = time3 % 1000;
+ int2(buf, h);
+ buf.append(':');
+ int2(buf, m);
+ buf.append(':');
+ int2(buf, s);
+ if (precision > 0) {
+ buf.append('.');
+
+ while(precision > 0) {
+ buf.append((char)(48 + ms / 100));
+ ms %= 100;
+ ms *= 10;
+ if (ms == 0) {
+ break;
+ }
+
+ --precision;
+ }
+ }
+
+ }
+
+ public static int timeToInternal(Time time) {
+ long ts = time.getTime() + (long)LOCAL_TZ.getOffset(time.getTime());
+ return (int)(ts % 86400000L);
+ }
+
+ public static int localTimeToUnixDate(LocalTime time) {
+ return time.getHour() * 3600000 + time.getMinute() * '\uea60' + time.getSecond() * 1000 + time.getNano() / 1000000;
+ }
+}
diff --git a/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/shims115/CollectStreamTableSink.java b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/shims115/CollectStreamTableSink.java
new file mode 100644
index 0000000000..0025389b26
--- /dev/null
+++ b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/shims115/CollectStreamTableSink.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.shims115;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.experimental.CollectSink;
+import org.apache.flink.table.sinks.RetractStreamTableSink;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.UUID;
+
+/**
+ * Table sink for collecting the results locally using sockets.
+ */
+public class CollectStreamTableSink implements RetractStreamTableSink<Row> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CollectStreamTableSink.class);
+
+ private final InetAddress targetAddress;
+ private final int targetPort;
+ private final TypeSerializer<Tuple2<Boolean, Row>> serializer;
+
+ private String[] fieldNames;
+ private TypeInformation<?>[] fieldTypes;
+
+ public CollectStreamTableSink(InetAddress targetAddress,
+ int targetPort,
+ TypeSerializer<Tuple2<Boolean, Row>> serializer) {
+ LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + targetPort);
+ this.targetAddress = targetAddress;
+ this.targetPort = targetPort;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public String[] getFieldNames() {
+ return fieldNames;
+ }
+
+ @Override
+ public TypeInformation<?>[] getFieldTypes() {
+ return fieldTypes;
+ }
+
+ @Override
+ public CollectStreamTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+ final CollectStreamTableSink copy =
+ new CollectStreamTableSink(targetAddress, targetPort, serializer);
+ copy.fieldNames = fieldNames;
+ copy.fieldTypes = fieldTypes;
+ return copy;
+ }
+
+ @Override
+ public TypeInformation<Row> getRecordType() {
+ return Types.ROW_NAMED(fieldNames, fieldTypes);
+ }
+
+ @Override
+ public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> stream) {
+ // add sink
+ return stream
+ .addSink(new CollectSink<>(targetAddress, targetPort, serializer))
+ .name("Zeppelin Flink Sql Stream Collect Sink " + UUID.randomUUID())
+ .setParallelism(1);
+ }
+
+ @Override
+ public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
+ return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
+ }
+}
diff --git a/flink/pom.xml b/flink/pom.xml
index 02a9ec547d..dc1664a0cf 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -32,23 +32,61 @@
<name>Zeppelin: Flink Parent</name>
<description>Zeppelin Flink Support</description>
+
<modules>
<module>flink-scala-parent</module>
- <module>flink-scala-2.11</module>
- <module>flink-scala-2.12</module>
<module>flink-shims</module>
<module>flink1.12-shims</module>
<module>flink1.13-shims</module>
<module>flink1.14-shims</module>
+ <module>flink1.15-shims</module>
</modules>
<properties>
<flink1.12.version>1.12.4</flink1.12.version>
<flink1.13.version>1.13.2</flink1.13.version>
<flink1.14.version>1.14.0</flink1.14.version>
+ <flink1.15.version>1.15.0</flink1.15.version>
<flink.scala.version>2.11.12</flink.scala.version>
<flink.scala.binary.version>2.11</flink.scala.binary.version>
</properties>
-</project>
\ No newline at end of file
+ <profiles>
+ <profile>
+ <id>flink-115</id>
+ <!-- Flink 1.15 only support scala 2.12-->
+ <modules>
+ <module>flink-scala-2.12</module>
+ </modules>
+ </profile>
+
+ <profile>
+ <id>flink-114</id>
+ <modules>
+ <module>flink-scala-2.11</module>
+ <module>flink-scala-2.12</module>
+ </modules>
+ </profile>
+
+ <profile>
+ <id>flink-113</id>
+ <modules>
+ <module>flink-scala-2.11</module>
+ <module>flink-scala-2.12</module>
+ </modules>
+ </profile>
+
+ <profile>
+ <id>flink-112</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <modules>
+ <module>flink-scala-2.11</module>
+ <module>flink-scala-2.12</module>
+ </modules>
+ </profile>
+ </profiles>
+
+</project>
diff --git a/testing/env_python_3_with_flink_115.yml b/testing/env_python_3_with_flink_115.yml
new file mode 100644
index 0000000000..a434029e4e
--- /dev/null
+++ b/testing/env_python_3_with_flink_115.yml
@@ -0,0 +1,29 @@
+name: python_3_with_flink
+channels:
+ - conda-forge
+ - defaults
+dependencies:
+ - pycodestyle
+ - scipy
+ - numpy=1.19.5
+ - grpcio
+ - protobuf
+ - pandasql
+ - ipython
+ - ipython_genutils
+ - ipykernel
+ - jupyter_client=5
+ - hvplot
+ - plotnine
+ - seaborn
+ - intake
+ - intake-parquet
+ - intake-xarray
+ - altair
+ - vega_datasets
+ - plotly
+ - jinja2=3.0.3
+ - pip
+ - pip:
+ - apache-flink==1.15.0
+
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest115.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest115.java
new file mode 100644
index 0000000000..4fbf5a93c8
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest115.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+//@RunWith(value = Parameterized.class)
+public class ZeppelinFlinkClusterTest115 extends ZeppelinFlinkClusterTest {
+
+ @Parameterized.Parameters
+ public static List<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {"1.15.0", "2.12"}
+ });
+ }
+
+ public ZeppelinFlinkClusterTest115(String flinkVersion, String scalaVersion) throws Exception {
+ super(flinkVersion, scalaVersion);
+ }
+}
diff --git a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
index 83a1f24b37..b7e5f83a72 100644
--- a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
@@ -136,7 +136,7 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
private String chooseFlinkAppJar(String flinkHome) throws IOException {
File flinkLibFolder = new File(flinkHome, "lib");
List<File> flinkDistFiles =
- Arrays.stream(flinkLibFolder.listFiles(file -> file.getName().contains("flink-dist_")))
+ Arrays.stream(flinkLibFolder.listFiles(file -> file.getName().contains("flink-dist")))
.collect(Collectors.toList());
if (flinkDistFiles.size() > 1) {
throw new IOException("More than 1 flink-dist files: " +
@@ -144,9 +144,15 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
.map(file -> file.getAbsolutePath())
.collect(Collectors.joining(",")));
}
- String scalaVersion = "2.11";
- if (flinkDistFiles.get(0).getName().contains("2.12")) {
- scalaVersion = "2.12";
+ if (flinkDistFiles.isEmpty()) {
+ throw new IOException(String.format("No flink-dist jar found under {0}", flinkHome + "/lib"));
+ }
+
+ // scala 2.12 is the only support scala version starting from Flink 1.15,
+ // so use 2.12 as the default value
+ String scalaVersion = "2.12";
+ if (flinkDistFiles.get(0).getName().contains("2.11")) {
+ scalaVersion = "2.11";
}
final String flinkScalaVersion = scalaVersion;
File flinkInterpreterFolder =
@@ -156,8 +162,11 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
.listFiles(file -> file.getName().endsWith(".jar")))
.filter(file -> file.getName().contains(flinkScalaVersion))
.collect(Collectors.toList());
- if (flinkScalaJars.size() > 1) {
- throw new IOException("More than 1 flink scala files: " +
+
+ if (flinkScalaJars.isEmpty()) {
+ throw new IOException("No flink scala jar file is found");
+ } else if (flinkScalaJars.size() > 1) {
+ throw new IOException("More than 1 flink scala jar files: " +
flinkScalaJars.stream()
.map(file -> file.getAbsolutePath())
.collect(Collectors.joining(",")));