You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/01/31 18:33:27 UTC

[hudi] branch release-0.13.0 updated (8ffc7ce3d8f -> e6d44c015b7)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a change to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git


    from 8ffc7ce3d8f [HUDI-5632] Fix failure launching Spark jobs from hudi-cli-bundle (#7790)
     new fe75c9af0b1 [MINOR] Make `data_before_after` the default cdc logging mode (#7797)
     new 4768408ed43 [HUDI-5563] Check table exist before drop table (#7679)
     new 2ced537cf8c [HUDI-5568] Fix the BucketStreamWriteFunction to rebase the local filesystem instance instead (#7685)
     new 571c0f29c9e [HUDI-5655] Closing write client for spark ds writer in all cases (including exception) (#7799)
     new 138af1a43b3 [HUDI-5654] Fixing read of an empty rollback completed meta files from data table timeline w/ metadata reads (#7798)
     new bba80984359 [HUDI-5487] Reduce duplicate logs in ExternalSpillableMap (#7579)
     new fb612bef734 [MINOR] Standardise schema concepts on Flink Engine (#7761)
     new 9c21118e146 [HUDI-5567] Make the bootstrapping exception message more clear (#7684)
     new f0152acae28 [HUDI-5553] Prevent partition(s) from being dropped if there are pending… (#7669)
     new e6d44c015b7 [HUDI-5633] Fixing performance regression in `HoodieSparkRecord` (#7769)

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../hudi/client/utils/DeletePartitionUtils.java    |  77 +++
 .../hudi/execution/HoodieLazyInsertIterable.java   |   4 +-
 .../org/apache/hudi/io/HoodieAppendHandle.java     |  32 +-
 .../org/apache/hudi/io/HoodieBootstrapHandle.java  |  24 +-
 .../org/apache/hudi/io/HoodieCreateHandle.java     |  20 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  14 +-
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |   2 +-
 .../table/action/commit/HoodieMergeHelper.java     |  59 +-
 .../java/org/apache/hudi/util/ExecutorFactory.java |  32 +-
 .../client/utils/TestDeletePartitionUtils.java     | 110 ++++
 .../FlinkDeletePartitionCommitActionExecutor.java  |   3 +
 .../HoodieSparkBootstrapSchemaProvider.java        |   4 +-
 .../hudi/common/model/HoodieSparkRecord.java       | 104 ++--
 .../hudi/execution/SparkLazyInsertIterable.java    |  14 +-
 .../hudi/io/storage/HoodieSparkFileWriter.java     |  17 -
 .../hudi/io/storage/HoodieSparkParquetReader.java  |   3 +-
 .../hudi/io/storage/HoodieSparkParquetWriter.java  |  46 +-
 .../apache/hudi/keygen/BuiltinKeyGenerator.java    |   2 +-
 .../bootstrap/BaseBootstrapMetadataHandler.java    |   9 +-
 .../bootstrap/OrcBootstrapMetadataHandler.java     |   5 +-
 .../bootstrap/ParquetBootstrapMetadataHandler.java |  87 ++-
 .../SparkDeletePartitionCommitActionExecutor.java  |   3 +
 .../apache/hudi/util/HoodieSparkRecordUtils.java   |  69 ---
 .../hudi/HoodieDatasetBulkInsertHelper.scala       |   2 +
 .../apache/spark/sql/HoodieInternalRowUtils.scala  | 595 ++++++++++++---------
 .../apache/spark/sql/HoodieUnsafeRowUtils.scala    |  33 +-
 .../spark/sql/TestHoodieUnsafeRowUtils.scala       |  36 +-
 .../hudi/common/model/HoodieAvroIndexedRecord.java |  34 +-
 .../apache/hudi/common/model/HoodieAvroRecord.java |  42 +-
 .../hudi/common/model/HoodieEmptyRecord.java       |   9 +-
 .../org/apache/hudi/common/model/HoodieRecord.java |  20 +-
 .../apache/hudi/common/model/MetadataValues.java   |  76 ++-
 .../hudi/common/table/HoodieTableConfig.java       |   2 +-
 .../table/log/AbstractHoodieLogRecordReader.java   |  15 +-
 .../util/collection/ExternalSpillableMap.java      |   9 +-
 .../internal/schema/utils/InternalSchemaUtils.java |   4 +-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  18 +-
 .../sink/bucket/BucketStreamWriteFunction.java     |   2 +-
 .../hudi/table/format/InternalSchemaManager.java   |  57 +-
 .../apache/hudi/table/format/RecordIterators.java  |   8 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 133 ++---
 .../org/apache/hudi/HoodieStreamingSink.scala      |   6 +-
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala   |   3 +-
 .../apache/hudi/TestHoodieInternalRowUtils.scala   |  89 ---
 .../sql/hudi/TestAlterTableDropPartition.scala     | 129 ++++-
 .../org/apache/spark/sql/hudi/TestDropTable.scala  |  11 +
 .../sql/hudi/TestHoodieInternalRowUtils.scala}     |  99 +++-
 .../hudi/utilities/sources/HoodieIncrSource.java   |  37 +-
 .../TestHoodieDeltaStreamerWithMultiWriter.java    |   1 +
 49 files changed, 1319 insertions(+), 891 deletions(-)
 create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java
 create mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestDeletePartitionUtils.java
 delete mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java
 delete mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala
 rename hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/{hudi/TestStructTypeSchemaEvolutionUtils.scala => spark/sql/hudi/TestHoodieInternalRowUtils.scala} (77%)


[hudi] 06/10: [HUDI-5487] Reduce duplicate logs in ExternalSpillableMap (#7579)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit bba809843593829840ffedb751e7a55a70776724
Author: cxzl25 <cx...@users.noreply.github.com>
AuthorDate: Tue Jan 31 13:35:27 2023 +0800

    [HUDI-5487] Reduce duplicate logs in ExternalSpillableMap (#7579)
---
 .../apache/hudi/common/util/collection/ExternalSpillableMap.java | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
index ee930e588d0..b540b204214 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
@@ -202,10 +202,13 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
   @Override
   public R put(T key, R value) {
     if (this.currentInMemoryMapSize >= maxInMemorySizeInBytes || inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0) {
-      this.estimatedPayloadSize = (long) (this.estimatedPayloadSize * 0.9 
-        + (keySizeEstimator.sizeEstimate(key) + valueSizeEstimator.sizeEstimate(value)) * 0.1);
+      long tmpEstimatedPayloadSize = (long) (this.estimatedPayloadSize * 0.9
+          + (keySizeEstimator.sizeEstimate(key) + valueSizeEstimator.sizeEstimate(value)) * 0.1);
+      if (this.estimatedPayloadSize != tmpEstimatedPayloadSize) {
+        LOG.info("Update Estimated Payload size to => " + this.estimatedPayloadSize);
+      }
+      this.estimatedPayloadSize = tmpEstimatedPayloadSize;
       this.currentInMemoryMapSize = this.inMemoryMap.size() * this.estimatedPayloadSize;
-      LOG.info("Update Estimated Payload size to => " + this.estimatedPayloadSize);
     }
 
     if (this.currentInMemoryMapSize < maxInMemorySizeInBytes || inMemoryMap.containsKey(key)) {


[hudi] 10/10: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord` (#7769)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit e6d44c015b774c732eff1182a1821e8980a51f35
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Tue Jan 31 08:36:26 2023 -0800

    [HUDI-5633] Fixing performance regression in `HoodieSparkRecord` (#7769)
    
    This change addresses a few performance regressions in `HoodieSparkRecord` identified during our recent benchmarking::
    
    1. `HoodieSparkRecord` rewrites records using `rewriteRecord` and `rewriteRecordWithNewSchema` which do Schema traversals for every record. Instead we should do schema traversal only once and produce a transformer that will directly create new record from the old one.
    
    2. `HoodieRecord`s currently could be rewritten multiple times even in cases when just meta-fields need to be mixed into the schema (in that case, `HoodieSparkRecord` simply wraps source `InternalRow` into `HoodieInternalRow` holding the meta-fields). This is problematic due to a) `UnsafeProjection` re-using mutable row (as a buffer) to avoid allocation of small objects leading to b) recursive overwriting of the same row.
    
    3. Records are currently copied for every Executor even for Simple one which actually is not buffering any records and therefore doesn't require records to be copied.
    
    To address aforementioned gaps following changes have been implemented:
    
     1. Row writing utils have been revisited to decouple `RowWriter` generation from actual application (to the source row; that way actual application is much more efficient). Additionally, considerable number of row-writing utilities have been eliminated as these are purely duplicative.
    
     2. `HoodieRecord.rewriteRecord` API is renamed into `prependMetaFields` to clearly disambiguate it from `rewriteRecordWithSchema`
    
     3. `WriteHandle` and `HoodieMergeHelper` implementations are substantially simplified and streamlined accommodating being rebased onto `prependMetaFields`
---
 .../hudi/execution/HoodieLazyInsertIterable.java   |   4 +-
 .../org/apache/hudi/io/HoodieAppendHandle.java     |  32 +-
 .../org/apache/hudi/io/HoodieBootstrapHandle.java  |  24 +-
 .../org/apache/hudi/io/HoodieCreateHandle.java     |  20 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  14 +-
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |   2 +-
 .../table/action/commit/HoodieMergeHelper.java     |  59 +-
 .../java/org/apache/hudi/util/ExecutorFactory.java |  32 +-
 .../hudi/common/model/HoodieSparkRecord.java       | 104 ++--
 .../hudi/execution/SparkLazyInsertIterable.java    |  14 +-
 .../hudi/io/storage/HoodieSparkFileWriter.java     |  17 -
 .../hudi/io/storage/HoodieSparkParquetReader.java  |   3 +-
 .../hudi/io/storage/HoodieSparkParquetWriter.java  |  46 +-
 .../apache/hudi/keygen/BuiltinKeyGenerator.java    |   2 +-
 .../bootstrap/BaseBootstrapMetadataHandler.java    |   9 +-
 .../bootstrap/OrcBootstrapMetadataHandler.java     |   5 +-
 .../bootstrap/ParquetBootstrapMetadataHandler.java |  87 ++-
 .../apache/hudi/util/HoodieSparkRecordUtils.java   |  69 ---
 .../hudi/HoodieDatasetBulkInsertHelper.scala       |   2 +
 .../apache/spark/sql/HoodieInternalRowUtils.scala  | 595 ++++++++++++---------
 .../apache/spark/sql/HoodieUnsafeRowUtils.scala    |  33 +-
 .../spark/sql/TestHoodieUnsafeRowUtils.scala       |  36 +-
 .../hudi/common/model/HoodieAvroIndexedRecord.java |  34 +-
 .../apache/hudi/common/model/HoodieAvroRecord.java |  42 +-
 .../hudi/common/model/HoodieEmptyRecord.java       |   9 +-
 .../org/apache/hudi/common/model/HoodieRecord.java |  20 +-
 .../apache/hudi/common/model/MetadataValues.java   |  76 ++-
 .../table/log/AbstractHoodieLogRecordReader.java   |  15 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 110 ++--
 .../org/apache/hudi/HoodieStreamingSink.scala      |   6 +-
 .../apache/hudi/TestHoodieInternalRowUtils.scala   |  89 ---
 .../sql/hudi/TestHoodieInternalRowUtils.scala}     |  99 +++-
 .../hudi/utilities/sources/HoodieIncrSource.java   |  37 +-
 .../TestHoodieDeltaStreamerWithMultiWriter.java    |   1 +
 34 files changed, 904 insertions(+), 843 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
index 991e52982cf..8cc8c07a02e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
@@ -23,11 +23,11 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.LazyIterableIterator;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.queue.ExecutorType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.CreateHandleFactory;
 import org.apache.hudi.io.WriteHandleFactory;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.util.ExecutorFactory;
 
 import java.util.Iterator;
 import java.util.List;
@@ -104,7 +104,7 @@ public abstract class HoodieLazyInsertIterable<T>
     //       it since these records will be subsequently buffered (w/in the in-memory queue);
     //       Only case when we don't need to make a copy is when using [[SimpleExecutor]] which
     //       is guaranteed to not hold on to references to any records
-    boolean shouldClone = writeConfig.getExecutorType() != ExecutorType.SIMPLE;
+    boolean shouldClone = ExecutorFactory.isBufferingRecords(writeConfig);
 
     return record -> {
       HoodieRecord<T> clonedRecord = shouldClone ? record.copy() : record;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 02053127d9c..ef63e3ba727 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -236,21 +236,25 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
       // If the format can not record the operation field, nullify the DELETE payload manually.
       boolean nullifyPayload = HoodieOperation.isDelete(hoodieRecord.getOperation()) && !config.allowOperationMetadataField();
       recordProperties.put(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, String.valueOf(isUpdateRecord));
-      Option<HoodieRecord> finalRecord = nullifyPayload ? Option.empty() : Option.of(hoodieRecord);
+
+      Option<HoodieRecord> finalRecordOpt = nullifyPayload ? Option.empty() : Option.of(hoodieRecord);
       // Check for delete
-      if (finalRecord.isPresent() && !finalRecord.get().isDelete(schema, recordProperties)) {
-        // Check for ignore ExpressionPayload
-        if (finalRecord.get().shouldIgnore(schema, recordProperties)) {
-          return finalRecord;
+      if (finalRecordOpt.isPresent() && !finalRecordOpt.get().isDelete(schema, recordProperties)) {
+        HoodieRecord finalRecord = finalRecordOpt.get();
+        // Check if the record should be ignored (special case for [[ExpressionPayload]])
+        if (finalRecord.shouldIgnore(schema, recordProperties)) {
+          return finalRecordOpt;
         }
-        // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
-        HoodieRecord rewrittenRecord = schemaOnReadEnabled ? finalRecord.get().rewriteRecordWithNewSchema(schema, recordProperties, writeSchemaWithMetaFields)
-            : finalRecord.get().rewriteRecord(schema, recordProperties, writeSchemaWithMetaFields);
+
+        // Prepend meta-fields into the record
+        MetadataValues metadataValues = populateMetadataFields(finalRecord);
+        HoodieRecord populatedRecord =
+            finalRecord.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, recordProperties);
+
         // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
         //       payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
         //       it since these records will be put into the recordList(List).
-        HoodieRecord populatedRecord = populateMetadataFields(rewrittenRecord.copy(), writeSchemaWithMetaFields, recordProperties);
-        finalRecord = Option.of(populatedRecord);
+        finalRecordOpt = Option.of(populatedRecord.copy());
         if (isUpdateRecord || isLogCompaction) {
           updatedRecordsWritten++;
         } else {
@@ -258,7 +262,7 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
         }
         recordsWritten++;
       } else {
-        finalRecord = Option.empty();
+        finalRecordOpt = Option.empty();
         recordsDeleted++;
       }
 
@@ -267,7 +271,7 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
       // part of marking
       // record successful.
       hoodieRecord.deflate();
-      return finalRecord;
+      return finalRecordOpt;
     } catch (Exception e) {
       LOG.error("Error writing record  " + hoodieRecord, e);
       writeStatus.markFailure(hoodieRecord, e, recordMetadata);
@@ -275,7 +279,7 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
     return Option.empty();
   }
 
-  private HoodieRecord populateMetadataFields(HoodieRecord<T> hoodieRecord, Schema schema, Properties prop) throws IOException {
+  private MetadataValues populateMetadataFields(HoodieRecord<T> hoodieRecord) {
     MetadataValues metadataValues = new MetadataValues();
     if (config.populateMetaFields()) {
       String seqId =
@@ -292,7 +296,7 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
       metadataValues.setOperation(hoodieRecord.getOperation().getName());
     }
 
-    return hoodieRecord.updateMetadataValues(schema, prop, metadataValues);
+    return metadataValues;
   }
 
   private void initNewStatus() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieBootstrapHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieBootstrapHandle.java
index f110bf585db..e4985907e2e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieBootstrapHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieBootstrapHandle.java
@@ -18,13 +18,19 @@
 
 package org.apache.hudi.io;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.avro.JsonProperties;
+import org.apache.avro.Schema;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
+
 /**
  * This class is essentially same as Create Handle but overrides two things
  * 1) Schema : Metadata bootstrap writes only metadata fields as part of write. So, setup the writer schema accordingly.
@@ -34,14 +40,28 @@ import org.apache.hudi.table.HoodieTable;
  */
 public class HoodieBootstrapHandle<T, I, K, O> extends HoodieCreateHandle<T, I, K, O> {
 
+  // NOTE: We have to use schema containing all the meta-fields in here b/c unlike for [[HoodieAvroRecord]],
+  //       [[HoodieSparkRecord]] requires records to always bear either all or no meta-fields in the
+  //       record schema (ie partial inclusion of the meta-fields in the schema is not allowed)
+  public static final Schema METADATA_BOOTSTRAP_RECORD_SCHEMA = createMetadataBootstrapRecordSchema();
+
   public HoodieBootstrapHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T, I, K, O> hoodieTable,
       String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
     super(config, commitTime, hoodieTable, partitionPath, fileId,
-        Option.of(HoodieAvroUtils.RECORD_KEY_SCHEMA), taskContextSupplier);
+        Option.of(METADATA_BOOTSTRAP_RECORD_SCHEMA), taskContextSupplier);
   }
 
   @Override
   public boolean canWrite(HoodieRecord record) {
     return true;
   }
+
+  private static Schema createMetadataBootstrapRecordSchema() {
+    List<Schema.Field> fields =
+        HoodieRecord.HOODIE_META_COLUMNS.stream()
+            .map(metaField ->
+                new Schema.Field(metaField, createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE))
+            .collect(Collectors.toList());
+    return Schema.createRecord("HoodieRecordKey", "", "", false, fields);
+  }
 }
\ No newline at end of file
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 96611959f9a..4d3f52a5ba6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -136,24 +136,22 @@ public class HoodieCreateHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
         if (record.shouldIgnore(schema, config.getProps())) {
           return;
         }
-        // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
-        HoodieRecord rewriteRecord;
-        if (schemaOnReadEnabled) {
-          rewriteRecord = record.rewriteRecordWithNewSchema(schema, config.getProps(), writeSchemaWithMetaFields);
-        } else {
-          rewriteRecord = record.rewriteRecord(schema, config.getProps(), writeSchemaWithMetaFields);
-        }
+
         MetadataValues metadataValues = new MetadataValues().setFileName(path.getName());
-        rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, config.getProps(), metadataValues);
+        HoodieRecord populatedRecord =
+            record.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, config.getProps());
+
         if (preserveMetadata) {
-          fileWriter.write(record.getRecordKey(), rewriteRecord, writeSchemaWithMetaFields);
+          fileWriter.write(record.getRecordKey(), populatedRecord, writeSchemaWithMetaFields);
         } else {
-          fileWriter.writeWithMetadata(record.getKey(), rewriteRecord, writeSchemaWithMetaFields);
+          fileWriter.writeWithMetadata(record.getKey(), populatedRecord, writeSchemaWithMetaFields);
         }
-        // update the new location of record, so we know where to find it next
+
+        // Update the new location of record, so we know where to find it next
         record.unseal();
         record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId()));
         record.seal();
+
         recordsWritten++;
         insertRecordsWritten++;
       } else {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 0460f88101c..f92a1e73d58 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -374,20 +374,16 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
   }
 
   protected void writeToFile(HoodieKey key, HoodieRecord<T> record, Schema schema, Properties prop, boolean shouldPreserveRecordMetadata) throws IOException {
-    HoodieRecord rewriteRecord;
-    if (schemaOnReadEnabled) {
-      rewriteRecord = record.rewriteRecordWithNewSchema(schema, prop, writeSchemaWithMetaFields);
-    } else {
-      rewriteRecord = record.rewriteRecord(schema, prop, writeSchemaWithMetaFields);
-    }
     // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the
     //       file holding this record even in cases when overall metadata is preserved
     MetadataValues metadataValues = new MetadataValues().setFileName(newFilePath.getName());
-    rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, prop, metadataValues);
+    HoodieRecord populatedRecord =
+        record.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, prop);
+
     if (shouldPreserveRecordMetadata) {
-      fileWriter.write(key.getRecordKey(), rewriteRecord, writeSchemaWithMetaFields);
+      fileWriter.write(key.getRecordKey(), populatedRecord, writeSchemaWithMetaFields);
     } else {
-      fileWriter.writeWithMetadata(key, rewriteRecord, writeSchemaWithMetaFields);
+      fileWriter.writeWithMetadata(key, populatedRecord, writeSchemaWithMetaFields);
     }
   }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 8e470471db9..889d7a64769 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -95,7 +95,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends HoodieIOHandle<T, I,
         !hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction());
     this.taskContextSupplier = taskContextSupplier;
     this.writeToken = makeWriteToken();
-    schemaOnReadEnabled = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
+    this.schemaOnReadEnabled = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
     this.recordMerger = config.getRecordMerger();
   }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index 7f46e211e73..3c8255a21b9 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -27,8 +27,6 @@ import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.InternalSchemaCache;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.MappingIterator;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.queue.HoodieExecutor;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -94,8 +92,8 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper {
 
     // In case Advanced Schema Evolution is enabled we might need to rewrite currently
     // persisted records to adhere to an evolved schema
-    Option<Pair<Function<Schema, Function<HoodieRecord, HoodieRecord>>, Schema>> schemaEvolutionTransformerOpt =
-        composeSchemaEvolutionTransformer(writerSchema, baseFile, writeConfig, table.getMetaClient());
+    Option<Function<HoodieRecord, HoodieRecord>> schemaEvolutionTransformerOpt =
+        composeSchemaEvolutionTransformer(readerSchema, writerSchema, baseFile, writeConfig, table.getMetaClient());
 
     // Check whether the writer schema is simply a projection of the file's one, ie
     //   - Its field-set is a proper subset (of the reader schema)
@@ -130,29 +128,27 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper {
             (left, right) ->
                 left.joinWith(right, mergeHandle.getWriterSchemaWithMetaFields()));
         recordSchema = mergeHandle.getWriterSchemaWithMetaFields();
-      } else if (schemaEvolutionTransformerOpt.isPresent()) {
-        recordIterator = new MappingIterator<>(baseFileRecordIterator,
-            schemaEvolutionTransformerOpt.get().getLeft().apply(isPureProjection ? writerSchema : readerSchema));
-        recordSchema = schemaEvolutionTransformerOpt.get().getRight();
       } else {
         recordIterator = baseFileRecordIterator;
         recordSchema = isPureProjection ? writerSchema : readerSchema;
       }
 
+      boolean isBufferingRecords = ExecutorFactory.isBufferingRecords(writeConfig);
+
       wrapper = ExecutorFactory.create(writeConfig, recordIterator, new UpdateHandler(mergeHandle), record -> {
+        HoodieRecord newRecord;
+        if (schemaEvolutionTransformerOpt.isPresent()) {
+          newRecord = schemaEvolutionTransformerOpt.get().apply(record);
+        } else if (shouldRewriteInWriterSchema) {
+          newRecord = record.rewriteRecordWithNewSchema(recordSchema, writeConfig.getProps(), writerSchema);
+        } else {
+          newRecord = record;
+        }
+
         // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
         //       payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
         //       it since these records will be put into queue of QueueBasedExecutorFactory.
-        if (shouldRewriteInWriterSchema) {
-          try {
-            return record.rewriteRecordWithNewSchema(recordSchema, writeConfig.getProps(), writerSchema).copy();
-          } catch (IOException e) {
-            LOG.error("Error rewrite record with new schema", e);
-            throw new HoodieException(e);
-          }
-        } else {
-          return record.copy();
-        }
+        return isBufferingRecords ? newRecord.copy() : newRecord;
       }, table.getPreExecuteRunnable());
 
       wrapper.execute();
@@ -173,10 +169,11 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper {
     }
   }
 
-  private Option<Pair<Function<Schema, Function<HoodieRecord, HoodieRecord>>, Schema>> composeSchemaEvolutionTransformer(Schema writerSchema,
-                                                                                           HoodieBaseFile baseFile,
-                                                                                           HoodieWriteConfig writeConfig,
-                                                                                           HoodieTableMetaClient metaClient) {
+  private Option<Function<HoodieRecord, HoodieRecord>> composeSchemaEvolutionTransformer(Schema recordSchema,
+                                                                                         Schema writerSchema,
+                                                                                         HoodieBaseFile baseFile,
+                                                                                         HoodieWriteConfig writeConfig,
+                                                                                         HoodieTableMetaClient metaClient) {
     Option<InternalSchema> querySchemaOpt = SerDeHelper.fromJson(writeConfig.getInternalSchema());
     // TODO support bootstrap
     if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) {
@@ -214,18 +211,12 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper {
           || SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
       if (needToReWriteRecord) {
         Map<String, String> renameCols = InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
-        return Option.of(Pair.of(
-            (schema) -> (record) -> {
-              try {
-                return record.rewriteRecordWithNewSchema(
-                    schema,
-                    writeConfig.getProps(),
-                    newWriterSchema, renameCols);
-              } catch (IOException e) {
-                LOG.error("Error rewrite record with new schema", e);
-                throw new HoodieException(e);
-              }
-            }, newWriterSchema));
+        return Option.of(record -> {
+          return record.rewriteRecordWithNewSchema(
+              recordSchema,
+              writeConfig.getProps(),
+              newWriterSchema, renameCols);
+        });
       } else {
         return Option.empty();
       }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java
index 7baada74089..49e83733adf 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java
@@ -33,31 +33,47 @@ import java.util.function.Function;
 
 public class ExecutorFactory {
 
-  public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig,
+  public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig config,
                                                    Iterator<I> inputItr,
                                                    HoodieConsumer<O, E> consumer,
                                                    Function<I, O> transformFunction) {
-    return create(hoodieConfig, inputItr, consumer, transformFunction, Functions.noop());
+    return create(config, inputItr, consumer, transformFunction, Functions.noop());
   }
 
-  public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig,
+  public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig config,
                                                    Iterator<I> inputItr,
                                                    HoodieConsumer<O, E> consumer,
                                                    Function<I, O> transformFunction,
                                                    Runnable preExecuteRunnable) {
-    ExecutorType executorType = hoodieConfig.getExecutorType();
-
+    ExecutorType executorType = config.getExecutorType();
     switch (executorType) {
       case BOUNDED_IN_MEMORY:
-        return new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, consumer,
+        return new BoundedInMemoryExecutor<>(config.getWriteBufferLimitBytes(), inputItr, consumer,
             transformFunction, preExecuteRunnable);
       case DISRUPTOR:
-        return new DisruptorExecutor<>(hoodieConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(), inputItr, consumer,
-            transformFunction, hoodieConfig.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable);
+        return new DisruptorExecutor<>(config.getWriteExecutorDisruptorWriteBufferLimitBytes(), inputItr, consumer,
+            transformFunction, config.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable);
       case SIMPLE:
         return new SimpleExecutor<>(inputItr, consumer, transformFunction);
       default:
         throw new HoodieException("Unsupported Executor Type " + executorType);
     }
   }
+
+  /**
+   * Checks whether configured {@link HoodieExecutor} buffer records (for ex, by holding them
+   * in the queue)
+   */
+  public static boolean isBufferingRecords(HoodieWriteConfig config) {
+    ExecutorType executorType = config.getExecutorType();
+    switch (executorType) {
+      case BOUNDED_IN_MEMORY:
+      case DISRUPTOR:
+        return true;
+      case SIMPLE:
+        return false;
+      default:
+        throw new HoodieException("Unsupported Executor Type " + executorType);
+    }
+  }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index b119e75e217..7756d2502e8 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -30,9 +30,9 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
-import org.apache.hudi.util.HoodieSparkRecordUtils;
 import org.apache.spark.sql.HoodieInternalRowUtils;
 import org.apache.spark.sql.HoodieUnsafeRowUtils;
 import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
@@ -44,14 +44,13 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
+import scala.Function1;
 
 import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
-import static org.apache.hudi.util.HoodieSparkRecordUtils.getNullableValAsString;
-import static org.apache.hudi.util.HoodieSparkRecordUtils.getValue;
 import static org.apache.spark.sql.types.DataTypes.BooleanType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
 
@@ -150,8 +149,9 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> implements Kryo
       return getRecordKey();
     }
     StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
-    return keyGeneratorOpt.isPresent() ? ((SparkKeyGeneratorInterface) keyGeneratorOpt.get())
-        .getRecordKey(data, structType).toString() : data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
+    return keyGeneratorOpt.isPresent()
+        ? ((SparkKeyGeneratorInterface) keyGeneratorOpt.get()).getRecordKey(data, structType).toString()
+        : data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
   }
 
   @Override
@@ -173,7 +173,11 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> implements Kryo
   @Override
   public Object[] getColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled) {
     StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
-    return HoodieSparkRecordUtils.getRecordColumnValues(data, columns, structType, consistentLogicalTimestampEnabled);
+    Object[] objects = new Object[columns.length];
+    for (int i = 0; i < objects.length; i++) {
+      objects[i] = getValue(structType, columns[i], data);
+    }
+    return objects;
   }
 
   @Override
@@ -186,50 +190,27 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> implements Kryo
   }
 
   @Override
-  public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException {
+  public HoodieRecord prependMetaFields(Schema recordSchema, Schema targetSchema, MetadataValues metadataValues, Properties props) {
     StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
     StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
 
-    // TODO HUDI-5281 Rewrite HoodieSparkRecord with UnsafeRowWriter
-    InternalRow rewriteRecord = HoodieInternalRowUtils.rewriteRecord(this.data, structType, targetStructType);
-    UnsafeRow unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType, targetStructType).apply(rewriteRecord);
-
-    boolean containMetaFields = hasMetaFields(targetStructType);
-    UTF8String[] metaFields = tryExtractMetaFields(unsafeRow, targetStructType);
-    HoodieInternalRow internalRow = new HoodieInternalRow(metaFields, unsafeRow, containMetaFields);
+    HoodieInternalRow updatableRow = wrapIntoUpdatableOverlay(this.data, structType);
+    updateMetadataValuesInternal(updatableRow, metadataValues);
 
-    return new HoodieSparkRecord(getKey(), internalRow, targetStructType, getOperation(), this.currentLocation, this.newLocation, false);
+    return new HoodieSparkRecord(getKey(), updatableRow, targetStructType, getOperation(), this.currentLocation, this.newLocation, false);
   }
 
   @Override
-  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) throws IOException {
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) {
     StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
     StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
 
-    // TODO HUDI-5281 Rewrite HoodieSparkRecord with UnsafeRowWriter
-    InternalRow rewriteRecord = HoodieInternalRowUtils.rewriteRecordWithNewSchema(this.data, structType, newStructType, renameCols);
-    UnsafeRow unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(newStructType, newStructType).apply(rewriteRecord);
+    Function1<InternalRow, UnsafeRow> unsafeRowWriter =
+        HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType, newStructType, renameCols);
 
-    boolean containMetaFields = hasMetaFields(newStructType);
-    UTF8String[] metaFields = tryExtractMetaFields(unsafeRow, newStructType);
-    HoodieInternalRow internalRow = new HoodieInternalRow(metaFields, unsafeRow, containMetaFields);
+    UnsafeRow unsafeRow = unsafeRowWriter.apply(this.data);
 
-    return new HoodieSparkRecord(getKey(), internalRow, newStructType, getOperation(), this.currentLocation, this.newLocation, false);
-  }
-
-  @Override
-  public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException {
-    StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
-    HoodieInternalRow updatableRow = wrapIntoUpdatableOverlay(data, structType);
-
-    metadataValues.getKv().forEach((key, value) -> {
-      int pos = structType.fieldIndex(key);
-      if (value != null) {
-        updatableRow.update(pos, CatalystTypeConverters.convertToCatalyst(value));
-      }
-    });
-
-    return new HoodieSparkRecord(getKey(), updatableRow, structType, getOperation(), this.currentLocation, this.newLocation, copy);
+    return new HoodieSparkRecord(getKey(), unsafeRow, newStructType, getOperation(), this.currentLocation, this.newLocation, false);
   }
 
   @Override
@@ -317,12 +298,13 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> implements Kryo
   public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) {
     StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
     String orderingField = ConfigUtils.getOrderingField(props);
-    if (!HoodieInternalRowUtils.existField(structType, orderingField)) {
-      return 0;
+    scala.Option<NestedFieldPath> cachedNestedFieldPath =
+        HoodieInternalRowUtils.getCachedPosList(structType, orderingField);
+    if (cachedNestedFieldPath.isDefined()) {
+      NestedFieldPath nestedFieldPath = cachedNestedFieldPath.get();
+      return (Comparable<?>) HoodieUnsafeRowUtils.getNestedInternalRowValue(data, nestedFieldPath);
     } else {
-      NestedFieldPath nestedFieldPath = HoodieInternalRowUtils.getCachedPosList(structType, orderingField);
-      Comparable<?> value = (Comparable<?>) HoodieUnsafeRowUtils.getNestedInternalRowValue(data, nestedFieldPath);
-      return value;
+      return 0;
     }
   }
 
@@ -368,21 +350,28 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> implements Kryo
     }
 
     boolean containsMetaFields = hasMetaFields(structType);
-    UTF8String[] metaFields = tryExtractMetaFields(data, structType);
+    UTF8String[] metaFields = extractMetaFields(data, structType);
     return new HoodieInternalRow(metaFields, data, containsMetaFields);
   }
 
-  private static UTF8String[] tryExtractMetaFields(InternalRow row, StructType structType) {
+  private static UTF8String[] extractMetaFields(InternalRow row, StructType structType) {
     boolean containsMetaFields = hasMetaFields(structType);
-    if (containsMetaFields && structType.size() == 1) {
-      // Support bootstrap with RECORD_KEY_SCHEMA
-      return new UTF8String[] {row.getUTF8String(0)};
-    } else if (containsMetaFields) {
+    if (containsMetaFields) {
       return HoodieRecord.HOODIE_META_COLUMNS.stream()
           .map(col -> row.getUTF8String(HOODIE_META_COLUMNS_NAME_TO_POS.get(col)))
           .toArray(UTF8String[]::new);
-    } else {
-      return new UTF8String[HoodieRecord.HOODIE_META_COLUMNS.size()];
+    }
+
+    return new UTF8String[HoodieRecord.HOODIE_META_COLUMNS.size()];
+  }
+
+  private static void updateMetadataValuesInternal(HoodieInternalRow updatableRow, MetadataValues metadataValues) {
+    String[] values = metadataValues.getValues();
+    for (int pos = 0; pos < values.length; ++pos) {
+      String value = values[pos];
+      if (value != null) {
+        updatableRow.update(pos, CatalystTypeConverters.convertToCatalyst(value));
+      }
     }
   }
 
@@ -416,7 +405,8 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> implements Kryo
         getValue(structType, recordKeyPartitionPathFieldPair.getRight(), record.data).toString());
 
     HoodieOperation operation = withOperationField
-        ? HoodieOperation.fromName(getNullableValAsString(structType, record.data, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
+        ? HoodieOperation.fromName(record.data.getString(structType.fieldIndex(HoodieRecord.OPERATION_METADATA_FIELD)))
+        : null;
     return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), record.data, structType, operation, record.copy);
   }
 
@@ -434,4 +424,14 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> implements Kryo
 
     ValidationUtils.checkState(isValid);
   }
+
+  private static Object getValue(StructType structType, String fieldName, InternalRow row) {
+    scala.Option<NestedFieldPath> cachedNestedFieldPath =
+        HoodieInternalRowUtils.getCachedPosList(structType, fieldName);
+    if (cachedNestedFieldPath.isDefined()) {
+      return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, cachedNestedFieldPath.get());
+    } else {
+      throw new HoodieException(String.format("Field at %s is not present in %s", fieldName, structType));
+    }
+  }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
index 147b5cf6b33..3b42d40a1a2 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
@@ -38,19 +38,7 @@ import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
 public class SparkLazyInsertIterable<T> extends HoodieLazyInsertIterable<T> {
 
-  private boolean useWriterSchema;
-
-  public SparkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
-                                 boolean areRecordsSorted,
-                                 HoodieWriteConfig config,
-                                 String instantTime,
-                                 HoodieTable hoodieTable,
-                                 String idPrefix,
-                                 TaskContextSupplier taskContextSupplier,
-                                 boolean useWriterSchema) {
-    super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier);
-    this.useWriterSchema = useWriterSchema;
-  }
+  private final boolean useWriterSchema;
 
   public SparkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
                                  boolean areRecordsSorted,
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriter.java
index 58ee01182c3..3c69f0ab5c2 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriter.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriter.java
@@ -21,18 +21,11 @@ package org.apache.hudi.io.storage;
 import org.apache.avro.Schema;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.spark.sql.catalyst.CatalystTypeConverters;
 import org.apache.spark.sql.catalyst.InternalRow;
 
 import java.io.IOException;
 import java.util.Properties;
 
-import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD;
-import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD;
-import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.FILENAME_METADATA_FIELD;
-import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.PARTITION_PATH_METADATA_FIELD;
-import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD;
-
 public interface HoodieSparkFileWriter extends HoodieFileWriter {
   boolean canWrite();
 
@@ -51,14 +44,4 @@ public interface HoodieSparkFileWriter extends HoodieFileWriter {
   default void writeWithMetadata(HoodieKey key, HoodieRecord record, Schema schema, Properties props) throws IOException {
     writeRowWithMetadata(key, (InternalRow) record.getData());
   }
-
-  default InternalRow prepRecordWithMetadata(HoodieKey key, InternalRow row, String instantTime, Integer partitionId, long recordIndex, String fileName)  {
-    String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex);
-    row.update(COMMIT_TIME_METADATA_FIELD.ordinal(), CatalystTypeConverters.convertToCatalyst(instantTime));
-    row.update(COMMIT_SEQNO_METADATA_FIELD.ordinal(), CatalystTypeConverters.convertToCatalyst(seqId));
-    row.update(RECORD_KEY_METADATA_FIELD.ordinal(), CatalystTypeConverters.convertToCatalyst(key.getRecordKey()));
-    row.update(PARTITION_PATH_METADATA_FIELD.ordinal(), CatalystTypeConverters.convertToCatalyst(key.getPartitionPath()));
-    row.update(FILENAME_METADATA_FIELD.ordinal(), CatalystTypeConverters.convertToCatalyst(fileName));
-    return row;
-  }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index 320217aff1d..e24f03a4cb4 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -122,7 +122,8 @@ public class HoodieSparkParquetReader implements HoodieSparkFileReader {
     // and therefore if we convert to Avro directly we'll lose logical type-info.
     MessageType messageType = ((ParquetUtils) parquetUtils).readSchema(conf, path);
     StructType structType = new ParquetToSparkSchemaConverter(conf).convert(messageType);
-    return SparkAdapterSupport$.MODULE$.sparkAdapter().getAvroSchemaConverters()
+    return SparkAdapterSupport$.MODULE$.sparkAdapter()
+        .getAvroSchemaConverters()
         .toAvroType(structType, true, messageType.getName(), StringUtils.EMPTY_STRING);
   }
 
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java
index 3b4a86502d2..d601e6ded3e 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java
@@ -21,22 +21,32 @@ package org.apache.hudi.io.storage;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.io.storage.row.HoodieRowParquetConfig;
 import org.apache.hudi.io.storage.row.HoodieRowParquetWriteSupport;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.unsafe.types.UTF8String;
 
 import java.io.IOException;
+import java.util.function.Function;
+
+import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD;
+import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD;
+import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.FILENAME_METADATA_FIELD;
+import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.PARTITION_PATH_METADATA_FIELD;
+import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD;
 
 public class HoodieSparkParquetWriter extends HoodieBaseParquetWriter<InternalRow> implements HoodieSparkFileWriter {
 
-  // TODO: better code reuse
-  private final String fileName;
-  private final String instantTime;
-  private final TaskContextSupplier taskContextSupplier;
+  private final UTF8String fileName;
+  private final UTF8String instantTime;
+
   private final boolean populateMetaFields;
+
   private final HoodieRowParquetWriteSupport writeSupport;
 
+  private final Function<Long, String> seqIdGenerator;
+
   public HoodieSparkParquetWriter(Path file,
                                   HoodieRowParquetConfig parquetConfig,
                                   String instantTime,
@@ -44,19 +54,23 @@ public class HoodieSparkParquetWriter extends HoodieBaseParquetWriter<InternalRo
                                   boolean populateMetaFields) throws IOException {
     super(file, parquetConfig);
     this.writeSupport = parquetConfig.getWriteSupport();
-    this.fileName = file.getName();
-    this.instantTime = instantTime;
-    this.taskContextSupplier = taskContextSupplier;
+    this.fileName = UTF8String.fromString(file.getName());
+    this.instantTime = UTF8String.fromString(instantTime);
     this.populateMetaFields = populateMetaFields;
+    this.seqIdGenerator = recordIndex -> {
+      Integer partitionId = taskContextSupplier.getPartitionIdSupplier().get();
+      return HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex);
+    };
   }
 
   @Override
   public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws IOException {
     if (populateMetaFields) {
-      prepRecordWithMetadata(key, row, instantTime,
-          taskContextSupplier.getPartitionIdSupplier().get(), getWrittenRecordCount(), fileName);
+      UTF8String recordKey = UTF8String.fromString(key.getRecordKey());
+      updateRecordMetadata(row, recordKey, key.getPartitionPath(), getWrittenRecordCount());
+
       super.write(row);
-      writeSupport.add(UTF8String.fromString(key.getRecordKey()));
+      writeSupport.add(recordKey);
     } else {
       super.write(row);
     }
@@ -74,4 +88,16 @@ public class HoodieSparkParquetWriter extends HoodieBaseParquetWriter<InternalRo
   public void close() throws IOException {
     super.close();
   }
+
+  protected void updateRecordMetadata(InternalRow row,
+                                      UTF8String recordKey,
+                                      String partitionPath,
+                                      long recordCount)  {
+    row.update(COMMIT_TIME_METADATA_FIELD.ordinal(), instantTime);
+    row.update(COMMIT_SEQNO_METADATA_FIELD.ordinal(), UTF8String.fromString(seqIdGenerator.apply(recordCount)));
+    row.update(RECORD_KEY_METADATA_FIELD.ordinal(), recordKey);
+    // TODO set partition path in ctor
+    row.update(PARTITION_PATH_METADATA_FIELD.ordinal(), UTF8String.fromString(partitionPath));
+    row.update(FILENAME_METADATA_FIELD.ordinal(), fileName);
+  }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
index 40154d8675c..b95f1b1ef74 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
@@ -480,7 +480,7 @@ public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements Sp
     private HoodieUnsafeRowUtils.NestedFieldPath[] resolveNestedFieldPaths(List<String> fieldPaths, StructType schema, boolean returnNull) {
       try {
         return fieldPaths.stream()
-            .map(fieldPath -> HoodieUnsafeRowUtils$.MODULE$.composeNestedFieldPath(schema, fieldPath))
+            .map(fieldPath -> HoodieUnsafeRowUtils$.MODULE$.composeNestedFieldPath(schema, fieldPath).get())
             .toArray(HoodieUnsafeRowUtils.NestedFieldPath[]::new);
       } catch (Exception e) {
         if (returnNull) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BaseBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BaseBootstrapMetadataHandler.java
index 0bc15fa2106..3e63d0bb22c 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BaseBootstrapMetadataHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BaseBootstrapMetadataHandler.java
@@ -34,7 +34,6 @@ import org.apache.hudi.keygen.KeyGeneratorInterface;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.parquet.avro.AvroReadSupport;
 
 import java.io.IOException;
 import java.util.List;
@@ -62,10 +61,10 @@ public abstract class BaseBootstrapMetadataHandler implements BootstrapMetadataH
           .map(HoodieAvroUtils::getRootLevelFieldName)
           .collect(Collectors.toList());
       Schema recordKeySchema = HoodieAvroUtils.generateProjectionSchema(avroSchema, recordKeyColumns);
-      LOG.info("Schema to be used for reading record Keys :" + recordKeySchema);
-      AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), recordKeySchema);
-      AvroReadSupport.setRequestedProjection(table.getHadoopConf(), recordKeySchema);
-      executeBootstrap(bootstrapHandle, sourceFilePath, keyGenerator, partitionPath, avroSchema);
+
+      LOG.info("Schema to be used for reading record keys: " + recordKeySchema);
+
+      executeBootstrap(bootstrapHandle, sourceFilePath, keyGenerator, partitionPath, recordKeySchema);
     } catch (Exception e) {
       throw new HoodieException(e.getMessage(), e);
     }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
index d7a4a2b52ab..14a442c93b1 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.table.action.bootstrap;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieFileStatus;
 import org.apache.hudi.client.bootstrap.BootstrapRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -47,6 +46,8 @@ import org.apache.orc.TypeDescription;
 
 import java.io.IOException;
 
+import static org.apache.hudi.io.HoodieBootstrapHandle.METADATA_BOOTSTRAP_RECORD_SCHEMA;
+
 class OrcBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
   private static final Logger LOG = LogManager.getLogger(OrcBootstrapMetadataHandler.class);
 
@@ -75,7 +76,7 @@ class OrcBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
       wrapper = new BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void>(config.getWriteBufferLimitBytes(),
           new OrcReaderIterator(reader, avroSchema, orcSchema), new BootstrapRecordConsumer(bootstrapHandle), inp -> {
         String recKey = keyGenerator.getKey(inp).getRecordKey();
-        GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA);
+        GenericRecord gr = new GenericData.Record(METADATA_BOOTSTRAP_RECORD_SCHEMA);
         gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);
         BootstrapRecordPayload payload = new BootstrapRecordPayload(gr);
         HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
index d008d7cf9fd..e5944225750 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
@@ -18,11 +18,15 @@
 
 package org.apache.hudi.table.action.bootstrap;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.client.bootstrap.BootstrapRecordPayload;
+import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.MetadataValues;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -35,19 +39,24 @@ import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.MessageType;
+import org.apache.spark.sql.HoodieInternalRowUtils$;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
 
 import java.io.IOException;
-import java.util.Properties;
+import java.util.function.Function;
+
+import static org.apache.hudi.io.HoodieBootstrapHandle.METADATA_BOOTSTRAP_RECORD_SCHEMA;
 
 class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
-  private static final Logger LOG = LogManager.getLogger(ParquetBootstrapMetadataHandler.class);
 
   public ParquetBootstrapMetadataHandler(HoodieWriteConfig config, HoodieTable table, HoodieFileStatus srcFileStatus) {
     super(config, table, srcFileStatus);
@@ -62,31 +71,29 @@ class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
   }
 
   @Override
-  void executeBootstrap(HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle,
-                        Path sourceFilePath, KeyGeneratorInterface keyGenerator, String partitionPath, Schema avroSchema) throws Exception {
+  protected void executeBootstrap(HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle,
+                                  Path sourceFilePath,
+                                  KeyGeneratorInterface keyGenerator,
+                                  String partitionPath,
+                                  Schema schema) throws Exception {
     BoundedInMemoryExecutor<HoodieRecord, HoodieRecord, Void> wrapper = null;
-    HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(table.getConfig().getRecordMerger().getRecordType())
+    HoodieRecordMerger recordMerger = table.getConfig().getRecordMerger();
+
+    HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(recordMerger.getRecordType())
             .getFileReader(table.getHadoopConf(), sourceFilePath);
     try {
+      Function<HoodieRecord, HoodieRecord> transformer = record -> {
+        String recordKey = record.getRecordKey(schema, Option.of(keyGenerator));
+        return createNewMetadataBootstrapRecord(recordKey, partitionPath, recordMerger.getRecordType())
+            // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
+            //       payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
+            //       it since these records will be inserted into the queue later.
+            .copy();
+      };
+
       wrapper = new BoundedInMemoryExecutor<HoodieRecord, HoodieRecord, Void>(config.getWriteBufferLimitBytes(),
-          reader.getRecordIterator(), new BootstrapRecordConsumer(bootstrapHandle), record -> {
-        try {
-          String recKey = record.getRecordKey(reader.getSchema(), Option.of(keyGenerator));
-          // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
-          //       payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
-          //       it since these records will be inserted into the queue later.
-          HoodieRecord hoodieRecord = record
-              .rewriteRecord(reader.getSchema(), config.getProps(), HoodieAvroUtils.RECORD_KEY_SCHEMA)
-              .copy();
-          MetadataValues metadataValues = new MetadataValues().setRecordKey(recKey);
-          return hoodieRecord
-              .updateMetadataValues(HoodieAvroUtils.RECORD_KEY_SCHEMA, new Properties(), metadataValues)
-              .newInstance(new HoodieKey(recKey, partitionPath));
-        } catch (IOException e) {
-          LOG.error("Unable to overrideMetadataFieldValue", e);
-          return null;
-        }
-      }, table.getPreExecuteRunnable());
+          reader.getRecordIterator(schema), new BootstrapRecordConsumer(bootstrapHandle), transformer, table.getPreExecuteRunnable());
+
       wrapper.execute();
     } catch (Exception e) {
       throw new HoodieException(e);
@@ -99,5 +106,31 @@ class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
       bootstrapHandle.close();
     }
   }
+
+  private HoodieRecord createNewMetadataBootstrapRecord(String recordKey, String partitionPath, HoodieRecord.HoodieRecordType recordType) {
+    HoodieKey hoodieKey = new HoodieKey(recordKey, partitionPath);
+    switch (recordType) {
+      case AVRO:
+        GenericRecord avroRecord = new GenericData.Record(METADATA_BOOTSTRAP_RECORD_SCHEMA);
+        avroRecord.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recordKey);
+        BootstrapRecordPayload payload = new BootstrapRecordPayload(avroRecord);
+        return new HoodieAvroRecord<>(hoodieKey, payload);
+
+      case SPARK:
+        StructType schema = HoodieInternalRowUtils$.MODULE$.getCachedSchema(METADATA_BOOTSTRAP_RECORD_SCHEMA);
+        UnsafeProjection unsafeProjection = HoodieInternalRowUtils$.MODULE$.getCachedUnsafeProjection(schema, schema);
+
+        GenericInternalRow row = new GenericInternalRow(METADATA_BOOTSTRAP_RECORD_SCHEMA.getFields().size());
+        row.update(HoodieRecord.RECORD_KEY_META_FIELD_ORD, UTF8String.fromString(recordKey));
+
+        UnsafeRow unsafeRow = unsafeProjection.apply(row);
+
+        return new HoodieSparkRecord(hoodieKey, unsafeRow,false);
+
+      default:
+        throw new UnsupportedOperationException(String.format("Record type %s is not supported yet!", recordType));
+    }
+
+  }
 }
 
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java
deleted file mode 100644
index 9a4aaa1dbc5..00000000000
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.util;
-
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-
-import org.apache.spark.sql.HoodieInternalRowUtils;
-import org.apache.spark.sql.HoodieUnsafeRowUtils;
-import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.StructType;
-
-public class HoodieSparkRecordUtils {
-
-  public static Object getValue(StructType structType, String fieldName, InternalRow row) {
-    NestedFieldPath posList = HoodieInternalRowUtils.getCachedPosList(structType, fieldName);
-    return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
-  }
-
-  /**
-   * Returns the string value of the given record {@code rec} and field {@code fieldName}. The field and value both could be missing.
-   *
-   * @param row       The record
-   * @param fieldName The field name
-   * @return the string form of the field or empty if the schema does not contain the field name or the value is null
-   */
-  public static Option<String> getNullableValAsString(StructType structType, InternalRow row, String fieldName) {
-    String fieldVal = !HoodieInternalRowUtils.existField(structType, fieldName)
-        ? null : StringUtils.objToString(getValue(structType, fieldName, row));
-    return Option.ofNullable(fieldVal);
-  }
-
-  /**
-   * Gets record column values into one object.
-   *
-   * @param row  InternalRow.
-   * @param columns Names of the columns to get values.
-   * @param structType  {@link StructType} instance.
-   * @return Column value if a single column, or concatenated String values by comma.
-   */
-  public static Object[] getRecordColumnValues(InternalRow row,
-      String[] columns,
-      StructType structType, boolean consistentLogicalTimestampEnabled) {
-    Object[] objects = new Object[columns.length];
-    for (int i = 0; i < objects.length; i++) {
-      NestedFieldPath posList = HoodieInternalRowUtils.getCachedPosList(structType, columns[i]);
-      Object value = HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
-      objects[i] = value;
-    }
-    return objects;
-  }
-}
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index 238fed526a1..7e235993e33 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -25,6 +25,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.util.ReflectionUtils
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.index.SparkHoodieIndexFactory
 import org.apache.hudi.keygen.{BuiltinKeyGenerator, SparkKeyGeneratorInterface}
 import org.apache.hudi.table.action.commit.{BulkInsertDataInternalWriterHelper, ParallelismHelper}
@@ -175,6 +176,7 @@ object HoodieDatasetBulkInsertHelper
     val partitionPathMetaFieldOrd = schema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
     // NOTE: Pre-combine field could be a nested field
     val preCombineFieldPath = composeNestedFieldPath(schema, preCombineFieldRef)
+      .getOrElse(throw new HoodieException(s"Pre-combine field $preCombineFieldRef is missing in $schema"))
 
     rdd.map { row =>
         val rowKey = if (isGlobalIndex) {
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
index dfe3295cf00..4d2ee33d154 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
@@ -18,304 +18,413 @@
 
 package org.apache.spark.sql
 
-import java.nio.charset.StandardCharsets
-import java.util.HashMap
-import java.util.concurrent.ConcurrentHashMap
 import org.apache.avro.Schema
 import org.apache.hbase.thirdparty.com.google.common.base.Supplier
-import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType
 import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, toJavaDate}
 import org.apache.hudi.exception.HoodieException
-import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath, composeNestedFieldPath}
+import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.types.Decimal.ROUND_HALF_EVEN
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.{ArrayDeque => JArrayDeque, Collections => JCollections, Deque => JDeque, Map => JMap}
+import java.util.function.{Function => JFunction}
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
 
 object HoodieInternalRowUtils {
 
-  // Projection are all thread local. Projection is not thread-safe
-  val unsafeProjectionThreadLocal: ThreadLocal[HashMap[(StructType, StructType), UnsafeProjection]] =
-    ThreadLocal.withInitial(new Supplier[HashMap[(StructType, StructType), UnsafeProjection]] {
-      override def get(): HashMap[(StructType, StructType), UnsafeProjection] = new HashMap[(StructType, StructType), UnsafeProjection]
+  private type RenamedColumnMap = JMap[String, String]
+  private type UnsafeRowWriter = InternalRow => UnsafeRow
+
+  // NOTE: [[UnsafeProjection]] objects cache have to stay [[ThreadLocal]] since these are not thread-safe
+  private val unsafeWriterThreadLocal: ThreadLocal[mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter]] =
+    ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter]] {
+      override def get(): mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter] =
+        new mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter]
     })
-  val schemaMap = new ConcurrentHashMap[Schema, StructType]
-  val orderPosListMap = new ConcurrentHashMap[(StructType, String), NestedFieldPath]
 
-  /**
-   * @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecord(org.apache.avro.generic.GenericRecord, org.apache.avro.Schema)
-   */
-  def rewriteRecord(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType): InternalRow = {
-    val newRow = new GenericInternalRow(Array.fill(newSchema.fields.length)(null).asInstanceOf[Array[Any]])
-
-    for ((field, pos) <- newSchema.fields.zipWithIndex) {
-      var oldValue: AnyRef = null
-      var oldType: DataType = null
-      if (existField(oldSchema, field.name)) {
-        val oldField = oldSchema(field.name)
-        val oldPos = oldSchema.fieldIndex(field.name)
-        oldType = oldField.dataType
-        oldValue = oldRecord.get(oldPos, oldType)
-      }
-      if (oldValue != null) {
-        field.dataType match {
-          case structType: StructType =>
-            val oldType = oldSchema(field.name).dataType.asInstanceOf[StructType]
-            val newValue = rewriteRecord(oldValue.asInstanceOf[InternalRow], oldType, structType)
-            newRow.update(pos, newValue)
-          case decimalType: DecimalType =>
-            val oldFieldSchema = oldSchema(field.name).dataType.asInstanceOf[DecimalType]
-            if (decimalType.scale != oldFieldSchema.scale || decimalType.precision != oldFieldSchema.precision) {
-              newRow.update(pos, Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
-              )
-            } else {
-              newRow.update(pos, oldValue)
-            }
-          case t if t == oldType => newRow.update(pos, oldValue)
-          // Type promotion
-          case _: ShortType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toShort)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: IntegerType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toInt)
-              case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toInt)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: LongType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toLong)
-              case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toLong)
-              case _: IntegerType => newRow.update(pos, oldValue.asInstanceOf[Int].toLong)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: FloatType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toFloat)
-              case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toFloat)
-              case _: IntegerType => newRow.update(pos, oldValue.asInstanceOf[Int].toFloat)
-              case _: LongType => newRow.update(pos, oldValue.asInstanceOf[Long].toFloat)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: DoubleType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toDouble)
-              case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toDouble)
-              case _: IntegerType => newRow.update(pos, oldValue.asInstanceOf[Int].toDouble)
-              case _: LongType => newRow.update(pos, oldValue.asInstanceOf[Long].toDouble)
-              case _: FloatType => newRow.update(pos, oldValue.asInstanceOf[Float].toDouble)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: BinaryType if oldType.isInstanceOf[StringType] => newRow.update(pos, oldValue.asInstanceOf[String].getBytes)
-          case _ => newRow.update(pos, oldValue)
-        }
-      } else {
-        // TODO default value in newSchema
-      }
-    }
+  // NOTE: [[UnsafeRowWriter]] objects cache have to stay [[ThreadLocal]] since these are not thread-safe
+  private val unsafeProjectionThreadLocal: ThreadLocal[mutable.HashMap[(StructType, StructType), UnsafeProjection]] =
+    ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType, StructType), UnsafeProjection]] {
+      override def get(): mutable.HashMap[(StructType, StructType), UnsafeProjection] =
+        new mutable.HashMap[(StructType, StructType), UnsafeProjection]
+    })
 
-    newRow
-  }
+  private val schemaMap = new ConcurrentHashMap[Schema, StructType]
+  private val orderPosListMap = new ConcurrentHashMap[(StructType, String), Option[NestedFieldPath]]
 
   /**
-   * @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(org.apache.avro.generic.IndexedRecord, org.apache.avro.Schema, java.util.Map)
+   * Provides cached instance of [[UnsafeProjection]] transforming provided [[InternalRow]]s from
+   * one [[StructType]] and into another [[StructType]]
+   *
+   * For more details regarding its semantic, please check corresponding scala-doc for
+   * [[HoodieCatalystExpressionUtils.generateUnsafeProjection]]
    */
-  def rewriteRecordWithNewSchema(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType, renameCols: java.util.Map[String, String]): InternalRow = {
-    rewriteRecordWithNewSchema(oldRecord, oldSchema, newSchema, renameCols, new java.util.LinkedList[String]).asInstanceOf[InternalRow]
+  def getCachedUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = {
+    unsafeProjectionThreadLocal.get()
+      .getOrElseUpdate((from, to), generateUnsafeProjection(from, to))
   }
 
   /**
-   * @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(java.lang.Object, org.apache.avro.Schema, org.apache.avro.Schema, java.util.Map, java.util.Deque)
+   * Provides cached instance of [[UnsafeRowWriter]] transforming provided [[InternalRow]]s from
+   * one [[StructType]] and into another [[StructType]]
+   *
+   * Unlike [[UnsafeProjection]] requiring that [[from]] has to be a proper subset of [[to]] schema,
+   * [[UnsafeRowWriter]] is able to perform whole spectrum of schema-evolution transformations including:
+   *
+   * <ul>
+   *   <li>Transforming nested structs/maps/arrays</li>
+   *   <li>Handling type promotions (int -> long, etc)</li>
+   *   <li>Handling (field) renames</li>
+   * </ul>
    */
-  private def rewriteRecordWithNewSchema(oldRecord: Any, oldSchema: DataType, newSchema: DataType, renameCols: java.util.Map[String, String], fieldNames: java.util.Deque[String]): Any = {
-    if (oldRecord == null) {
-      null
-    } else {
-      newSchema match {
-        case targetSchema: StructType =>
-          if (!oldRecord.isInstanceOf[InternalRow]) {
-            throw new IllegalArgumentException("cannot rewrite record with different type")
-          }
-          val oldRow = oldRecord.asInstanceOf[InternalRow]
-          val helper = mutable.Map[Integer, Any]()
-
-          val oldStrucType = oldSchema.asInstanceOf[StructType]
-          targetSchema.fields.zipWithIndex.foreach { case (field, i) =>
-            fieldNames.push(field.name)
-            if (existField(oldStrucType, field.name)) {
-              val oldField = oldStrucType(field.name)
-              val oldPos = oldStrucType.fieldIndex(field.name)
-              helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
-            } else {
-              val fieldFullName = createFullName(fieldNames)
-              val colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\.")
-              val lastColNameFromOldSchema = colNamePartsFromOldSchema(colNamePartsFromOldSchema.length - 1)
-              // deal with rename
-              if (!existField(oldStrucType, field.name) && existField(oldStrucType, lastColNameFromOldSchema)) {
-                // find rename
-                val oldField = oldStrucType(lastColNameFromOldSchema)
-                val oldPos = oldStrucType.fieldIndex(lastColNameFromOldSchema)
-                helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
-              }
-            }
-            fieldNames.pop()
-          }
-          val newRow = new GenericInternalRow(Array.fill(targetSchema.length)(null).asInstanceOf[Array[Any]])
-          targetSchema.fields.zipWithIndex.foreach { case (_, i) =>
-            if (helper.contains(i)) {
-              newRow.update(i, helper(i))
-            } else {
-              // TODO add default val
-              newRow.update(i, null)
-            }
-          }
+  def getCachedUnsafeRowWriter(from: StructType, to: StructType, renamedColumnsMap: JMap[String, String] = JCollections.emptyMap()): UnsafeRowWriter = {
+    unsafeWriterThreadLocal.get()
+      .getOrElseUpdate((from, to, renamedColumnsMap), genUnsafeRowWriter(from, to, renamedColumnsMap))
+  }
 
-          newRow
-        case targetSchema: ArrayType =>
-          if (!oldRecord.isInstanceOf[ArrayData]) {
-            throw new IllegalArgumentException("cannot rewrite record with different type")
-          }
-          val oldElementType = oldSchema.asInstanceOf[ArrayType].elementType
-          val oldArray = oldRecord.asInstanceOf[ArrayData]
-          val newElementType = targetSchema.elementType
-          val newArray = new GenericArrayData(Array.fill(oldArray.numElements())(null).asInstanceOf[Array[Any]])
-          fieldNames.push("element")
-          oldArray.toSeq[Any](oldElementType).zipWithIndex.foreach { case (value, i) => newArray.update(i, rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldElementType, newElementType, renameCols, fieldNames)) }
-          fieldNames.pop()
-
-          newArray
-        case targetSchema: MapType =>
-          if (!oldRecord.isInstanceOf[MapData]) {
-            throw new IllegalArgumentException("cannot rewrite record with different type")
-          }
-          val oldValueType = oldSchema.asInstanceOf[MapType].valueType
-          val oldKeyType = oldSchema.asInstanceOf[MapType].keyType
-          val oldMap = oldRecord.asInstanceOf[MapData]
-          val newValueType = targetSchema.valueType
-          val newKeyArray = new GenericArrayData(Array.fill(oldMap.keyArray().numElements())(null).asInstanceOf[Array[Any]])
-          val newValueArray = new GenericArrayData(Array.fill(oldMap.valueArray().numElements())(null).asInstanceOf[Array[Any]])
-          val newMap = new ArrayBasedMapData(newKeyArray, newValueArray)
-          fieldNames.push("value")
-          oldMap.keyArray().toSeq[Any](oldKeyType).zipWithIndex.foreach { case (value, i) => newKeyArray.update(i, rewritePrimaryType(value, oldKeyType, oldKeyType)) }
-          oldMap.valueArray().toSeq[Any](oldValueType).zipWithIndex.foreach { case (value, i) => newValueArray.update(i, rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldValueType, newValueType, renameCols, fieldNames)) }
-          fieldNames.pop()
-
-          newMap
-        case _ => rewritePrimaryType(oldRecord, oldSchema, newSchema)
-      }
+  def getCachedPosList(structType: StructType, field: String): Option[NestedFieldPath] = {
+    val nestedFieldPathOpt = orderPosListMap.get((structType, field))
+    // NOTE: This specifically designed to do 2 lookups (in case of cache-miss) to avoid
+    //       allocating the closure when using [[computeIfAbsent]] on more frequent cache-hit path
+    if (nestedFieldPathOpt != null) {
+      nestedFieldPathOpt
+    } else {
+      orderPosListMap.computeIfAbsent((structType, field), new JFunction[(StructType, String), Option[NestedFieldPath]] {
+        override def apply(t: (StructType, String)): Option[NestedFieldPath] =
+          composeNestedFieldPath(structType, field)
+      })
     }
   }
 
-  def getCachedPosList(structType: StructType, field: String): NestedFieldPath = {
-    val schemaPair = (structType, field)
-    if (!orderPosListMap.containsKey(schemaPair)) {
-      val posList = HoodieUnsafeRowUtils.composeNestedFieldPath(structType, field)
-      orderPosListMap.put(schemaPair, posList)
+  def getCachedSchema(schema: Schema): StructType = {
+    val structType = schemaMap.get(schema)
+    // NOTE: This specifically designed to do 2 lookups (in case of cache-miss) to avoid
+    //       allocating the closure when using [[computeIfAbsent]] on more frequent cache-hit path
+    if (structType != null) {
+      structType
+    } else {
+      schemaMap.computeIfAbsent(schema, new JFunction[Schema, StructType] {
+        override def apply(t: Schema): StructType =
+          convertAvroSchemaToStructType(schema)
+      })
     }
-    orderPosListMap.get(schemaPair)
   }
 
-  def getCachedUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = {
-    val schemaPair = (from, to)
-    val map = unsafeProjectionThreadLocal.get()
-    if (!map.containsKey(schemaPair)) {
-      val projection = HoodieCatalystExpressionUtils.generateUnsafeProjection(from, to)
-      map.put(schemaPair, projection)
+  private[sql] def genUnsafeRowWriter(prevSchema: StructType,
+                                      newSchema: StructType,
+                                      renamedColumnsMap: JMap[String, String]): UnsafeRowWriter = {
+    val writer = newWriterRenaming(prevSchema, newSchema, renamedColumnsMap, new JArrayDeque[String]())
+    val unsafeProjection = generateUnsafeProjection(newSchema, newSchema)
+    val phonyUpdater = new CatalystDataUpdater {
+      var value: InternalRow = _
+
+      override def set(ordinal: Int, value: Any): Unit =
+        this.value = value.asInstanceOf[InternalRow]
     }
-    map.get(schemaPair)
-  }
 
-  def getCachedSchema(schema: Schema): StructType = {
-    if (!schemaMap.containsKey(schema)) {
-      val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
-      schemaMap.put(schema, structType)
+    oldRow => {
+      writer(phonyUpdater, 0, oldRow)
+      unsafeProjection(phonyUpdater.value)
     }
-    schemaMap.get(schema)
   }
 
-  def existField(structType: StructType, name: String): Boolean = {
-    try {
-      HoodieUnsafeRowUtils.composeNestedFieldPath(structType, name)
-      true
-    } catch {
-      case _: IllegalArgumentException => false
+  private type RowFieldUpdater = (CatalystDataUpdater, Int, Any) => Unit
+
+  private def genUnsafeStructWriter(prevStructType: StructType,
+                                    newStructType: StructType,
+                                    renamedColumnsMap: JMap[String, String],
+                                    fieldNamesStack: JDeque[String]): (CatalystDataUpdater, Any) => Unit = {
+    // TODO need to canonicalize schemas (casing)
+    val fieldWriters = ArrayBuffer.empty[RowFieldUpdater]
+    val positionMap = ArrayBuffer.empty[Int]
+
+    for (newField <- newStructType.fields) {
+      fieldNamesStack.push(newField.name)
+
+      val (fieldWriter, prevFieldPos): (RowFieldUpdater, Int) =
+        prevStructType.getFieldIndex(newField.name) match {
+          case Some(prevFieldPos) =>
+            val prevField = prevStructType(prevFieldPos)
+            (newWriterRenaming(prevField.dataType, newField.dataType, renamedColumnsMap, fieldNamesStack), prevFieldPos)
+
+          case None =>
+            val newFieldQualifiedName = createFullName(fieldNamesStack)
+            val prevFieldName: String = lookupRenamedField(newFieldQualifiedName, renamedColumnsMap)
+
+            // Handle rename
+            prevStructType.getFieldIndex(prevFieldName) match {
+              case Some(prevFieldPos) =>
+                val prevField = prevStructType.fields(prevFieldPos)
+                (newWriterRenaming(prevField.dataType, newField.dataType, renamedColumnsMap, fieldNamesStack), prevFieldPos)
+
+              case None =>
+                val updater: RowFieldUpdater = (fieldUpdater, ordinal, _) => fieldUpdater.setNullAt(ordinal)
+                (updater, -1)
+            }
+        }
+
+      fieldWriters += fieldWriter
+      positionMap += prevFieldPos
+
+      fieldNamesStack.pop()
     }
-  }
 
-  private def rewritePrimaryType(oldValue: Any, oldSchema: DataType, newSchema: DataType) = {
-    if (oldSchema.equals(newSchema) || (oldSchema.isInstanceOf[DecimalType] && newSchema.isInstanceOf[DecimalType])) {
-      oldSchema match {
-        case NullType | BooleanType | IntegerType | LongType | FloatType | DoubleType | DateType | TimestampType | BinaryType =>
-          oldValue
-        // Copy UTF8String before putting into GenericInternalRow
-        case StringType => UTF8String.fromString(oldValue.toString)
-        case DecimalType() =>
-          Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
-        case _ =>
-          throw new HoodieException("Unknown schema type: " + newSchema)
+    (fieldUpdater, row) => {
+      var pos = 0
+      while (pos < fieldWriters.length) {
+        val prevPos = positionMap(pos)
+        val prevValue = if (prevPos >= 0) {
+          row.asInstanceOf[InternalRow].get(prevPos, prevStructType.fields(prevPos).dataType)
+        } else {
+          null
+        }
+
+        fieldWriters(pos)(fieldUpdater, pos, prevValue)
+        pos += 1
       }
-    } else {
-      rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema, newSchema)
     }
   }
 
-  private def rewritePrimaryTypeWithDiffSchemaType(oldValue: Any, oldSchema: DataType, newSchema: DataType): Any = {
-    val value = newSchema match {
-      case NullType | BooleanType =>
-      case DateType if oldSchema.equals(StringType) =>
-        CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(oldValue.toString))
-      case LongType =>
-        oldSchema match {
-          case IntegerType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].longValue())
+  private def newWriterRenaming(prevDataType: DataType,
+                                newDataType: DataType,
+                                renamedColumnsMap: JMap[String, String],
+                                fieldNameStack: JDeque[String]): RowFieldUpdater = {
+    (newDataType, prevDataType) match {
+      case (newType, prevType) if prevType == newType =>
+        (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value)
+
+      case (newStructType: StructType, prevStructType: StructType) =>
+        val writer = genUnsafeStructWriter(prevStructType, newStructType, renamedColumnsMap, fieldNameStack)
+
+        val newRow = new SpecificInternalRow(newStructType.fields.map(_.dataType))
+        val rowUpdater = new RowUpdater(newRow)
+
+        (fieldUpdater, ordinal, value) => {
+          // Here new row is built in 2 stages:
+          //    - First, we pass mutable row (used as buffer/scratchpad) created above wrapped into [[RowUpdater]]
+          //      into generated row-writer
+          //    - Upon returning from row-writer, we call back into parent row's [[fieldUpdater]] to set returned
+          //      row as a value in it
+          writer(rowUpdater, value)
+          fieldUpdater.set(ordinal, newRow)
+        }
+
+      case (ArrayType(newElementType, _), ArrayType(prevElementType, containsNull)) =>
+        fieldNameStack.push("element")
+        val elementWriter = newWriterRenaming(prevElementType, newElementType, renamedColumnsMap, fieldNameStack)
+        fieldNameStack.pop()
+
+        (fieldUpdater, ordinal, value) => {
+          val prevArrayData = value.asInstanceOf[ArrayData]
+          val prevArray = prevArrayData.toObjectArray(prevElementType)
+
+          val newArrayData = createArrayData(newElementType, prevArrayData.numElements())
+          val elementUpdater = new ArrayDataUpdater(newArrayData)
+
+          var i = 0
+          while (i < prevArray.length) {
+            val element = prevArray(i)
+            if (element == null) {
+              if (!containsNull) {
+                throw new HoodieException(
+                  s"Array value at path '${fieldNameStack.asScala.mkString(".")}' is not allowed to be null")
+              } else {
+                elementUpdater.setNullAt(i)
+              }
+            } else {
+              elementWriter(elementUpdater, i, element)
+            }
+            i += 1
+          }
+
+          fieldUpdater.set(ordinal, newArrayData)
+        }
+
+      case (MapType(_, newValueType, _), MapType(_, prevValueType, valueContainsNull)) =>
+        fieldNameStack.push("value")
+        val valueWriter = newWriterRenaming(prevValueType, newValueType, renamedColumnsMap, fieldNameStack)
+        fieldNameStack.pop()
+
+        (updater, ordinal, value) =>
+          val mapData = value.asInstanceOf[MapData]
+          val prevKeyArrayData = mapData.keyArray
+          val prevValueArrayData = mapData.valueArray
+          val prevValueArray = prevValueArrayData.toObjectArray(prevValueType)
+
+          val newValueArray = createArrayData(newValueType, mapData.numElements())
+          val valueUpdater = new ArrayDataUpdater(newValueArray)
+          var i = 0
+          while (i < prevValueArray.length) {
+            val value = prevValueArray(i)
+            if (value == null) {
+              if (!valueContainsNull) {
+                throw new HoodieException(s"Map value at path ${fieldNameStack.asScala.mkString(".")} is not allowed to be null")
+              } else {
+                valueUpdater.setNullAt(i)
+              }
+            } else {
+              valueWriter(valueUpdater, i, value)
+            }
+            i += 1
+          }
+
+          // NOTE: Key's couldn't be transformed and have to always be of [[StringType]]
+          updater.set(ordinal, new ArrayBasedMapData(prevKeyArrayData, newValueArray))
+
+      case (newDecimal: DecimalType, _) =>
+        prevDataType match {
+          case IntegerType | LongType | FloatType | DoubleType | StringType =>
+            (fieldUpdater, ordinal, value) =>
+              val scale = newDecimal.scale
+              // TODO this has to be revisited to avoid loss of precision (for fps)
+              fieldUpdater.setDecimal(ordinal, Decimal.fromDecimal(BigDecimal(value.toString).setScale(scale, ROUND_HALF_EVEN)))
+
+          case _: DecimalType =>
+            (fieldUpdater, ordinal, value) =>
+              fieldUpdater.setDecimal(ordinal, Decimal.fromDecimal(value.asInstanceOf[Decimal].toBigDecimal.setScale(newDecimal.scale)))
+
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case FloatType =>
-        oldSchema match {
-          case IntegerType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].floatValue())
-          case LongType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Long].floatValue())
+
+      case (_: ShortType, _) =>
+        prevDataType match {
+          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setShort(ordinal, value.asInstanceOf[Byte].toShort)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case DoubleType =>
-        oldSchema match {
-          case IntegerType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].doubleValue())
-          case LongType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Long].doubleValue())
-          case FloatType => CatalystTypeConverters.convertToCatalyst(java.lang.Double.valueOf(oldValue.asInstanceOf[Float] + ""))
+
+      case (_: IntegerType, _) =>
+        prevDataType match {
+          case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setInt(ordinal, value.asInstanceOf[Short].toInt)
+          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setInt(ordinal, value.asInstanceOf[Byte].toInt)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case BinaryType =>
-        oldSchema match {
-          case StringType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[String].getBytes(StandardCharsets.UTF_8))
+
+      case (_: LongType, _) =>
+        prevDataType match {
+          case _: IntegerType => (fieldUpdater, ordinal, value) => fieldUpdater.setLong(ordinal, value.asInstanceOf[Int].toLong)
+          case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setLong(ordinal, value.asInstanceOf[Short].toLong)
+          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setLong(ordinal, value.asInstanceOf[Byte].toLong)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case StringType =>
-        oldSchema match {
-          case BinaryType => CatalystTypeConverters.convertToCatalyst(new String(oldValue.asInstanceOf[Array[Byte]]))
-          case DateType => CatalystTypeConverters.convertToCatalyst(toJavaDate(oldValue.asInstanceOf[Integer]).toString)
-          case IntegerType | LongType | FloatType | DoubleType | DecimalType() => CatalystTypeConverters.convertToCatalyst(oldValue.toString)
+
+      case (_: FloatType, _) =>
+        prevDataType match {
+          case _: LongType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Long].toFloat)
+          case _: IntegerType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Int].toFloat)
+          case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Short].toFloat)
+          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Byte].toFloat)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case DecimalType() =>
-        oldSchema match {
-          case IntegerType | LongType | FloatType | DoubleType | StringType =>
-            val scale = newSchema.asInstanceOf[DecimalType].scale
 
-            Decimal.fromDecimal(BigDecimal(oldValue.toString).setScale(scale))
+      case (_: DoubleType, _) =>
+        prevDataType match {
+          case _: FloatType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Float].toDouble)
+          case _: LongType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Long].toDouble)
+          case _: IntegerType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Int].toDouble)
+          case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Short].toDouble)
+          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Byte].toDouble)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case _ =>
-    }
-    if (value == None) {
-      throw new HoodieException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema))
-    } else {
-      CatalystTypeConverters.convertToCatalyst(value)
+
+      case (_: BinaryType, _: StringType) =>
+        (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value.asInstanceOf[UTF8String].getBytes)
+
+      // TODO revisit this (we need to align permitted casting w/ Spark)
+      // NOTE: This is supported to stay compatible w/ [[HoodieAvroUtils.rewriteRecordWithNewSchema]]
+      case (_: StringType, _) =>
+        prevDataType match {
+          case BinaryType => (fieldUpdater, ordinal, value) =>
+            fieldUpdater.set(ordinal, UTF8String.fromBytes(value.asInstanceOf[Array[Byte]]))
+          case DateType => (fieldUpdater, ordinal, value) =>
+            fieldUpdater.set(ordinal, UTF8String.fromString(toJavaDate(value.asInstanceOf[Integer]).toString))
+          case IntegerType | LongType | FloatType | DoubleType | _: DecimalType =>
+            (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, UTF8String.fromString(value.toString))
+
+          case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
+        }
+
+      case (DateType, StringType) =>
+        (fieldUpdater, ordinal, value) =>
+          fieldUpdater.set(ordinal, CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(value.toString)))
+
+      case (_, _) =>
+        throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
     }
   }
 
-  def removeFields(schema: StructType, fieldsToRemove: java.util.List[String]): StructType = {
-    StructType(schema.fields.filter(field => !fieldsToRemove.contains(field.name)))
+  private def lookupRenamedField(newFieldQualifiedName: String, renamedColumnsMap: JMap[String, String]) = {
+    val prevFieldQualifiedName = renamedColumnsMap.getOrDefault(newFieldQualifiedName, "")
+    val prevFieldQualifiedNameParts = prevFieldQualifiedName.split("\\.")
+    val prevFieldName = prevFieldQualifiedNameParts(prevFieldQualifiedNameParts.length - 1)
+
+    prevFieldName
   }
+
+  private def createArrayData(elementType: DataType, length: Int): ArrayData = elementType match {
+    case BooleanType => UnsafeArrayData.fromPrimitiveArray(new Array[Boolean](length))
+    case ByteType => UnsafeArrayData.fromPrimitiveArray(new Array[Byte](length))
+    case ShortType => UnsafeArrayData.fromPrimitiveArray(new Array[Short](length))
+    case IntegerType => UnsafeArrayData.fromPrimitiveArray(new Array[Int](length))
+    case LongType => UnsafeArrayData.fromPrimitiveArray(new Array[Long](length))
+    case FloatType => UnsafeArrayData.fromPrimitiveArray(new Array[Float](length))
+    case DoubleType => UnsafeArrayData.fromPrimitiveArray(new Array[Double](length))
+    case _ => new GenericArrayData(new Array[Any](length))
+  }
+
+  sealed trait CatalystDataUpdater {
+    def set(ordinal: Int, value: Any): Unit
+    def setNullAt(ordinal: Int): Unit = set(ordinal, null)
+    def setBoolean(ordinal: Int, value: Boolean): Unit = set(ordinal, value)
+    def setByte(ordinal: Int, value: Byte): Unit = set(ordinal, value)
+    def setShort(ordinal: Int, value: Short): Unit = set(ordinal, value)
+    def setInt(ordinal: Int, value: Int): Unit = set(ordinal, value)
+    def setLong(ordinal: Int, value: Long): Unit = set(ordinal, value)
+    def setDouble(ordinal: Int, value: Double): Unit = set(ordinal, value)
+    def setFloat(ordinal: Int, value: Float): Unit = set(ordinal, value)
+    def setDecimal(ordinal: Int, value: Decimal): Unit = set(ordinal, value)
+  }
+
+  final class RowUpdater(row: InternalRow) extends CatalystDataUpdater {
+    override def set(ordinal: Int, value: Any): Unit = row.update(ordinal, value)
+    override def setNullAt(ordinal: Int): Unit = row.setNullAt(ordinal)
+    override def setBoolean(ordinal: Int, value: Boolean): Unit = row.setBoolean(ordinal, value)
+    override def setByte(ordinal: Int, value: Byte): Unit = row.setByte(ordinal, value)
+    override def setShort(ordinal: Int, value: Short): Unit = row.setShort(ordinal, value)
+    override def setInt(ordinal: Int, value: Int): Unit = row.setInt(ordinal, value)
+    override def setLong(ordinal: Int, value: Long): Unit = row.setLong(ordinal, value)
+    override def setDouble(ordinal: Int, value: Double): Unit = row.setDouble(ordinal, value)
+    override def setFloat(ordinal: Int, value: Float): Unit = row.setFloat(ordinal, value)
+    override def setDecimal(ordinal: Int, value: Decimal): Unit =
+      row.setDecimal(ordinal, value, value.precision)
+  }
+
+  final class ArrayDataUpdater(array: ArrayData) extends CatalystDataUpdater {
+    override def set(ordinal: Int, value: Any): Unit = array.update(ordinal, value)
+    override def setNullAt(ordinal: Int): Unit = array.setNullAt(ordinal)
+    override def setBoolean(ordinal: Int, value: Boolean): Unit = array.setBoolean(ordinal, value)
+    override def setByte(ordinal: Int, value: Byte): Unit = array.setByte(ordinal, value)
+    override def setShort(ordinal: Int, value: Short): Unit = array.setShort(ordinal, value)
+    override def setInt(ordinal: Int, value: Int): Unit = array.setInt(ordinal, value)
+    override def setLong(ordinal: Int, value: Long): Unit = array.setLong(ordinal, value)
+    override def setDouble(ordinal: Int, value: Double): Unit = array.setDouble(ordinal, value)
+    override def setFloat(ordinal: Int, value: Float): Unit = array.setFloat(ordinal, value)
+    override def setDecimal(ordinal: Int, value: Decimal): Unit = array.update(ordinal, value)
+  }
+
 }
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala
index c105142de0f..5486c9f6551 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala
@@ -93,29 +93,36 @@ object HoodieUnsafeRowUtils {
    *
    * This method produces nested-field path, that is subsequently used by [[getNestedInternalRowValue]], [[getNestedRowValue]]
    */
-  def composeNestedFieldPath(schema: StructType, nestedFieldRef: String): NestedFieldPath = {
+  def composeNestedFieldPath(schema: StructType, nestedFieldRef: String): Option[NestedFieldPath]= {
     val fieldRefParts = nestedFieldRef.split('.')
     val ordSeq = ArrayBuffer[(Int, StructField)]()
     var curSchema = schema
     var idx = 0
     while (idx < fieldRefParts.length) {
       val fieldRefPart = fieldRefParts(idx)
-      val ord = curSchema.fieldIndex(fieldRefPart)
-      val field = curSchema(ord)
-      // Append current field's (ordinal, data-type)
-      ordSeq.append((ord, field))
-      // Update current schema, unless terminal field-ref part
-      if (idx < fieldRefParts.length - 1) {
-        curSchema = field.dataType match {
-          case st: StructType => st
-          case dt@_ =>
-            throw new IllegalArgumentException(s"Invalid nested field reference ${fieldRefParts.drop(idx).mkString(".")} into $dt")
-        }
+      curSchema.getFieldIndex(fieldRefPart) match {
+        case Some(ord) =>
+          val field = curSchema(ord)
+          // Append current field's (ordinal, data-type)
+          ordSeq.append((ord, field))
+          // Update current schema, unless terminal field-ref part
+          if (idx < fieldRefParts.length - 1) {
+            curSchema = field.dataType match {
+              case st: StructType => st
+              case _ =>
+                // In case we've stumbled upon something other than the [[StructType]] means that
+                // provided nested field reference is invalid. In that case we simply return null
+                return None
+            }
+          }
+
+        // In case, field is not found we return null
+        case None => return None
       }
       idx += 1
     }
 
-    NestedFieldPath(ordSeq.toArray)
+    Some(NestedFieldPath(ordSeq.toArray))
   }
 
   case class NestedFieldPath(parts: Array[(Int, StructField)])
diff --git a/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/TestHoodieUnsafeRowUtils.scala b/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/TestHoodieUnsafeRowUtils.scala
index c23bbab99b4..104d1870eae 100644
--- a/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/TestHoodieUnsafeRowUtils.scala
+++ b/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/TestHoodieUnsafeRowUtils.scala
@@ -21,7 +21,7 @@ package org.apache.spark.sql
 import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue, getNestedRowValue}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types._
-import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail}
 import org.junit.jupiter.api.Test
 
 class TestHoodieUnsafeRowUtils {
@@ -41,11 +41,9 @@ class TestHoodieUnsafeRowUtils {
 
     assertEquals(
       Seq((1, schema(1)), (0, schema(1).dataType.asInstanceOf[StructType](0))),
-      composeNestedFieldPath(schema, "bar.baz").parts.toSeq)
+      composeNestedFieldPath(schema, "bar.baz").get.parts.toSeq)
 
-    assertThrows(classOf[IllegalArgumentException]) { () =>
-      composeNestedFieldPath(schema, "foo.baz")
-    }
+    assertTrue(composeNestedFieldPath(schema, "foo.baz").isEmpty)
   }
 
   @Test
@@ -65,36 +63,36 @@ class TestHoodieUnsafeRowUtils {
 
     assertEquals(
       123,
-      getNestedInternalRowValue(row, composeNestedFieldPath(schema, "bar.baz"))
+      getNestedInternalRowValue(row, composeNestedFieldPath(schema, "bar.baz").get)
     )
     assertEquals(
       456L,
-      getNestedInternalRowValue(row, composeNestedFieldPath(schema, "bar.bor"))
+      getNestedInternalRowValue(row, composeNestedFieldPath(schema, "bar.bor").get)
     )
     assertEquals(
       "str",
-      getNestedInternalRowValue(row, composeNestedFieldPath(schema, "foo"))
+      getNestedInternalRowValue(row, composeNestedFieldPath(schema, "foo").get)
     )
     assertEquals(
       row.getStruct(1, 2),
-      getNestedInternalRowValue(row, composeNestedFieldPath(schema, "bar"))
+      getNestedInternalRowValue(row, composeNestedFieldPath(schema, "bar").get)
     )
 
     val rowProperNullable = InternalRow("str", null)
 
     assertEquals(
       null,
-      getNestedInternalRowValue(rowProperNullable, composeNestedFieldPath(schema, "bar.baz"))
+      getNestedInternalRowValue(rowProperNullable, composeNestedFieldPath(schema, "bar.baz").get)
     )
     assertEquals(
       null,
-      getNestedInternalRowValue(rowProperNullable, composeNestedFieldPath(schema, "bar"))
+      getNestedInternalRowValue(rowProperNullable, composeNestedFieldPath(schema, "bar").get)
     )
 
     val rowInvalidNullable = InternalRow(null, InternalRow(123, 456L))
 
     assertThrows(classOf[IllegalArgumentException]) { () =>
-      getNestedInternalRowValue(rowInvalidNullable, composeNestedFieldPath(schema, "foo"))
+      getNestedInternalRowValue(rowInvalidNullable, composeNestedFieldPath(schema, "foo").get)
     }
   }
 
@@ -115,36 +113,36 @@ class TestHoodieUnsafeRowUtils {
 
     assertEquals(
       123,
-      getNestedRowValue(row, composeNestedFieldPath(schema, "bar.baz"))
+      getNestedRowValue(row, composeNestedFieldPath(schema, "bar.baz").get)
     )
     assertEquals(
       456L,
-      getNestedRowValue(row, composeNestedFieldPath(schema, "bar.bor"))
+      getNestedRowValue(row, composeNestedFieldPath(schema, "bar.bor").get)
     )
     assertEquals(
       "str",
-      getNestedRowValue(row, composeNestedFieldPath(schema, "foo"))
+      getNestedRowValue(row, composeNestedFieldPath(schema, "foo").get)
     )
     assertEquals(
       row.getStruct(1),
-      getNestedRowValue(row, composeNestedFieldPath(schema, "bar"))
+      getNestedRowValue(row, composeNestedFieldPath(schema, "bar").get)
     )
 
     val rowProperNullable = Row("str", null)
 
     assertEquals(
       null,
-      getNestedRowValue(rowProperNullable, composeNestedFieldPath(schema, "bar.baz"))
+      getNestedRowValue(rowProperNullable, composeNestedFieldPath(schema, "bar.baz").get)
     )
     assertEquals(
       null,
-      getNestedRowValue(rowProperNullable, composeNestedFieldPath(schema, "bar"))
+      getNestedRowValue(rowProperNullable, composeNestedFieldPath(schema, "bar").get)
     )
 
     val rowInvalidNullable = Row(null, Row(123, 456L))
 
     assertThrows(classOf[IllegalArgumentException]) { () =>
-      getNestedRowValue(rowInvalidNullable, composeNestedFieldPath(schema, "foo"))
+      getNestedRowValue(rowInvalidNullable, composeNestedFieldPath(schema, "foo").get)
     }
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index a8c695240cd..db77c21e8f4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -112,28 +112,18 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
   }
 
   @Override
-  public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException {
-    GenericRecord record = HoodieAvroUtils.rewriteRecordWithNewSchema(data, targetSchema);
-    return new HoodieAvroIndexedRecord(key, record, operation, metaData);
+  public HoodieRecord prependMetaFields(Schema recordSchema, Schema targetSchema, MetadataValues metadataValues, Properties props) {
+    GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(data, targetSchema);
+    updateMetadataValuesInternal(newAvroRecord, metadataValues);
+    return new HoodieAvroIndexedRecord(key, newAvroRecord, operation, metaData);
   }
 
   @Override
-  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) throws IOException {
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) {
     GenericRecord record = HoodieAvroUtils.rewriteRecordWithNewSchema(data, newSchema, renameCols);
     return new HoodieAvroIndexedRecord(key, record, operation, metaData);
   }
 
-  @Override
-  public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException {
-    metadataValues.getKv().forEach((key, value) -> {
-      if (value != null) {
-        ((GenericRecord) data).put(key, value);
-      }
-    });
-
-    return new HoodieAvroIndexedRecord(key, data, operation, metaData);
-  }
-
   @Override
   public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, String keyFieldName) {
     ((GenericRecord) data).put(keyFieldName, StringUtils.EMPTY_STRING);
@@ -234,4 +224,18 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
 
     return kryo.readObjectOrNull(input, GenericRecord.class, avroSerializer);
   }
+
+  static void updateMetadataValuesInternal(GenericRecord avroRecord, MetadataValues metadataValues) {
+    if (metadataValues.isEmpty()) {
+      return; // no-op
+    }
+
+    String[] values = metadataValues.getValues();
+    for (int pos = 0; pos < values.length; ++pos) {
+      String value = values[pos];
+      if (value != null) {
+        avroRecord.put(HoodieMetadataField.values()[pos].getFieldName(), value);
+      }
+    }
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index 62001356b4f..943c4e0953f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 
 import org.apache.avro.Schema;
@@ -37,6 +38,8 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.apache.hudi.common.model.HoodieAvroIndexedRecord.updateMetadataValuesInternal;
+
 /**
  * Implementation of {@link HoodieRecord} using Avro payload.
  *
@@ -123,31 +126,26 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
   }
 
   @Override
-  public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException {
-    Option<IndexedRecord> avroRecordPayloadOpt = getData().getInsertValue(recordSchema, props);
-    GenericRecord avroPayloadInNewSchema =
-        HoodieAvroUtils.rewriteRecord((GenericRecord) avroRecordPayloadOpt.get(), targetSchema);
-    return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroPayloadInNewSchema), getOperation(), this.currentLocation, this.newLocation);
-  }
-
-  @Override
-  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) throws IOException {
-    GenericRecord oldRecord = (GenericRecord) getData().getInsertValue(recordSchema, props).get();
-    GenericRecord rewriteRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(oldRecord, newSchema, renameCols);
-    return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(rewriteRecord), getOperation(), this.currentLocation, this.newLocation);
+  public HoodieRecord prependMetaFields(Schema recordSchema, Schema targetSchema, MetadataValues metadataValues, Properties props) {
+    try {
+      Option<IndexedRecord> avroRecordOpt = getData().getInsertValue(recordSchema, props);
+      GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecordOpt.get(), targetSchema);
+      updateMetadataValuesInternal(newAvroRecord, metadataValues);
+      return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(newAvroRecord), getOperation(), this.currentLocation, this.newLocation);
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to deserialize record!", e);
+    }
   }
 
   @Override
-  public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException {
-    GenericRecord avroRecordPayload = (GenericRecord) getData().getInsertValue(recordSchema, props).get();
-
-    metadataValues.getKv().forEach((key, value) -> {
-      if (value != null) {
-        avroRecordPayload.put(key, value);
-      }
-    });
-
-    return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroRecordPayload), getOperation(), this.currentLocation, this.newLocation);
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) {
+    try {
+      GenericRecord oldRecord = (GenericRecord) getData().getInsertValue(recordSchema, props).get();
+      GenericRecord rewriteRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(oldRecord, newSchema, renameCols);
+      return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(rewriteRecord), getOperation(), this.currentLocation, this.newLocation);
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to deserialize record!", e);
+    }
   }
 
   @Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
index e2f9334f8a2..3c7927b6b65 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
@@ -99,17 +99,12 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
   }
 
   @Override
-  public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException {
+  public HoodieRecord prependMetaFields(Schema recordSchema, Schema targetSchema, MetadataValues metadataValues, Properties props) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException {
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) {
     throw new UnsupportedOperationException();
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index a23316a2ff9..d78241aaeb4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -354,25 +354,21 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf
   public abstract HoodieRecord joinWith(HoodieRecord other, Schema targetSchema);
 
   /**
-   * Rewrite record into new schema(add meta columns)
+   * Rewrites record into new target schema containing Hudi-specific meta-fields
+   *
+   * NOTE: This operation is idempotent
    */
-  public abstract HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException;
+  public abstract HoodieRecord prependMetaFields(Schema recordSchema, Schema targetSchema, MetadataValues metadataValues, Properties props);
 
   /**
    * Support schema evolution.
    */
-  public abstract HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) throws IOException;
+  public abstract HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols);
 
-  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema) throws IOException {
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema) {
     return rewriteRecordWithNewSchema(recordSchema, props, newSchema, Collections.emptyMap());
   }
 
-  /**
-   * This method could change in the future.
-   * @temporary
-   */
-  public abstract HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException;
-
   public abstract boolean isDelete(Schema recordSchema, Properties props) throws IOException;
 
   /**
@@ -391,6 +387,10 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf
     return instantTime + "_" + partitionId + "_" + recordIndex;
   }
 
+  protected static boolean hasMetaFields(Schema schema) {
+    return schema.getField(HoodieRecord.RECORD_KEY_METADATA_FIELD) != null;
+  }
+
   /**
    * A special record returned by {@link HoodieRecordPayload}, which means we should just skip this record.
    * This record is only used for {@link HoodieRecordPayload} currently, so it should not
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java b/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java
index baee0816338..361da5639f4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java
@@ -18,47 +18,95 @@
 
 package org.apache.hudi.common.model;
 
-import java.util.HashMap;
-import java.util.Map;
-
 public class MetadataValues {
-  private final Map<String, String> kv;
 
-  public MetadataValues() {
-    this.kv = new HashMap<>();
+  // NOTE: These fields are laid out in the same order as they are encoded in
+  //       each record and that should be preserved
+  private String commitTime;
+  private String commitSeqNo;
+  private String recordKey;
+  private String partitionPath;
+  private String fileName;
+  private String operation;
+
+  private boolean set = false;
+
+  public MetadataValues() {}
+
+  public String getCommitTime() {
+    return commitTime;
+  }
+
+  public String getCommitSeqNo() {
+    return commitSeqNo;
+  }
+
+  public String getRecordKey() {
+    return recordKey;
+  }
+
+  public String getPartitionPath() {
+    return partitionPath;
+  }
+
+  public String getFileName() {
+    return fileName;
+  }
+
+  public String getOperation() {
+    return operation;
   }
 
   public MetadataValues setCommitTime(String value) {
-    this.kv.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, value);
+    this.commitTime = value;
+    this.set = true;
     return this;
   }
 
   public MetadataValues setCommitSeqno(String value) {
-    this.kv.put(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, value);
+    this.commitSeqNo = value;
+    this.set = true;
     return this;
   }
 
   public MetadataValues setRecordKey(String value) {
-    this.kv.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, value);
+    this.recordKey = value;
+    this.set = true;
     return this;
   }
 
   public MetadataValues setPartitionPath(String value) {
-    this.kv.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, value);
+    this.partitionPath = value;
+    this.set = true;
     return this;
   }
 
   public MetadataValues setFileName(String value) {
-    this.kv.put(HoodieRecord.FILENAME_METADATA_FIELD, value);
+    this.fileName = value;
+    this.set = true;
     return this;
   }
 
   public MetadataValues setOperation(String value) {
-    this.kv.put(HoodieRecord.OPERATION_METADATA_FIELD, value);
+    this.operation = value;
+    this.set = true;
     return this;
   }
 
-  public Map<String, String> getKv() {
-    return kv;
+  public boolean isEmpty() {
+    return !set;
+  }
+
+  public String[] getValues() {
+    return new String[] {
+        // NOTE: These fields are laid out in the same order as they are encoded in
+        //       each record and that should be preserved
+        commitTime,
+        commitSeqNo,
+        recordKey,
+        partitionPath,
+        fileName,
+        operation
+    };
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 42babc775b9..9855c9323dc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -842,16 +842,11 @@ public abstract class AbstractHoodieLogRecordReader {
     Schema mergedAvroSchema = AvroInternalSchemaConverter.convert(mergedInternalSchema, readerSchema.getFullName());
 
     return Option.of(Pair.of((record) -> {
-      try {
-        return record.rewriteRecordWithNewSchema(
-            dataBlock.getSchema(),
-            this.hoodieTableMetaClient.getTableConfig().getProps(),
-            mergedAvroSchema,
-            Collections.emptyMap());
-      } catch (IOException e) {
-        LOG.error("Error rewrite record with new schema", e);
-        throw new HoodieException(e);
-      }
+      return record.rewriteRecordWithNewSchema(
+          dataBlock.getSchema(),
+          this.hoodieTableMetaClient.getTableConfig().getProps(),
+          mergedAvroSchema,
+          Collections.emptyMap());
     }, mergedAvroSchema));
   }
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 304a1303a3b..0092019dad5 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -37,7 +37,7 @@ import org.apache.hudi.common.model._
 import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
-import org.apache.hudi.common.util.{CommitUtils, StringUtils}
+import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption}
 import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME, KEYGEN_CLASS_NAME}
 import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.{HoodieException, SchemaCompatibilityException}
@@ -57,6 +57,7 @@ import org.apache.hudi.util.SparkKeyGenUtils
 import org.apache.log4j.LogManager
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.HoodieInternalRowUtils.getCachedUnsafeRowWriter
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
@@ -95,12 +96,11 @@ object HoodieSparkSqlWriter {
             optParams: Map[String, String],
             df: DataFrame,
             hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
-            hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
-            asyncCompactionTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty,
-            asyncClusteringTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty,
-            extraPreCommitFn: Option[BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata]] = Option.empty)
-  : (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String],
-    SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
+            hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty,
+            asyncCompactionTriggerFn: Option[SparkRDDWriteClient[_] => Unit] = Option.empty,
+            asyncClusteringTriggerFn: Option[SparkRDDWriteClient[_] => Unit] = Option.empty,
+            extraPreCommitFn: Option[BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata]] = Option.empty):
+  (Boolean, HOption[String], HOption[String], HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = {
 
     assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
     val path = optParams("path")
@@ -255,7 +255,7 @@ object HoodieSparkSqlWriter {
       }
       // scalastyle:on
 
-      val (writeResult, writeClient: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]) =
+      val (writeResult, writeClient: SparkRDDWriteClient[_]) =
         operation match {
           case WriteOperationType.DELETE =>
             val genericRecords = HoodieSparkUtils.createRdd(df, avroRecordName, avroRecordNamespace)
@@ -271,7 +271,7 @@ object HoodieSparkSqlWriter {
             val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
               null, path, tblName,
               mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
-              .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
+              .asInstanceOf[SparkRDDWriteClient[_]]
 
             if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
               asyncCompactionTriggerFn.get.apply(client)
@@ -307,7 +307,7 @@ object HoodieSparkSqlWriter {
             val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
               schemaStr, path, tblName,
               mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
-              .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
+              .asInstanceOf[SparkRDDWriteClient[_]]
             // Issue delete partitions
             client.startCommitWithTime(instantTime, commitActionType)
             val writeStatuses = DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, instantTime)
@@ -330,31 +330,20 @@ object HoodieSparkSqlWriter {
               writerSchema
             }
 
-            // NOTE: Avro's [[Schema]] can't be effectively serialized by JVM native serialization framework
-            //       (due to containing cyclic refs), therefore we have to convert it to string before
-            //       passing onto the Executor
-            val dataFileSchemaStr = dataFileSchema.toString
-
             // Create a HoodieWriteClient & issue the write.
             val client = hoodieWriteClient.getOrElse {
               val finalOpts = addSchemaEvolutionParameters(parameters, internalSchemaOpt, Some(writerSchema)) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key
               // TODO(HUDI-4772) proper writer-schema has to be specified here
-              DataSourceUtils.createHoodieClient(jsc, dataFileSchemaStr, path, tblName, mapAsJavaMap(finalOpts))
-            }.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
+              DataSourceUtils.createHoodieClient(jsc, dataFileSchema.toString, path, tblName, mapAsJavaMap(finalOpts))
+            }
             val writeConfig = client.getConfig
             if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == HoodieTableType.MERGE_ON_READ && writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) {
               throw new UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName} only support parquet log.")
             }
             // Convert to RDD[HoodieRecord]
-            val hoodieRecords = createHoodieRecordRdd(
-              df,
-              writeConfig,
-              parameters,
-              avroRecordName,
-              avroRecordNamespace,
-              writerSchema,
-              dataFileSchemaStr,
-              operation)
+            val hoodieRecords =
+              createHoodieRecordRdd(df, writeConfig, parameters, avroRecordName, avroRecordNamespace, writerSchema,
+                dataFileSchema, operation)
 
             if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
               asyncCompactionTriggerFn.get.apply(client)
@@ -528,31 +517,6 @@ object HoodieSparkSqlWriter {
     HoodieAvroUtils.removeFields(schema, partitionColumns.toSet.asJava)
   }
 
-  def generateSparkSchemaWithoutPartitionColumns(partitionParam: String, schema: StructType): StructType = {
-    val partitionColumns = getPartitionColumns(partitionParam)
-    HoodieInternalRowUtils.removeFields(schema, partitionColumns)
-  }
-
-  def getAvroProcessedRecord(partitionParam: String, record: GenericRecord,
-                             dropPartitionColumns: Boolean): GenericRecord = {
-    var processedRecord = record
-    if (dropPartitionColumns) {
-      val writeSchema = generateSchemaWithoutPartitionColumns(partitionParam, record.getSchema)
-      processedRecord = HoodieAvroUtils.rewriteRecord(record, writeSchema)
-    }
-    processedRecord
-  }
-
-  def getProcessedRecord(partitionParam: String, record: GenericRecord,
-                         dropPartitionColumns: Boolean): GenericRecord = {
-    var processedRecord = record
-    if (dropPartitionColumns) {
-      val writeSchema = generateSchemaWithoutPartitionColumns(partitionParam, record.getSchema)
-      processedRecord = HoodieAvroUtils.rewriteRecord(record, writeSchema)
-    }
-    processedRecord
-  }
-
   def addSchemaEvolutionParameters(parameters: Map[String, String], internalSchemaOpt: Option[InternalSchema], writeSchemaOpt: Option[Schema] = None): Map[String, String] = {
     val schemaEvolutionEnable = if (internalSchemaOpt.isDefined) "true" else "false"
 
@@ -650,7 +614,7 @@ object HoodieSparkSqlWriter {
                 optParams: Map[String, String],
                 df: DataFrame,
                 hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
-                hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty): Boolean = {
+                hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty): Boolean = {
 
     assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
     val path = optParams("path")
@@ -921,7 +885,7 @@ object HoodieSparkSqlWriter {
                                              schema: StructType,
                                              writeResult: HoodieWriteResult,
                                              parameters: Map[String, String],
-                                             client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
+                                             client: SparkRDDWriteClient[_],
                                              tableConfig: HoodieTableConfig,
                                              jsc: JavaSparkContext,
                                              tableInstantInfo: TableInstantInfo,
@@ -988,7 +952,7 @@ object HoodieSparkSqlWriter {
     }
   }
 
-  private def isAsyncCompactionEnabled(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
+  private def isAsyncCompactionEnabled(client: SparkRDDWriteClient[_],
                                        tableConfig: HoodieTableConfig,
                                        parameters: Map[String, String], configuration: Configuration): Boolean = {
     log.info(s"Config.inlineCompactionEnabled ? ${client.getConfig.inlineCompactionEnabled}")
@@ -1000,7 +964,7 @@ object HoodieSparkSqlWriter {
     }
   }
 
-  private def isAsyncClusteringEnabled(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
+  private def isAsyncClusteringEnabled(client: SparkRDDWriteClient[_],
                                        parameters: Map[String, String]): Boolean = {
     log.info(s"Config.asyncClusteringEnabled ? ${client.getConfig.isAsyncClusteringEnabled}")
     asyncClusteringTriggerFnDefined && client.getConfig.isAsyncClusteringEnabled
@@ -1056,7 +1020,7 @@ object HoodieSparkSqlWriter {
                                     recordName: String,
                                     recordNameSpace: String,
                                     writerSchema: Schema,
-                                    dataFileSchemaStr: String,
+                                    dataFileSchema: Schema,
                                     operation: WriteOperationType) = {
     val shouldDropPartitionColumns = config.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
     val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps))
@@ -1067,7 +1031,13 @@ object HoodieSparkSqlWriter {
       parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
         HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
 
-    log.debug(s"Use $recordType")
+    // NOTE: Avro's [[Schema]] can't be effectively serialized by JVM native serialization framework
+    //       (due to containing cyclic refs), therefore we have to convert it to string before
+    //       passing onto the Executor
+    val dataFileSchemaStr = dataFileSchema.toString
+
+    log.debug(s"Creating HoodieRecords (as $recordType)")
+
     recordType match {
       case HoodieRecord.HoodieRecordType.AVRO =>
         val avroRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, recordName, recordNameSpace,
@@ -1097,29 +1067,27 @@ object HoodieSparkSqlWriter {
             hoodieRecord
           }
         }).toJavaRDD()
+
       case HoodieRecord.HoodieRecordType.SPARK =>
-        // ut will use AvroKeyGenerator, so we need to cast it in spark record
         val sparkKeyGenerator = keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
         val dataFileSchema = new Schema.Parser().parse(dataFileSchemaStr)
         val dataFileStructType = HoodieInternalRowUtils.getCachedSchema(dataFileSchema)
         val writerStructType = HoodieInternalRowUtils.getCachedSchema(writerSchema)
         val sourceStructType = df.schema
-        df.queryExecution.toRdd.mapPartitions { iter =>
 
-          iter.map { internalRow =>
-            val recordKey = sparkKeyGenerator.getRecordKey(internalRow, sourceStructType)
-            val partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, sourceStructType)
+        df.queryExecution.toRdd.mapPartitions { it =>
+          val targetStructType = if (shouldDropPartitionColumns) dataFileStructType else writerStructType
+          // NOTE: To make sure we properly transform records
+          val targetStructTypeRowWriter = getCachedUnsafeRowWriter(sourceStructType, targetStructType)
+
+          it.map { sourceRow =>
+            val recordKey = sparkKeyGenerator.getRecordKey(sourceRow, sourceStructType)
+            val partitionPath = sparkKeyGenerator.getPartitionPath(sourceRow, sourceStructType)
             val key = new HoodieKey(recordKey.toString, partitionPath.toString)
 
-            if (shouldDropPartitionColumns) {
-              val row = HoodieInternalRowUtils.getCachedUnsafeProjection(dataFileStructType, dataFileStructType)
-                .apply(HoodieInternalRowUtils.rewriteRecord(internalRow, sourceStructType, dataFileStructType))
-              new HoodieSparkRecord(key, row, dataFileStructType, false)
-            } else {
-              val row = HoodieInternalRowUtils.getCachedUnsafeProjection(writerStructType, writerStructType)
-                .apply(HoodieInternalRowUtils.rewriteRecord(internalRow, sourceStructType, writerStructType))
-              new HoodieSparkRecord(key, row, writerStructType, false)
-            }
+            val targetRow = targetStructTypeRowWriter(sourceRow)
+
+            new HoodieSparkRecord(key, targetRow, dataFileStructType, false)
           }
         }.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]]
     }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index 9830d323081..502d25f8287 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -89,7 +89,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
 
   private var asyncCompactorService: AsyncCompactService = _
   private var asyncClusteringService: AsyncClusteringService = _
-  private var writeClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty
+  private var writeClient: Option[SparkRDDWriteClient[_]] = Option.empty
   private var hoodieTableConfig: Option[HoodieTableConfig] = Option.empty
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = this.synchronized {
@@ -253,7 +253,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     }
   }
 
-  protected def triggerAsyncCompactor(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]): Unit = {
+  protected def triggerAsyncCompactor(client: SparkRDDWriteClient[_]): Unit = {
     if (null == asyncCompactorService) {
       log.info("Triggering Async compaction !!")
       asyncCompactorService = new SparkStreamingAsyncCompactService(new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)),
@@ -282,7 +282,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     }
   }
 
-  protected def triggerAsyncClustering(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]): Unit = {
+  protected def triggerAsyncClustering(client: SparkRDDWriteClient[_]): Unit = {
     if (null == asyncClusteringService) {
       log.info("Triggering async clustering!")
       asyncClusteringService = new SparkStreamingAsyncClusteringService(new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)),
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala
deleted file mode 100644
index 26a343d2ff6..00000000000
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.testutils.HoodieClientTestUtils
-
-import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.types._
-import org.apache.spark.sql.{HoodieInternalRowUtils, Row, SparkSession}
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-
-class TestHoodieInternalRowUtils extends FunSuite with Matchers with BeforeAndAfterAll {
-
-  private var sparkSession: SparkSession = _
-
-  private val schema1 = StructType(
-    Array(
-      StructField("name", StringType),
-      StructField("age", IntegerType)
-    )
-  )
-  private val schema2 = StructType(
-    Array(
-      StructField("name1", StringType),
-      StructField("age1", IntegerType)
-    )
-  )
-  private val schemaMerge = StructType(schema1.fields ++ schema2.fields)
-  private val schema1WithMetaData = StructType(Array(
-    StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType),
-    StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType),
-    StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, StringType),
-    StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, StringType),
-    StructField(HoodieRecord.FILENAME_METADATA_FIELD, StringType),
-    StructField(HoodieRecord.OPERATION_METADATA_FIELD, StringType),
-    StructField(HoodieRecord.HOODIE_IS_DELETED_FIELD, BooleanType)
-  ) ++ schema1.fields)
-
-  override protected def beforeAll(): Unit = {
-    // Initialize a local spark env
-    val jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(classOf[TestHoodieInternalRowUtils].getName))
-    jsc.setLogLevel("ERROR")
-    sparkSession = SparkSession.builder.config(jsc.getConf).getOrCreate
-  }
-
-  override protected def afterAll(): Unit = {
-    sparkSession.close()
-  }
-
-  test("test rewrite") {
-    val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18, "like1", 181)))
-    val oldRow = sparkSession.createDataFrame(data, schemaMerge).queryExecution.toRdd.first()
-    val newRow1 = HoodieInternalRowUtils.rewriteRecord(oldRow, schemaMerge, schema1)
-    val newRow2 = HoodieInternalRowUtils.rewriteRecord(oldRow, schemaMerge, schema2)
-    assert(newRow1.get(0, StringType).toString.equals("like"))
-    assert(newRow1.get(1, IntegerType) == 18)
-    assert(newRow2.get(0, StringType).toString.equals("like1"))
-    assert(newRow2.get(1, IntegerType) == 181)
-  }
-
-  test("test rewrite with nullable value") {
-    val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18)))
-    val oldRow = sparkSession.createDataFrame(data, schema1).queryExecution.toRdd.first()
-    val newRow = HoodieInternalRowUtils.rewriteRecord(oldRow, schema1, schemaMerge)
-    assert(newRow.get(0, StringType).toString.equals("like"))
-    assert(newRow.get(1, IntegerType) == 18)
-    assert(newRow.get(2, StringType) == null)
-    assert(newRow.get(3, IntegerType) == null)
-  }
-
-
-}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestStructTypeSchemaEvolutionUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieInternalRowUtils.scala
similarity index 77%
rename from hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestStructTypeSchemaEvolutionUtils.scala
rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieInternalRowUtils.scala
index 31d36adbee5..be1dae8cc27 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestStructTypeSchemaEvolutionUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieInternalRowUtils.scala
@@ -16,28 +16,30 @@
  * limitations under the License.
  */
 
-package org.apache.hudi
-
-import java.nio.ByteBuffer
-import java.util.{ArrayList, HashMap, Objects}
+package org.apache.spark.sql.hudi
 
 import org.apache.avro.generic.GenericData
 import org.apache.avro.{LogicalTypes, Schema}
+import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.SparkAdapterSupport.sparkAdapter
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.internal.schema.Types
 import org.apache.hudi.internal.schema.action.TableChanges
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import org.apache.hudi.internal.schema.utils.SchemaChangeUtils
 import org.apache.hudi.testutils.HoodieClientTestUtils
-
 import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.{HoodieInternalRowUtils, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.{HoodieInternalRowUtils, Row, SparkSession}
+import org.junit.jupiter.api.Assertions.assertEquals
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 
-class TestStructTypeSchemaEvolutionUtils extends FunSuite with Matchers with BeforeAndAfterAll {
+import java.nio.ByteBuffer
+import java.util.{ArrayList, HashMap, Objects, Collections => JCollections}
+
+class TestHoodieInternalRowUtils extends FunSuite with Matchers with BeforeAndAfterAll {
   private var sparkSession: SparkSession = _
 
   override protected def beforeAll(): Unit = {
@@ -51,6 +53,54 @@ class TestStructTypeSchemaEvolutionUtils extends FunSuite with Matchers with Bef
     sparkSession.close()
   }
 
+  private val schema1 = StructType(Seq(
+    StructField("name", StringType),
+    StructField("age", IntegerType),
+    StructField("address",
+      StructType(Seq(
+        StructField("city", StringType),
+        StructField("street", StringType)
+      ))
+    )
+  ))
+
+  private val schema2 = StructType(Seq(
+    StructField("name1", StringType),
+    StructField("age1", IntegerType)
+  ))
+
+  private val mergedSchema = StructType(schema1.fields ++ schema2.fields)
+
+  test("Test simple row rewriting") {
+    val rows = Seq(
+      Row("Andrew", 18, Row("Mission st", "SF"), "John", 19)
+    )
+    val data = sparkSession.sparkContext.parallelize(rows)
+    val oldRow = sparkSession.createDataFrame(data, mergedSchema).queryExecution.toRdd.first()
+
+    val rowWriter1 = HoodieInternalRowUtils.genUnsafeRowWriter(mergedSchema, schema1, JCollections.emptyMap())
+    val newRow1 = rowWriter1(oldRow)
+
+    val serDe1 = sparkAdapter.createSparkRowSerDe(schema1)
+    assertEquals(serDe1.deserializeRow(newRow1), Row("Andrew", 18, Row("Mission st", "SF")));
+
+    val rowWriter2 = HoodieInternalRowUtils.genUnsafeRowWriter(mergedSchema, schema2, JCollections.emptyMap())
+    val newRow2 = rowWriter2(oldRow)
+
+    val serDe2 = sparkAdapter.createSparkRowSerDe(schema2)
+    assertEquals(serDe2.deserializeRow(newRow2), Row("John", 19));
+  }
+
+  test("Test simple rewriting (with nullable value)") {
+    val data = sparkSession.sparkContext.parallelize(Seq(Row("Rob", 18, null.asInstanceOf[StructType])))
+    val oldRow = sparkSession.createDataFrame(data, schema1).queryExecution.toRdd.first()
+    val rowWriter = HoodieInternalRowUtils.genUnsafeRowWriter(schema1, mergedSchema, JCollections.emptyMap())
+    val newRow = rowWriter(oldRow)
+
+    val serDe = sparkAdapter.createSparkRowSerDe(mergedSchema)
+    assertEquals(serDe.deserializeRow(newRow), Row("Rob", 18, null.asInstanceOf[StructType], null.asInstanceOf[StringType], null.asInstanceOf[IntegerType]))
+  }
+
   /**
    * test record data type changes.
    * int => long/float/double/string
@@ -61,7 +111,7 @@ class TestStructTypeSchemaEvolutionUtils extends FunSuite with Matchers with Bef
    * String => date/decimal
    * date => String
    */
-  test("test rewrite record with type changed") {
+  test("Test rewrite record with type changed") {
     val avroSchema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"h0_record\",\"namespace\":\"hoodie.h0\",\"fields\""
       + ":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null},"
       + "{\"name\":\"comb\",\"type\":[\"null\",\"int\"],\"default\":null},"
@@ -114,7 +164,22 @@ class TestStructTypeSchemaEvolutionUtils extends FunSuite with Matchers with Bef
     val internalSchema = AvroInternalSchemaConverter.convert(avroSchema)
     // do change type operation
     val updateChange = TableChanges.ColumnUpdateChange.get(internalSchema)
-    updateChange.updateColumnType("id", Types.LongType.get).updateColumnType("comb", Types.FloatType.get).updateColumnType("com1", Types.DoubleType.get).updateColumnType("col0", Types.StringType.get).updateColumnType("col1", Types.FloatType.get).updateColumnType("col11", Types.DoubleType.get).updateColumnType("col12", Types.StringType.get).updateColumnType("col2", Types.DoubleType.get).updateColumnType("col21", Types.StringType.get).updateColumnType("col3", Types.StringType.get).updateCo [...]
+    updateChange.updateColumnType("id", Types.LongType.get)
+      .updateColumnType("comb", Types.FloatType.get)
+      .updateColumnType("com1", Types.DoubleType.get)
+      .updateColumnType("col0", Types.StringType.get)
+      .updateColumnType("col1", Types.FloatType.get)
+      .updateColumnType("col11", Types.DoubleType.get)
+      .updateColumnType("col12", Types.StringType.get)
+      .updateColumnType("col2", Types.DoubleType.get)
+      .updateColumnType("col21", Types.StringType.get)
+      .updateColumnType("col3", Types.StringType.get)
+      .updateColumnType("col31", Types.DecimalType.get(18, 9))
+      .updateColumnType("col4", Types.DecimalType.get(18, 9))
+      .updateColumnType("col41", Types.StringType.get)
+      .updateColumnType("col5", Types.DateType.get)
+      .updateColumnType("col51", Types.DecimalType.get(18, 9))
+      .updateColumnType("col6", Types.StringType.get)
     val newSchema = SchemaChangeUtils.applyTableChanges2Schema(internalSchema, updateChange)
     val newAvroSchema = AvroInternalSchemaConverter.convert(newSchema, avroSchema.getName)
     val newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap[String, String])
@@ -125,11 +190,14 @@ class TestStructTypeSchemaEvolutionUtils extends FunSuite with Matchers with Bef
     val row = AvroConversionUtils.createAvroToInternalRowConverter(avroSchema, structTypeSchema).apply(avroRecord).get
     val newRowExpected = AvroConversionUtils.createAvroToInternalRowConverter(newAvroSchema, newStructTypeSchema)
       .apply(newRecord).get
-    val newRowActual = HoodieInternalRowUtils.rewriteRecordWithNewSchema(row, structTypeSchema, newStructTypeSchema, new HashMap[String, String])
-    internalRowCompare(newRowExpected, newRowActual, newStructTypeSchema)
+
+    val rowWriter = HoodieInternalRowUtils.genUnsafeRowWriter(structTypeSchema, newStructTypeSchema, new HashMap[String, String])
+    val newRow = rowWriter(row)
+
+    internalRowCompare(newRowExpected, newRow, newStructTypeSchema)
   }
 
-  test("test rewrite nest record") {
+  test("Test rewrite nest record") {
     val record = Types.RecordType.get(Types.Field.get(0, false, "id", Types.IntType.get()),
       Types.Field.get(1, true, "data", Types.StringType.get()),
       Types.Field.get(2, true, "preferences",
@@ -177,8 +245,11 @@ class TestStructTypeSchemaEvolutionUtils extends FunSuite with Matchers with Bef
     val newStructTypeSchema = HoodieInternalRowUtils.getCachedSchema(newAvroSchema)
     val row = AvroConversionUtils.createAvroToInternalRowConverter(schema, structTypeSchema).apply(avroRecord).get
     val newRowExpected = AvroConversionUtils.createAvroToInternalRowConverter(newAvroSchema, newStructTypeSchema).apply(newAvroRecord).get
-    val newRowActual = HoodieInternalRowUtils.rewriteRecordWithNewSchema(row, structTypeSchema, newStructTypeSchema, new HashMap[String, String])
-    internalRowCompare(newRowExpected, newRowActual, newStructTypeSchema)
+
+    val rowWriter = HoodieInternalRowUtils.genUnsafeRowWriter(structTypeSchema, newStructTypeSchema, new HashMap[String, String])
+    val newRow = rowWriter(row)
+
+    internalRowCompare(newRowExpected, newRow, newStructTypeSchema)
   }
 
   private def internalRowCompare(expected: Any, actual: Any, schema: DataType): Unit = {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index 7a3aa3e3bf5..7134e8ff7cc 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -89,12 +89,6 @@ public class HoodieIncrSource extends RowSource {
      */
     static final String SOURCE_FILE_FORMAT = "hoodie.deltastreamer.source.hoodieincr.file.format";
     static final String DEFAULT_SOURCE_FILE_FORMAT = "parquet";
-
-    /**
-     * Drops all meta fields from the source hudi table while ingesting into sink hudi table.
-     */
-    static final String HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE = "hoodie.deltastreamer.source.hoodieincr.drop.all.meta.fields.from.source";
-    public static final Boolean DEFAULT_HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE = false;
   }
 
   public HoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
@@ -159,35 +153,8 @@ public class HoodieIncrSource extends RowSource {
               queryTypeAndInstantEndpts.getRight().getRight()));
     }
 
-    /*
-     * log.info("Partition Fields are : (" + partitionFields + "). Initial Source Schema :" + source.schema());
-     *
-     * StructType newSchema = new StructType(source.schema().fields()); for (String field : partitionFields) { newSchema
-     * = newSchema.add(field, DataTypes.StringType, true); }
-     *
-     * /** Validates if the commit time is sane and also generates Partition fields from _hoodie_partition_path if
-     * configured
-     *
-     * Dataset<Row> validated = source.map((MapFunction<Row, Row>) (Row row) -> { // _hoodie_instant_time String
-     * instantTime = row.getString(0); IncrSourceHelper.validateInstantTime(row, instantTime, instantEndpts.getKey(),
-     * instantEndpts.getValue()); if (!partitionFields.isEmpty()) { // _hoodie_partition_path String hoodiePartitionPath
-     * = row.getString(3); List<Object> partitionVals =
-     * extractor.extractPartitionValuesInPath(hoodiePartitionPath).stream() .map(o -> (Object)
-     * o).collect(Collectors.toList()); ValidationUtils.checkArgument(partitionVals.size() == partitionFields.size(),
-     * "#partition-fields != #partition-values-extracted"); List<Object> rowObjs = new
-     * ArrayList<>(scala.collection.JavaConversions.seqAsJavaList(row.toSeq())); rowObjs.addAll(partitionVals); return
-     * RowFactory.create(rowObjs.toArray()); } return row; }, RowEncoder.apply(newSchema));
-     *
-     * log.info("Validated Source Schema :" + validated.schema());
-     */
-    boolean dropAllMetaFields = props.getBoolean(Config.HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE,
-        Config.DEFAULT_HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE);
-
-    // Remove Hoodie meta columns except partition path from input source
-    String[] colsToDrop = dropAllMetaFields ? HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new) :
-        HoodieRecord.HOODIE_META_COLUMNS.stream().filter(x -> !x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toArray(String[]::new);
-    final Dataset<Row> src = source.drop(colsToDrop);
-    // log.info("Final Schema from Source is :" + src.schema());
+    // Remove Hoodie meta columns
+    final Dataset<Row> src = source.drop(HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new));
     return Pair.of(Option.of(src), queryTypeAndInstantEndpts.getRight().getRight());
   }
 }
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
index fd684e95b2d..4c03e1c67f2 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
@@ -73,6 +73,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
   String propsFilePath;
   String tableBasePath;
 
+  @Disabled("HUDI-5653")
   @ParameterizedTest
   @EnumSource(HoodieTableType.class)
   void testUpsertsContinuousModeWithMultipleWritersForConflicts(HoodieTableType tableType) throws Exception {


[hudi] 04/10: [HUDI-5655] Closing write client for spark ds writer in all cases (including exception) (#7799)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 571c0f29c9ec55c23478d801c21a0fa4f19b2463
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Mon Jan 30 20:37:10 2023 -0800

    [HUDI-5655] Closing write client for spark ds writer in all cases (including exception) (#7799)
    
    Looks like we miss to close the writeClient on some of the failure cases while writing via spark-ds and spark-sql writes.
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 23 ++++++++++++++--------
 1 file changed, 15 insertions(+), 8 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 7e234775faa..304a1303a3b 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -376,12 +376,22 @@ object HoodieSparkSqlWriter {
         }
 
       // Check for errors and commit the write.
-      val (writeSuccessful, compactionInstant, clusteringInstant) =
-        commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
-          writeResult, parameters, writeClient, tableConfig, jsc,
-          TableInstantInfo(basePath, instantTime, commitActionType, operation), extraPreCommitFn)
+      try {
+        val (writeSuccessful, compactionInstant, clusteringInstant) =
+          commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
+            writeResult, parameters, writeClient, tableConfig, jsc,
+            TableInstantInfo(basePath, instantTime, commitActionType, operation), extraPreCommitFn)
 
-      (writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig)
+        (writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig)
+      } finally {
+        // close the write client in all cases
+        val asyncCompactionEnabled = isAsyncCompactionEnabled(writeClient, tableConfig, parameters, jsc.hadoopConfiguration())
+        val asyncClusteringEnabled = isAsyncClusteringEnabled(writeClient, parameters)
+        if (!asyncCompactionEnabled && !asyncClusteringEnabled) {
+          log.info("Closing write client")
+          writeClient.close()
+        }
+      }
     }
   }
 
@@ -959,9 +969,6 @@ object HoodieSparkSqlWriter {
         tableInstantInfo.basePath, schema)
 
       log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
-      if (!asyncCompactionEnabled && !asyncClusteringEnabled) {
-        client.close()
-      }
       (commitSuccess && metaSyncSuccess, compactionInstant, clusteringInstant)
     } else {
       log.error(s"${tableInstantInfo.operation} failed with errors")


[hudi] 02/10: [HUDI-5563] Check table exist before drop table (#7679)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4768408ed43f33f88e510781be9bea632ce200eb
Author: Zouxxyy <zo...@alibaba-inc.com>
AuthorDate: Tue Jan 31 09:43:43 2023 +0800

    [HUDI-5563] Check table exist before drop table (#7679)
---
 .../org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala   |  3 ++-
 .../test/scala/org/apache/spark/sql/hudi/TestDropTable.scala  | 11 +++++++++++
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 21194eaaeeb..4875892b0ef 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -617,7 +617,8 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic
         CreateHoodieTableCommand(table, ignoreIfExists)
       // Rewrite the DropTableCommand to DropHoodieTableCommand
       case DropTableCommand(tableName, ifExists, false, purge)
-        if sparkAdapter.isHoodieTable(tableName, sparkSession) =>
+        if sparkSession.sessionState.catalog.tableExists(tableName)
+          && sparkAdapter.isHoodieTable(tableName, sparkSession) =>
         DropHoodieTableCommand(tableName, ifExists, false, purge)
       // Rewrite the AlterTableDropPartitionCommand to AlterHoodieTableDropPartitionCommand
       case AlterTableDropPartitionCommand(tableName, specs, ifExists, purge, retainData)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala
index 4470712e020..b86241eaca9 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi
 
 import org.apache.hadoop.fs.{LocalFileSystem, Path}
 import org.apache.hudi.common.fs.FSUtils
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 
@@ -51,6 +52,16 @@ class TestDropTable extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Drop Table with non existent table") {
+    // drop table if exists
+    spark.sql("drop table if exists non_existent_table")
+
+    // drop table
+    assertThrows[AnalysisException]{
+      spark.sql("drop table non_existent_table")
+    }
+  }
+
   test("Test Drop Table with purge") {
     withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>


[hudi] 07/10: [MINOR] Standardise schema concepts on Flink Engine (#7761)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit fb612bef734cbf8aef94f714f8bde24688b82add
Author: voonhous <vo...@gmail.com>
AuthorDate: Tue Jan 31 15:04:36 2023 +0800

    [MINOR] Standardise schema concepts on Flink Engine (#7761)
---
 .../internal/schema/utils/InternalSchemaUtils.java |  4 +-
 .../hudi/table/format/InternalSchemaManager.java   | 57 +++++++++++++---------
 .../apache/hudi/table/format/RecordIterators.java  |  8 +--
 3 files changed, 41 insertions(+), 28 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
index 4c926f9f293..cf66986e155 100644
--- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
@@ -278,9 +278,9 @@ public class InternalSchemaUtils {
   public static Map<String, String> collectRenameCols(InternalSchema oldSchema, InternalSchema newSchema) {
     List<String> colNamesFromWriteSchema = oldSchema.getAllColsFullName();
     return colNamesFromWriteSchema.stream().filter(f -> {
-      int filedIdFromWriteSchema = oldSchema.findIdByName(f);
+      int fieldIdFromWriteSchema = oldSchema.findIdByName(f);
       // try to find the cols which has the same id, but have different colName;
-      return newSchema.getAllIds().contains(filedIdFromWriteSchema) && !newSchema.findfullName(filedIdFromWriteSchema).equalsIgnoreCase(f);
+      return newSchema.getAllIds().contains(fieldIdFromWriteSchema) && !newSchema.findfullName(fieldIdFromWriteSchema).equalsIgnoreCase(f);
     }).collect(Collectors.toMap(e -> newSchema.findfullName(oldSchema.findIdByName(e)), e -> {
       int lastDotIndex = e.lastIndexOf(".");
       return e.substring(lastDotIndex == -1 ? 0 : lastDotIndex + 1);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
index 7fa598bc834..3783e642c8d 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
@@ -93,28 +93,39 @@ public class InternalSchemaManager implements Serializable {
     return querySchema;
   }
 
-  InternalSchema getFileSchema(String fileName) {
+  /**
+   * Attempts to merge the file and query schema to produce a mergeSchema, prioritising the use of fileSchema types.
+   * An emptySchema is returned if:
+   * <ul>
+   * <li>1. An empty querySchema is provided</li>
+   * <li>2. querySchema is equal to fileSchema</li>
+   * </ul>
+   * Note that this method returns an emptySchema if merging is not required to be performed.
+   * @param fileName Name of file to fetch commitTime/versionId for
+   * @return mergeSchema, i.e. the schema on which the file should be read with
+   */
+  InternalSchema getMergeSchema(String fileName) {
     if (querySchema.isEmptySchema()) {
       return querySchema;
     }
     long commitInstantTime = Long.parseLong(FSUtils.getCommitTime(fileName));
-    InternalSchema fileSchemaUnmerged = InternalSchemaCache.getInternalSchemaByVersionId(
+    InternalSchema fileSchema = InternalSchemaCache.getInternalSchemaByVersionId(
         commitInstantTime, tablePath, getHadoopConf(), validCommits);
-    if (querySchema.equals(fileSchemaUnmerged)) {
+    if (querySchema.equals(fileSchema)) {
       return InternalSchema.getEmptyInternalSchema();
     }
-    return new InternalSchemaMerger(fileSchemaUnmerged, querySchema, true, true).mergeSchema();
+    return new InternalSchemaMerger(fileSchema, querySchema, true, true).mergeSchema();
   }
 
   /**
-   * This method returns a mapping of columns that have type inconsistencies between the fileSchema and querySchema.
+   * This method returns a mapping of columns that have type inconsistencies between the mergeSchema and querySchema.
    * This is done by:
    * <li>1. Finding the columns with type changes</li>
    * <li>2. Get a map storing the index of these columns with type changes; Map of -> (colIdxInQueryFieldNames, colIdxInQuerySchema)</li>
    * <li>3. For each selectedField with type changes, build a castMap containing the cast/conversion details;
    * Map of -> (selectedPos, Cast([from] fileType, [to] queryType))</li>
    *
-   * @param fileSchema InternalSchema representation of the file's schema (acquired from commit/.schema metadata)
+   * @param mergeSchema InternalSchema representation of mergeSchema (prioritise use of fileSchemaType) that is used for reading base parquet files
    * @param queryFieldNames array containing the columns of a Hudi Flink table
    * @param queryFieldTypes array containing the field types of the columns of a Hudi Flink table
    * @param selectedFields array containing the index of the columns of interest required (indexes are based on queryFieldNames and queryFieldTypes)
@@ -122,31 +133,33 @@ public class InternalSchemaManager implements Serializable {
    *
    * @see CastMap
    */
-  CastMap getCastMap(InternalSchema fileSchema, String[] queryFieldNames, DataType[] queryFieldTypes, int[] selectedFields) {
+  CastMap getCastMap(InternalSchema mergeSchema, String[] queryFieldNames, DataType[] queryFieldTypes, int[] selectedFields) {
     Preconditions.checkArgument(!querySchema.isEmptySchema(), "querySchema cannot be empty");
-    Preconditions.checkArgument(!fileSchema.isEmptySchema(), "fileSchema cannot be empty");
+    Preconditions.checkArgument(!mergeSchema.isEmptySchema(), "mergeSchema cannot be empty");
 
     CastMap castMap = new CastMap();
     // map storing the indexes of columns with type changes Map of -> (colIdxInQueryFieldNames, colIdxInQuerySchema)
-    Map<Integer, Integer> posProxy = getPosProxy(fileSchema, queryFieldNames);
+    Map<Integer, Integer> posProxy = getPosProxy(mergeSchema, queryFieldNames);
     if (posProxy.isEmpty()) {
       // no type changes
       castMap.setFileFieldTypes(queryFieldTypes);
       return castMap;
     }
     List<Integer> selectedFieldList = IntStream.of(selectedFields).boxed().collect(Collectors.toList());
-    List<DataType> fileSchemaAsDataTypes = AvroSchemaConverter.convertToDataType(
-        AvroInternalSchemaConverter.convert(fileSchema, "tableName")).getChildren();
+    // mergeSchema is built with useColumnTypeFromFileSchema = true
+    List<DataType> mergeSchemaAsDataTypes = AvroSchemaConverter.convertToDataType(
+        AvroInternalSchemaConverter.convert(mergeSchema, "tableName")).getChildren();
     DataType[] fileFieldTypes = new DataType[queryFieldTypes.length];
     for (int i = 0; i < queryFieldTypes.length; i++) {
+      // position of ChangedType in querySchema
       Integer posOfChangedType = posProxy.get(i);
       if (posOfChangedType == null) {
         // no type change for column; fileFieldType == queryFieldType
         fileFieldTypes[i] = queryFieldTypes[i];
       } else {
         // type change detected for column;
-        DataType fileType = fileSchemaAsDataTypes.get(posOfChangedType);
-        // update fileFieldType match the type found in fileSchema
+        DataType fileType = mergeSchemaAsDataTypes.get(posOfChangedType);
+        // update fileFieldType match the type found in mergeSchema
         fileFieldTypes[i] = fileType;
         int selectedPos = selectedFieldList.indexOf(i);
         if (selectedPos != -1) {
@@ -162,34 +175,34 @@ public class InternalSchemaManager implements Serializable {
 
   /**
    * For columns that have been modified via the column renaming operation, the column name might be inconsistent
-   * between querySchema and fileSchema.
+   * between querySchema and mergeSchema.
    * <p>
    * As such, this method will identify all columns that have been renamed, and return a string array of column names
-   * corresponding to the column names found in the fileSchema.
+   * corresponding to the column names found in the mergeSchema.
    * <p>
    * This is done by:
    * <li>1. Get the rename mapping of -> (colNameFromNewSchema, colNameLastPartFromOldSchema)</li>
    * <li>2. For columns that have been renamed, replace them with the old column name</li>
    *
-   * @param fileSchema InternalSchema representation of the file's schema (acquired from commit/.schema metadata)
+   * @param mergeSchema InternalSchema representation of mergeSchema (prioritise use of fileSchemaType) that is used for reading base parquet files
    * @param queryFieldNames array containing the columns of a Hudi Flink table
-   * @return String array containing column names corresponding to the column names found in the fileSchema
+   * @return String array containing column names corresponding to the column names found in the mergeSchema
    *
    * @see InternalSchemaUtils#collectRenameCols(InternalSchema, InternalSchema)
    */
-  String[] getFileFieldNames(InternalSchema fileSchema, String[] queryFieldNames) {
+  String[] getMergeFieldNames(InternalSchema mergeSchema, String[] queryFieldNames) {
     Preconditions.checkArgument(!querySchema.isEmptySchema(), "querySchema cannot be empty");
-    Preconditions.checkArgument(!fileSchema.isEmptySchema(), "fileSchema cannot be empty");
+    Preconditions.checkArgument(!mergeSchema.isEmptySchema(), "mergeSchema cannot be empty");
 
-    Map<String, String> renamedCols = InternalSchemaUtils.collectRenameCols(fileSchema, querySchema);
+    Map<String, String> renamedCols = InternalSchemaUtils.collectRenameCols(mergeSchema, querySchema);
     if (renamedCols.isEmpty()) {
       return queryFieldNames;
     }
     return Arrays.stream(queryFieldNames).map(name -> renamedCols.getOrDefault(name, name)).toArray(String[]::new);
   }
 
-  private Map<Integer, Integer> getPosProxy(InternalSchema fileSchema, String[] queryFieldNames) {
-    Map<Integer, Pair<Type, Type>> changedCols = InternalSchemaUtils.collectTypeChangedCols(querySchema, fileSchema);
+  private Map<Integer, Integer> getPosProxy(InternalSchema mergeSchema, String[] queryFieldNames) {
+    Map<Integer, Pair<Type, Type>> changedCols = InternalSchemaUtils.collectTypeChangedCols(querySchema, mergeSchema);
     HashMap<Integer, Integer> posProxy = new HashMap<>(changedCols.size());
     List<String> fieldNameList = Arrays.asList(queryFieldNames);
     List<Types.Field> columns = querySchema.columns();
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
index 8657f16ddc9..1bc02bcad40 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
@@ -50,8 +50,8 @@ public abstract class RecordIterators {
       Path path,
       long splitStart,
       long splitLength) throws IOException {
-    InternalSchema fileSchema = internalSchemaManager.getFileSchema(path.getName());
-    if (fileSchema.isEmptySchema()) {
+    InternalSchema mergeSchema = internalSchemaManager.getMergeSchema(path.getName());
+    if (mergeSchema.isEmptySchema()) {
       return new ParquetSplitRecordIterator(
           ParquetSplitReaderUtil.genPartColumnarRowReader(
               utcTimestamp,
@@ -66,14 +66,14 @@ public abstract class RecordIterators {
               splitStart,
               splitLength));
     } else {
-      CastMap castMap = internalSchemaManager.getCastMap(fileSchema, fieldNames, fieldTypes, selectedFields);
+      CastMap castMap = internalSchemaManager.getCastMap(mergeSchema, fieldNames, fieldTypes, selectedFields);
       Option<RowDataProjection> castProjection = castMap.toRowDataProjection(selectedFields);
       ClosableIterator<RowData> itr = new ParquetSplitRecordIterator(
           ParquetSplitReaderUtil.genPartColumnarRowReader(
               utcTimestamp,
               caseSensitive,
               conf,
-              internalSchemaManager.getFileFieldNames(fileSchema, fieldNames), // the reconciled field names
+              internalSchemaManager.getMergeFieldNames(mergeSchema, fieldNames), // the reconciled field names
               castMap.getFileFieldTypes(),                                     // the reconciled field types
               partitionSpec,
               selectedFields,


[hudi] 08/10: [HUDI-5567] Make the bootstrapping exception message more clear (#7684)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 9c21118e146ed591259d46d5291822c80674e946
Author: Ming Wei <29...@qq.com>
AuthorDate: Tue Jan 31 16:26:10 2023 +0800

    [HUDI-5567] Make the bootstrapping exception message more clear (#7684)
    
    Co-authored-by: jameswei <ja...@didiglobal.com>
---
 .../hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
index b161182b83a..bc0a1663c4b 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
@@ -60,7 +60,7 @@ public class HoodieSparkBootstrapSchemaProvider extends HoodieBootstrapSchemaPro
           } else if (ORC.getFileExtension().equals(extension)) {
             return getBootstrapSourceSchemaOrc(writeConfig, context, filePath);
           } else {
-            throw new HoodieException("Could not determine schema from the data files.");
+            throw new HoodieException("Could not determine schema from the data files, supported file formats: [ORC, PARQUET].");
           }
         }
     ).filter(Objects::nonNull).findAny()
@@ -92,7 +92,7 @@ public class HoodieSparkBootstrapSchemaProvider extends HoodieBootstrapSchemaPro
     try {
       orcReader = OrcFile.createReader(filePath, OrcFile.readerOptions(context.getHadoopConf().get()));
     } catch (IOException e) {
-      throw new HoodieException("Could not determine schema from the data files.");
+      throw new HoodieException("Could not determine schema from the ORC data files.");
     }
     TypeDescription orcSchema = orcReader.getSchema();
     String tableName = HoodieAvroUtils.sanitizeName(writeConfig.getTableName());


[hudi] 03/10: [HUDI-5568] Fix the BucketStreamWriteFunction to rebase the local filesystem instance instead (#7685)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 2ced537cf8c3bf2c96b51ce55037d8cc382978f6
Author: luokey <85...@qq.com>
AuthorDate: Mon Jan 30 23:01:14 2023 -0500

    [HUDI-5568] Fix the BucketStreamWriteFunction to rebase the local filesystem instance instead (#7685)
    
    Should use `writeClient. getHoodieTable(). getHoodieView()` to determine the fileSystemView
---
 .../java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index c989b4eb29a..cf06dbc18d6 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -156,7 +156,7 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
 
     // Load existing fileID belongs to this task
     Map<Integer, String> bucketToFileIDMap = new HashMap<>();
-    this.writeClient.getHoodieTable().getFileSystemView().getAllFileGroups(partition).forEach(fileGroup -> {
+    this.writeClient.getHoodieTable().getHoodieView().getAllFileGroups(partition).forEach(fileGroup -> {
       String fileID = fileGroup.getFileGroupId().getFileId();
       int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID);
       if (isBucketToLoad(bucketNumber, partition)) {


[hudi] 01/10: [MINOR] Make `data_before_after` the default cdc logging mode (#7797)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit fe75c9af0b1524126165d4aadaf4ff111292db94
Author: Shiyan Xu <27...@users.noreply.github.com>
AuthorDate: Mon Jan 30 19:31:51 2023 -0600

    [MINOR] Make `data_before_after` the default cdc logging mode (#7797)
---
 .../src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index e34aa1e6dad..c1f64d3baeb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -140,7 +140,7 @@ public class HoodieTableConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> CDC_SUPPLEMENTAL_LOGGING_MODE = ConfigProperty
       .key("hoodie.table.cdc.supplemental.logging.mode")
-      .defaultValue(op_key_only.name())
+      .defaultValue(data_before_after.name())
       .withValidValues(
           op_key_only.name(),
           data_before.name(),


[hudi] 05/10: [HUDI-5654] Fixing read of an empty rollback completed meta files from data table timeline w/ metadata reads (#7798)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 138af1a43b3976242a5a0b099bfdd618d5fc6a17
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Mon Jan 30 20:43:18 2023 -0800

    [HUDI-5654] Fixing read of an empty rollback completed meta files from data table timeline w/ metadata reads (#7798)
    
    Fixing metadata table to read rollback info even w/ empty rollback completed meta file.
---
 .../hudi/metadata/HoodieBackedTableMetadata.java       | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 0ab11d65e8b..1ccc14176a1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.data.HoodieData;
@@ -563,10 +564,21 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
    */
   private List<String> getRollbackedCommits(HoodieInstant instant, HoodieActiveTimeline timeline) {
     try {
+      List<String> commitsToRollback = null;
       if (instant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) {
-        HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
-            timeline.getInstantDetails(instant).get());
-        return rollbackMetadata.getCommitsRollback();
+        try {
+          HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
+              timeline.getInstantDetails(instant).get());
+          commitsToRollback = rollbackMetadata.getCommitsRollback();
+        } catch (IOException e) {
+          // if file is empty, fetch the commits to rollback from rollback.requested file
+          HoodieRollbackPlan rollbackPlan = TimelineMetadataUtils.deserializeAvroMetadata(
+              timeline.readRollbackInfoAsBytes(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION,
+                  instant.getTimestamp())).get(), HoodieRollbackPlan.class);
+          commitsToRollback = Collections.singletonList(rollbackPlan.getInstantToRollback().getCommitTime());
+          LOG.warn("Had to fetch rollback info from requested instant since completed file is empty " + instant.toString());
+        }
+        return commitsToRollback;
       }
 
       List<String> rollbackedCommits = new LinkedList<>();


[hudi] 09/10: [HUDI-5553] Prevent partition(s) from being dropped if there are pending… (#7669)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit f0152acae28ebea4cf2b3f45f7d0d9104610d69c
Author: voonhous <vo...@gmail.com>
AuthorDate: Tue Jan 31 19:18:55 2023 +0800

    [HUDI-5553] Prevent partition(s) from being dropped if there are pending… (#7669)
---
 .../hudi/client/utils/DeletePartitionUtils.java    |  77 ++++++++++++
 .../client/utils/TestDeletePartitionUtils.java     | 110 ++++++++++++++++++
 .../FlinkDeletePartitionCommitActionExecutor.java  |   3 +
 .../SparkDeletePartitionCommitActionExecutor.java  |   3 +
 .../sql/hudi/TestAlterTableDropPartition.scala     | 129 ++++++++++++++++++++-
 5 files changed, 321 insertions(+), 1 deletion(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java
new file mode 100644
index 00000000000..92c2065457e
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.exception.HoodieDeletePartitionException;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A utility class for helper functions when performing a delete partition operation.
+ */
+public class DeletePartitionUtils {
+
+  /**
+   * Check if there are any pending table service actions (requested + inflight) on a table affecting the partitions to
+   * be dropped.
+   * <p>
+   * This check is to prevent a drop-partition from proceeding should a partition have a table service action in
+   * the pending stage. If this is allowed to happen, the filegroup that is an input for a table service action, might
+   * also be a candidate for being replaced. As such, when the table service action and drop-partition commits are
+   * committed, there will be two commits replacing a single filegroup.
+   * <p>
+   * For example, a timeline might have an execution order as such:
+   * 000.replacecommit.requested (clustering filegroup_1 + filegroup_2 -> filegroup_3)
+   * 001.replacecommit.requested, 001.replacecommit.inflight, 0001.replacecommit (drop_partition to replace filegroup_1)
+   * 000.replacecommit.inflight (clustering is executed now)
+   * 000.replacecommit (clustering completed)
+   * For an execution order as shown above, 000.replacecommit and 001.replacecommit will both flag filegroup_1 to be replaced.
+   * This will cause  downstream duplicate key errors when a map is being constructed.
+   *
+   * @param table Table to perform validation on
+   * @param partitionsToDrop List of partitions to drop
+   */
+  public static void checkForPendingTableServiceActions(HoodieTable table, List<String> partitionsToDrop) {
+    List<String> instantsOfOffendingPendingTableServiceAction = new ArrayList<>();
+    // ensure that there are no pending inflight clustering/compaction operations involving this partition
+    SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView();
+
+    // separating the iteration of pending compaction operations from clustering as they return different stream types
+    Stream.concat(fileSystemView.getPendingCompactionOperations(), fileSystemView.getPendingLogCompactionOperations())
+        .filter(op -> partitionsToDrop.contains(op.getRight().getPartitionPath()))
+        .forEach(op -> instantsOfOffendingPendingTableServiceAction.add(op.getLeft()));
+
+    fileSystemView.getFileGroupsInPendingClustering()
+        .filter(fgIdInstantPair -> partitionsToDrop.contains(fgIdInstantPair.getLeft().getPartitionPath()))
+        .forEach(x -> instantsOfOffendingPendingTableServiceAction.add(x.getRight().getTimestamp()));
+
+    if (instantsOfOffendingPendingTableServiceAction.size() > 0) {
+      throw new HoodieDeletePartitionException("Failed to drop partitions. "
+          + "Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: " + partitionsToDrop + ". "
+          + "Instant(s) of offending pending table service action: "
+          + instantsOfOffendingPendingTableServiceAction.stream().distinct().collect(Collectors.toList()));
+    }
+  }
+
+}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestDeletePartitionUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestDeletePartitionUtils.java
new file mode 100644
index 00000000000..3a1632737ef
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestDeletePartitionUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieDeletePartitionException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class TestDeletePartitionUtils {
+
+  private static final String PARTITION_IN_PENDING_SERVICE_ACTION = "partition_with_pending_table_service_action";
+  private static final String HARDCODED_INSTANT_TIME = "0";
+
+  private final HoodieTable table = Mockito.mock(HoodieTable.class);
+
+  private final SyncableFileSystemView fileSystemView = Mockito.mock(SyncableFileSystemView.class);
+
+  public static Stream<Arguments> generateTruthValues() {
+    int noOfVariables = 3;
+    int noOfRows = 1 << noOfVariables;
+    Object[][] truthValues = new Object[noOfRows][noOfVariables];
+    for (int i = 0; i < noOfRows; i++) {
+      for (int j = noOfVariables - 1; j >= 0; j--) {
+        boolean out = (i / (1 << j)) % 2 != 0;
+        truthValues[i][j] = out;
+      }
+    }
+    return Stream.of(truthValues).map(Arguments::of);
+  }
+
+  @ParameterizedTest
+  @MethodSource("generateTruthValues")
+  public void testDeletePartitionUtils(
+      boolean hasPendingCompactionOperations,
+      boolean hasPendingLogCompactionOperations,
+      boolean hasFileGroupsInPendingClustering) {
+    System.out.printf("hasPendingCompactionOperations: %s, hasPendingLogCompactionOperations: %s, hasFileGroupsInPendingClustering: %s%n",
+        hasPendingCompactionOperations, hasPendingLogCompactionOperations, hasFileGroupsInPendingClustering);
+    Mockito.when(table.getSliceView()).thenReturn(fileSystemView);
+    Mockito.when(fileSystemView.getPendingCompactionOperations()).thenReturn(createPendingCompactionOperations(hasPendingCompactionOperations));
+    Mockito.when(fileSystemView.getPendingLogCompactionOperations()).thenReturn(createPendingCompactionOperations(hasPendingLogCompactionOperations));
+    Mockito.when(fileSystemView.getFileGroupsInPendingClustering()).thenReturn(createFileGroupsInPendingClustering(hasFileGroupsInPendingClustering));
+
+    boolean shouldThrowException = hasPendingCompactionOperations || hasPendingLogCompactionOperations || hasFileGroupsInPendingClustering;
+
+    if (shouldThrowException) {
+      assertThrows(HoodieDeletePartitionException.class,
+          () -> DeletePartitionUtils.checkForPendingTableServiceActions(table,
+              Collections.singletonList(PARTITION_IN_PENDING_SERVICE_ACTION)));
+    } else {
+      assertDoesNotThrow(() -> DeletePartitionUtils.checkForPendingTableServiceActions(table,
+          Collections.singletonList(PARTITION_IN_PENDING_SERVICE_ACTION)));
+    }
+  }
+
+  private static Stream<Pair<String, CompactionOperation>> createPendingCompactionOperations(boolean hasPendingCompactionOperations) {
+    return Stream.of(Pair.of(HARDCODED_INSTANT_TIME, getCompactionOperation(hasPendingCompactionOperations)));
+  }
+
+  private static CompactionOperation getCompactionOperation(boolean hasPendingJobInPartition) {
+    return new CompactionOperation(
+        "fileId", getPartitionName(hasPendingJobInPartition), HARDCODED_INSTANT_TIME, Option.empty(),
+        new ArrayList<>(), Option.empty(), Option.empty(), new HashMap<>());
+  }
+
+  private static Stream<Pair<HoodieFileGroupId, HoodieInstant>> createFileGroupsInPendingClustering(boolean hasFileGroupsInPendingClustering) {
+    HoodieFileGroupId hoodieFileGroupId = new HoodieFileGroupId(getPartitionName(hasFileGroupsInPendingClustering), "fileId");
+    HoodieInstant hoodieInstant = new HoodieInstant(true, "replacecommit", HARDCODED_INSTANT_TIME);
+    return Stream.of(Pair.of(hoodieFileGroupId, hoodieInstant));
+  }
+
+  private static String getPartitionName(boolean hasPendingTableServiceAction) {
+    return hasPendingTableServiceAction ? PARTITION_IN_PENDING_SERVICE_ACTION : "unaffected_partition";
+  }
+
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java
index a301ba228e4..3f19534d08c 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.commit;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.DeletePartitionUtils;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -62,6 +63,8 @@ public class FlinkDeletePartitionCommitActionExecutor<T extends HoodieRecordPayl
 
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> execute() {
+    DeletePartitionUtils.checkForPendingTableServiceActions(table, partitions);
+
     try {
       HoodieTimer timer = new HoodieTimer().startTimer();
       context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions.");
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
index 49134d604d2..b45a691fbad 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.DeletePartitionUtils;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -59,6 +60,8 @@ public class SparkDeletePartitionCommitActionExecutor<T>
 
   @Override
   public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+    DeletePartitionUtils.checkForPendingTableServiceActions(table, partitions);
+
     try {
       HoodieTimer timer = HoodieTimer.start();
       context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions.");
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
index 15b14ec77f5..02c558fc2f3 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
@@ -18,14 +18,17 @@
 package org.apache.spark.sql.hudi
 
 import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.{HoodieCLIUtils, HoodieSparkUtils}
 import org.apache.hudi.common.model.HoodieCommitMetadata
 import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant}
+import org.apache.hudi.common.util.{Option => HOption}
 import org.apache.hudi.common.util.{PartitionPathEncodeUtils, StringUtils}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
 import org.apache.spark.sql.SaveMode
 import org.junit.jupiter.api.Assertions
+import org.junit.jupiter.api.Assertions.assertTrue
 
 class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
 
@@ -396,4 +399,128 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
       }
     }
   }
+
+  test("Prevent a partition from being dropped if there are pending CLUSTERING jobs") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}t/$tableName"
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | options (
+           |  primaryKey ='id',
+           |  type = 'cow',
+           |  preCombineField = 'ts'
+           | )
+           | partitioned by(ts)
+           | location '$basePath'
+           | """.stripMargin)
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+      spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+      val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
+
+      // Generate the first clustering plan
+      val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
+      client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())
+
+      checkAnswer(s"call show_clustering('$tableName')")(
+        Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), "*")
+      )
+
+      val partition = "ts=1002"
+      val errMsg = s"Failed to drop partitions. Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: [$partition]"
+      checkExceptionContain(s"ALTER TABLE $tableName DROP PARTITION($partition)")(errMsg)
+    }
+  }
+
+  test("Prevent a partition from being dropped if there are pending COMPACTs jobs") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}t/$tableName"
+      // Using INMEMORY index type to ensure that deltacommits generate log files instead of parquet
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | options (
+           |  primaryKey ='id',
+           |  type = 'mor',
+           |  preCombineField = 'ts',
+           |  hoodie.index.type = 'INMEMORY'
+           | )
+           | partitioned by(ts)
+           | location '$basePath'
+           | """.stripMargin)
+      // Create 5 deltacommits to ensure that it is > default `hoodie.compact.inline.max.delta.commits`
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+      spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+      spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
+      spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
+      val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
+
+      // Generate the first compaction plan
+      val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
+      assertTrue(client.scheduleCompactionAtInstant(firstScheduleInstant, HOption.empty()))
+
+      checkAnswer(s"call show_compaction('$tableName')")(
+        Seq(firstScheduleInstant, 5, HoodieInstant.State.REQUESTED.name())
+      )
+
+      val partition = "ts=1002"
+      val errMsg = s"Failed to drop partitions. Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: [$partition]"
+      checkExceptionContain(s"ALTER TABLE $tableName DROP PARTITION($partition)")(errMsg)
+    }
+  }
+
+  test("Prevent a partition from being dropped if there are pending LOG_COMPACT jobs") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}t/$tableName"
+      // Using INMEMORY index type to ensure that deltacommits generate log files instead of parquet
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | options (
+           |  primaryKey ='id',
+           |  type = 'mor',
+           |  preCombineField = 'ts',
+           |  hoodie.index.type = 'INMEMORY'
+           | )
+           | partitioned by(ts)
+           | location '$basePath'
+           | """.stripMargin)
+      // Create 5 deltacommits to ensure that it is > default `hoodie.compact.inline.max.delta.commits`
+      // Write everything into the same FileGroup but into separate blocks
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
+      spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
+      spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
+      spark.sql(s"insert into $tableName values(5, 'a5', 10, 1000)")
+      val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
+
+      // Generate the first log_compaction plan
+      val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
+      assertTrue(client.scheduleLogCompactionAtInstant(firstScheduleInstant, HOption.empty()))
+
+      val partition = "ts=1000"
+      val errMsg = s"Failed to drop partitions. Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: [$partition]"
+      checkExceptionContain(s"ALTER TABLE $tableName DROP PARTITION($partition)")(errMsg)
+    }
+  }
 }