You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/02 15:32:38 UTC

[incubator-streampark] branch dev updated: [Feature] Support Flink 1.16 submit (#1938)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1292a1215 [Feature] Support Flink 1.16 submit (#1938)
1292a1215 is described below

commit 1292a12153327c9ece08f3037db235d716d5f270
Author: lvshaokang <lv...@hotmail.com>
AuthorDate: Wed Nov 2 23:32:32 2022 +0800

    [Feature] Support Flink 1.16 submit (#1938)
    
    [Feature] Support flink 1.16
---
 .../streampark-console-service/pom.xml             |   7 +
 .../console/core/runner/EnvInitializer.java        |   2 +-
 .../src/test/java/java/util/regex/RegexTest.java   |   2 +-
 .../flink/packer/pipeline/BuildRequest.scala       |   1 +
 .../streampark/flink/proxy/FlinkShimsProxy.scala   |   8 +-
 streampark-flink/streampark-flink-shims/pom.xml    |   1 +
 .../streampark/flink/core/FlinkSqlValidator.scala  |   4 +-
 .../streampark/flink/core/FlinkTableTrait.scala    |  21 +--
 .../streampark-flink-shims_flink-1.16/pom.xml      | 149 +++++++++++++++++++++
 .../streampark/flink/core/FlinkClusterClient.scala |  35 +++++
 .../streampark/flink/core/StreamTableContext.scala | 104 ++++++++++++++
 .../streampark/flink/core/TableContext.scala       |  62 +++++++++
 .../apache/streampark/flink/core/TableExt.scala    |  42 ++++++
 .../flink/submit/impl/YarnApplicationSubmit.scala  |   1 +
 14 files changed, 414 insertions(+), 25 deletions(-)

diff --git a/streampark-console/streampark-console-service/pom.xml b/streampark-console/streampark-console-service/pom.xml
index cbe7a83bf..a0ee18875 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -520,6 +520,13 @@
                                     <version>${project.version}</version>
                                     <outputDirectory>${project.build.directory}/shims</outputDirectory>
                                 </dependency>
+                                <!-- flink 1.16 support-->
+                                <dependency>
+                                    <groupId>org.apache.streampark</groupId>
+                                    <artifactId>streampark-flink-shims_flink-1.16_${scala.binary.version}</artifactId>
+                                    <version>${project.version}</version>
+                                    <outputDirectory>${project.build.directory}/shims</outputDirectory>
+                                </dependency>
                                 <!-- flink-submit-core -->
                                 <dependency>
                                     <groupId>org.apache.streampark</groupId>
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 5ac81ba3c..8a9a76431 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -67,7 +67,7 @@ public class EnvInitializer implements ApplicationRunner {
     private final FileFilter fileFilter = p -> !".gitkeep".equals(p.getName());
 
     private static final Pattern PATTERN_FLINK_SHIMS_JAR = Pattern.compile(
-        "^streampark-flink-shims_flink-(1.12|1.13|1.14|1.15)_(2.11|2.12)-(.*).jar$", Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
+        "^streampark-flink-shims_flink-(1.12|1.13|1.14|1.15|1.16)_(2.11|2.12)-(.*).jar$", Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
 
     @Override
     public void run(ApplicationArguments args) throws Exception {
diff --git a/streampark-console/streampark-console-service/src/test/java/java/util/regex/RegexTest.java b/streampark-console/streampark-console-service/src/test/java/java/util/regex/RegexTest.java
index 4805aeb8d..caffe795c 100644
--- a/streampark-console/streampark-console-service/src/test/java/java/util/regex/RegexTest.java
+++ b/streampark-console/streampark-console-service/src/test/java/java/util/regex/RegexTest.java
@@ -48,7 +48,7 @@ class RegexTest {
     void classLoader() throws MalformedURLException {
         List<URL> libCache = new ArrayList<>(0);
         List<URL> shimsCache = new ArrayList<>(0);
-        String regex = "(^|.*)streampark-flink-shims_flink-(1.12|1.13|1.14|1.15)-(.*).jar$";
+        String regex = "(^|.*)streampark-flink-shims_flink-(1.12|1.13|1.14|1.15|1.16)-(.*).jar$";
         Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
         String lib = "~/workspace/streampark/streampark-console-service-1.1.0-SNAPSHOT/lib";
 
diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
index a997b2a56..9e6c7bc0c 100644
--- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
+++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
@@ -66,6 +66,7 @@ sealed trait FlinkBuildParam extends BuildParam {
           case Array(1, 13, _) => s"${localWorkspace.APP_SHIMS}/flink-1.13"
           case Array(1, 14, _) => s"${localWorkspace.APP_SHIMS}/flink-1.14"
           case Array(1, 15, _) => s"${localWorkspace.APP_SHIMS}/flink-1.15"
+          case Array(1, 16, _) => s"${localWorkspace.APP_SHIMS}/flink-1.16"
           case _ => throw new UnsupportedOperationException(s"Unsupported flink version: $flinkVersion")
         }
       }
diff --git a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
index 72dcd10f2..77319d562 100644
--- a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
+++ b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
@@ -36,7 +36,7 @@ object FlinkShimsProxy extends Logger {
   )
 
   private[this] val SHIMS_PATTERN = Pattern.compile(
-    "streampark-flink-shims_flink-(1.12|1.13|1.14|1.15)_(2.11|2.12)-(.*).jar",
+    "streampark-flink-shims_flink-(1.12|1.13|1.14|1.15|1.16)_(2.11|2.12)-(.*).jar",
     Pattern.CASE_INSENSITIVE | Pattern.DOTALL
   )
 
@@ -81,10 +81,10 @@ object FlinkShimsProxy extends Logger {
     ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
   }
 
-  // flink 1.12 1.13~1.14 1.15 parseSql class exist in different dependencies,
-  //need to load all flink-table dependencies compatible with different versions
+  // flink 1.12 1.13~1.14 1.15 1.16 parseSql class exist in different dependencies,
+  // need to load all flink-table dependencies compatible with different versions
   def getVerifySqlLibClassLoader(flinkVersion: FlinkVersion): ClassLoader = {
-    logInfo(s"add  verify sql lib,flink version:  $flinkVersion")
+    logInfo(s"add verify sql lib,flink version: $flinkVersion")
     VERIFY_SQL_CLASS_LOADER_CACHE.getOrElseUpdate(s"${flinkVersion.fullVersion}", {
       val getFlinkTable: File => Boolean = _.getName.startsWith("flink-table")
       // 1) flink/lib/flink-table*
diff --git a/streampark-flink/streampark-flink-shims/pom.xml b/streampark-flink/streampark-flink-shims/pom.xml
index cfc0ab95a..bbda33c19 100644
--- a/streampark-flink/streampark-flink-shims/pom.xml
+++ b/streampark-flink/streampark-flink-shims/pom.xml
@@ -48,6 +48,7 @@
             <modules>
                 <!-- flink 1.15+ only support scala 2.12 -->
                 <module>streampark-flink-shims_flink-1.15</module>
+                <module>streampark-flink-shims_flink-1.16</module>
             </modules>
         </profile>
 
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
index 760586358..5b0f4389c 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
@@ -36,7 +36,7 @@ object FlinkSqlValidator extends Logger {
 
   private[this] val FLINK112_CALCITE_PARSER_CLASS = "org.apache.flink.table.planner.calcite.CalciteParser"
 
-  private[this] val FLINK113_CALCITE_PARSER_CLASS = "org.apache.flink.table.planner.parse.CalciteParser"
+  private[this] val FLINK113_PLUS_CALCITE_PARSER_CLASS = "org.apache.flink.table.planner.parse.CalciteParser"
 
   private[this] val SYNTAX_ERROR_REGEXP = ".*at\\sline\\s(\\d+),\\scolumn\\s(\\d+).*".r
 
@@ -82,7 +82,7 @@ object FlinkSqlValidator extends Logger {
             hasInsert = true
           }
           Try {
-            val calciteClass = Try(Class.forName(FLINK112_CALCITE_PARSER_CLASS)).getOrElse(Class.forName(FLINK113_CALCITE_PARSER_CLASS))
+            val calciteClass = Try(Class.forName(FLINK112_CALCITE_PARSER_CLASS)).getOrElse(Class.forName(FLINK113_PLUS_CALCITE_PARSER_CLASS))
             sqlDialect.toUpperCase() match {
               case "HIVE" | "DEFAULT" =>
               case _ =>
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
index ec2af84f4..34eb19524 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
@@ -29,19 +29,9 @@ import org.apache.flink.table.types.AbstractDataType
 import java.lang
 import java.util.Optional
 
-/**
- *
- * @param parameter
- * @param tableEnv
- */
 abstract class FlinkTableTrait(val parameter: ParameterTool,
                                private val tableEnv: TableEnvironment) extends TableEnvironment {
 
-  /**
-   * 推荐使用该Api启动任务...
-   *
-   * @return
-   */
   def start(): JobExecutionResult = {
     val appName = (parameter.get(KEY_APP_NAME(), null), parameter.get(KEY_FLINK_APP_NAME, null)) match {
       case (appName: String, _) => appName
@@ -51,7 +41,10 @@ abstract class FlinkTableTrait(val parameter: ParameterTool,
     execute(appName)
   }
 
-  def execute(jobName: String): JobExecutionResult
+  def execute(jobName: String): JobExecutionResult = {
+    printLogo(s"FlinkTable $jobName Starting...")
+    null
+  }
 
   def sql(sql: String = null): Unit = FlinkSqlExecutor.executeSql(sql, parameter, this)
 
@@ -135,12 +128,6 @@ abstract class FlinkTableTrait(val parameter: ParameterTool,
 
   override def createStatementSet(): StatementSet = tableEnv.createStatementSet()
 
-  /**
-   *
-   * @param name
-   * @param dataStream
-   * @tparam T
-   */
   @deprecated override def registerFunction(name: String, function: ScalarFunction): Unit = tableEnv.registerFunction(name, function)
 
   @deprecated override def registerTable(name: String, table: Table): Unit = tableEnv.registerTable(name, table)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/pom.xml b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/pom.xml
new file mode 100644
index 000000000..0b75ffeb6
--- /dev/null
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/pom.xml
@@ -0,0 +1,149 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       https://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.streampark</groupId>
+        <artifactId>streampark-flink-shims</artifactId>
+        <version>1.2.4-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>streampark-flink-shims_flink-1.16_${scala.binary.version}</artifactId>
+    <name>StreamPark : Flink Shims 1.16</name>
+
+    <properties>
+        <flink.version>1.16.0</flink.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            <artifactId>streampark-flink-shims-base_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!--flink-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-uber</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-statebackend-rocksdb</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-yarn</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-api</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-kubernetes</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <createDependencyReducedPom>false</createDependencyReducedPom>
+                            <artifactSet>
+                                <includes>
+                                    <include>org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version}</include>
+                                </includes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
new file mode 100644
index 000000000..f7d48dc4d
--- /dev/null
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.common.JobID
+import org.apache.flink.client.program.ClusterClient
+import org.apache.flink.core.execution.SavepointFormatType
+
+import java.util.concurrent.CompletableFuture
+
+class FlinkClusterClient[T](clusterClient: ClusterClient[T]) extends FlinkClientTrait[T](clusterClient) {
+
+  override def cancelWithSavepoint(jobID: JobID, savepointDirectory: String): CompletableFuture[String] = {
+    clusterClient.cancelWithSavepoint(jobID, savepointDirectory, SavepointFormatType.DEFAULT)
+  }
+
+  override def stopWithSavepoint(jobID: JobID, advanceToEndOfEventTime: Boolean, savepointDirectory: String): CompletableFuture[String] = {
+    clusterClient.stopWithSavepoint(jobID, advanceToEndOfEventTime, savepointDirectory, SavepointFormatType.DEFAULT)
+  }
+
+}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
new file mode 100644
index 000000000..4d00bc39a
--- /dev/null
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.table.api.{CompiledPlan, PlanReference, Schema, Table, TableDescriptor, TableResult}
+import org.apache.flink.table.api.bridge.scala.{StreamStatementSet, StreamTableEnvironment}
+import org.apache.flink.table.connector.ChangelogMode
+import org.apache.flink.table.module.ModuleEntry
+import org.apache.flink.table.types.AbstractDataType
+import org.apache.flink.types.Row
+
+class StreamTableContext(override val parameter: ParameterTool,
+                         private val streamEnv: StreamExecutionEnvironment,
+                         private val tableEnv: StreamTableEnvironment)
+  extends FlinkStreamTableTrait(parameter, streamEnv, tableEnv) {
+
+  def this(args: (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment)) = this(args._1, args._2, args._3)
+
+  def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initialize(args))
+
+  override def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table = tableEnv.fromDataStream[T](dataStream, schema)
+
+  override def fromChangelogStream(dataStream: DataStream[Row]): Table = tableEnv.fromChangelogStream(dataStream)
+
+  override def fromChangelogStream(dataStream: DataStream[Row], schema: Schema): Table = tableEnv.fromChangelogStream(dataStream, schema)
+
+  override def fromChangelogStream(dataStream: DataStream[Row], schema: Schema, changelogMode: ChangelogMode): Table = tableEnv.fromChangelogStream(dataStream, schema, changelogMode)
+
+  override def createTemporaryView[T](path: String, dataStream: DataStream[T], schema: Schema): Unit = tableEnv.createTemporaryView[T](path, dataStream, schema)
+
+  override def toDataStream(table: Table): DataStream[Row] = {
+    isConvertedToDataStream = true
+    tableEnv.toDataStream(table)
+  }
+
+  override def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T] = {
+    isConvertedToDataStream = true
+    tableEnv.toDataStream[T](table, targetClass)
+  }
+
+  override def toDataStream[T](table: Table, targetDataType: AbstractDataType[_]): DataStream[T] = {
+    isConvertedToDataStream = true
+    tableEnv.toDataStream[T](table, targetDataType)
+  }
+
+  override def toChangelogStream(table: Table): DataStream[Row] = {
+    isConvertedToDataStream = true
+    tableEnv.toChangelogStream(table)
+  }
+
+  override def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row] = {
+    isConvertedToDataStream = true
+    tableEnv.toChangelogStream(table, targetSchema)
+  }
+
+  override def toChangelogStream(table: Table, targetSchema: Schema, changelogMode: ChangelogMode): DataStream[Row] = {
+    isConvertedToDataStream = true
+    tableEnv.toChangelogStream(table, targetSchema, changelogMode)
+  }
+
+  override def createStatementSet(): StreamStatementSet = tableEnv.createStatementSet()
+
+  override def useModules(strings: String*): Unit = tableEnv.useModules(strings: _*)
+
+  override def createTemporaryTable(path: String, descriptor: TableDescriptor): Unit = tableEnv.createTemporaryTable(path, descriptor)
+
+  override def createTable(path: String, descriptor: TableDescriptor): Unit = tableEnv.createTable(path, descriptor)
+
+  override def from(descriptor: TableDescriptor): Table = tableEnv.from(descriptor)
+
+  override def listFullModules(): Array[ModuleEntry] = tableEnv.listFullModules()
+
+  /**
+   * @since 1.15
+   */
+  override def listTables(s: String, s1: String): Array[String] = tableEnv.listTables(s, s1)
+
+  /**
+   * @since 1.15
+   */
+  override def loadPlan(planReference: PlanReference): CompiledPlan = tableEnv.loadPlan(planReference)
+
+  /**
+   * @since 1.15
+   */
+  override def compilePlanSql(s: String): CompiledPlan = tableEnv.compilePlanSql(s)
+}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
new file mode 100644
index 000000000..964966a9a
--- /dev/null
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.table.api.{CompiledPlan, PlanReference, Table, TableDescriptor, TableEnvironment, TableResult}
+import org.apache.flink.table.module.ModuleEntry
+
+class TableContext(override val parameter: ParameterTool,
+                   private val tableEnv: TableEnvironment)
+  extends FlinkTableTrait(parameter, tableEnv) {
+
+  def this(args: (ParameterTool, TableEnvironment)) = this(args._1, args._2)
+
+  def this(args: TableEnvConfig) = this(FlinkTableInitializer.initialize(args))
+
+  override def useModules(strings: String*): Unit = tableEnv.useModules(strings: _*)
+
+  override def createTemporaryTable(path: String, descriptor: TableDescriptor): Unit = {
+    tableEnv.createTemporaryTable(path, descriptor)
+  }
+
+  override def createTable(path: String, descriptor: TableDescriptor): Unit = {
+    tableEnv.createTable(path, descriptor)
+  }
+
+  override def from(tableDescriptor: TableDescriptor): Table = {
+    tableEnv.from(tableDescriptor)
+  }
+
+  override def listFullModules(): Array[ModuleEntry] = tableEnv.listFullModules()
+
+  /**
+   * @since 1.15
+   */
+  override def listTables(catalogName: String, databaseName: String): Array[String] = tableEnv.listTables(catalogName, databaseName)
+
+  /**
+   * @since 1.15
+   */
+  override def loadPlan(planReference: PlanReference): CompiledPlan = tableEnv.loadPlan(planReference)
+
+  /**
+   * @since 1.15
+   */
+  override def compilePlanSql(stmt: String): CompiledPlan = tableEnv.compilePlanSql(stmt)
+}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/TableExt.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
new file mode 100644
index 000000000..f71e6fd94
--- /dev/null
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api.bridge.scala.{TableConversions => FlinkTableConversions}
+import org.apache.flink.table.api.{Table => FlinkTable}
+import org.apache.flink.types.Row
+
+object TableExt {
+
+  class Table(val table: FlinkTable) {
+    def ->(field: String, fields: String*): FlinkTable = table.as(field, fields: _*)
+  }
+
+  class TableConversions(table: FlinkTable) extends FlinkTableConversions(table) {
+
+    def \\ : DataStream[Row] = toDataStream
+
+    def >>[T: TypeInformation](implicit context: StreamTableContext): DataStream[T] = {
+      context.isConvertedToDataStream = true
+      super.toAppendStream
+    }
+  }
+
+}
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala
index baf7c059b..be7d08ad5 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala
@@ -71,6 +71,7 @@ object YarnApplicationSubmit extends YarnSubmitTrait {
             case Array(1, 13, _) => array += s"${workspace.APP_SHIMS}/flink-1.13"
             case Array(1, 14, _) => array += s"${workspace.APP_SHIMS}/flink-1.14"
             case Array(1, 15, _) => array += s"${workspace.APP_SHIMS}/flink-1.15"
+            case Array(1, 16, _) => array += s"${workspace.APP_SHIMS}/flink-1.16"
             case _ => throw new UnsupportedOperationException(s"Unsupported flink version: ${submitRequest.flinkVersion}")
           }
           val jobLib = s"${workspace.APP_WORKSPACE}/${submitRequest.id}/lib"