You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/12/23 03:15:05 UTC

[GitHub] [iceberg] openinx commented on a change in pull request #1974: Flink: Add ChangeLog DataStream end-to-end unit tests.

openinx commented on a change in pull request #1974:
URL: https://github.com/apache/iceberg/pull/1974#discussion_r547616494



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -169,6 +172,17 @@ public Builder writeParallelism(int newWriteParallelism) {
       return this;
     }
 
+    /**
+     * Configuring the equality field columns for iceberg table that accept CDC or UPSERT events.
+     *
+     * @param columns defines the iceberg table's key.
+     * @return {@link Builder} to connect the iceberg table.
+     */
+    public Builder equalityFieldColumns(List<String> columns) {

Review comment:
       In the next PR https://github.com/openinx/incubator-iceberg/commit/a863c66eb3d72dd975ea64c75ed2ac35984c17fe,  The flink table SQL's primary key  will act as the equality field columns.   The semantic of iceberg equality columns is almost the same as primary key,  one difference I can think of is:   the uniqueness of key are not enforced. In this [discussion](https://github.com/apache/iceberg/pull/1663#discussion_r528278694),  we don't guarantee the uniqueness when writing a key which has been also wrote in the previous committed txn, that means if : 
   
   ```java
   Txn-1:  INSERT key1,  txn commit; 
   Txn-2:  INSERT key1,  txn commit;
   ```
   
   Then the table will have two records with the same key. 
   
   If people really need iceberg to maintain the key's uniqueness, then they will need to transform all the `INSERT` to `UPSERT`, which means `DELETE` firstly and then `INSERT` the new values.   
   
   It will introduce another issues:  Each `INSERT` will be regarded as an `UPSERT`,  so it write a `DELETE` and a `INSERT`.  Finally the size of delete files will be almost same as the size of data files.    The process of merging on read will be quite inefficient   because there are too many useless `DELETE` to JOIN.  
   
   The direct way is using bloom filter to reduce the useless `DELETE`, say we will generate bloom filter binary for each committed data file.  When bootstrap the flink/spark job we will need to prefetch all the bloom filter binary from parquet/avro data files's metadata. Before writing a equality delete, we will check the bloom filter, and if the bloom filter indicate that all the committed data files are not containing the given key, then we could skip to append that equality-delete. That would reduce lots of useless `DELETE` in delete files. Of course, the bloom filter will have 'false positive' issue, but that probability is less than 1%, that means we may append
   small amout of deletes whose keys don't exist in the current table.  In my view, that should be OK.
   
   In summary, I think it's reasonable to regard those equality fields as primary key in iceberg table, people could choose to use `UNIQUENESS ENFORCED` or `UNIQUENESS NOT-ENFORCED`, in this way they could trade off between strong semantic and performance.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org