You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/09/16 05:56:28 UTC
[hudi] branch master updated: [HUDI-4853] Get field by name for OverwriteNonDefaultsWithLatestAvroPayload to avoid schema mismatch (#6689)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f70678f435 [HUDI-4853] Get field by name for OverwriteNonDefaultsWithLatestAvroPayload to avoid schema mismatch (#6689)
f70678f435 is described below
commit f70678f4354c6264b6a1e38900dd7a11cb345b96
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri Sep 16 13:56:23 2022 +0800
[HUDI-4853] Get field by name for OverwriteNonDefaultsWithLatestAvroPayload to avoid schema mismatch (#6689)
---
.../OverwriteNonDefaultsWithLatestAvroPayload.java | 2 +-
...tOverwriteNonDefaultsWithLatestAvroPayload.java | 59 ++++++++++++++++++++--
2 files changed, 57 insertions(+), 4 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java
index 6ce99aae21..9ce241bc78 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java
@@ -70,7 +70,7 @@ public class OverwriteNonDefaultsWithLatestAvroPayload extends OverwriteWithLate
if (!overwriteField(value, defaultValue)) {
builder.set(field, value);
} else {
- builder.set(field, currentRecord.get(field.pos()));
+ builder.set(field, currentRecord.get(field.name()));
}
});
return Option.of(builder.build());
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java
index 9e3405b304..0807b41f61 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java
@@ -23,6 +23,7 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -54,13 +55,15 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload {
@Test
public void testActiveRecords() throws IOException {
+ Schema writerSchema = HoodieAvroUtils.addMetadataFields(schema);
+
GenericRecord record1 = new GenericData.Record(schema);
record1.put("id", "1");
record1.put("partition", "partition1");
record1.put("ts", 0L);
record1.put("_hoodie_is_deleted", false);
record1.put("city", "NY0");
- record1.put("child", Arrays.asList("A"));
+ record1.put("child", Collections.singletonList("A"));
GenericRecord record2 = new GenericData.Record(schema);
record2.put("id", "2");
@@ -76,11 +79,38 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload {
record3.put("ts", 1L);
record3.put("_hoodie_is_deleted", false);
record3.put("city", "NY0");
- record3.put("child", Arrays.asList("A"));
-
+ record3.put("child", Collections.singletonList("A"));
+
+ // same content with record1 plus metadata fields
+ GenericRecord record4 = createRecordWithMetadataFields(writerSchema, "1", "partition1");
+ record4.put("id", "1");
+ record4.put("partition", "partition1");
+ record4.put("ts", 0L);
+ record4.put("_hoodie_is_deleted", false);
+ record4.put("city", "NY0");
+ record4.put("child", Collections.singletonList("A"));
+
+ // same content with record2 plus metadata fields
+ GenericRecord record5 = createRecordWithMetadataFields(writerSchema, "2", "");
+ record5.put("id", "2");
+ record5.put("partition", "");
+ record5.put("ts", 1L);
+ record5.put("_hoodie_is_deleted", false);
+ record5.put("city", "NY");
+ record5.put("child", Collections.emptyList());
+
+ // same content with record3 plus metadata fields
+ GenericRecord record6 = createRecordWithMetadataFields(writerSchema, "2", "");
+ record6.put("id", "2");
+ record6.put("partition", "partition1");
+ record6.put("ts", 1L);
+ record6.put("_hoodie_is_deleted", false);
+ record6.put("city", "NY0");
+ record6.put("child", Collections.singletonList("A"));
OverwriteNonDefaultsWithLatestAvroPayload payload1 = new OverwriteNonDefaultsWithLatestAvroPayload(record1, 1);
OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(record2, 2);
+ OverwriteNonDefaultsWithLatestAvroPayload payload5 = new OverwriteNonDefaultsWithLatestAvroPayload(record5, 2);
assertEquals(payload1.preCombine(payload2), payload2);
assertEquals(payload2.preCombine(payload1), payload2);
@@ -94,6 +124,19 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload {
IndexedRecord combinedVal2 = payload2.combineAndGetUpdateValue(record1, schema).get();
assertEquals(combinedVal2, record3);
assertNotSame(combinedVal2, record3);
+
+ // the real case in production is: the current record to be combined includes the metadata fields,
+ // the payload record could include the metadata fields (for compaction) or not (for normal writer path).
+
+ // case1: validate normal writer path
+ IndexedRecord combinedVal3 = payload2.combineAndGetUpdateValue(record4, schema).get();
+ assertEquals(combinedVal3, record3);
+ assertNotSame(combinedVal3, record3);
+
+ // case2: validate compaction path
+ IndexedRecord combinedVal4 = payload5.combineAndGetUpdateValue(record4, writerSchema).get();
+ assertEquals(combinedVal4, record6);
+ assertNotSame(combinedVal4, record6);
}
@Test
@@ -164,4 +207,14 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload {
OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(record2, 1);
assertEquals(payload2.combineAndGetUpdateValue(record1, avroSchema).get(), record3);
}
+
+ private static GenericRecord createRecordWithMetadataFields(Schema schema, String recordKey, String partitionPath) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, "001");
+ record.put(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, "123");
+ record.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recordKey);
+ record.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, partitionPath);
+ record.put(HoodieRecord.FILENAME_METADATA_FIELD, "file1");
+ return record;
+ }
}