You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/08 14:38:53 UTC

[GitHub] [hudi] stayrascal commented on a change in pull request #4724: [HUDI-2815] add partial overwrite payload to support partial overwrit…

stayrascal commented on a change in pull request #4724:
URL: https://github.com/apache/hudi/pull/4724#discussion_r801702428



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
##########
@@ -105,7 +105,7 @@ public static FlinkWriteHelper newInstance() {
       // we cannot allow the user to change the key or partitionPath, since that will affect
       // everything
       // so pick it from one of the records.
-      boolean choosePrev = data1.equals(reducedData);
+      boolean choosePrev = data2.compareTo(data1) < 0;
       HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey();
       HoodieOperation operation = choosePrev ? rec1.getOperation() : rec2.getOperation();

Review comment:
       The previous logic of `data2.preCombine(data1)` is that return one of data1 or data2 ordering by their `orderVal`. But if we merge/combine data1 and data2 into a new payload(reduceData), the `data1.equals(reduceData)` is always false. In order to get the `HoodieKey` and `HoodieOperation` for new  HoodieRecord with `reduceData`, we need to get the latest `HoodieKey` and `HoodieOperation` from `data1` and `data2`, `compareTo` is used for replace `@preCombine` to compare their `orderingVal`.
   
   ```
    @Override
     public int compareTo(OverwriteWithLatestAvroPayload oldValue) {
       return orderingVal.compareTo(oldValue.orderingVal);
     }
   ```
   
   ```
   @Test
     public void testCompareFunction() {
       GenericRecord record = new GenericData.Record(schema);
       record.put("id", "1");
       record.put("partition", "partition1");
       record.put("ts", 0L);
       record.put("_hoodie_is_deleted", false);
       record.put("city", "NY0");
       record.put("child", Arrays.asList("A"));
   
       PartialOverwriteWithLatestAvroPayload payload1 = new PartialOverwriteWithLatestAvroPayload(record, 1);
       PartialOverwriteWithLatestAvroPayload payload2 = new PartialOverwriteWithLatestAvroPayload(record, 2);
   
       assertEquals(payload1.compareTo(payload2), -1);
       assertEquals(payload2.compareTo(payload1), 1);
       assertEquals(payload1.compareTo(payload1), 0);
     }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org