You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/09/24 17:39:16 UTC

[GitHub] [iceberg] Reo-LEI commented on a change in pull request #2898: Flink: Fix duplicate rows when sync CDC data by FlinkSQL

Reo-LEI commented on a change in pull request #2898:
URL: https://github.com/apache/iceberg/pull/2898#discussion_r715795062



##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -59,12 +60,22 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
         .map(UniqueConstraint::getColumns)
         .orElseGet(ImmutableList::of);
 
-    return (DataStreamSinkProvider) dataStream -> FlinkSink.forRowData(dataStream)
-        .tableLoader(tableLoader)
-        .tableSchema(tableSchema)
-        .equalityFieldColumns(equalityColumns)
-        .overwrite(overwrite)
-        .build();
+    return (DataStreamSinkProvider) dataStream ->  {
+      // For CDC case in FlinkSQL, change log will be rebalanced(default partition strategy) distributed to Filter opr
+      // when set job default parallelism greater than 1. That will make change log data disorder and produce a wrong
+      // result for iceberg(e.g. +U comes before -U). Here try to specific the Filter opr parallelism same as it's
+      // input to keep Filter chaining it's input and avoid rebalance.
+      Transformation<?> forwardOpr = dataStream.getTransformation();
+      if (forwardOpr.getName().equals("Filter") && forwardOpr.getInputs().size() == 1) {

Review comment:
       Resolve for https://github.com/apache/iceberg/issues/2918#issuecomment-926804817




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org