You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2023/04/23 02:21:59 UTC

[hudi] branch master updated: [HUDI-4920] fix PartialUpdatePayload cannot return deleted record in … (#6799)

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 71752e285dd [HUDI-4920] fix PartialUpdatePayload cannot return deleted record in … (#6799)
71752e285dd is described below

commit 71752e285ddfb3759a29aa6666ebc5d588930e2d
Author: 冯健 <fe...@gmail.com>
AuthorDate: Sun Apr 23 10:21:52 2023 +0800

    [HUDI-4920] fix PartialUpdatePayload cannot return deleted record in … (#6799)
    
    Co-authored-by: fengjian <fe...@dipeak.com>
---
 .../common/model/PartialUpdateAvroPayload.java     | 68 +++++++++++++++-------
 .../common/model/TestPartialUpdateAvroPayload.java | 36 +++++++++---
 2 files changed, 74 insertions(+), 30 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
index 08c020f63ca..7c09a38d56f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
@@ -18,17 +18,17 @@
 
 package org.apache.hudi.common.model;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
+
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
-import org.apache.avro.generic.IndexedRecord;
-
 import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
@@ -85,7 +85,7 @@ import java.util.Properties;
  *  Result data after preCombine or combineAndGetUpdateValue:
  *      id      ts      name    price
  *      1       2       name_1  price_1
- *</pre>
+ * </pre>
  */
 public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvroPayload {
 
@@ -100,14 +100,14 @@ public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvro
   @Override
   public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue, Schema schema, Properties properties) {
     if (oldValue.recordBytes.length == 0) {
-      // use natural order for delete record
+      // use natural order for deleted record
       return this;
     }
     // pick the payload with greater ordering value as insert record
     final boolean shouldPickOldRecord = oldValue.orderingVal.compareTo(orderingVal) > 0 ? true : false;
     try {
       GenericRecord oldRecord = HoodieAvroUtils.bytesToAvro(oldValue.recordBytes, schema);
-      Option<IndexedRecord> mergedRecord = mergeOldRecord(oldRecord, schema, shouldPickOldRecord);
+      Option<IndexedRecord> mergedRecord = mergeOldRecord(oldRecord, schema, shouldPickOldRecord, true);
       if (mergedRecord.isPresent()) {
         return new PartialUpdateAvroPayload((GenericRecord) mergedRecord.get(),
             shouldPickOldRecord ? oldValue.orderingVal : this.orderingVal);
@@ -120,12 +120,12 @@ public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvro
 
   @Override
   public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
-    return this.mergeOldRecord(currentValue, schema, false);
+    return this.mergeOldRecord(currentValue, schema, false, false);
   }
 
   @Override
   public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties prop) throws IOException {
-    return mergeOldRecord(currentValue, schema, isRecordNewer(orderingVal, currentValue, prop));
+    return mergeOldRecord(currentValue, schema, isRecordNewer(orderingVal, currentValue, prop), false);
   }
 
   /**
@@ -139,18 +139,31 @@ public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvro
   //  Utilities
   // -------------------------------------------------------------------------
 
+  /**
+   * Merge old record with new record.
+   *
+   * @param oldRecord
+   * @param schema
+   * @param isOldRecordNewer
+   * @param isPreCombining   flag for deleted record combine logic
+   *                         1 preCombine: if delete record is newer, return merged record with _hoodie_is_deleted = true
+   *                         2 combineAndGetUpdateValue:  if delete record is newer, return empty since we don't need to store deleted data to storage
+   * @return
+   * @throws IOException
+   */
   private Option<IndexedRecord> mergeOldRecord(IndexedRecord oldRecord,
-      Schema schema,
-      boolean isOldRecordNewer) throws IOException {
-    Option<IndexedRecord> recordOption = getInsertValue(schema);
-    if (!recordOption.isPresent()) {
+                                               Schema schema,
+                                               boolean isOldRecordNewer, boolean isPreCombining) throws IOException {
+    Option<IndexedRecord> recordOption = getInsertValue(schema, isPreCombining);
+
+    if (!recordOption.isPresent() && !isPreCombining) {
       // use natural order for delete record
       return Option.empty();
     }
 
     if (isOldRecordNewer && schema.getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD) != null) {
       // handling disorder, should use the metadata fields of the updating record
-      return mergeDisorderRecordsWithMetadata(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get());
+      return mergeDisorderRecordsWithMetadata(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get(), isPreCombining);
     } else if (isOldRecordNewer) {
       return mergeRecords(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get());
     } else {
@@ -158,20 +171,34 @@ public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvro
     }
   }
 
+  /**
+   * return itself as long as it called by preCombine
+   * @param schema
+   * @param isPreCombining
+   * @return
+   * @throws IOException
+   */
+  public Option<IndexedRecord> getInsertValue(Schema schema, boolean isPreCombining) throws IOException {
+    if (recordBytes.length == 0 || (!isPreCombining && isDeletedRecord)) {
+      return Option.empty();
+    }
+
+    return Option.of((IndexedRecord) HoodieAvroUtils.bytesToAvro(recordBytes, schema));
+  }
+
   /**
    * Merges the given disorder records with metadata.
    *
    * @param schema         The record schema
    * @param oldRecord      The current record from file
    * @param updatingRecord The incoming record
-   *
    * @return the merged record option
    */
   protected Option<IndexedRecord> mergeDisorderRecordsWithMetadata(
       Schema schema,
       GenericRecord oldRecord,
-      GenericRecord updatingRecord) {
-    if (isDeleteRecord(oldRecord)) {
+      GenericRecord updatingRecord, boolean isPreCombining) {
+    if (isDeleteRecord(oldRecord) && !isPreCombining) {
       return Option.empty();
     } else {
       final GenericRecordBuilder builder = new GenericRecordBuilder(schema);
@@ -197,9 +224,8 @@ public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvro
    * Returns whether the given record is newer than the record of this payload.
    *
    * @param orderingVal
-   * @param record The record
-   * @param prop   The payload properties
-   *
+   * @param record      The record
+   * @param prop        The payload properties
    * @return true if the given record is newer
    */
   private static boolean isRecordNewer(Comparable orderingVal, IndexedRecord record, Properties prop) {
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
index 2d5bb511981..6431b63899f 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
@@ -18,11 +18,13 @@
 
 package org.apache.hudi.common.model;
 
-import org.apache.hudi.common.util.Option;
-
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.Option;
+
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -34,6 +36,8 @@ import java.util.Properties;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 
 /**
  * Unit tests {@link TestPartialUpdateAvroPayload}.
@@ -147,24 +151,38 @@ public class TestPartialUpdateAvroPayload {
     GenericRecord record2 = new GenericData.Record(schema);
     record2.put("id", "1");
     record2.put("partition", "partition0");
-    record2.put("ts", 0L);
-    record2.put("_hoodie_is_deleted", true);
+    record2.put("ts", 2L);
+    record2.put("_hoodie_is_deleted", false);
     record2.put("city", "NY0");
     record2.put("child", Collections.emptyList());
 
     PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 0L);
-    PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(delRecord1, 1L);
+    PartialUpdateAvroPayload delPayload = new PartialUpdateAvroPayload(delRecord1, 1L);
+    PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 2L);
+
+    PartialUpdateAvroPayload mergedPayload = payload1.preCombine(delPayload, schema, new Properties());
+    assertTrue(HoodieAvroUtils.bytesToAvro(mergedPayload.recordBytes, schema).get("_hoodie_is_deleted").equals(true));
+    assertArrayEquals(mergedPayload.recordBytes, delPayload.recordBytes);
+
+    mergedPayload = delPayload.preCombine(payload1, schema, new Properties());
+    assertTrue(HoodieAvroUtils.bytesToAvro(mergedPayload.recordBytes, schema).get("_hoodie_is_deleted").equals(true));
+    assertArrayEquals(mergedPayload.recordBytes, delPayload.recordBytes);
+
+    mergedPayload = payload2.preCombine(delPayload, schema, new Properties());
+    assertTrue(HoodieAvroUtils.bytesToAvro(mergedPayload.recordBytes, schema).get("_hoodie_is_deleted").equals(false));
+    assertArrayEquals(mergedPayload.recordBytes, payload2.recordBytes);
 
-    assertArrayEquals(payload1.preCombine(payload2).recordBytes, payload2.recordBytes);
-    assertArrayEquals(payload2.preCombine(payload1).recordBytes, payload2.recordBytes);
+    mergedPayload = delPayload.preCombine(payload2, schema, new Properties());
+    assertTrue(HoodieAvroUtils.bytesToAvro(mergedPayload.recordBytes, schema).get("_hoodie_is_deleted").equals(false));
+    assertArrayEquals(mergedPayload.recordBytes, payload2.recordBytes);
 
     assertEquals(record1, payload1.getInsertValue(schema).get());
-    assertFalse(payload2.getInsertValue(schema).isPresent());
+    assertFalse(delPayload.getInsertValue(schema).isPresent());
 
     Properties properties = new Properties();
     properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts");
     assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema, properties), Option.empty());
-    assertFalse(payload2.combineAndGetUpdateValue(record1, schema, properties).isPresent());
+    assertFalse(delPayload.combineAndGetUpdateValue(record1, schema, properties).isPresent());
   }
 
   @Test