You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/09/16 05:56:28 UTC

[hudi] branch master updated: [HUDI-4853] Get field by name for OverwriteNonDefaultsWithLatestAvroPayload to avoid schema mismatch (#6689)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f70678f435 [HUDI-4853] Get field by name for OverwriteNonDefaultsWithLatestAvroPayload to avoid schema mismatch (#6689)
f70678f435 is described below

commit f70678f4354c6264b6a1e38900dd7a11cb345b96
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri Sep 16 13:56:23 2022 +0800

    [HUDI-4853] Get field by name for OverwriteNonDefaultsWithLatestAvroPayload to avoid schema mismatch (#6689)
---
 .../OverwriteNonDefaultsWithLatestAvroPayload.java |  2 +-
 ...tOverwriteNonDefaultsWithLatestAvroPayload.java | 59 ++++++++++++++++++++--
 2 files changed, 57 insertions(+), 4 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java
index 6ce99aae21..9ce241bc78 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java
@@ -70,7 +70,7 @@ public class OverwriteNonDefaultsWithLatestAvroPayload extends OverwriteWithLate
         if (!overwriteField(value, defaultValue)) {
           builder.set(field, value);
         } else {
-          builder.set(field, currentRecord.get(field.pos()));
+          builder.set(field, currentRecord.get(field.name()));
         }
       });
       return Option.of(builder.build());
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java
index 9e3405b304..0807b41f61 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java
@@ -23,6 +23,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -54,13 +55,15 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload {
 
   @Test
   public void testActiveRecords() throws IOException {
+    Schema writerSchema = HoodieAvroUtils.addMetadataFields(schema);
+
     GenericRecord record1 = new GenericData.Record(schema);
     record1.put("id", "1");
     record1.put("partition", "partition1");
     record1.put("ts", 0L);
     record1.put("_hoodie_is_deleted", false);
     record1.put("city", "NY0");
-    record1.put("child", Arrays.asList("A"));
+    record1.put("child", Collections.singletonList("A"));
 
     GenericRecord record2 = new GenericData.Record(schema);
     record2.put("id", "2");
@@ -76,11 +79,38 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload {
     record3.put("ts", 1L);
     record3.put("_hoodie_is_deleted", false);
     record3.put("city", "NY0");
-    record3.put("child", Arrays.asList("A"));
-
+    record3.put("child", Collections.singletonList("A"));
+
+    // same content with record1 plus metadata fields
+    GenericRecord record4 = createRecordWithMetadataFields(writerSchema, "1", "partition1");
+    record4.put("id", "1");
+    record4.put("partition", "partition1");
+    record4.put("ts", 0L);
+    record4.put("_hoodie_is_deleted", false);
+    record4.put("city", "NY0");
+    record4.put("child", Collections.singletonList("A"));
+
+    // same content with record2 plus metadata fields
+    GenericRecord record5 = createRecordWithMetadataFields(writerSchema, "2", "");
+    record5.put("id", "2");
+    record5.put("partition", "");
+    record5.put("ts", 1L);
+    record5.put("_hoodie_is_deleted", false);
+    record5.put("city", "NY");
+    record5.put("child", Collections.emptyList());
+
+    // same content with record3 plus metadata fields
+    GenericRecord record6 = createRecordWithMetadataFields(writerSchema, "2", "");
+    record6.put("id", "2");
+    record6.put("partition", "partition1");
+    record6.put("ts", 1L);
+    record6.put("_hoodie_is_deleted", false);
+    record6.put("city", "NY0");
+    record6.put("child", Collections.singletonList("A"));
 
     OverwriteNonDefaultsWithLatestAvroPayload payload1 = new OverwriteNonDefaultsWithLatestAvroPayload(record1, 1);
     OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(record2, 2);
+    OverwriteNonDefaultsWithLatestAvroPayload payload5 = new OverwriteNonDefaultsWithLatestAvroPayload(record5, 2);
     assertEquals(payload1.preCombine(payload2), payload2);
     assertEquals(payload2.preCombine(payload1), payload2);
 
@@ -94,6 +124,19 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload {
     IndexedRecord combinedVal2 = payload2.combineAndGetUpdateValue(record1, schema).get();
     assertEquals(combinedVal2, record3);
     assertNotSame(combinedVal2, record3);
+
+    // the real case in production is: the current record to be combined includes the metadata fields,
+    // the payload record could include the metadata fields (for compaction) or not (for normal writer path).
+
+    // case1: validate normal writer path
+    IndexedRecord combinedVal3 = payload2.combineAndGetUpdateValue(record4, schema).get();
+    assertEquals(combinedVal3, record3);
+    assertNotSame(combinedVal3, record3);
+
+    // case2: validate compaction path
+    IndexedRecord combinedVal4 = payload5.combineAndGetUpdateValue(record4, writerSchema).get();
+    assertEquals(combinedVal4, record6);
+    assertNotSame(combinedVal4, record6);
   }
 
   @Test
@@ -164,4 +207,14 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload {
     OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(record2, 1);
     assertEquals(payload2.combineAndGetUpdateValue(record1, avroSchema).get(), record3);
   }
+
+  private static GenericRecord createRecordWithMetadataFields(Schema schema, String recordKey, String partitionPath) {
+    GenericRecord record = new GenericData.Record(schema);
+    record.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, "001");
+    record.put(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, "123");
+    record.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recordKey);
+    record.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, partitionPath);
+    record.put(HoodieRecord.FILENAME_METADATA_FIELD, "file1");
+    return record;
+  }
 }