You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/05/24 09:27:18 UTC

[incubator-seatunnel] branch dev updated: The toreactstream is converted to the changlogstream method. Support update, delete message. (#1895)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8882f041 The toreactstream is converted to the changlogstream method. Support update, delete message. (#1895)
8882f041 is described below

commit 8882f041ca52bcbc57fd5f383797807d5fc728e8
Author: chao chen <59...@users.noreply.github.com>
AuthorDate: Tue May 24 17:27:14 2022 +0800

    The toreactstream is converted to the changlogstream method. Support update, delete message. (#1895)
---
 .../org/apache/seatunnel/flink/stream/FlinkStreamExecution.java     | 2 +-
 .../src/main/java/org/apache/seatunnel/flink/util/TableUtil.java    | 6 +-----
 2 files changed, 2 insertions(+), 6 deletions(-)

diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
index 43fa7daa..3514bae4 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
@@ -99,7 +99,7 @@ public class FlinkStreamExecution implements Execution<FlinkStreamSource, FlinkS
         if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
             StreamTableEnvironment tableEnvironment = flinkEnvironment.getStreamTableEnvironment();
             Table table = tableEnvironment.scan(pluginConfig.getString(SOURCE_TABLE_NAME));
-            return Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, true));
+            return Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, false));
         }
         return Optional.empty();
     }
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java
index e1877eff..c1c856e5 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java
@@ -39,11 +39,7 @@ public final class TableUtil {
         if (isAppend) {
             return tableEnvironment.toAppendStream(table, typeInfo);
         }
-        return tableEnvironment
-                .toRetractStream(table, typeInfo)
-                .filter(row -> row.f0)
-                .map(row -> row.f1)
-                .returns(typeInfo);
+        return tableEnvironment.toChangelogStream(table);
     }
 
     public static DataSet<Row> tableToDataSet(BatchTableEnvironment tableEnvironment, Table table) {