You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/07/25 16:01:33 UTC

[GitHub] [iceberg] Reo-LEI opened a new pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Reo-LEI opened a new pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863


   fork from #1996 
   
   author: @openinx 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #2863: Flink: Add streaming upsert write option.

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#issuecomment-917792092


   @Reo-LEI , The PR looks good to me now,  thanks for the patient contribution, thanks all for reviewing (@kbendick & @stevenzwu ) .  I will get this merged once the travis CI says okay !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#issuecomment-886224031


   I pick #1996 up on this PR, could you take a look? @rdblue @openinx 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#issuecomment-914791848


   Retry to run CI ...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#issuecomment-890277167


   ping @openinx @rdblue 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#issuecomment-913093349


   I have fixed the unittest, could you retry the ci? :) @openinx 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2863: Flink: Add streaming upsert write option.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r704865520



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -222,4 +222,7 @@ private TableProperties() {
 
   public static final String MERGE_CARDINALITY_CHECK_ENABLED = "write.merge.cardinality-check.enabled";
   public static final boolean MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT = true;
+
+  public static final String UPSERT_MODE_ENABLE = "write.upsert.enable";
+  public static final boolean UPSERT_MODE_ENABLE_DEFAULT = false;

Review comment:
       > Is this only used in streaming mode now? Or does this work with Flink batch sink as well?
   
   Yes, it's only used for streaming mode right now. The batch upsert semantic has been implemented correctly by the [MERGE INTO](https://iceberg.apache.org/spark-writes/#merge-into) clause.
   
   > could we possibly use it to add our own support for CDC on top of Spark Structured Streaming?
   
   In theory, it's possible to add the CDC support for spark sturctured streaming, though the spark structured streaming does not support CDC event natively (I mean flink support INSERT/DELETE/UPDATE_BEFORE/UPDATE_AFTER events natively while Spark streaming doesn't unless we add extra field to indicate what's the operation type it is).  I think @XuQianJin-Stars @chenjunjiedada 's team are working on this issue in their own repo.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx closed pull request #2863: Flink: Add streaming upsert write option.

Posted by GitBox <gi...@apache.org>.
openinx closed pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#issuecomment-888450604


   > @Reo-LEI any update?
   
   Yep, I will keep push this PR. But I only have time on night, so the progress will be slow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r677426263



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
##########
@@ -69,6 +72,9 @@ public void write(RowData row) throws IOException {
 
     switch (row.getRowKind()) {
       case INSERT:
+        if (upsert) {
+          writer.delete(row);
+        }

Review comment:
       For an update operation in flink,  the `UPDATE_AFTER` event will must be emitted to the downstream, while the `UPDATE_BEFORE` is a best effort behavior or an configured behavior from the upstream flink source.  You can take a look at this [GroupAggFunction](https://github.com/apache/flink/blob/ab70dcfa19827febd2c3cdc5cb81e942caa5b2f0/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L186),  if the flink source is configured to produce `UPDATE_AFTER` only, then it won't emit any `UPDATE_BEFORE` to the downstream.  
   
   For the downstream iceberg sink,  we need to handle all the `UPDATE_AFTER`  as `UPSERT`.  That also means we need to do nothing for the `UPDATE_BEFORE` because we will remove the previous key in the next `UPDATE_AFTER` events.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r695007331



##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -63,6 +72,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
         .tableLoader(tableLoader)
         .tableSchema(tableSchema)
         .equalityFieldColumns(equalityColumns)
+        .upsert(PropertyUtil.propertyAsBoolean(tableProperties, UPSERT_MODE_ENABLED, UPSERT_MODE_ENABLED_DEFAULT))

Review comment:
       do we need to pass in the table properties for the upsert flag? `FlinkSink.Builder.build()` method internally open the table from TableLoader (if not passed in). Can we extract the prop inside the `build()` method.
   
   Also we probably can add a validation that `upsert` and `overwrite` aren't enabled at the same time?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r677393295



##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -63,6 +63,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
         .tableLoader(tableLoader)
         .tableSchema(tableSchema)
         .equalityFieldColumns(equalityColumns)
+        .upsert(equalityColumns.size() > 0)

Review comment:
       The upsert mode will produce an extra DELETE for each INSERT operation,  that means the read scanning job will need to join more extra data which will effect the read performance.  If it's the CDC cases, all the events are produced as an UPDATE_BEFORE and an UPDATE_AFTER and the stream can gurantee the uniqueness when applying each events,  then we don't have to enable the upsert switch. 
   
   I will suggest to introduce a table property named `write.upsert.enable`  for flink table, so that people could choose whether they need the real UPSERT.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r677376359



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -41,6 +41,7 @@
 import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionField;

Review comment:
       Nit:  This should be a unordered package import ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] coolderli commented on pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
coolderli commented on pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#issuecomment-888326244


   @Reo-LEI  any update?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r700913685



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -321,7 +339,27 @@ private String operatorName(String suffix) {
           equalityFieldIds.add(field.fieldId());
         }
       }
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
+
+      // Fallback to use upsert mode parsed from table properties if don't specify in job level.
+      boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
+          UPSERT_MODE_ENABLED, UPSERT_MODE_ENABLED_DEFAULT);
+
+      // Validate the equality fields and partition fields if we enable the upsert mode.
+      if (upsertMode) {
+        Preconditions.checkState(!overwrite,
+            "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
+        Preconditions.checkState(!equalityFieldIds.isEmpty(),
+            "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
+        if (!table.spec().isUnpartitioned()) {

Review comment:
       If we have a table with `user_id` and `hour`,  the business primary key is `user_id`, which mean the table should have at most one row for each given `user_id`.  Now let's take about the partition strategy. 
   
   If we just partition the table by `hour` field,  that means two different hour partitions may have the same `user_id`, because people may insert the `user_id` in `hour=01` and `hour=02`.   If we wanna to keep the primary key semantics, then we will need to delete the old user_id in the `hour=01` first, then insert the new `user_id` in the `hour=02`.   But when an INSERT come,  we don't know which partition has the specific user_id, then we have to broadcast the DELETE to all the partitions, which is quite inefficient.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on pull request #2863: Flink: Add streaming upsert write option.

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#issuecomment-917646396


   @openinx @stevenzwu @kbendick do you have another other concerns for this PR? I think this feature is very important for flink user, we should merge this PR as soon as possible. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r695012740



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
##########
@@ -69,6 +72,9 @@ public void write(RowData row) throws IOException {
 
     switch (row.getRowKind()) {
       case INSERT:
+        if (upsert) {
+          writer.delete(row);
+        }

Review comment:
       By name, I thought INSERT is "add a new row". Then we don't need to add a delete for it. But I guess it actually means "append a row (new or updated)".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx closed pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
openinx closed pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2863: Flink: Add streaming upsert write option.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r704872255



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -321,7 +339,27 @@ private String operatorName(String suffix) {
           equalityFieldIds.add(field.fieldId());
         }
       }
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
+
+      // Fallback to use upsert mode parsed from table properties if don't specify in job level.
+      boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
+          UPSERT_MODE_ENABLE, UPSERT_MODE_ENABLE_DEFAULT);
+
+      // Validate the equality fields and partition fields if we enable the upsert mode.
+      if (upsertMode) {
+        Preconditions.checkState(!overwrite,
+            "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");

Review comment:
       Agreed !




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r695916643



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -222,4 +222,7 @@ private TableProperties() {
 
   public static final String MERGE_CARDINALITY_CHECK_ENABLED = "write.merge.cardinality-check.enabled";
   public static final boolean MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT = true;
+
+  public static final String UPSERT_MODE_ENABLED = "write.upsert.enable";

Review comment:
       nit: looks like the convention is `enabled`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a change in pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r695846000



##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -63,6 +72,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
         .tableLoader(tableLoader)
         .tableSchema(tableSchema)
         .equalityFieldColumns(equalityColumns)
+        .upsert(PropertyUtil.propertyAsBoolean(tableProperties, UPSERT_MODE_ENABLED, UPSERT_MODE_ENABLED_DEFAULT))

Review comment:
       Sounds good to me! I will extract this prop in `build()` and add a validation for `overwrite` when `upsert` is enable. :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a change in pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r678369778



##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -63,6 +63,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
         .tableLoader(tableLoader)
         .tableSchema(tableSchema)
         .equalityFieldColumns(equalityColumns)
+        .upsert(equalityColumns.size() > 0)

Review comment:
       I ignore the CDC cases, make the upsert mode as configurable is makes sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a change in pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r676167969



##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -63,6 +63,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
         .tableLoader(tableLoader)
         .tableSchema(tableSchema)
         .equalityFieldColumns(equalityColumns)
+        .upsert(equalityColumns.size() > 0)

Review comment:
       I think we could enable `upsert` mode on flink sql when the primary key has been set

##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
##########
@@ -69,6 +72,9 @@ public void write(RowData row) throws IOException {
 
     switch (row.getRowKind()) {
       case INSERT:
+        if (upsert) {
+          writer.delete(row);
+        }

Review comment:
       I think we could only delete row on INSERT. I don't think there will be only have `UPDATE_AFTER` row and lost `UPDATE_BEFORE` situation. @openinx please check this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a change in pull request #2863: Flink: Add streaming upsert write option.

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r704949371



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -321,7 +339,27 @@ private String operatorName(String suffix) {
           equalityFieldIds.add(field.fieldId());
         }
       }
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
+
+      // Fallback to use upsert mode parsed from table properties if don't specify in job level.
+      boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
+          UPSERT_MODE_ENABLE, UPSERT_MODE_ENABLE_DEFAULT);
+
+      // Validate the equality fields and partition fields if we enable the upsert mode.
+      if (upsertMode) {
+        Preconditions.checkState(!overwrite,
+            "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");

Review comment:
       I have added a unittest to covery this, thanks for your reminder!:) @kbendick 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a change in pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r697884300



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -321,7 +339,27 @@ private String operatorName(String suffix) {
           equalityFieldIds.add(field.fieldId());
         }
       }
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
+
+      // Fallback to use upsert mode parsed from table properties if don't specify in job level.
+      boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
+          UPSERT_MODE_ENABLED, UPSERT_MODE_ENABLED_DEFAULT);
+
+      // Validate the equality fields and partition fields if we enable the upsert mode.
+      if (upsertMode) {
+        Preconditions.checkState(!overwrite,
+            "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
+        Preconditions.checkState(!equalityFieldIds.isEmpty(),
+            "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
+        if (!table.spec().isUnpartitioned()) {

Review comment:
       https://github.com/apache/iceberg/blob/4d33f18f5fcd4c20aea6d4118bd03e0d181271d0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java#L222
   As @openinx comment as above, we shoule restrict the partition fields is a subset of equality fields to ensure we can delete the old data in same partition. 
   
   > e.g., we can have an equality field (like user_id) and table can be partitioned by hour. would that be a valid scenario?
   
   I think that is not a valid scenario, to keep `user_id` unique in all different `hour` parition is make no sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r695920761



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -321,7 +339,27 @@ private String operatorName(String suffix) {
           equalityFieldIds.add(field.fieldId());
         }
       }
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
+
+      // Fallback to use upsert mode parsed from table properties if don't specify in job level.
+      boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
+          UPSERT_MODE_ENABLED, UPSERT_MODE_ENABLED_DEFAULT);
+
+      // Validate the equality fields and partition fields if we enable the upsert mode.
+      if (upsertMode) {
+        Preconditions.checkState(!overwrite,
+            "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
+        Preconditions.checkState(!equalityFieldIds.isEmpty(),
+            "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
+        if (!table.spec().isUnpartitioned()) {

Review comment:
       for my own learning, does partition field must be included in equality fields?
   
   e.g., we can have an equality field (like `user_id`) and table can be partitioned by hour. would that be a valid scenario?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx merged pull request #2863: Flink: Add streaming upsert write option.

Posted by GitBox <gi...@apache.org>.
openinx merged pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #2863: Flink: Add streaming upsert write option.

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#issuecomment-914860458


   LGTM if the travis CI says OK !  Thanks for @Reo-LEI for picking this up !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2863: Flink: Add streaming upsert write option.

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r704770231



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -321,7 +339,27 @@ private String operatorName(String suffix) {
           equalityFieldIds.add(field.fieldId());
         }
       }
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
+
+      // Fallback to use upsert mode parsed from table properties if don't specify in job level.
+      boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
+          UPSERT_MODE_ENABLE, UPSERT_MODE_ENABLE_DEFAULT);
+
+      // Validate the equality fields and partition fields if we enable the upsert mode.
+      if (upsertMode) {
+        Preconditions.checkState(!overwrite,
+            "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");

Review comment:
       Can you add a test that verifies the builder doesn't allow `overwrite` and `upsert` / upsertMode?
   
   Maybe I missed it, but seems like an important thing to have a test for in case of future code refractors.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a change in pull request #2863: Flink: Add streaming upsert write option.

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r704949371



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -321,7 +339,27 @@ private String operatorName(String suffix) {
           equalityFieldIds.add(field.fieldId());
         }
       }
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
+
+      // Fallback to use upsert mode parsed from table properties if don't specify in job level.
+      boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
+          UPSERT_MODE_ENABLE, UPSERT_MODE_ENABLE_DEFAULT);
+
+      // Validate the equality fields and partition fields if we enable the upsert mode.
+      if (upsertMode) {
+        Preconditions.checkState(!overwrite,
+            "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");

Review comment:
       I have added a unittest to cover this, thanks for your reminder!:) @kbendick 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#issuecomment-903826338


   This PR have been open for some time, I think this feature is very importent for user, and just waiting someone to review and merge that. Has anybody to take a look of this? @rdblue @openinx @aokolnychyi @stevenzwu @kbendick 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2863: Flink: Add streaming upsert write option.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r704872129



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
##########
@@ -70,11 +73,19 @@ public void write(RowData row) throws IOException {
     switch (row.getRowKind()) {
       case INSERT:
       case UPDATE_AFTER:
+        if (upsert) {
+          writer.delete(row);
+        }
         writer.write(row);
         break;
 
-      case DELETE:
       case UPDATE_BEFORE:
+        if (upsert) {
+          break;  // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
+        }

Review comment:
       It's a good question, @kbendick  ! Let's describe the out-of-order in two dimension: 
   
   1.  Is possible to produce disordered events in a single iceberg transaction ?  First of all,  if we want to maintain the correct data semantics between the source table and iceberg sink table, the records consumed from source table must be the correct order.  Second, the streaming job will need to shuffle based on the equality fields so that the records with same key are dispatched to the specialized parallelism task, otherwise the out-of-order issue happen if different tasks write the records with same equality fields to the iceberg table.  In this way, the order in a single transaction is guaranteed.
   
   2. The out-of-order issue between two continues transaction. In our flink stream integration,  we have guaranteed the [exact commit order](https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L281) even if a failover happen.   For the spark streaming,  I think we will need more consideration to this issue. 
   
   Hopefully,  I've answered your question, @kbendick  :-)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2863: Flink: Add streaming upsert write option.

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r704767022



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
##########
@@ -70,11 +73,19 @@ public void write(RowData row) throws IOException {
     switch (row.getRowKind()) {
       case INSERT:
       case UPDATE_AFTER:
+        if (upsert) {
+          writer.delete(row);
+        }
         writer.write(row);
         break;
 
-      case DELETE:
       case UPDATE_BEFORE:
+        if (upsert) {
+          break;  // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
+        }

Review comment:
       Non-blocking question: 
   
   Are there possible concerns with events coming out of order for some reason? I guess since the table commits are serializable, this isn't a concern as the same row for these equality fields shouldn't be updated twice in the same commit?

##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -222,4 +222,7 @@ private TableProperties() {
 
   public static final String MERGE_CARDINALITY_CHECK_ENABLED = "write.merge.cardinality-check.enabled";
   public static final boolean MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT = true;
+
+  public static final String UPSERT_MODE_ENABLE = "write.upsert.enable";
+  public static final boolean UPSERT_MODE_ENABLE_DEFAULT = false;

Review comment:
       Two questions, one that's somewhat unrelated:
   
   1. Is this only used in streaming mode now? Or does this work with Flink batch sink as well?
   2. (Somewhat unrelated / thinking out loud) If we have this new `write.upsert.enabled` flag, could we possibly use it to add our own support for CDC on top of Spark Structured Streaming? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a change in pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r678365099



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
##########
@@ -69,6 +72,9 @@ public void write(RowData row) throws IOException {
 
     switch (row.getRowKind()) {
       case INSERT:
+        if (upsert) {
+          writer.delete(row);
+        }

Review comment:
       I think we can only delete row on `UPDATE_AFTER` and keep `UPDATE_BEFORE` do nothing to prevent delete one row twice




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a change in pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r695856538



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
##########
@@ -69,6 +72,9 @@ public void write(RowData row) throws IOException {
 
     switch (row.getRowKind()) {
       case INSERT:
+        if (upsert) {
+          writer.delete(row);
+        }

Review comment:
       As @openinx mentioned in https://github.com/apache/iceberg/pull/1996#issue-546072705, we need to transform INSERT/UPDATE_AFTER to be UPSERT(delete + insert). If we don't add a delete on INSERT row when upsert mode is enable, we will get duplicate rows for same primary key.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #2863: Flink: Add streaming upsert write option.

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#issuecomment-917982669


   Reopened PR to retry the CI.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r677382619



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -263,8 +279,20 @@ public Builder uidPrefix(String newPrefix) {
       // Distribute the records from input data stream based on the write.distribution-mode.
       rowDataInput = distributeDataStream(rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
 
+      // Convert the INSERT stream to be an UPSERT stream if needed.

Review comment:
       I think this comment should be wrote as:
   
   ```
   // Validate the equality fields and partition fields if we enable the upsert stream.
   ```
   
   Because we did not do any data stream conversion as the stale comment said.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org