You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/02 15:32:38 UTC
[incubator-streampark] branch dev updated: [Feature] Support Flink 1.16 submit (#1938)
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 1292a1215 [Feature] Support Flink 1.16 submit (#1938)
1292a1215 is described below
commit 1292a12153327c9ece08f3037db235d716d5f270
Author: lvshaokang <lv...@hotmail.com>
AuthorDate: Wed Nov 2 23:32:32 2022 +0800
[Feature] Support Flink 1.16 submit (#1938)
[Feature] Support flink 1.16
---
.../streampark-console-service/pom.xml | 7 +
.../console/core/runner/EnvInitializer.java | 2 +-
.../src/test/java/java/util/regex/RegexTest.java | 2 +-
.../flink/packer/pipeline/BuildRequest.scala | 1 +
.../streampark/flink/proxy/FlinkShimsProxy.scala | 8 +-
streampark-flink/streampark-flink-shims/pom.xml | 1 +
.../streampark/flink/core/FlinkSqlValidator.scala | 4 +-
.../streampark/flink/core/FlinkTableTrait.scala | 21 +--
.../streampark-flink-shims_flink-1.16/pom.xml | 149 +++++++++++++++++++++
.../streampark/flink/core/FlinkClusterClient.scala | 35 +++++
.../streampark/flink/core/StreamTableContext.scala | 104 ++++++++++++++
.../streampark/flink/core/TableContext.scala | 62 +++++++++
.../apache/streampark/flink/core/TableExt.scala | 42 ++++++
.../flink/submit/impl/YarnApplicationSubmit.scala | 1 +
14 files changed, 414 insertions(+), 25 deletions(-)
diff --git a/streampark-console/streampark-console-service/pom.xml b/streampark-console/streampark-console-service/pom.xml
index cbe7a83bf..a0ee18875 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -520,6 +520,13 @@
<version>${project.version}</version>
<outputDirectory>${project.build.directory}/shims</outputDirectory>
</dependency>
+ <!-- flink 1.16 support-->
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-flink-shims_flink-1.16_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <outputDirectory>${project.build.directory}/shims</outputDirectory>
+ </dependency>
<!-- flink-submit-core -->
<dependency>
<groupId>org.apache.streampark</groupId>
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 5ac81ba3c..8a9a76431 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -67,7 +67,7 @@ public class EnvInitializer implements ApplicationRunner {
private final FileFilter fileFilter = p -> !".gitkeep".equals(p.getName());
private static final Pattern PATTERN_FLINK_SHIMS_JAR = Pattern.compile(
- "^streampark-flink-shims_flink-(1.12|1.13|1.14|1.15)_(2.11|2.12)-(.*).jar$", Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
+ "^streampark-flink-shims_flink-(1.12|1.13|1.14|1.15|1.16)_(2.11|2.12)-(.*).jar$", Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
@Override
public void run(ApplicationArguments args) throws Exception {
diff --git a/streampark-console/streampark-console-service/src/test/java/java/util/regex/RegexTest.java b/streampark-console/streampark-console-service/src/test/java/java/util/regex/RegexTest.java
index 4805aeb8d..caffe795c 100644
--- a/streampark-console/streampark-console-service/src/test/java/java/util/regex/RegexTest.java
+++ b/streampark-console/streampark-console-service/src/test/java/java/util/regex/RegexTest.java
@@ -48,7 +48,7 @@ class RegexTest {
void classLoader() throws MalformedURLException {
List<URL> libCache = new ArrayList<>(0);
List<URL> shimsCache = new ArrayList<>(0);
- String regex = "(^|.*)streampark-flink-shims_flink-(1.12|1.13|1.14|1.15)-(.*).jar$";
+ String regex = "(^|.*)streampark-flink-shims_flink-(1.12|1.13|1.14|1.15|1.16)-(.*).jar$";
Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
String lib = "~/workspace/streampark/streampark-console-service-1.1.0-SNAPSHOT/lib";
diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
index a997b2a56..9e6c7bc0c 100644
--- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
+++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
@@ -66,6 +66,7 @@ sealed trait FlinkBuildParam extends BuildParam {
case Array(1, 13, _) => s"${localWorkspace.APP_SHIMS}/flink-1.13"
case Array(1, 14, _) => s"${localWorkspace.APP_SHIMS}/flink-1.14"
case Array(1, 15, _) => s"${localWorkspace.APP_SHIMS}/flink-1.15"
+ case Array(1, 16, _) => s"${localWorkspace.APP_SHIMS}/flink-1.16"
case _ => throw new UnsupportedOperationException(s"Unsupported flink version: $flinkVersion")
}
}
diff --git a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
index 72dcd10f2..77319d562 100644
--- a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
+++ b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
@@ -36,7 +36,7 @@ object FlinkShimsProxy extends Logger {
)
private[this] val SHIMS_PATTERN = Pattern.compile(
- "streampark-flink-shims_flink-(1.12|1.13|1.14|1.15)_(2.11|2.12)-(.*).jar",
+ "streampark-flink-shims_flink-(1.12|1.13|1.14|1.15|1.16)_(2.11|2.12)-(.*).jar",
Pattern.CASE_INSENSITIVE | Pattern.DOTALL
)
@@ -81,10 +81,10 @@ object FlinkShimsProxy extends Logger {
ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
}
- // flink 1.12 1.13~1.14 1.15 parseSql class exist in different dependencies,
- //need to load all flink-table dependencies compatible with different versions
+ // flink 1.12 1.13~1.14 1.15 1.16 parseSql class exist in different dependencies,
+ // need to load all flink-table dependencies compatible with different versions
def getVerifySqlLibClassLoader(flinkVersion: FlinkVersion): ClassLoader = {
- logInfo(s"add verify sql lib,flink version: $flinkVersion")
+ logInfo(s"add verify sql lib,flink version: $flinkVersion")
VERIFY_SQL_CLASS_LOADER_CACHE.getOrElseUpdate(s"${flinkVersion.fullVersion}", {
val getFlinkTable: File => Boolean = _.getName.startsWith("flink-table")
// 1) flink/lib/flink-table*
diff --git a/streampark-flink/streampark-flink-shims/pom.xml b/streampark-flink/streampark-flink-shims/pom.xml
index cfc0ab95a..bbda33c19 100644
--- a/streampark-flink/streampark-flink-shims/pom.xml
+++ b/streampark-flink/streampark-flink-shims/pom.xml
@@ -48,6 +48,7 @@
<modules>
<!-- flink 1.15+ only support scala 2.12 -->
<module>streampark-flink-shims_flink-1.15</module>
+ <module>streampark-flink-shims_flink-1.16</module>
</modules>
</profile>
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
index 760586358..5b0f4389c 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
@@ -36,7 +36,7 @@ object FlinkSqlValidator extends Logger {
private[this] val FLINK112_CALCITE_PARSER_CLASS = "org.apache.flink.table.planner.calcite.CalciteParser"
- private[this] val FLINK113_CALCITE_PARSER_CLASS = "org.apache.flink.table.planner.parse.CalciteParser"
+ private[this] val FLINK113_PLUS_CALCITE_PARSER_CLASS = "org.apache.flink.table.planner.parse.CalciteParser"
private[this] val SYNTAX_ERROR_REGEXP = ".*at\\sline\\s(\\d+),\\scolumn\\s(\\d+).*".r
@@ -82,7 +82,7 @@ object FlinkSqlValidator extends Logger {
hasInsert = true
}
Try {
- val calciteClass = Try(Class.forName(FLINK112_CALCITE_PARSER_CLASS)).getOrElse(Class.forName(FLINK113_CALCITE_PARSER_CLASS))
+ val calciteClass = Try(Class.forName(FLINK112_CALCITE_PARSER_CLASS)).getOrElse(Class.forName(FLINK113_PLUS_CALCITE_PARSER_CLASS))
sqlDialect.toUpperCase() match {
case "HIVE" | "DEFAULT" =>
case _ =>
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
index ec2af84f4..34eb19524 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
@@ -29,19 +29,9 @@ import org.apache.flink.table.types.AbstractDataType
import java.lang
import java.util.Optional
-/**
- *
- * @param parameter
- * @param tableEnv
- */
abstract class FlinkTableTrait(val parameter: ParameterTool,
private val tableEnv: TableEnvironment) extends TableEnvironment {
- /**
- * 推荐使用该Api启动任务...
- *
- * @return
- */
def start(): JobExecutionResult = {
val appName = (parameter.get(KEY_APP_NAME(), null), parameter.get(KEY_FLINK_APP_NAME, null)) match {
case (appName: String, _) => appName
@@ -51,7 +41,10 @@ abstract class FlinkTableTrait(val parameter: ParameterTool,
execute(appName)
}
- def execute(jobName: String): JobExecutionResult
+ def execute(jobName: String): JobExecutionResult = {
+ printLogo(s"FlinkTable $jobName Starting...")
+ null
+ }
def sql(sql: String = null): Unit = FlinkSqlExecutor.executeSql(sql, parameter, this)
@@ -135,12 +128,6 @@ abstract class FlinkTableTrait(val parameter: ParameterTool,
override def createStatementSet(): StatementSet = tableEnv.createStatementSet()
- /**
- *
- * @param name
- * @param dataStream
- * @tparam T
- */
@deprecated override def registerFunction(name: String, function: ScalarFunction): Unit = tableEnv.registerFunction(name, function)
@deprecated override def registerTable(name: String, table: Table): Unit = tableEnv.registerTable(name, table)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/pom.xml b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/pom.xml
new file mode 100644
index 000000000..0b75ffeb6
--- /dev/null
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/pom.xml
@@ -0,0 +1,149 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ https://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-flink-shims</artifactId>
+ <version>1.2.4-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>streampark-flink-shims_flink-1.16_${scala.binary.version}</artifactId>
+ <name>StreamPark : Flink Shims 1.16</name>
+
+ <properties>
+ <flink.version>1.16.0</flink.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-flink-shims-base_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!--flink-->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-uber</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-statebackend-rocksdb</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-yarn</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-kubernetes</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ <artifactSet>
+ <includes>
+ <include>org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version}</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
new file mode 100644
index 000000000..f7d48dc4d
--- /dev/null
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.common.JobID
+import org.apache.flink.client.program.ClusterClient
+import org.apache.flink.core.execution.SavepointFormatType
+
+import java.util.concurrent.CompletableFuture
+
+class FlinkClusterClient[T](clusterClient: ClusterClient[T]) extends FlinkClientTrait[T](clusterClient) {
+
+ override def cancelWithSavepoint(jobID: JobID, savepointDirectory: String): CompletableFuture[String] = {
+ clusterClient.cancelWithSavepoint(jobID, savepointDirectory, SavepointFormatType.DEFAULT)
+ }
+
+ override def stopWithSavepoint(jobID: JobID, advanceToEndOfEventTime: Boolean, savepointDirectory: String): CompletableFuture[String] = {
+ clusterClient.stopWithSavepoint(jobID, advanceToEndOfEventTime, savepointDirectory, SavepointFormatType.DEFAULT)
+ }
+
+}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
new file mode 100644
index 000000000..4d00bc39a
--- /dev/null
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.table.api.{CompiledPlan, PlanReference, Schema, Table, TableDescriptor, TableResult}
+import org.apache.flink.table.api.bridge.scala.{StreamStatementSet, StreamTableEnvironment}
+import org.apache.flink.table.connector.ChangelogMode
+import org.apache.flink.table.module.ModuleEntry
+import org.apache.flink.table.types.AbstractDataType
+import org.apache.flink.types.Row
+
+class StreamTableContext(override val parameter: ParameterTool,
+ private val streamEnv: StreamExecutionEnvironment,
+ private val tableEnv: StreamTableEnvironment)
+ extends FlinkStreamTableTrait(parameter, streamEnv, tableEnv) {
+
+ def this(args: (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment)) = this(args._1, args._2, args._3)
+
+ def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initialize(args))
+
+ override def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table = tableEnv.fromDataStream[T](dataStream, schema)
+
+ override def fromChangelogStream(dataStream: DataStream[Row]): Table = tableEnv.fromChangelogStream(dataStream)
+
+ override def fromChangelogStream(dataStream: DataStream[Row], schema: Schema): Table = tableEnv.fromChangelogStream(dataStream, schema)
+
+ override def fromChangelogStream(dataStream: DataStream[Row], schema: Schema, changelogMode: ChangelogMode): Table = tableEnv.fromChangelogStream(dataStream, schema, changelogMode)
+
+ override def createTemporaryView[T](path: String, dataStream: DataStream[T], schema: Schema): Unit = tableEnv.createTemporaryView[T](path, dataStream, schema)
+
+ override def toDataStream(table: Table): DataStream[Row] = {
+ isConvertedToDataStream = true
+ tableEnv.toDataStream(table)
+ }
+
+ override def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T] = {
+ isConvertedToDataStream = true
+ tableEnv.toDataStream[T](table, targetClass)
+ }
+
+ override def toDataStream[T](table: Table, targetDataType: AbstractDataType[_]): DataStream[T] = {
+ isConvertedToDataStream = true
+ tableEnv.toDataStream[T](table, targetDataType)
+ }
+
+ override def toChangelogStream(table: Table): DataStream[Row] = {
+ isConvertedToDataStream = true
+ tableEnv.toChangelogStream(table)
+ }
+
+ override def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row] = {
+ isConvertedToDataStream = true
+ tableEnv.toChangelogStream(table, targetSchema)
+ }
+
+ override def toChangelogStream(table: Table, targetSchema: Schema, changelogMode: ChangelogMode): DataStream[Row] = {
+ isConvertedToDataStream = true
+ tableEnv.toChangelogStream(table, targetSchema, changelogMode)
+ }
+
+ override def createStatementSet(): StreamStatementSet = tableEnv.createStatementSet()
+
+ override def useModules(strings: String*): Unit = tableEnv.useModules(strings: _*)
+
+ override def createTemporaryTable(path: String, descriptor: TableDescriptor): Unit = tableEnv.createTemporaryTable(path, descriptor)
+
+ override def createTable(path: String, descriptor: TableDescriptor): Unit = tableEnv.createTable(path, descriptor)
+
+ override def from(descriptor: TableDescriptor): Table = tableEnv.from(descriptor)
+
+ override def listFullModules(): Array[ModuleEntry] = tableEnv.listFullModules()
+
+ /**
+ * @since 1.15
+ */
+ override def listTables(s: String, s1: String): Array[String] = tableEnv.listTables(s, s1)
+
+ /**
+ * @since 1.15
+ */
+ override def loadPlan(planReference: PlanReference): CompiledPlan = tableEnv.loadPlan(planReference)
+
+ /**
+ * @since 1.15
+ */
+ override def compilePlanSql(s: String): CompiledPlan = tableEnv.compilePlanSql(s)
+}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
new file mode 100644
index 000000000..964966a9a
--- /dev/null
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.table.api.{CompiledPlan, PlanReference, Table, TableDescriptor, TableEnvironment, TableResult}
+import org.apache.flink.table.module.ModuleEntry
+
+class TableContext(override val parameter: ParameterTool,
+ private val tableEnv: TableEnvironment)
+ extends FlinkTableTrait(parameter, tableEnv) {
+
+ def this(args: (ParameterTool, TableEnvironment)) = this(args._1, args._2)
+
+ def this(args: TableEnvConfig) = this(FlinkTableInitializer.initialize(args))
+
+ override def useModules(strings: String*): Unit = tableEnv.useModules(strings: _*)
+
+ override def createTemporaryTable(path: String, descriptor: TableDescriptor): Unit = {
+ tableEnv.createTemporaryTable(path, descriptor)
+ }
+
+ override def createTable(path: String, descriptor: TableDescriptor): Unit = {
+ tableEnv.createTable(path, descriptor)
+ }
+
+ override def from(tableDescriptor: TableDescriptor): Table = {
+ tableEnv.from(tableDescriptor)
+ }
+
+ override def listFullModules(): Array[ModuleEntry] = tableEnv.listFullModules()
+
+ /**
+ * @since 1.15
+ */
+ override def listTables(catalogName: String, databaseName: String): Array[String] = tableEnv.listTables(catalogName, databaseName)
+
+ /**
+ * @since 1.15
+ */
+ override def loadPlan(planReference: PlanReference): CompiledPlan = tableEnv.loadPlan(planReference)
+
+ /**
+ * @since 1.15
+ */
+ override def compilePlanSql(stmt: String): CompiledPlan = tableEnv.compilePlanSql(stmt)
+}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/TableExt.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
new file mode 100644
index 000000000..f71e6fd94
--- /dev/null
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api.bridge.scala.{TableConversions => FlinkTableConversions}
+import org.apache.flink.table.api.{Table => FlinkTable}
+import org.apache.flink.types.Row
+
+object TableExt {
+
+ class Table(val table: FlinkTable) {
+ def ->(field: String, fields: String*): FlinkTable = table.as(field, fields: _*)
+ }
+
+ class TableConversions(table: FlinkTable) extends FlinkTableConversions(table) {
+
+ def \\ : DataStream[Row] = toDataStream
+
+ def >>[T: TypeInformation](implicit context: StreamTableContext): DataStream[T] = {
+ context.isConvertedToDataStream = true
+ super.toAppendStream
+ }
+ }
+
+}
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala
index baf7c059b..be7d08ad5 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala
@@ -71,6 +71,7 @@ object YarnApplicationSubmit extends YarnSubmitTrait {
case Array(1, 13, _) => array += s"${workspace.APP_SHIMS}/flink-1.13"
case Array(1, 14, _) => array += s"${workspace.APP_SHIMS}/flink-1.14"
case Array(1, 15, _) => array += s"${workspace.APP_SHIMS}/flink-1.15"
+ case Array(1, 16, _) => array += s"${workspace.APP_SHIMS}/flink-1.16"
case _ => throw new UnsupportedOperationException(s"Unsupported flink version: ${submitRequest.flinkVersion}")
}
val jobLib = s"${workspace.APP_WORKSPACE}/${submitRequest.id}/lib"