You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2024/04/19 16:05:58 UTC

(incubator-streampark) branch flink-119 created (now 1bfdafa3d)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a change to branch flink-119
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


      at 1bfdafa3d [Improve] 2.1.4 upgrade sql bug fixed.

This branch includes the following new commits:

     new a39950c2e [Improve] apache flink 1.19 support
     new 1bfdafa3d [Improve] 2.1.4 upgrade sql bug fixed.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



(incubator-streampark) 01/02: [Improve] apache flink 1.19 support

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch flink-119
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit a39950c2ed2734c944a6686de16dbf2bb6d58003
Author: benjobs <be...@gmail.com>
AuthorDate: Fri Apr 19 15:17:34 2024 +0800

    [Improve] apache flink 1.19 support
---
 .../streampark/common/conf/FlinkVersion.scala      |   2 +-
 .../streampark-console-service/pom.xml             |   7 +
 .../streampark/console/core/entity/FlinkEnv.java   |  26 +++-
 .../console/core/runner/EnvInitializer.java        |   2 +-
 .../core/service/impl/FlinkEnvServiceImpl.java     |   2 +-
 streampark-flink/streampark-flink-shims/pom.xml    |   1 +
 .../streampark-flink-shims_flink-1.19/pom.xml      | 154 ++++++++++++++++++++
 .../streampark/flink/core/FlinkClusterClient.scala |  49 +++++++
 .../flink/core/FlinkKubernetesClient.scala         |  31 ++++
 .../streampark/flink/core/StreamTableContext.scala | 161 +++++++++++++++++++++
 .../streampark/flink/core/TableContext.scala       | 103 +++++++++++++
 .../apache/streampark/flink/core/TableExt.scala    |  42 ++++++
 12 files changed, 575 insertions(+), 5 deletions(-)

diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
index b6e5e6a90..6dec3b29b 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
@@ -116,7 +116,7 @@ class FlinkVersion(val flinkHome: String) extends java.io.Serializable with Logg
 
   def checkVersion(throwException: Boolean = true): Boolean = {
     version.split("\\.").map(_.trim.toInt) match {
-      case Array(1, v, _) if v >= 12 && v <= 18 => true
+      case Array(1, v, _) if v >= 12 && v <= 19 => true
       case _ =>
         if (throwException) {
           throw new UnsupportedOperationException(s"Unsupported flink version: $version")
diff --git a/streampark-console/streampark-console-service/pom.xml b/streampark-console/streampark-console-service/pom.xml
index 2d7a7a520..aa85f698f 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -618,6 +618,13 @@
                                     <version>${project.version}</version>
                                     <outputDirectory>${project.build.directory}/shims</outputDirectory>
                                 </dependency>
+                                <!-- flink 1.19 support-->
+                                <dependency>
+                                    <groupId>org.apache.streampark</groupId>
+                                    <artifactId>streampark-flink-shims_flink-1.19_${scala.binary.version}</artifactId>
+                                    <version>${project.version}</version>
+                                    <outputDirectory>${project.build.directory}/shims</outputDirectory>
+                                </dependency>
                                 <!-- flink-submit-core -->
                                 <dependency>
                                     <groupId>org.apache.streampark</groupId>
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
index 6decd5de5..b1ed53da8 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
@@ -20,6 +20,7 @@ package org.apache.streampark.console.core.entity;
 import org.apache.streampark.common.conf.FlinkVersion;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.ApiDetailException;
 
 import org.apache.commons.io.FileUtils;
@@ -33,6 +34,7 @@ import lombok.Setter;
 
 import java.io.File;
 import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
 import java.util.Date;
 import java.util.Map;
 import java.util.Properties;
@@ -67,9 +69,29 @@ public class FlinkEnv implements Serializable {
   private transient String streamParkScalaVersion = scala.util.Properties.versionNumberString();
 
   public void doSetFlinkConf() throws ApiDetailException {
+    File yaml;
+    float ver = Float.parseFloat(getVersionOfFirst().concat(".").concat(getVersionOfMiddle()));
+    if (ver < 1.19f) {
+      yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
+      if (!yaml.exists()) {
+        throw new ApiAlertException("cannot find flink-conf.yaml in flink/conf ");
+      }
+    } else if (ver == 1.19f) {
+      yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
+      if (!yaml.exists()) {
+        yaml = new File(this.flinkHome.concat("/conf/config.yaml"));
+      }
+      if (!yaml.exists()) {
+        throw new ApiAlertException("cannot find config.yaml|flink-conf.yaml in flink/conf ");
+      }
+    } else {
+      yaml = new File(this.flinkHome.concat("/conf/config.yaml"));
+      if (!yaml.exists()) {
+        throw new ApiAlertException("cannot find config.yaml in flink/conf ");
+      }
+    }
     try {
-      File yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
-      String flinkConf = FileUtils.readFileToString(yaml);
+      String flinkConf = FileUtils.readFileToString(yaml, StandardCharsets.UTF_8);
       this.flinkConf = DeflaterUtils.zipString(flinkConf);
     } catch (Exception e) {
       throw new ApiDetailException(e);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 841f954fa..0e87a9361 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -68,7 +68,7 @@ public class EnvInitializer implements ApplicationRunner {
 
   private static final Pattern PATTERN_FLINK_SHIMS_JAR =
       Pattern.compile(
-          "^streampark-flink-shims_flink-(1.1[2-8])_(2.11|2.12)-(.*).jar$",
+          "^streampark-flink-shims_flink-(1.1[2-9])_(2.11|2.12)-(.*).jar$",
           Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
 
   @Override
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
index e83bd65bb..5a53e587e 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
@@ -84,8 +84,8 @@ public class FlinkEnvServiceImpl extends ServiceImpl<FlinkEnvMapper, FlinkEnv>
     long count = this.baseMapper.selectCount(null);
     version.setIsDefault(count == 0);
     version.setCreateTime(new Date());
-    version.doSetFlinkConf();
     version.doSetVersion();
+    version.doSetFlinkConf();
     return save(version);
   }
 
diff --git a/streampark-flink/streampark-flink-shims/pom.xml b/streampark-flink/streampark-flink-shims/pom.xml
index 587c5bd4e..1b53bef49 100644
--- a/streampark-flink/streampark-flink-shims/pom.xml
+++ b/streampark-flink/streampark-flink-shims/pom.xml
@@ -45,6 +45,7 @@
                 <module>streampark-flink-shims_flink-1.16</module>
                 <module>streampark-flink-shims_flink-1.17</module>
                 <module>streampark-flink-shims_flink-1.18</module>
+                <module>streampark-flink-shims_flink-1.19</module>
             </modules>
         </profile>
     </profiles>
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/pom.xml b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/pom.xml
new file mode 100644
index 000000000..2e0751ecb
--- /dev/null
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/pom.xml
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.streampark</groupId>
+        <artifactId>streampark-flink-shims</artifactId>
+        <version>2.1.4</version>
+    </parent>
+
+    <artifactId>streampark-flink-shims_flink-1.19_${scala.binary.version}</artifactId>
+    <name>StreamPark : Flink Shims 1.19</name>
+
+    <properties>
+        <flink.version>1.19.0</flink.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            <artifactId>streampark-flink-shims-base_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!--flink-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-uber</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-statebackend-rocksdb</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-yarn</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client-api</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client-runtime</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-kubernetes</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <createDependencyReducedPom>true</createDependencyReducedPom>
+                            <dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+                            <artifactSet>
+                                <includes>
+                                    <include>org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version}</include>
+                                </includes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
new file mode 100644
index 000000000..4f6336f5a
--- /dev/null
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.common.JobID
+import org.apache.flink.client.program.ClusterClient
+import org.apache.flink.core.execution.SavepointFormatType
+
+import java.util.concurrent.CompletableFuture
+
+class FlinkClusterClient[T](clusterClient: ClusterClient[T])
+  extends FlinkClientTrait[T](clusterClient) {
+
+  override def triggerSavepoint(jobID: JobID, savepointDir: String): CompletableFuture[String] = {
+    clusterClient.triggerSavepoint(jobID, savepointDir, SavepointFormatType.DEFAULT)
+  }
+
+  override def cancelWithSavepoint(
+      jobID: JobID,
+      savepointDirectory: String): CompletableFuture[String] = {
+    clusterClient.cancelWithSavepoint(jobID, savepointDirectory, SavepointFormatType.DEFAULT)
+  }
+
+  override def stopWithSavepoint(
+      jobID: JobID,
+      advanceToEndOfEventTime: Boolean,
+      savepointDirectory: String): CompletableFuture[String] = {
+    clusterClient.stopWithSavepoint(
+      jobID,
+      advanceToEndOfEventTime,
+      savepointDirectory,
+      SavepointFormatType.DEFAULT)
+  }
+
+}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
new file mode 100644
index 000000000..f388c8e9f
--- /dev/null
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.core
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
+
+import java.util.Optional
+
+class FlinkKubernetesClient(kubeClient: FlinkKubeClient)
+  extends FlinkKubernetesClientTrait(kubeClient) {
+
+  override def getService(serviceName: String): Optional[KubernetesService] = {
+    kubeClient.getService(serviceName)
+  }
+
+}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
new file mode 100644
index 000000000..65f715c75
--- /dev/null
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat, PlanReference, Schema, Table, TableDescriptor, TableResult}
+import org.apache.flink.table.api.bridge.scala.{StreamStatementSet, StreamTableEnvironment}
+import org.apache.flink.table.catalog.CatalogDescriptor
+import org.apache.flink.table.connector.ChangelogMode
+import org.apache.flink.table.module.ModuleEntry
+import org.apache.flink.table.resource.ResourceUri
+import org.apache.flink.table.types.AbstractDataType
+import org.apache.flink.types.Row
+
+import java.util.{List => JList}
+
+class StreamTableContext(
+    override val parameter: ParameterTool,
+    private val streamEnv: StreamExecutionEnvironment,
+    private val tableEnv: StreamTableEnvironment)
+  extends FlinkStreamTableTrait(parameter, streamEnv, tableEnv) {
+
+  def this(args: (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment)) =
+    this(args._1, args._2, args._3)
+
+  def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initialize(args))
+
+  override def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table =
+    tableEnv.fromDataStream[T](dataStream, schema)
+
+  override def fromChangelogStream(dataStream: DataStream[Row]): Table =
+    tableEnv.fromChangelogStream(dataStream)
+
+  override def fromChangelogStream(dataStream: DataStream[Row], schema: Schema): Table =
+    tableEnv.fromChangelogStream(dataStream, schema)
+
+  override def fromChangelogStream(
+      dataStream: DataStream[Row],
+      schema: Schema,
+      changelogMode: ChangelogMode): Table =
+    tableEnv.fromChangelogStream(dataStream, schema, changelogMode)
+
+  override def createTemporaryView[T](
+      path: String,
+      dataStream: DataStream[T],
+      schema: Schema): Unit = tableEnv.createTemporaryView[T](path, dataStream, schema)
+
+  override def toDataStream(table: Table): DataStream[Row] = {
+    isConvertedToDataStream = true
+    tableEnv.toDataStream(table)
+  }
+
+  override def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T] = {
+    isConvertedToDataStream = true
+    tableEnv.toDataStream[T](table, targetClass)
+  }
+
+  override def toDataStream[T](table: Table, targetDataType: AbstractDataType[_]): DataStream[T] = {
+    isConvertedToDataStream = true
+    tableEnv.toDataStream[T](table, targetDataType)
+  }
+
+  override def toChangelogStream(table: Table): DataStream[Row] = {
+    isConvertedToDataStream = true
+    tableEnv.toChangelogStream(table)
+  }
+
+  override def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row] = {
+    isConvertedToDataStream = true
+    tableEnv.toChangelogStream(table, targetSchema)
+  }
+
+  override def toChangelogStream(
+      table: Table,
+      targetSchema: Schema,
+      changelogMode: ChangelogMode): DataStream[Row] = {
+    isConvertedToDataStream = true
+    tableEnv.toChangelogStream(table, targetSchema, changelogMode)
+  }
+
+  override def createStatementSet(): StreamStatementSet = tableEnv.createStatementSet()
+
+  override def useModules(strings: String*): Unit = tableEnv.useModules(strings: _*)
+
+  override def createTemporaryTable(path: String, descriptor: TableDescriptor): Unit =
+    tableEnv.createTemporaryTable(path, descriptor)
+
+  override def createTable(path: String, descriptor: TableDescriptor): Unit =
+    tableEnv.createTable(path, descriptor)
+
+  override def from(descriptor: TableDescriptor): Table = tableEnv.from(descriptor)
+
+  override def listFullModules(): Array[ModuleEntry] = tableEnv.listFullModules()
+
+  /** @since 1.15 */
+  override def listTables(s: String, s1: String): Array[String] = tableEnv.listTables(s, s1)
+
+  /** @since 1.15 */
+  override def loadPlan(planReference: PlanReference): CompiledPlan =
+    tableEnv.loadPlan(planReference)
+
+  /** @since 1.15 */
+  override def compilePlanSql(s: String): CompiledPlan = tableEnv.compilePlanSql(s)
+
+  /** @since 1.17 */
+  override def createFunction(
+      path: String,
+      className: String,
+      resourceUris: JList[ResourceUri]): Unit =
+    tableEnv.createFunction(path, className, resourceUris)
+
+  /** @since 1.17 */
+  override def createFunction(
+      path: String,
+      className: String,
+      resourceUris: JList[ResourceUri],
+      ignoreIfExists: Boolean): Unit =
+    tableEnv.createFunction(path, className, resourceUris, ignoreIfExists)
+
+  /** @since 1.17 */
+  override def createTemporaryFunction(
+      path: String,
+      className: String,
+      resourceUris: JList[ResourceUri]): Unit =
+    tableEnv.createTemporaryFunction(path, className, resourceUris)
+
+  /** @since 1.17 */
+  override def createTemporarySystemFunction(
+      name: String,
+      className: String,
+      resourceUris: JList[ResourceUri]): Unit =
+    tableEnv.createTemporarySystemFunction(name, className, resourceUris)
+
+  /** @since 1.17 */
+  override def explainSql(
+      statement: String,
+      format: ExplainFormat,
+      extraDetails: ExplainDetail*): String =
+    tableEnv.explainSql(statement, format, extraDetails: _*)
+
+  /** @since 1.18 */
+  override def createCatalog(catalog: String, catalogDescriptor: CatalogDescriptor): Unit = {
+    tableEnv.createCatalog(catalog, catalogDescriptor)
+  }
+}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
new file mode 100644
index 000000000..e8f704f39
--- /dev/null
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat, PlanReference, Table, TableDescriptor, TableEnvironment, TableResult}
+import org.apache.flink.table.catalog.CatalogDescriptor
+import org.apache.flink.table.module.ModuleEntry
+import org.apache.flink.table.resource.ResourceUri
+
+import java.util.{List => JList}
+
+class TableContext(override val parameter: ParameterTool, private val tableEnv: TableEnvironment)
+  extends FlinkTableTrait(parameter, tableEnv) {
+
+  def this(args: (ParameterTool, TableEnvironment)) = this(args._1, args._2)
+
+  def this(args: TableEnvConfig) = this(FlinkTableInitializer.initialize(args))
+
+  override def useModules(strings: String*): Unit = tableEnv.useModules(strings: _*)
+
+  override def createTemporaryTable(path: String, descriptor: TableDescriptor): Unit = {
+    tableEnv.createTemporaryTable(path, descriptor)
+  }
+
+  override def createTable(path: String, descriptor: TableDescriptor): Unit = {
+    tableEnv.createTable(path, descriptor)
+  }
+
+  override def from(tableDescriptor: TableDescriptor): Table = {
+    tableEnv.from(tableDescriptor)
+  }
+
+  override def listFullModules(): Array[ModuleEntry] = tableEnv.listFullModules()
+
+  /** @since 1.15 */
+  override def listTables(catalogName: String, databaseName: String): Array[String] =
+    tableEnv.listTables(catalogName, databaseName)
+
+  /** @since 1.15 */
+  override def loadPlan(planReference: PlanReference): CompiledPlan =
+    tableEnv.loadPlan(planReference)
+
+  /** @since 1.15 */
+  override def compilePlanSql(stmt: String): CompiledPlan = tableEnv.compilePlanSql(stmt)
+
+  /** @since 1.17 */
+  override def createFunction(
+      path: String,
+      className: String,
+      resourceUris: JList[ResourceUri]): Unit =
+    tableEnv.createFunction(path, className, resourceUris)
+
+  /** @since 1.17 */
+  override def createFunction(
+      path: String,
+      className: String,
+      resourceUris: JList[ResourceUri],
+      ignoreIfExists: Boolean): Unit =
+    tableEnv.createFunction(path, className, resourceUris, ignoreIfExists)
+
+  /** @since 1.17 */
+  override def createTemporaryFunction(
+      path: String,
+      className: String,
+      resourceUris: JList[ResourceUri]): Unit =
+    tableEnv.createTemporaryFunction(path, className, resourceUris)
+
+  /** @since 1.17 */
+  override def createTemporarySystemFunction(
+      name: String,
+      className: String,
+      resourceUris: JList[ResourceUri]): Unit =
+    tableEnv.createTemporarySystemFunction(name, className, resourceUris)
+
+  /** @since 1.17 */
+  override def explainSql(
+      statement: String,
+      format: ExplainFormat,
+      extraDetails: ExplainDetail*): String =
+    tableEnv.explainSql(statement, format, extraDetails: _*)
+
+  /** @since 1.18 */
+  override def createCatalog(catalog: String, catalogDescriptor: CatalogDescriptor): Unit = {
+    tableEnv.createCatalog(catalog, catalogDescriptor)
+  }
+
+}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableExt.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
new file mode 100644
index 000000000..cab368e36
--- /dev/null
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api.{Table => FlinkTable}
+import org.apache.flink.table.api.bridge.scala.{TableConversions => FlinkTableConversions}
+import org.apache.flink.types.Row
+
+object TableExt {
+
+  class Table(val table: FlinkTable) {
+    def ->(field: String, fields: String*): FlinkTable = table.as(field, fields: _*)
+  }
+
+  class TableConversions(table: FlinkTable) extends FlinkTableConversions(table) {
+
+    def \\ : DataStream[Row] = toDataStream
+
+    def >>[T: TypeInformation](implicit context: StreamTableContext): DataStream[T] = {
+      context.isConvertedToDataStream = true
+      super.toAppendStream
+    }
+  }
+
+}


(incubator-streampark) 02/02: [Improve] 2.1.4 upgrade sql bug fixed.

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch flink-119
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 1bfdafa3d38271e103d2a95f1b4c41d634b16779
Author: benjobs <be...@gmail.com>
AuthorDate: Fri Apr 19 15:43:36 2024 +0800

    [Improve] 2.1.4 upgrade sql bug fixed.
---
 .../src/main/assembly/script/upgrade/mysql/2.1.4.sql                    | 2 +-
 .../src/main/assembly/script/upgrade/pgsql/2.1.4.sql                    | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql
index 04b7f6ea4..33607f396 100644
--- a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql
+++ b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql
@@ -27,7 +27,7 @@ SET a.`flink_cluster_id` = c.`id`;
 
 UPDATE `t_flink_app`
 SET `cluster_id` = `app_id`
-WHERE `execution_mode` IN (2,3,5);
+WHERE `execution_mode` IN (2,3,4);
 
 ALTER TABLE `t_flink_app` DROP COLUMN `app_id`;
 
diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.4.sql b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.4.sql
index 6b216e6ec..6e6668214 100644
--- a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.4.sql
+++ b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.4.sql
@@ -23,6 +23,6 @@ WHERE t_flink_app.cluster_id = t_flink_cluster.cluster_id
 
 UPDATE t_flink_app
 SET cluster_id = app_id
-WHERE execution_mode IN (2,3,5);
+WHERE execution_mode IN (2,3,4);
 
 ALTER TABLE t_flink_app DROP COLUMN app_id;