You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/09/02 09:29:14 UTC

[GitHub] [iceberg] openinx commented on a change in pull request #2863: Flink: Transform INSERT as one DELETE following one INSERT

openinx commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r700913685



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -321,7 +339,27 @@ private String operatorName(String suffix) {
           equalityFieldIds.add(field.fieldId());
         }
       }
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
+
+      // Fallback to use upsert mode parsed from table properties if don't specify in job level.
+      boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
+          UPSERT_MODE_ENABLED, UPSERT_MODE_ENABLED_DEFAULT);
+
+      // Validate the equality fields and partition fields if we enable the upsert mode.
+      if (upsertMode) {
+        Preconditions.checkState(!overwrite,
+            "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
+        Preconditions.checkState(!equalityFieldIds.isEmpty(),
+            "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
+        if (!table.spec().isUnpartitioned()) {

Review comment:
       If we have a table with `user_id` and `hour`,  the business primary key is `user_id`, which mean the table should have at most one row for each given `user_id`.  Now let's take about the partition strategy. 
   
   If we just partition the table by `hour` field,  that means two different hour partitions may have the same `user_id`, because people may insert the `user_id` in `hour=01` and `hour=02`.   If we wanna to keep the primary key semantics, then we will need to delete the old user_id in the `hour=01` first, then insert the new `user_id` in the `hour=02`.   But when an INSERT come,  we don't know which partition has the specific user_id, then we have to broadcast the DELETE to all the partitions, which is quite inefficient.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org