You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/04/13 00:42:22 UTC

[hudi] branch master updated: [HUDI-3855] Fixing `FILENAME_METADATA_FIELD` not being correctly updated in `HoodieMergeHandle` (#5296)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b78dff45f [HUDI-3855] Fixing `FILENAME_METADATA_FIELD` not being correctly updated in `HoodieMergeHandle` (#5296)
7b78dff45f is described below

commit 7b78dff45f646597d442e3a2b950c5917eea92f6
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Tue Apr 12 17:42:15 2022 -0700

    [HUDI-3855] Fixing `FILENAME_METADATA_FIELD` not being correctly updated in `HoodieMergeHandle` (#5296)
    
    Fixing FILENAME_METADATA_FIELD not being correctly updated in HoodieMergeHandle, in cases when old-record is carried over from existing file as is.
    
    - Revisited HoodieFileWriter API to accept HoodieKey instead of HoodieRecord
    - Fixed FILENAME_METADATA_FIELD not being overridden in cases when simply old record is carried over
    - Exposing standard JVM's debugger ports in Docker setup
---
 .../docker-compose_hadoop284_hive233_spark244.yml  | 33 +++++++++++++++---
 .../org/apache/hudi/io/HoodieConcatHandle.java     |  4 ++-
 .../org/apache/hudi/io/HoodieCreateHandle.java     |  2 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java | 28 ++++++++--------
 .../apache/hudi/io/HoodieSortedMergeHandle.java    |  2 +-
 .../apache/hudi/io/storage/HoodieFileWriter.java   |  7 ++--
 .../apache/hudi/io/storage/HoodieHFileWriter.java  | 27 ++++++++-------
 .../apache/hudi/io/storage/HoodieOrcWriter.java    | 39 +++++++++++-----------
 .../hudi/io/storage/HoodieParquetWriter.java       | 15 ++++-----
 .../io/storage/TestHoodieHFileReaderWriter.java    |  5 +--
 .../org/apache/hudi/io/TestHoodieMergeHandle.java  | 26 ++++++++++++---
 11 files changed, 116 insertions(+), 72 deletions(-)

diff --git a/docker/compose/docker-compose_hadoop284_hive233_spark244.yml b/docker/compose/docker-compose_hadoop284_hive233_spark244.yml
index 086004f121..b8217fc0d0 100644
--- a/docker/compose/docker-compose_hadoop284_hive233_spark244.yml
+++ b/docker/compose/docker-compose_hadoop284_hive233_spark244.yml
@@ -26,6 +26,8 @@ services:
     ports:
       - "50070:50070"
       - "8020:8020"
+      # JVM debugging port (will be mapped to a random port on host)
+      - "5005"
     env_file:
       - ./hadoop.env
     healthcheck:
@@ -45,6 +47,8 @@ services:
     ports:
       - "50075:50075"
       - "50010:50010"
+      # JVM debugging port (will be mapped to a random port on host)
+      - "5005"
     links:
       - "namenode"
       - "historyserver"
@@ -99,6 +103,8 @@ services:
       SERVICE_PRECONDITION: "namenode:50070 hive-metastore-postgresql:5432"
     ports:
       - "9083:9083"
+      # JVM debugging port (will be mapped to a random port on host)
+      - "5005"
     healthcheck:
       test: ["CMD", "nc", "-z", "hivemetastore", "9083"]
       interval: 30s
@@ -118,6 +124,8 @@ services:
       SERVICE_PRECONDITION: "hivemetastore:9083"
     ports:
       - "10000:10000"
+      # JVM debugging port (will be mapped to a random port on host)
+      - "5005"
     depends_on:
       - "hivemetastore"
     links:
@@ -136,6 +144,8 @@ services:
     ports:
       - "8080:8080"
       - "7077:7077"
+      # JVM debugging port (will be mapped to a random port on host)
+      - "5005"
     environment:
       - INIT_DAEMON_STEP=setup_spark
     links:
@@ -154,6 +164,8 @@ services:
       - sparkmaster
     ports:
       - "8081:8081"
+      # JVM debugging port (will be mapped to a random port on host)
+      - "5005"
     environment:
       - "SPARK_MASTER=spark://sparkmaster:7077"
     links:
@@ -167,7 +179,7 @@ services:
     hostname: zookeeper
     container_name: zookeeper
     ports:
-      - '2181:2181'
+      - "2181:2181"
     environment:
       - ALLOW_ANONYMOUS_LOGIN=yes
 
@@ -176,7 +188,7 @@ services:
     hostname: kafkabroker
     container_name: kafkabroker
     ports:
-      - '9092:9092'
+      - "9092:9092"
     environment:
       - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
       - ALLOW_PLAINTEXT_LISTENER=yes
@@ -186,7 +198,9 @@ services:
     hostname: presto-coordinator-1
     image: apachehudi/hudi-hadoop_2.8.4-prestobase_0.271:latest
     ports:
-      - '8090:8090'
+      - "8090:8090"
+      # JVM debugging port (will be mapped to a random port on host)
+      - "5005"
     environment:
       - PRESTO_JVM_MAX_HEAP=512M
       - PRESTO_QUERY_MAX_MEMORY=1GB
@@ -226,7 +240,9 @@ services:
     hostname: trino-coordinator-1
     image: apachehudi/hudi-hadoop_2.8.4-trinocoordinator_368:latest
     ports:
-      - '8091:8091'
+      - "8091:8091"
+      # JVM debugging port (will be mapped to a random port on host)
+      - "5005"
     links:
       - "hivemetastore"
     volumes:
@@ -239,7 +255,9 @@ services:
     image: apachehudi/hudi-hadoop_2.8.4-trinoworker_368:latest
     depends_on: [ "trino-coordinator-1" ]
     ports:
-      - '8092:8092'
+      - "8092:8092"
+      # JVM debugging port (will be mapped to a random port on host)
+      - "5005"
     links:
       - "hivemetastore"
       - "hiveserver"
@@ -268,6 +286,8 @@ services:
       - sparkmaster
     ports:
       - '4040:4040'
+      # JVM debugging port (mapped to 5006 on the host)
+      - "5006:5005"
     environment:
       - "SPARK_MASTER=spark://sparkmaster:7077"
     links:
@@ -286,6 +306,9 @@ services:
     container_name: adhoc-2
     env_file:
       - ./hadoop.env
+    ports:
+      # JVM debugging port (mapped to 5005 on the host)
+      - "5005:5005"
     depends_on:
       - sparkmaster
     environment:
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java
index c33c0f08ca..022f600b5e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java
@@ -20,6 +20,7 @@ package org.apache.hudi.io;
 
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -93,7 +94,8 @@ public class HoodieConcatHandle<T extends HoodieRecordPayload, I, K, O> extends
   public void write(GenericRecord oldRecord) {
     String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt);
     try {
-      fileWriter.writeAvro(key, oldRecord);
+      // NOTE: We're enforcing preservation of the record metadata to keep existing semantic
+      writeToFile(new HoodieKey(key, partitionPath), oldRecord, true);
     } catch (IOException | RuntimeException e) {
       String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s",
           key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 91a7622bf8..41d583668a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -142,7 +142,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
           fileWriter.writeAvro(record.getRecordKey(),
               rewriteRecordWithMetadata((GenericRecord) avroRecord.get(), path.getName()));
         } else {
-          fileWriter.writeAvroWithMetadata(rewriteRecord((GenericRecord) avroRecord.get()), record);
+          fileWriter.writeAvroWithMetadata(record.getKey(), rewriteRecord((GenericRecord) avroRecord.get()));
         }
         // update the new location of record, so we know where to find it next
         record.unseal();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 06e752f59d..3363571ddf 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -292,13 +293,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
     }
     try {
       if (indexedRecord.isPresent() && !isDelete) {
-        // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
-        if (preserveMetadata && useWriterSchemaForCompaction) { // useWriteSchema will be true only in case of compaction.
-          fileWriter.writeAvro(hoodieRecord.getRecordKey(),
-              rewriteRecordWithMetadata((GenericRecord) indexedRecord.get(), newFilePath.getName()));
-        } else {
-          fileWriter.writeAvroWithMetadata(rewriteRecord((GenericRecord) indexedRecord.get()), hoodieRecord);
-        }
+        writeToFile(hoodieRecord.getKey(), (GenericRecord) indexedRecord.get(), preserveMetadata && useWriterSchemaForCompaction);
         recordsWritten++;
       } else {
         recordsDeleted++;
@@ -352,14 +347,9 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
     }
 
     if (copyOldRecord) {
-      // this should work as it is, since this is an existing record
       try {
-        // rewrite file names
-        // do not preserve FILENAME_METADATA_FIELD
-        if (preserveMetadata && useWriterSchemaForCompaction) {
-          oldRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, newFilePath.getName());
-        }
-        fileWriter.writeAvro(key, oldRecord);
+        // NOTE: We're enforcing preservation of the record metadata to keep existing semantic
+        writeToFile(new HoodieKey(key, partitionPath), oldRecord, true);
       } catch (IOException | RuntimeException e) {
         String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s",
                 key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
@@ -370,6 +360,16 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
     }
   }
 
+  protected void writeToFile(HoodieKey key, GenericRecord avroRecord, boolean shouldPreserveRecordMetadata) throws IOException {
+    if (shouldPreserveRecordMetadata) {
+      // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the
+      //       file holding this record even in cases when overall metadata is preserved
+      fileWriter.writeAvro(key.getRecordKey(), rewriteRecordWithMetadata(avroRecord, newFilePath.getName()));
+    } else {
+      fileWriter.writeAvroWithMetadata(key, rewriteRecord(avroRecord));
+    }
+  }
+
   protected void writeIncomingRecords() throws IOException {
     // write out any pending records (this can happen when inserts are turned into updates)
     Iterator<HoodieRecord<T>> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
index 931b08c2fe..d6c1d1be40 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
@@ -47,7 +47,7 @@ import java.util.Queue;
  */
 public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandle<T, I, K, O> {
 
-  private Queue<String> newRecordKeysSorted = new PriorityQueue<>();
+  private final Queue<String> newRecordKeysSorted = new PriorityQueue<>();
 
   public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
                                  Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
index 9f749566b2..1d1dd5c9ba 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io.storage;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 
 import org.apache.avro.generic.IndexedRecord;
@@ -29,7 +30,7 @@ import java.io.IOException;
 
 public interface HoodieFileWriter<R extends IndexedRecord> {
 
-  void writeAvroWithMetadata(R newRecord, HoodieRecord record) throws IOException;
+  void writeAvroWithMetadata(HoodieKey key, R newRecord) throws IOException;
 
   boolean canWrite();
 
@@ -37,9 +38,9 @@ public interface HoodieFileWriter<R extends IndexedRecord> {
 
   void writeAvro(String key, R oldRecord) throws IOException;
 
-  default void prepRecordWithMetadata(R avroRecord, HoodieRecord record, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) {
+  default void prepRecordWithMetadata(HoodieKey key, R avroRecord, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) {
     String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex.getAndIncrement());
-    HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName);
+    HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, key.getRecordKey(), key.getPartitionPath(), fileName);
     HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId);
     return;
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
index 1642eb2c42..91f79cefa2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
@@ -18,16 +18,6 @@
 
 package org.apache.hudi.io.storage;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
@@ -40,6 +30,15 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.io.Writable;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -111,13 +110,13 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR
   }
 
   @Override
-  public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
+  public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException {
     if (populateMetaFields) {
-      prepRecordWithMetadata(avroRecord, record, instantTime,
+      prepRecordWithMetadata(key, avroRecord, instantTime,
           taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName());
-      writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord);
+      writeAvro(key.getRecordKey(), avroRecord);
     } else {
-      writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord);
+      writeAvro(key.getRecordKey(), avroRecord);
     }
   }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java
index 3fe8be05c0..17d5ead3ef 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java
@@ -18,34 +18,35 @@
 
 package org.apache.hudi.io.storage;
 
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.orc.storage.ql.exec.vector.ColumnVector;
-import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
-import org.apache.orc.OrcFile;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.Writer;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
-import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.AvroOrcUtils;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
 
 public class HoodieOrcWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
     implements HoodieFileWriter<R>, Closeable {
@@ -94,10 +95,10 @@ public class HoodieOrcWriter<T extends HoodieRecordPayload, R extends IndexedRec
   }
 
   @Override
-  public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
-    prepRecordWithMetadata(avroRecord, record, instantTime,
+  public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException {
+    prepRecordWithMetadata(key, avroRecord, instantTime,
         taskContextSupplier.getPartitionIdSupplier().get(), RECORD_INDEX, file.getName());
-    writeAvro(record.getRecordKey(), avroRecord);
+    writeAvro(key.getRecordKey(), avroRecord);
   }
 
   @Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
index 957a0ff52e..5b3c69ddf9 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
@@ -18,16 +18,15 @@
 
 package org.apache.hudi.io.storage;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroWriteSupport;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
-import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.ParquetWriter;
 
@@ -84,12 +83,12 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
   }
 
   @Override
-  public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
+  public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException {
     if (populateMetaFields) {
-      prepRecordWithMetadata(avroRecord, record, instantTime,
+      prepRecordWithMetadata(key, avroRecord, instantTime,
           taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName());
       super.write(avroRecord);
-      writeSupport.add(record.getRecordKey());
+      writeSupport.add(key.getRecordKey());
     } else {
       super.write(avroRecord);
     }
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
index 2db8eb0204..da6f717258 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
@@ -154,8 +154,9 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
       record.put("time", Integer.toString(RANDOM.nextInt()));
       record.put("number", i);
       if (testAvroWithMeta) {
-        writer.writeAvroWithMetadata(record, new HoodieAvroRecord(new HoodieKey((String) record.get("_row_key"),
-            Integer.toString((Integer) record.get("number"))), new EmptyHoodieRecordPayload())); // payload does not matter. GenericRecord passed in is what matters
+        // payload does not matter. GenericRecord passed in is what matters
+        writer.writeAvroWithMetadata(new HoodieAvroRecord(new HoodieKey((String) record.get("_row_key"),
+                Integer.toString((Integer) record.get("number"))), new EmptyHoodieRecordPayload()).getKey(), record);
         // only HoodieKey will be looked up from the 2nd arg(HoodieRecord).
       } else {
         writer.writeAvro(key, record);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
index 17ebccb153..72749160e6 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
@@ -21,13 +21,15 @@ package org.apache.hudi.io;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.HoodieCommonConfig;
-import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
@@ -36,8 +38,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
 import org.apache.hudi.testutils.HoodieClientTestUtils;
-
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -51,6 +51,8 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
@@ -94,7 +96,6 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
         .withProperties(properties)
         .build();
     try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
-      FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
 
       /**
        * Write 1 (only inserts) This will do a bulk insert of 44 records of which there are 2 records repeated 21 times
@@ -202,6 +203,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
       // Check the entire dataset has 47 records still
       dataSet = getRecords();
       assertEquals(47, dataSet.count(), "Must contain 47 records");
+
       Row[] rows = (Row[]) dataSet.collect();
       int record1Count = 0;
       int record2Count = 0;
@@ -228,6 +230,22 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
       // Assert that id2 record count which has been updated to rider-004 and driver-004 is 21, which is the total
       // number of records with row_key id2
       assertEquals(21, record2Count);
+
+      // Validate that all the records only reference the _latest_ base files as part of the
+      // FILENAME_METADATA_FIELD payload (entailing that corresponding metadata is in-sync with
+      // the state of the table
+      HoodieTableFileSystemView tableView =
+          getHoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline(), HoodieTestTable.of(metaClient).listAllBaseFiles());
+
+      Set<String> latestBaseFileNames = tableView.getLatestBaseFiles()
+          .map(BaseFile::getFileName)
+          .collect(Collectors.toSet());
+
+      Set<Object> metadataFilenameFieldRefs = dataSet.collectAsList().stream()
+          .map(row -> row.getAs(HoodieRecord.FILENAME_METADATA_FIELD))
+          .collect(Collectors.toSet());
+
+      assertEquals(latestBaseFileNames, metadataFilenameFieldRefs);
     }
   }