You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/05/30 01:07:12 UTC

[incubator-seatunnel] branch revert-1895-dev created (now 685cc918)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a change to branch revert-1895-dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


      at 685cc918 Revert "The toreactstream is converted to the changlogstream method. Support update, delete message. (#1895)"

This branch includes the following new commits:

     new 685cc918 Revert "The toreactstream is converted to the changlogstream method. Support update, delete message. (#1895)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-seatunnel] 01/01: Revert "The toreactstream is converted to the changlogstream method. Support update, delete message. (#1895)"

Posted by ki...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch revert-1895-dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git

commit 685cc918a43f7dd67651beba8d4d5c969c5b5edb
Author: Kirs <AC...@yeah.net>
AuthorDate: Mon May 30 09:07:07 2022 +0800

    Revert "The toreactstream is converted to the changlogstream method. Support update, delete message. (#1895)"
    
    This reverts commit 8882f041ca52bcbc57fd5f383797807d5fc728e8.
---
 .../org/apache/seatunnel/flink/stream/FlinkStreamExecution.java     | 2 +-
 .../src/main/java/org/apache/seatunnel/flink/util/TableUtil.java    | 6 +++++-
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
index 3514bae4..43fa7daa 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
@@ -99,7 +99,7 @@ public class FlinkStreamExecution implements Execution<FlinkStreamSource, FlinkS
         if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
             StreamTableEnvironment tableEnvironment = flinkEnvironment.getStreamTableEnvironment();
             Table table = tableEnvironment.scan(pluginConfig.getString(SOURCE_TABLE_NAME));
-            return Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, false));
+            return Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, true));
         }
         return Optional.empty();
     }
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java
index c1c856e5..e1877eff 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java
@@ -39,7 +39,11 @@ public final class TableUtil {
         if (isAppend) {
             return tableEnvironment.toAppendStream(table, typeInfo);
         }
-        return tableEnvironment.toChangelogStream(table);
+        return tableEnvironment
+                .toRetractStream(table, typeInfo)
+                .filter(row -> row.f0)
+                .map(row -> row.f1)
+                .returns(typeInfo);
     }
 
     public static DataSet<Row> tableToDataSet(BatchTableEnvironment tableEnvironment, Table table) {