You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/04/06 07:56:58 UTC

[hudi] branch master updated: [HUDI-3800] Fixed preserve commit metadata for compaction for untouched records (#5232)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8683fb1d49 [HUDI-3800] Fixed preserve commit metadata for compaction for untouched records (#5232)
8683fb1d49 is described below

commit 8683fb1d49b408a5012185104b13ae59c376a709
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Wed Apr 6 00:56:53 2022 -0700

    [HUDI-3800] Fixed preserve commit metadata for compaction for untouched records (#5232)
---
 .../main/java/org/apache/hudi/io/HoodieMergeHandle.java   | 15 ++++++++++-----
 .../java/org/apache/hudi/io/HoodieSortedMergeHandle.java  |  4 ++--
 2 files changed, 12 insertions(+), 7 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 567ae63e1e..d09087d88c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -105,7 +105,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
   protected long recordsDeleted = 0;
   protected long updatedRecordsWritten = 0;
   protected long insertRecordsWritten = 0;
-  protected boolean useWriterSchema;
+  protected boolean useWriterSchemaForCompaction;
   protected Option<BaseKeyGenerator> keyGeneratorOpt;
   private HoodieBaseFile baseFileToMerge;
 
@@ -133,7 +133,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
                            HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
     super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
     this.keyToNewRecords = keyToNewRecords;
-    this.useWriterSchema = true;
+    this.useWriterSchemaForCompaction = true;
     this.preserveMetadata = config.isPreserveHoodieCommitMetadataForCompaction();
     init(fileId, this.partitionPath, dataFileToBeMerged);
     validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
@@ -267,7 +267,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
   }
 
   protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
-    Schema schema = useWriterSchema ? tableSchemaWithMetaFields : tableSchema;
+    Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
     Option<IndexedRecord> insertRecord = hoodieRecord.getData().getInsertValue(schema, config.getProps());
     // just skip the ignored record
     if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
@@ -293,7 +293,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
     try {
       if (indexedRecord.isPresent() && !isDelete) {
         // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
-        if (preserveMetadata && useWriterSchema) { // useWriteSchema will be true only in case of compaction.
+        if (preserveMetadata && useWriterSchemaForCompaction) { // useWriteSchema will be true only in case of compaction.
           fileWriter.writeAvro(hoodieRecord.getRecordKey(),
               rewriteRecordWithMetadata((GenericRecord) indexedRecord.get(), newFilePath.getName()));
         } else {
@@ -329,7 +329,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
       try {
         Option<IndexedRecord> combinedAvroRecord =
             hoodieRecord.getData().combineAndGetUpdateValue(oldRecord,
-              useWriterSchema ? tableSchemaWithMetaFields : tableSchema,
+              useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema,
                 config.getPayloadConfig().getProps());
 
         if (combinedAvroRecord.isPresent() && combinedAvroRecord.get().equals(IGNORE_RECORD)) {
@@ -355,6 +355,11 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
     if (copyOldRecord) {
       // this should work as it is, since this is an existing record
       try {
+        // rewrite file names
+        // do not preserve FILENAME_METADATA_FIELD
+        if (preserveMetadata && useWriterSchemaForCompaction) {
+          oldRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, newFilePath.getName());
+        }
         fileWriter.writeAvro(key, oldRecord);
       } catch (IOException | RuntimeException e) {
         String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s",
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
index 897491b906..931b08c2fe 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
@@ -90,7 +90,7 @@ public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> ext
         throw new HoodieUpsertException("Insert/Update not in sorted order");
       }
       try {
-        if (useWriterSchema) {
+        if (useWriterSchemaForCompaction) {
           writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
         } else {
           writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
@@ -113,7 +113,7 @@ public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> ext
         String key = newRecordKeysSorted.poll();
         HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
         if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
-          if (useWriterSchema) {
+          if (useWriterSchemaForCompaction) {
             writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
           } else {
             writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));