You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/07/22 15:35:13 UTC

[hudi] branch master updated: [HUDI-4039] Make sure all builtin `KeyGenerator`s properly implement Spark specific APIs (#5523)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new eea4a692c0 [HUDI-4039] Make sure all builtin `KeyGenerator`s properly implement Spark specific APIs (#5523)
eea4a692c0 is described below

commit eea4a692c09694c778354c726fdcd4a3d75e13d0
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Fri Jul 22 08:35:07 2022 -0700

    [HUDI-4039] Make sure all builtin `KeyGenerator`s properly implement Spark specific APIs (#5523)
    
    This set of changes makes sure that all builtin KeyGenerators properly implement Spark-specific APIs in a performant way (minimizing key-generators overhead)
---
 .../hudi/keygen/ComplexAvroKeyGenerator.java       |   2 +-
 .../apache/hudi/keygen/CustomAvroKeyGenerator.java |   4 +-
 .../hudi/keygen/GlobalAvroDeleteKeyGenerator.java  |   2 +-
 .../java/org/apache/hudi/keygen/KeyGenUtils.java   |   1 +
 .../keygen/NonpartitionedAvroKeyGenerator.java     |   6 +-
 .../apache/hudi/keygen/SimpleAvroKeyGenerator.java |   2 +-
 .../keygen/TimestampBasedAvroKeyGenerator.java     |   4 +-
 .../hudi/client/model/HoodieInternalRow.java       |  76 ++-
 .../row/HoodieInternalRowFileWriterFactory.java    |  65 +-
 .../hudi/io/storage/row/HoodieRowCreateHandle.java |  85 +--
 .../storage/row/HoodieRowParquetWriteSupport.java  |   6 +-
 .../apache/hudi/keygen/BuiltinKeyGenerator.java    | 594 ++++++++++++++---
 .../apache/hudi/keygen/ComplexKeyGenerator.java    |  33 +-
 .../org/apache/hudi/keygen/CustomKeyGenerator.java |  45 +-
 .../hudi/keygen/GlobalDeleteKeyGenerator.java      |  27 +-
 .../hudi/keygen/NonpartitionedKeyGenerator.java    |  34 +-
 .../org/apache/hudi/keygen/RowKeyGenUtils.java     |  59 --
 .../apache/hudi/keygen/RowKeyGeneratorHelper.java  | 355 ----------
 .../org/apache/hudi/keygen/SimpleKeyGenerator.java |  50 +-
 .../hudi/keygen/SparkKeyGeneratorInterface.java    |  51 +-
 .../hudi/keygen/TimestampBasedKeyGenerator.java    |  54 +-
 .../bootstrap/BaseBootstrapMetadataHandler.java    |  13 +-
 .../org/apache/hudi/unsafe/UTF8StringBuilder.java  | 100 +++
 .../apache/spark/sql/HoodieUnsafeRowUtils.scala    |  24 +-
 .../row/TestHoodieInternalRowParquetWriter.java    |   3 +-
 .../io/storage/row/TestHoodieRowCreateHandle.java  |  16 +-
 .../hudi/testutils/SparkDatasetTestUtils.java      |   1 +
 .../hudi/keygen/TestRowGeneratorHelper.scala       | 102 ---
 .../spark/sql/TestHoodieUnsafeRowUtils.scala       |   3 +-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |   4 +-
 .../org/apache/hudi/common/model/HoodieRecord.java |   6 +-
 .../hudi/common/table/HoodieTableMetaClient.java   |   8 +-
 .../apache/hudi/common/util/CollectionUtils.java   |  12 +-
 .../org/apache/hudi/common/util/HoodieTimer.java   |  14 +-
 .../org/apache/hudi/common/util/ParquetUtils.java  |   2 +-
 .../org/apache/hudi/keygen/BaseKeyGenerator.java   |  14 +-
 .../java/org/apache/hudi/keygen/KeyGenerator.java  |   2 +-
 .../apache/hudi/common/util/TestParquetUtils.java  |   2 +-
 .../integ/testsuite/generator/DeltaGenerator.java  |   2 +-
 .../java/org/apache/hudi/integ/ITTestBase.java     |   5 +-
 .../hudi/connect/utils/KafkaConnectUtils.java      |   9 +-
 .../BulkInsertDataInternalWriterHelper.java        |  36 +-
 .../hudi/HoodieDatasetBulkInsertHelper.scala       |   7 +-
 .../hudi/keygen/TestComplexKeyGenerator.java       |  22 +-
 .../apache/hudi/keygen/TestCustomKeyGenerator.java |  43 +-
 .../keygen/TestGlobalDeleteRecordGenerator.java    |   2 -
 .../keygen/TestNonpartitionedKeyGenerator.java     |   5 +-
 .../apache/hudi/keygen/TestSimpleKeyGenerator.java |  33 +-
 .../keygen/TestTimestampBasedKeyGenerator.java     |  15 +-
 .../hudi/testutils/KeyGeneratorTestUtilities.java  |   2 +-
 .../org/apache/hudi/ScalaAssertionSupport.scala    |  30 +-
 .../org/apache/hudi/TestDataSourceDefaults.scala   | 715 ++++++++++-----------
 52 files changed, 1457 insertions(+), 1350 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
index dca0c25775..9ff5c522e4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
@@ -44,7 +44,7 @@ public class ComplexAvroKeyGenerator extends BaseKeyGenerator {
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    return KeyGenUtils.getRecordKey(record, getRecordKeyFields(), isConsistentLogicalTimestampEnabled());
+    return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
   }
 
   @Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
index 370b57b530..77377de7ab 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
@@ -103,13 +103,13 @@ public class CustomAvroKeyGenerator extends BaseKeyGenerator {
   @Override
   public String getRecordKey(GenericRecord record) {
     validateRecordKeyFields();
-    return getRecordKeyFields().size() == 1
+    return getRecordKeyFieldNames().size() == 1
         ? new SimpleAvroKeyGenerator(config).getRecordKey(record)
         : new ComplexAvroKeyGenerator(config).getRecordKey(record);
   }
 
   private void validateRecordKeyFields() {
-    if (getRecordKeyFields() == null || getRecordKeyFields().isEmpty()) {
+    if (getRecordKeyFieldNames() == null || getRecordKeyFieldNames().isEmpty()) {
       throw new HoodieKeyException("Unable to find field names for record key in cfg");
     }
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
index 10a5760238..dc0bc3cef2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
@@ -40,7 +40,7 @@ public class GlobalAvroDeleteKeyGenerator extends BaseKeyGenerator {
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    return KeyGenUtils.getRecordKey(record, getRecordKeyFields(), isConsistentLogicalTimestampEnabled());
+    return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
   }
 
   @Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
index f1e41296f1..362ef208d4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
@@ -42,6 +42,7 @@ public class KeyGenUtils {
 
   protected static final String HUDI_DEFAULT_PARTITION_PATH = PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
   public static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+  public static final String DEFAULT_RECORD_KEY_PARTS_SEPARATOR = ",";
 
   /**
    * Fetches record key from the GenericRecord.
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
index db7596993d..5b5cedcbf8 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
@@ -56,10 +56,10 @@ public class NonpartitionedAvroKeyGenerator extends BaseKeyGenerator {
     // for backward compatibility, we need to use the right format according to the number of record key fields
     // 1. if there is only one record key field, the format of record key is just "<value>"
     // 2. if there are multiple record key fields, the format is "<field1>:<value1>,<field2>:<value2>,..."
-    if (getRecordKeyFields().size() == 1) {
-      return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0), isConsistentLogicalTimestampEnabled());
+    if (getRecordKeyFieldNames().size() == 1) {
+      return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled());
     }
-    return KeyGenUtils.getRecordKey(record, getRecordKeyFields(), isConsistentLogicalTimestampEnabled());
+    return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
   }
 
   public String getEmptyPartition() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
index 943091225a..c7398e94ec 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
@@ -47,7 +47,7 @@ public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0), isConsistentLogicalTimestampEnabled());
+    return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled());
   }
 
   @Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
index c543fd2604..60ccc694f9 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
@@ -111,7 +111,7 @@ public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator {
     try {
       return getPartitionPath(partitionVal);
     } catch (Exception e) {
-      throw new HoodieKeyGeneratorException("Unable to parse input partition field :" + partitionVal, e);
+      throw new HoodieKeyGeneratorException("Unable to parse input partition field: " + partitionVal, e);
     }
   }
 
@@ -181,7 +181,7 @@ public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator {
       timeMs = convertLongTimeToMillis(((LocalDate) partitionVal).toEpochDay());
     } else if (partitionVal instanceof CharSequence) {
       if (!inputFormatter.isPresent()) {
-        throw new HoodieException("Missing inputformatter. Ensure " + KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!");
+        throw new HoodieException("Missing input formatter. Ensure " + KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!");
       }
       DateTime parsedDateTime = inputFormatter.get().parseDateTime(partitionVal.toString());
       if (this.outputDateTimeZone == null) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java
index c2f86bd6b8..4651c0d4b4 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java
@@ -20,6 +20,7 @@ package org.apache.hudi.client.model;
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.catalyst.util.ArrayData;
 import org.apache.spark.sql.catalyst.util.MapData;
 import org.apache.spark.sql.types.DataType;
@@ -34,7 +35,7 @@ import java.util.Arrays;
  * Hudi internal implementation of the {@link InternalRow} allowing to extend arbitrary
  * {@link InternalRow} overlaying Hudi-internal meta-fields on top of it.
  *
- * Capable of overlaying meta-fields in both cases: whether original {@link #row} contains
+ * Capable of overlaying meta-fields in both cases: whether original {@link #sourceRow} contains
  * meta columns or not. This allows to handle following use-cases allowing to avoid any
  * manipulation (reshuffling) of the source row, by simply creating new instance
  * of {@link HoodieInternalRow} with all the meta-values provided
@@ -50,22 +51,27 @@ public class HoodieInternalRow extends InternalRow {
 
   /**
    * Collection of meta-fields as defined by {@link HoodieRecord#HOODIE_META_COLUMNS}
+   *
+   * NOTE: {@code HoodieInternalRow} *always* overlays its own meta-fields even in case
+   *       when source row also contains them, to make sure these fields are mutable and
+   *       can be updated (for ex, {@link UnsafeRow} doesn't support mutations due to
+   *       its memory layout, as it persists field offsets)
    */
   private final UTF8String[] metaFields;
-  private final InternalRow row;
+  private final InternalRow sourceRow;
 
   /**
-   * Specifies whether source {@link #row} contains meta-fields
+   * Specifies whether source {@link #sourceRow} contains meta-fields
    */
-  private final boolean containsMetaFields;
+  private final boolean sourceContainsMetaFields;
 
   public HoodieInternalRow(UTF8String commitTime,
                            UTF8String commitSeqNumber,
                            UTF8String recordKey,
                            UTF8String partitionPath,
                            UTF8String fileName,
-                           InternalRow row,
-                           boolean containsMetaFields) {
+                           InternalRow sourceRow,
+                           boolean sourceContainsMetaFields) {
     this.metaFields = new UTF8String[] {
         commitTime,
         commitSeqNumber,
@@ -74,21 +80,21 @@ public class HoodieInternalRow extends InternalRow {
         fileName
     };
 
-    this.row = row;
-    this.containsMetaFields = containsMetaFields;
+    this.sourceRow = sourceRow;
+    this.sourceContainsMetaFields = sourceContainsMetaFields;
   }
 
   private HoodieInternalRow(UTF8String[] metaFields,
-                           InternalRow row,
-                           boolean containsMetaFields) {
+                           InternalRow sourceRow,
+                           boolean sourceContainsMetaFields) {
     this.metaFields = metaFields;
-    this.row = row;
-    this.containsMetaFields = containsMetaFields;
+    this.sourceRow = sourceRow;
+    this.sourceContainsMetaFields = sourceContainsMetaFields;
   }
 
   @Override
   public int numFields() {
-    return row.numFields();
+    return sourceRow.numFields();
   }
 
   @Override
@@ -96,7 +102,7 @@ public class HoodieInternalRow extends InternalRow {
     if (ordinal < metaFields.length) {
       metaFields[ordinal] = null;
     } else {
-      row.setNullAt(rebaseOrdinal(ordinal));
+      sourceRow.setNullAt(rebaseOrdinal(ordinal));
     }
   }
 
@@ -112,7 +118,7 @@ public class HoodieInternalRow extends InternalRow {
             String.format("Could not update the row at (%d) with value of type (%s), either UTF8String or String are expected", ordinal, value.getClass().getSimpleName()));
       }
     } else {
-      row.update(rebaseOrdinal(ordinal), value);
+      sourceRow.update(rebaseOrdinal(ordinal), value);
     }
   }
 
@@ -121,113 +127,113 @@ public class HoodieInternalRow extends InternalRow {
     if (ordinal < metaFields.length) {
       return metaFields[ordinal] == null;
     }
-    return row.isNullAt(rebaseOrdinal(ordinal));
+    return sourceRow.isNullAt(rebaseOrdinal(ordinal));
   }
 
   @Override
   public UTF8String getUTF8String(int ordinal) {
-    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
+    if (ordinal < metaFields.length) {
       return metaFields[ordinal];
     }
-    return row.getUTF8String(rebaseOrdinal(ordinal));
+    return sourceRow.getUTF8String(rebaseOrdinal(ordinal));
   }
 
   @Override
   public Object get(int ordinal, DataType dataType) {
-    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
+    if (ordinal < metaFields.length) {
       validateMetaFieldDataType(dataType);
       return metaFields[ordinal];
     }
-    return row.get(rebaseOrdinal(ordinal), dataType);
+    return sourceRow.get(rebaseOrdinal(ordinal), dataType);
   }
 
   @Override
   public boolean getBoolean(int ordinal) {
     ruleOutMetaFieldsAccess(ordinal, Boolean.class);
-    return row.getBoolean(rebaseOrdinal(ordinal));
+    return sourceRow.getBoolean(rebaseOrdinal(ordinal));
   }
 
   @Override
   public byte getByte(int ordinal) {
     ruleOutMetaFieldsAccess(ordinal, Byte.class);
-    return row.getByte(rebaseOrdinal(ordinal));
+    return sourceRow.getByte(rebaseOrdinal(ordinal));
   }
 
   @Override
   public short getShort(int ordinal) {
     ruleOutMetaFieldsAccess(ordinal, Short.class);
-    return row.getShort(rebaseOrdinal(ordinal));
+    return sourceRow.getShort(rebaseOrdinal(ordinal));
   }
 
   @Override
   public int getInt(int ordinal) {
     ruleOutMetaFieldsAccess(ordinal, Integer.class);
-    return row.getInt(rebaseOrdinal(ordinal));
+    return sourceRow.getInt(rebaseOrdinal(ordinal));
   }
 
   @Override
   public long getLong(int ordinal) {
     ruleOutMetaFieldsAccess(ordinal, Long.class);
-    return row.getLong(rebaseOrdinal(ordinal));
+    return sourceRow.getLong(rebaseOrdinal(ordinal));
   }
 
   @Override
   public float getFloat(int ordinal) {
     ruleOutMetaFieldsAccess(ordinal, Float.class);
-    return row.getFloat(rebaseOrdinal(ordinal));
+    return sourceRow.getFloat(rebaseOrdinal(ordinal));
   }
 
   @Override
   public double getDouble(int ordinal) {
     ruleOutMetaFieldsAccess(ordinal, Double.class);
-    return row.getDouble(rebaseOrdinal(ordinal));
+    return sourceRow.getDouble(rebaseOrdinal(ordinal));
   }
 
   @Override
   public Decimal getDecimal(int ordinal, int precision, int scale) {
     ruleOutMetaFieldsAccess(ordinal, Decimal.class);
-    return row.getDecimal(rebaseOrdinal(ordinal), precision, scale);
+    return sourceRow.getDecimal(rebaseOrdinal(ordinal), precision, scale);
   }
 
   @Override
   public byte[] getBinary(int ordinal) {
     ruleOutMetaFieldsAccess(ordinal, Byte[].class);
-    return row.getBinary(rebaseOrdinal(ordinal));
+    return sourceRow.getBinary(rebaseOrdinal(ordinal));
   }
 
   @Override
   public CalendarInterval getInterval(int ordinal) {
     ruleOutMetaFieldsAccess(ordinal, CalendarInterval.class);
-    return row.getInterval(rebaseOrdinal(ordinal));
+    return sourceRow.getInterval(rebaseOrdinal(ordinal));
   }
 
   @Override
   public InternalRow getStruct(int ordinal, int numFields) {
     ruleOutMetaFieldsAccess(ordinal, InternalRow.class);
-    return row.getStruct(rebaseOrdinal(ordinal), numFields);
+    return sourceRow.getStruct(rebaseOrdinal(ordinal), numFields);
   }
 
   @Override
   public ArrayData getArray(int ordinal) {
     ruleOutMetaFieldsAccess(ordinal, ArrayData.class);
-    return row.getArray(rebaseOrdinal(ordinal));
+    return sourceRow.getArray(rebaseOrdinal(ordinal));
   }
 
   @Override
   public MapData getMap(int ordinal) {
     ruleOutMetaFieldsAccess(ordinal, MapData.class);
-    return row.getMap(rebaseOrdinal(ordinal));
+    return sourceRow.getMap(rebaseOrdinal(ordinal));
   }
 
   @Override
   public InternalRow copy() {
-    return new HoodieInternalRow(Arrays.copyOf(metaFields, metaFields.length), row.copy(), containsMetaFields);
+    return new HoodieInternalRow(Arrays.copyOf(metaFields, metaFields.length), sourceRow.copy(), sourceContainsMetaFields);
   }
 
   private int rebaseOrdinal(int ordinal) {
     // NOTE: In cases when source row does not contain meta fields, we will have to
     //       rebase ordinal onto its indexes
-    return containsMetaFields ? ordinal : ordinal - metaFields.length;
+    return sourceContainsMetaFields ? ordinal : ordinal - metaFields.length;
   }
 
   private void validateMetaFieldDataType(DataType dataType) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
index eb408f81c1..e68873f92e 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
@@ -18,15 +18,14 @@
 
 package org.apache.hudi.io.storage.row;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.table.HoodieTable;
-
-import org.apache.hadoop.fs.Path;
 import org.apache.spark.sql.types.StructType;
 
 import java.io.IOException;
@@ -42,32 +41,34 @@ public class HoodieInternalRowFileWriterFactory {
    * Factory method to assist in instantiating an instance of {@link HoodieInternalRowFileWriter}.
    * @param path path of the RowFileWriter.
    * @param hoodieTable instance of {@link HoodieTable} in use.
-   * @param config instance of {@link HoodieWriteConfig} to use.
+   * @param writeConfig instance of {@link HoodieWriteConfig} to use.
    * @param schema schema of the dataset in use.
    * @return the instantiated {@link HoodieInternalRowFileWriter}.
    * @throws IOException if format is not supported or if any exception during instantiating the RowFileWriter.
    *
    */
-  public static HoodieInternalRowFileWriter getInternalRowFileWriter(
-      Path path, HoodieTable hoodieTable, HoodieWriteConfig config, StructType schema)
+  public static HoodieInternalRowFileWriter getInternalRowFileWriter(Path path,
+                                                                     HoodieTable hoodieTable,
+                                                                     HoodieWriteConfig writeConfig,
+                                                                     StructType schema)
       throws IOException {
     final String extension = FSUtils.getFileExtension(path.getName());
     if (PARQUET.getFileExtension().equals(extension)) {
-      return newParquetInternalRowFileWriter(path, config, schema, hoodieTable);
+      return newParquetInternalRowFileWriter(path, hoodieTable, writeConfig, schema, tryInstantiateBloomFilter(writeConfig));
     }
     throw new UnsupportedOperationException(extension + " format not supported yet.");
   }
 
-  private static HoodieInternalRowFileWriter newParquetInternalRowFileWriter(
-      Path path, HoodieWriteConfig writeConfig, StructType structType, HoodieTable table)
+  private static HoodieInternalRowFileWriter newParquetInternalRowFileWriter(Path path,
+                                                                             HoodieTable table,
+                                                                             HoodieWriteConfig writeConfig,
+                                                                             StructType structType,
+                                                                             Option<BloomFilter> bloomFilterOpt
+  )
       throws IOException {
-    BloomFilter filter = BloomFilterFactory.createBloomFilter(
-            writeConfig.getBloomFilterNumEntries(),
-            writeConfig.getBloomFilterFPP(),
-            writeConfig.getDynamicBloomFilterMaxNumEntries(),
-            writeConfig.getBloomFilterType());
     HoodieRowParquetWriteSupport writeSupport =
-            new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, filter, writeConfig);
+            new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, bloomFilterOpt, writeConfig);
+
     return new HoodieInternalRowParquetWriter(
         path,
         new HoodieParquetConfig<>(
@@ -82,30 +83,18 @@ public class HoodieInternalRowFileWriterFactory {
         ));
   }
 
-  public static HoodieInternalRowFileWriter getInternalRowFileWriterWithoutMetaFields(
-      Path path, HoodieTable hoodieTable, HoodieWriteConfig config, StructType schema)
-      throws IOException {
-    if (PARQUET.getFileExtension().equals(hoodieTable.getBaseFileExtension())) {
-      return newParquetInternalRowFileWriterWithoutMetaFields(path, config, schema, hoodieTable);
+  private static Option<BloomFilter> tryInstantiateBloomFilter(HoodieWriteConfig writeConfig) {
+    // NOTE: Currently Bloom Filter is only going to be populated if meta-fields are populated
+    if (writeConfig.populateMetaFields()) {
+      BloomFilter bloomFilter = BloomFilterFactory.createBloomFilter(
+          writeConfig.getBloomFilterNumEntries(),
+          writeConfig.getBloomFilterFPP(),
+          writeConfig.getDynamicBloomFilterMaxNumEntries(),
+          writeConfig.getBloomFilterType());
+
+      return Option.of(bloomFilter);
     }
-    throw new HoodieIOException(hoodieTable.getBaseFileExtension() + " format not supported yet in row writer path");
-  }
 
-  private static HoodieInternalRowFileWriter newParquetInternalRowFileWriterWithoutMetaFields(
-      Path path, HoodieWriteConfig writeConfig, StructType structType, HoodieTable table)
-      throws IOException {
-    HoodieRowParquetWriteSupport writeSupport =
-        new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, null, writeConfig);
-    return new HoodieInternalRowParquetWriter(
-        path, new HoodieParquetConfig<>(
-        writeSupport,
-        writeConfig.getParquetCompressionCodec(),
-        writeConfig.getParquetBlockSize(),
-        writeConfig.getParquetPageSize(),
-        writeConfig.getParquetMaxFileSize(),
-        writeSupport.getHadoopConf(),
-        writeConfig.getParquetCompressionRatio(),
-        writeConfig.parquetDictionaryEnabled())
-    );
+    return Option.empty();
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
index 310afd4f14..e7c6ccd6fa 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.hadoop.CachingPath;
@@ -56,11 +57,6 @@ public class HoodieRowCreateHandle implements Serializable {
   private static final Logger LOG = LogManager.getLogger(HoodieRowCreateHandle.class);
   private static final AtomicLong GLOBAL_SEQ_NO = new AtomicLong(1);
 
-  private static final Integer RECORD_KEY_META_FIELD_ORD =
-      HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.RECORD_KEY_METADATA_FIELD);
-  private static final Integer PARTITION_PATH_META_FIELD_ORD =
-      HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
-
   private final HoodieTable table;
   private final HoodieWriteConfig writeConfig;
 
@@ -87,14 +83,13 @@ public class HoodieRowCreateHandle implements Serializable {
                                int taskPartitionId,
                                long taskId,
                                long taskEpochId,
-                               StructType structType,
-                               boolean populateMetaFields) {
+                               StructType structType) {
     this.partitionPath = partitionPath;
     this.table = table;
     this.writeConfig = writeConfig;
     this.fileId = fileId;
 
-    this.currTimer = new HoodieTimer(true);
+    this.currTimer = HoodieTimer.start();
 
     FileSystem fs = table.getMetaClient().getFs();
 
@@ -102,7 +97,7 @@ public class HoodieRowCreateHandle implements Serializable {
     String fileName = FSUtils.makeBaseFileName(instantTime, writeToken, this.fileId, table.getBaseFileExtension());
     this.path = makeNewPath(fs, partitionPath, fileName, writeConfig);
 
-    this.populateMetaFields = populateMetaFields;
+    this.populateMetaFields = writeConfig.populateMetaFields();
     this.fileName = UTF8String.fromString(path.getName());
     this.commitTime = UTF8String.fromString(instantTime);
     this.seqIdGenerator = (id) -> HoodieRecord.generateSequenceId(instantTime, taskPartitionId, id);
@@ -121,12 +116,15 @@ public class HoodieRowCreateHandle implements Serializable {
               FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath),
               table.getPartitionMetafileFormat());
       partitionMetadata.trySave(taskPartitionId);
+
       createMarkerFile(partitionPath, fileName, instantTime, table, writeConfig);
-      this.fileWriter = createNewFileWriter(path, table, writeConfig, structType);
+
+      this.fileWriter = HoodieInternalRowFileWriterFactory.getInternalRowFileWriter(path, table, writeConfig, structType);
     } catch (IOException e) {
       throw new HoodieInsertException("Failed to initialize file writer for path " + path, e);
     }
-    LOG.info("New handle created for partition :" + partitionPath + " with fileId " + fileId);
+
+    LOG.info("New handle created for partition: " + partitionPath + " with fileId " + fileId);
   }
 
   /**
@@ -137,47 +135,59 @@ public class HoodieRowCreateHandle implements Serializable {
    * @throws IOException
    */
   public void write(InternalRow row) throws IOException {
+    if (populateMetaFields) {
+      writeRow(row);
+    } else {
+      writeRowNoMetaFields(row);
+    }
+  }
+
+  private void writeRow(InternalRow row) {
     try {
       // NOTE: PLEASE READ THIS CAREFULLY BEFORE MODIFYING
       //       This code lays in the hot-path, and substantial caution should be
       //       exercised making changes to it to minimize amount of excessive:
-      //          - Conversions b/w Spark internal (low-level) types and JVM native ones (like
-      //         [[UTF8String]] and [[String]])
+      //          - Conversions b/w Spark internal types and JVM native ones (like [[UTF8String]]
+      //          and [[String]])
       //          - Repeated computations (for ex, converting file-path to [[UTF8String]] over and
       //          over again)
-      UTF8String recordKey = row.getUTF8String(RECORD_KEY_META_FIELD_ORD);
-
-      InternalRow updatedRow;
-      // In cases when no meta-fields need to be added we simply relay provided row to
-      // the writer as is
-      if (!populateMetaFields) {
-        updatedRow = row;
-      } else {
-        UTF8String partitionPath = row.getUTF8String(PARTITION_PATH_META_FIELD_ORD);
-        // This is the only meta-field that is generated dynamically, hence conversion b/w
-        // [[String]] and [[UTF8String]] is unavoidable
-        UTF8String seqId = UTF8String.fromString(seqIdGenerator.apply(GLOBAL_SEQ_NO.getAndIncrement()));
-
-        updatedRow = new HoodieInternalRow(commitTime, seqId, recordKey,
-            partitionPath, fileName, row, true);
-      }
+      UTF8String recordKey = row.getUTF8String(HoodieRecord.RECORD_KEY_META_FIELD_ORD);
+      UTF8String partitionPath = row.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_ORD);
+      // This is the only meta-field that is generated dynamically, hence conversion b/w
+      // [[String]] and [[UTF8String]] is unavoidable
+      UTF8String seqId = UTF8String.fromString(seqIdGenerator.apply(GLOBAL_SEQ_NO.getAndIncrement()));
+
+      InternalRow updatedRow = new HoodieInternalRow(commitTime, seqId, recordKey,
+          partitionPath, fileName, row, true);
 
       try {
         fileWriter.writeRow(recordKey, updatedRow);
         // NOTE: To avoid conversion on the hot-path we only convert [[UTF8String]] into [[String]]
         //       in cases when successful records' writes are being tracked
         writeStatus.markSuccess(writeStatus.isTrackingSuccessfulWrites() ? recordKey.toString() : null);
-      } catch (Throwable t) {
+      } catch (Exception t) {
         writeStatus.markFailure(recordKey.toString(), t);
       }
-    } catch (Throwable ge) {
-      writeStatus.setGlobalError(ge);
-      throw ge;
+    } catch (Exception e) {
+      writeStatus.setGlobalError(e);
+      throw e;
+    }
+  }
+
+  private void writeRowNoMetaFields(InternalRow row) {
+    try {
+      // TODO make sure writing w/ and w/o meta fields is consistent (currently writing w/o
+      //      meta-fields would fail if any record will, while when writing w/ meta-fields it won't)
+      fileWriter.writeRow(row);
+      writeStatus.markSuccess();
+    } catch (Exception e) {
+      writeStatus.setGlobalError(e);
+      throw new HoodieException("Exception thrown while writing spark InternalRows to file ", e);
     }
   }
 
   /**
-   * @returns {@code true} if this handle can take in more writes. else {@code false}.
+   * Returns {@code true} if this handle can take in more writes. else {@code false}.
    */
   public boolean canWrite() {
     return fileWriter.canWrite();
@@ -188,7 +198,6 @@ public class HoodieRowCreateHandle implements Serializable {
    * status of the writes to this handle.
    *
    * @return the {@link HoodieInternalWriteStatus} containing the stats and status of the writes to this handle.
-   * @throws IOException
    */
   public HoodieInternalWriteStatus close() throws IOException {
     fileWriter.close();
@@ -245,10 +254,4 @@ public class HoodieRowCreateHandle implements Serializable {
     return taskPartitionId + "-" + taskId + "-" + taskEpochId;
   }
 
-  protected HoodieInternalRowFileWriter createNewFileWriter(
-      Path path, HoodieTable hoodieTable, HoodieWriteConfig config, StructType schema)
-      throws IOException {
-    return HoodieInternalRowFileWriterFactory.getInternalRowFileWriter(
-        path, hoodieTable, config, schema);
-  }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
index 46c2a6d835..ce26bcb474 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io.storage.row;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
@@ -45,14 +46,13 @@ public class HoodieRowParquetWriteSupport extends ParquetWriteSupport {
   private UTF8String minRecordKey;
   private UTF8String maxRecordKey;
 
-  public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, BloomFilter bloomFilter, HoodieWriteConfig writeConfig) {
-    super();
+  public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, Option<BloomFilter> bloomFilterOpt, HoodieWriteConfig writeConfig) {
     Configuration hadoopConf = new Configuration(conf);
     hadoopConf.set("spark.sql.parquet.writeLegacyFormat", writeConfig.parquetWriteLegacyFormatEnabled());
     hadoopConf.set("spark.sql.parquet.outputTimestampType", writeConfig.parquetOutputTimestampType());
     this.hadoopConf = hadoopConf;
     setSchema(structType, hadoopConf);
-    this.bloomFilter = bloomFilter;
+    this.bloomFilter = bloomFilterOpt.orElse(null);
   }
 
   public Configuration getHadoopConf() {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
index b7cdcf851a..afda4c0a7f 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
@@ -19,132 +19,560 @@
 package org.apache.hudi.keygen;
 
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
 import scala.Function1;
 
-import java.util.HashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static org.apache.hudi.common.util.CollectionUtils.tail;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
 
 /**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link SparkKeyGeneratorInterface}, which
+ *       by default however fallback to Avro implementation. For maximum performance (to avoid
+ *       conversion from Spark's internal data-types to Avro) you should override these methods
+ *       in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
  */
+@ThreadSafe
 public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements SparkKeyGeneratorInterface {
 
-  private static final String STRUCT_NAME = "hoodieRowTopLevelField";
-  private static final String NAMESPACE = "hoodieRow";
-  private Function1<Row, GenericRecord> converterFn = null;
-  private final AtomicBoolean validatePartitionFields = new AtomicBoolean(false);
-  protected StructType structType;
+  private static final Logger LOG = LogManager.getLogger(BuiltinKeyGenerator.class);
+
+  private static final String COMPOSITE_KEY_FIELD_VALUE_INFIX = ":";
+
+  protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 = UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
+  protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER);
+  protected static final UTF8String EMPTY_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(EMPTY_RECORDKEY_PLACEHOLDER);
+
 
-  protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo = new HashMap<>();
-  protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo = new HashMap<>();
+  protected transient volatile SparkRowConverter rowConverter;
+  protected transient volatile SparkRowAccessor rowAccessor;
 
   protected BuiltinKeyGenerator(TypedProperties config) {
     super(config);
   }
 
-  /**
-   * Fetch record key from {@link Row}.
-   *
-   * @param row instance of {@link Row} from which record key is requested.
-   * @return the record key of interest from {@link Row}.
-   */
   @Override
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public String getRecordKey(Row row) {
-    // TODO avoid conversion to avro
-    //      since converterFn is transient this will be repeatedly initialized over and over again
-    if (null == converterFn) {
-      converterFn = AvroConversionUtils.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE);
-    }
-    return getKey(converterFn.apply(row)).getRecordKey();
+    tryInitRowConverter(row.schema());
+    // NOTE: This implementation has considerable computational overhead and has to be overridden
+    //       to provide for optimal performance on Spark. This implementation provided exclusively
+    //       for compatibility reasons.
+    return getRecordKey(rowConverter.convertToAvro(row));
   }
 
   @Override
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  public String getRecordKey(InternalRow internalRow, StructType schema) {
-    try {
-      // TODO fix
-      buildFieldSchemaInfoIfNeeded(schema);
-      return RowKeyGeneratorHelper.getRecordKeyFromInternalRow(internalRow, getRecordKeyFields(), recordKeySchemaInfo, false);
-    } catch (Exception e) {
-      throw new HoodieException("Conversion of InternalRow to Row failed with exception", e);
+  public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+    tryInitRowConverter(schema);
+    // NOTE: This implementation has considerable computational overhead and has to be overridden
+    //       to provide for optimal performance on Spark. This implementation provided exclusively
+    //       for compatibility reasons.
+    return UTF8String.fromString(getRecordKey(rowConverter.convertToAvro(internalRow)));
+  }
+
+  @Override
+  public String getPartitionPath(Row row) {
+    tryInitRowConverter(row.schema());
+    // NOTE: This implementation has considerable computational overhead and has to be overridden
+    //       to provide for optimal performance on Spark. This implementation provided exclusively
+    //       for compatibility reasons.
+    return getPartitionPath(rowConverter.convertToAvro(row));
+  }
+
+  @Override
+  public UTF8String getPartitionPath(InternalRow internalRow, StructType schema) {
+    tryInitRowConverter(schema);
+    // NOTE: This implementation has considerable computational overhead and has to be overridden
+    //       to provide for optimal performance on Spark. This implementation provided exclusively
+    //       for compatibility reasons.
+    GenericRecord avroRecord = rowConverter.convertToAvro(internalRow);
+    return UTF8String.fromString(getPartitionPath(avroRecord));
+  }
+
+  protected void tryInitRowAccessor(StructType schema) {
+    if (this.rowAccessor == null) {
+      synchronized (this) {
+        if (this.rowAccessor == null) {
+          this.rowAccessor = new SparkRowAccessor(schema);
+        }
+      }
     }
   }
+
   /**
-   * Fetch partition path from {@link Row}.
-   *
-   * @param row instance of {@link Row} from which partition path is requested
-   * @return the partition path of interest from {@link Row}.
+   * NOTE: This method has to stay final (so that it's easier for JIT compiler to apply certain
+   *       optimizations, like inlining)
    */
+  protected final String combinePartitionPath(Object... partitionPathParts) {
+    return combinePartitionPathInternal(
+        JavaStringBuilder::new,
+        BuiltinKeyGenerator::toString,
+        this::tryEncodePartitionPath,
+        BuiltinKeyGenerator::handleNullOrEmptyPartitionPathPart,
+        partitionPathParts
+    );
+  }
 
-  @Override
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  public String getPartitionPath(Row row) {
-    if (null == converterFn) {
-      converterFn = AvroConversionUtils.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE);
+  /**
+   * NOTE: This method has to stay final (so that it's easier for JIT compiler to apply certain
+   *       optimizations, like inlining)
+   */
+  protected final UTF8String combinePartitionPathUnsafe(Object... partitionPathParts) {
+    return combinePartitionPathInternal(
+        UTF8StringBuilder::new,
+        BuiltinKeyGenerator::toUTF8String,
+        this::tryEncodePartitionPathUTF8,
+        BuiltinKeyGenerator::handleNullOrEmptyPartitionPathPartUTF8,
+        partitionPathParts
+    );
+  }
+
+  /**
+   * NOTE: This method has to stay final (so that it's easier for JIT compiler to apply certain
+   *       optimizations, like inlining)
+   */
+  protected final String combineRecordKey(Object... recordKeyParts) {
+    return combineRecordKeyInternal(
+        JavaStringBuilder::new,
+        BuiltinKeyGenerator::toString,
+        BuiltinKeyGenerator::handleNullRecordKey,
+        recordKeyParts
+    );
+  }
+
+  /**
+   * NOTE: This method has to stay final (so that it's easier for JIT compiler to apply certain
+   *       optimizations, like inlining)
+   */
+  protected final UTF8String combineRecordKeyUnsafe(Object... recordKeyParts) {
+    return combineRecordKeyInternal(
+        UTF8StringBuilder::new,
+        BuiltinKeyGenerator::toUTF8String,
+        BuiltinKeyGenerator::handleNullRecordKey,
+        recordKeyParts
+    );
+  }
+
+  /**
+   * NOTE: This method has to stay final (so that it's easier for JIT compiler to apply certain
+   *       optimizations, like inlining)
+   */
+  protected final String combineCompositeRecordKey(Object... recordKeyParts) {
+    return combineCompositeRecordKeyInternal(
+        JavaStringBuilder::new,
+        BuiltinKeyGenerator::toString,
+        BuiltinKeyGenerator::handleNullOrEmptyCompositeKeyPart,
+        BuiltinKeyGenerator::isNullOrEmptyCompositeKeyPart,
+        recordKeyParts
+    );
+  }
+
+  /**
+   * NOTE: This method has to stay final (so that it's easier for JIT compiler to apply certain
+   *       optimizations, like inlining)
+   */
+  protected final UTF8String combineCompositeRecordKeyUnsafe(Object... recordKeyParts) {
+    return combineCompositeRecordKeyInternal(
+        UTF8StringBuilder::new,
+        BuiltinKeyGenerator::toUTF8String,
+        BuiltinKeyGenerator::handleNullOrEmptyCompositeKeyPartUTF8,
+        BuiltinKeyGenerator::isNullOrEmptyCompositeKeyPartUTF8,
+        recordKeyParts
+    );
+  }
+
+  private <S> S combineRecordKeyInternal(
+      Supplier<StringBuilder<S>> builderFactory,
+      Function<Object, S> converter,
+      Function<S, S> emptyKeyPartHandler,
+      Object... recordKeyParts
+  ) {
+    if (recordKeyParts.length == 1) {
+      return emptyKeyPartHandler.apply(converter.apply(recordKeyParts[0]));
     }
-    return getKey(converterFn.apply(row)).getPartitionPath();
+
+    StringBuilder<S> sb = builderFactory.get();
+    for (int i = 0; i < recordKeyParts.length; ++i) {
+      // NOTE: If record-key part has already been a string [[toString]] will be a no-op
+      sb.append(emptyKeyPartHandler.apply(converter.apply(recordKeyParts[i])));
+
+      if (i < recordKeyParts.length - 1) {
+        sb.appendJava(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
+      }
+    }
+
+    return sb.build();
+  }
+
+  private <S> S combineCompositeRecordKeyInternal(
+      Supplier<StringBuilder<S>> builderFactory,
+      Function<Object, S> converter,
+      Function<S, S> emptyKeyPartHandler,
+      Predicate<S> isNullOrEmptyKeyPartPredicate,
+      Object... recordKeyParts
+  ) {
+    boolean hasNonNullNonEmptyPart = false;
+
+    StringBuilder<S> sb = builderFactory.get();
+    for (int i = 0; i < recordKeyParts.length; ++i) {
+      // NOTE: If record-key part has already been a string [[toString]] will be a no-op
+      S convertedKeyPart = emptyKeyPartHandler.apply(converter.apply(recordKeyParts[i]));
+
+      sb.appendJava(recordKeyFields.get(i));
+      sb.appendJava(COMPOSITE_KEY_FIELD_VALUE_INFIX);
+      sb.append(convertedKeyPart);
+      // This check is to validate that overall composite-key has at least one non-null, non-empty
+      // segment
+      hasNonNullNonEmptyPart |= !isNullOrEmptyKeyPartPredicate.test(convertedKeyPart);
+
+      if (i < recordKeyParts.length - 1) {
+        sb.appendJava(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
+      }
+    }
+
+    if (hasNonNullNonEmptyPart) {
+      return sb.build();
+    } else {
+      throw new HoodieKeyException(String.format("All of the values for (%s) were either null or empty", recordKeyFields));
+    }
+  }
+
+  private <S> S combinePartitionPathInternal(Supplier<StringBuilder<S>> builderFactory,
+                                             Function<Object, S> converter,
+                                             Function<S, S> encoder,
+                                             Function<S, S> emptyHandler,
+                                             Object... partitionPathParts) {
+    checkState(partitionPathParts.length == partitionPathFields.size());
+    // Avoid creating [[StringBuilder]] in case there's just one partition-path part,
+    // and Hive-style of partitioning is not required
+    if (!hiveStylePartitioning && partitionPathParts.length == 1) {
+      return emptyHandler.apply(converter.apply(partitionPathParts[0]));
+    }
+
+    StringBuilder<S> sb = builderFactory.get();
+    for (int i = 0; i < partitionPathParts.length; ++i) {
+      S partitionPathPartStr = encoder.apply(emptyHandler.apply(converter.apply(partitionPathParts[i])));
+
+      if (hiveStylePartitioning) {
+        sb.appendJava(partitionPathFields.get(i))
+            .appendJava("=")
+            .append(partitionPathPartStr);
+      } else {
+        sb.append(partitionPathPartStr);
+      }
+
+      if (i < partitionPathParts.length - 1) {
+        sb.appendJava(DEFAULT_PARTITION_PATH_SEPARATOR);
+      }
+    }
+
+    return sb.build();
+  }
+
+  private String tryEncodePartitionPath(String partitionPathPart) {
+    return encodePartitionPath ? PartitionPathEncodeUtils.escapePathName(partitionPathPart) : partitionPathPart;
+  }
+
+  private UTF8String tryEncodePartitionPathUTF8(UTF8String partitionPathPart) {
+    // NOTE: This method avoids [[UTF8String]] to [[String]] conversion (and back) unless
+    //       partition-path encoding is enabled
+    return encodePartitionPath ? UTF8String.fromString(PartitionPathEncodeUtils.escapePathName(partitionPathPart.toString())) : partitionPathPart;
+  }
+
+  private void tryInitRowConverter(StructType structType) {
+    if (rowConverter == null) {
+      synchronized (this) {
+        if (rowConverter == null) {
+          rowConverter = new SparkRowConverter(structType);
+        }
+      }
+    }
+  }
+
+  protected static String requireNonNullNonEmptyKey(String key) {
+    if (key != null && key.length() > 0) {
+      return key;
+    } else {
+      throw new HoodieKeyException("Record key has to be non-empty string!");
+    }
+  }
+
+  protected static UTF8String requireNonNullNonEmptyKey(UTF8String key) {
+    if (key != null && key.numChars() > 0) {
+      return key;
+    } else {
+      throw new HoodieKeyException("Record key has to be non-empty string!");
+    }
+  }
+
+  protected static <S> S handleNullRecordKey(S s) {
+    if (s == null || s.toString().isEmpty()) {
+      throw new HoodieKeyException("Record key has to be non-null!");
+    }
+
+    return s;
+  }
+
+  private static UTF8String toUTF8String(Object o) {
+    if (o == null) {
+      return null;
+    } else if (o instanceof UTF8String) {
+      return (UTF8String) o;
+    } else {
+      // NOTE: If object is a [[String]], [[toString]] would be a no-op
+      return UTF8String.fromString(o.toString());
+    }
+  }
+
+  private static String toString(Object o) {
+    return o == null ? null : o.toString();
+  }
+
+  private static String handleNullOrEmptyCompositeKeyPart(Object keyPart) {
+    if (keyPart == null) {
+      return NULL_RECORDKEY_PLACEHOLDER;
+    } else {
+      // NOTE: [[toString]] is a no-op if key-part was already a [[String]]
+      String keyPartStr = keyPart.toString();
+      return !keyPartStr.isEmpty() ? keyPartStr : EMPTY_RECORDKEY_PLACEHOLDER;
+    }
+  }
+
+  private static UTF8String handleNullOrEmptyCompositeKeyPartUTF8(UTF8String keyPart) {
+    if (keyPart == null) {
+      return NULL_RECORD_KEY_PLACEHOLDER_UTF8;
+    } else if (keyPart.numChars() == 0) {
+      return EMPTY_RECORD_KEY_PLACEHOLDER_UTF8;
+    }
+
+    return keyPart;
+  }
+
+  @SuppressWarnings("StringEquality")
+  private static boolean isNullOrEmptyCompositeKeyPart(String keyPart) {
+    // NOTE: Converted key-part is compared against null/empty stub using ref-equality
+    //       for performance reasons (it relies on the fact that we're using internalized
+    //       constants)
+    return keyPart == NULL_RECORDKEY_PLACEHOLDER || keyPart == EMPTY_RECORDKEY_PLACEHOLDER;
+  }
+
+  private static boolean isNullOrEmptyCompositeKeyPartUTF8(UTF8String keyPart) {
+    // NOTE: Converted key-part is compared against null/empty stub using ref-equality
+    //       for performance reasons (it relies on the fact that we're using internalized
+    //       constants)
+    return keyPart == NULL_RECORD_KEY_PLACEHOLDER_UTF8 || keyPart == EMPTY_RECORD_KEY_PLACEHOLDER_UTF8;
+  }
+
+  private static String handleNullOrEmptyPartitionPathPart(Object partitionPathPart) {
+    if (partitionPathPart == null) {
+      return HUDI_DEFAULT_PARTITION_PATH;
+    } else {
+      // NOTE: [[toString]] is a no-op if key-part was already a [[String]]
+      String keyPartStr = partitionPathPart.toString();
+      return keyPartStr.isEmpty() ? HUDI_DEFAULT_PARTITION_PATH : keyPartStr;
+    }
+  }
+
+  private static UTF8String handleNullOrEmptyPartitionPathPartUTF8(UTF8String keyPart) {
+    if (keyPart == null || keyPart.numChars() == 0) {
+      return HUDI_DEFAULT_PARTITION_PATH_UTF8;
+    }
+
+    return keyPart;
   }
 
   /**
-   * Fetch partition path from {@link InternalRow}.
+   * Converts provided (raw) value extracted from the {@link InternalRow} object into a deserialized,
+   * JVM native format (for ex, converting {@code Long} into {@link Instant},
+   * {@code Integer} to {@link LocalDate}, etc)
+   *
+   * This method allows to avoid costly full-row deserialization sequence. Note, that this method
+   * should be maintained in sync w/
+   *
+   * <ol>
+   *   <li>{@code RowEncoder#deserializerFor}, as well as</li>
+   *   <li>{@code HoodieAvroUtils#convertValueForAvroLogicalTypes}</li>
+   * </ol>
    *
-   * @param internalRow {@link InternalRow} instance from which partition path needs to be fetched from.
-   * @param structType  schema of the internalRow.
-   * @return the partition path.
+   * @param dataType target data-type of the given value
+   * @param value target value to be converted
    */
-  @Override
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  public String getPartitionPath(InternalRow internalRow, StructType structType) {
-    try {
-      buildFieldSchemaInfoIfNeeded(structType);
-      return RowKeyGeneratorHelper.getPartitionPathFromInternalRow(internalRow, getPartitionPathFields(),
-          hiveStylePartitioning, partitionPathSchemaInfo);
-    } catch (Exception e) {
-      throw new HoodieException("Conversion of InternalRow to Row failed with exception", e);
-    }
-  }
-
-  void buildFieldSchemaInfoIfNeeded(StructType structType) {
-    if (this.structType == null) {
-      this.structType = structType;
-      getRecordKeyFields()
-          .stream().filter(f -> !f.isEmpty())
-          .forEach(f -> recordKeySchemaInfo.put(f, RowKeyGeneratorHelper.getFieldSchemaInfo(structType, f, true)));
-      if (getPartitionPathFields() != null) {
-        getPartitionPathFields().stream().filter(f -> !f.isEmpty())
-            .forEach(f -> partitionPathSchemaInfo.put(f, RowKeyGeneratorHelper.getFieldSchemaInfo(structType, f, false)));
+  private static Object convertToLogicalDataType(DataType dataType, Object value) {
+    if (value == null) {
+      return null;
+    } else if (dataType instanceof TimestampType) {
+      // Provided value have to be [[Long]] in this case, representing micros since epoch
+      return new Timestamp((Long) value / 1000);
+    } else if (dataType instanceof DateType) {
+      // Provided value have to be [[Int]] in this case
+      return LocalDate.ofEpochDay((Integer) value);
+    }
+
+    return value;
+  }
+
+  protected static class SparkRowConverter {
+    private static final String STRUCT_NAME = "hoodieRowTopLevelField";
+    private static final String NAMESPACE = "hoodieRow";
+
+    private final Function1<Row, GenericRecord> avroConverter;
+    private final SparkRowSerDe rowSerDe;
+
+    SparkRowConverter(StructType schema) {
+      this.rowSerDe = HoodieSparkUtils.getDeserializer(schema);
+      this.avroConverter = AvroConversionUtils.createConverterToAvro(schema, STRUCT_NAME, NAMESPACE);
+    }
+
+    GenericRecord convertToAvro(Row row) {
+      return avroConverter.apply(row);
+    }
+
+    GenericRecord convertToAvro(InternalRow row) {
+      return avroConverter.apply(rowSerDe.deserializeRow(row));
+    }
+  }
+
+  protected class SparkRowAccessor {
+    private final HoodieUnsafeRowUtils.NestedFieldPath[] recordKeyFieldsPaths;
+    private final HoodieUnsafeRowUtils.NestedFieldPath[] partitionPathFieldsPaths;
+
+    SparkRowAccessor(StructType schema) {
+      this.recordKeyFieldsPaths = resolveNestedFieldPaths(getRecordKeyFieldNames(), schema);
+      this.partitionPathFieldsPaths = resolveNestedFieldPaths(getPartitionPathFields(), schema);
+    }
+
+    public Object[] getRecordKeyParts(Row row) {
+      return getNestedFieldValues(row, recordKeyFieldsPaths);
+    }
+
+    public Object[] getRecordPartitionPathValues(Row row) {
+      return getNestedFieldValues(row, partitionPathFieldsPaths);
+    }
+
+    public Object[] getRecordKeyParts(InternalRow row) {
+      return getNestedFieldValues(row, recordKeyFieldsPaths);
+    }
+
+    public Object[] getRecordPartitionPathValues(InternalRow row) {
+      return getNestedFieldValues(row, partitionPathFieldsPaths);
+    }
+
+    private Object[] getNestedFieldValues(Row row, HoodieUnsafeRowUtils.NestedFieldPath[] nestedFieldsPaths) {
+      Object[] nestedFieldValues = new Object[nestedFieldsPaths.length];
+      for (int i = 0; i < nestedFieldsPaths.length; ++i) {
+        nestedFieldValues[i] = HoodieUnsafeRowUtils$.MODULE$.getNestedRowValue(row, nestedFieldsPaths[i]);
+      }
+      return nestedFieldValues;
+    }
+
+    private Object[] getNestedFieldValues(InternalRow row, HoodieUnsafeRowUtils.NestedFieldPath[] nestedFieldsPaths) {
+      Object[] nestedFieldValues = new Object[nestedFieldsPaths.length];
+      for (int i = 0; i < nestedFieldsPaths.length; ++i) {
+        Object rawValue = HoodieUnsafeRowUtils$.MODULE$.getNestedInternalRowValue(row, nestedFieldsPaths[i]);
+        DataType dataType = tail(nestedFieldsPaths[i].parts())._2.dataType();
+
+        nestedFieldValues[i] = convertToLogicalDataType(dataType, rawValue);
+      }
+
+      return nestedFieldValues;
+    }
+
+    private HoodieUnsafeRowUtils.NestedFieldPath[] resolveNestedFieldPaths(List<String> fieldPaths, StructType schema) {
+      try {
+        return fieldPaths.stream()
+            .map(fieldPath -> HoodieUnsafeRowUtils$.MODULE$.composeNestedFieldPath(schema, fieldPath))
+            .toArray(HoodieUnsafeRowUtils.NestedFieldPath[]::new);
+      } catch (Exception e) {
+        LOG.error(String.format("Failed to resolve nested field-paths (%s) in schema (%s)", fieldPaths, schema), e);
+        throw new HoodieException("Failed to resolve nested field-paths", e);
       }
     }
   }
 
-  protected String getPartitionPathInternal(InternalRow row, StructType structType) {
-    buildFieldSchemaInfoIfNeeded(structType);
-    validatePartitionFieldsForInternalRow();
-    return RowKeyGeneratorHelper.getPartitionPathFromInternalRow(row, getPartitionPathFields(),
-        hiveStylePartitioning, partitionPathSchemaInfo);
+  /**
+   * This is a generic interface closing the gap and unifying the {@link java.lang.StringBuilder} with
+   * {@link org.apache.hudi.unsafe.UTF8StringBuilder} implementations, allowing us to avoid code-duplication by performing
+   * most of the key-generation in a generic and unified way
+   *
+   * @param <S> target string type this builder is producing (could either be native {@link String}
+   *           or alternatively {@link UTF8String}
+   */
+  private interface StringBuilder<S> {
+    default StringBuilder<S> append(S s) {
+      return appendJava(s.toString());
+    }
+
+    StringBuilder<S> appendJava(String s);
+
+    S build();
   }
 
-  protected void validatePartitionFieldsForInternalRow() {
-    if (!validatePartitionFields.getAndSet(true)) {
-      partitionPathSchemaInfo.values().forEach(entry -> {
-        if (entry.getKey().size() > 1) {
-          throw new IllegalArgumentException("Nested column for partitioning is not supported with disabling meta columns");
-        }
-      });
+  private static class JavaStringBuilder implements StringBuilder<String> {
+    private final java.lang.StringBuilder sb = new java.lang.StringBuilder();
+
+    @Override
+    public StringBuilder<String> appendJava(String s) {
+      sb.append(s);
+      return this;
+    }
+
+    @Override
+    public String build() {
+      return sb.toString();
+    }
+  }
+
+  private static class UTF8StringBuilder implements StringBuilder<UTF8String> {
+    private final org.apache.hudi.unsafe.UTF8StringBuilder sb = new org.apache.hudi.unsafe.UTF8StringBuilder();
+
+    @Override
+    public StringBuilder<UTF8String> appendJava(String s) {
+      sb.append(s);
+      return this;
+    }
+
+    @Override
+    public StringBuilder<UTF8String> append(UTF8String s) {
+      sb.append(s);
+      return this;
+    }
+
+    @Override
+    public UTF8String build() {
+      return sb.build();
     }
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
index b09ff0755a..1eac757975 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
@@ -17,19 +17,23 @@
 
 package org.apache.hudi.keygen;
 
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
-
-import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
 
 import java.util.Arrays;
 import java.util.stream.Collectors;
 
 /**
- * Complex key generator, which takes names of fields to be used for recordKey and partitionPath as configs.
+ * Key generator prefixing field names before corresponding record-key parts.
+ *
+ * <p/>
+ * For example, for the schema of {@code { "key": string, "value": bytes }}, and corresponding record
+ * {@code { "key": "foo" }}, record-key "key:foo" will be produced.
  */
 public class ComplexKeyGenerator extends BuiltinKeyGenerator {
 
@@ -45,7 +49,7 @@ public class ComplexKeyGenerator extends BuiltinKeyGenerator {
         .map(String::trim)
         .filter(s -> !s.isEmpty())
         .collect(Collectors.toList());
-    complexAvroKeyGenerator = new ComplexAvroKeyGenerator(props);
+    this.complexAvroKeyGenerator = new ComplexAvroKeyGenerator(props);
   }
 
   @Override
@@ -60,26 +64,25 @@ public class ComplexKeyGenerator extends BuiltinKeyGenerator {
 
   @Override
   public String getRecordKey(Row row) {
-    buildFieldSchemaInfoIfNeeded(row.schema());
-    return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, true);
+    tryInitRowAccessor(row.schema());
+    return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row));
   }
 
   @Override
-  public String getRecordKey(InternalRow internalRow, StructType schema) {
-    buildFieldSchemaInfoIfNeeded(schema);
-    return RowKeyGeneratorHelper.getRecordKeyFromInternalRow(internalRow, getRecordKeyFields(), recordKeySchemaInfo, true);
+  public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+    tryInitRowAccessor(schema);
+    return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow));
   }
 
   @Override
   public String getPartitionPath(Row row) {
-    buildFieldSchemaInfoIfNeeded(row.schema());
-    return RowKeyGeneratorHelper.getPartitionPathFromRow(row, getPartitionPathFields(),
-        hiveStylePartitioning, partitionPathSchemaInfo);
+    tryInitRowAccessor(row.schema());
+    return combinePartitionPath(rowAccessor.getRecordPartitionPathValues(row));
   }
 
   @Override
-  public String getPartitionPath(InternalRow row, StructType structType) {
-    return getPartitionPathInternal(row, structType);
+  public UTF8String getPartitionPath(InternalRow row, StructType schema) {
+    tryInitRowAccessor(schema);
+    return combinePartitionPathUnsafe(rowAccessor.getRecordPartitionPathValues(row));
   }
-
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
index c43892af45..fcd94bb4f1 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
@@ -29,9 +29,11 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.stream.Collectors;
 
 /**
@@ -46,16 +48,29 @@ import java.util.stream.Collectors;
  * field in the partition path, use field1:simple 3. If you want your table to be non partitioned, simply leave it as blank.
  *
  * RecordKey is internally generated using either SimpleKeyGenerator or ComplexKeyGenerator.
+ *
+ * @deprecated
  */
+@Deprecated
 public class CustomKeyGenerator extends BuiltinKeyGenerator {
 
   private final CustomAvroKeyGenerator customAvroKeyGenerator;
 
   public CustomKeyGenerator(TypedProperties props) {
-    super(props);
-    this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")).map(String::trim).collect(Collectors.toList());
-    this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(",")).map(String::trim).collect(Collectors.toList());
-    customAvroKeyGenerator = new CustomAvroKeyGenerator(props);
+    // NOTE: We have to strip partition-path configuration, since it could only be interpreted by
+    //       this key-gen
+    super(stripPartitionPathConfig(props));
+    this.recordKeyFields =
+        Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
+            .map(String::trim)
+            .collect(Collectors.toList());
+    String partitionPathFields = props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
+    this.partitionPathFields = partitionPathFields == null
+        ? Collections.emptyList()
+        : Arrays.stream(partitionPathFields.split(",")).map(String::trim).collect(Collectors.toList());
+    this.customAvroKeyGenerator = new CustomAvroKeyGenerator(props);
+
+    validateRecordKeyFields();
   }
 
   @Override
@@ -70,9 +85,8 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
 
   @Override
   public String getRecordKey(Row row) {
-    validateRecordKeyFields();
-    return getRecordKeyFields().size() == 1
-        ? new SimpleKeyGenerator(config).getRecordKey(row)
+    return getRecordKeyFieldNames().size() == 1
+        ? new SimpleKeyGenerator(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()), null).getRecordKey(row)
         : new ComplexKeyGenerator(config).getRecordKey(row);
   }
 
@@ -82,8 +96,8 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
   }
 
   @Override
-  public String getPartitionPath(InternalRow row, StructType structType) {
-    return getPartitionPath(Option.empty(), Option.empty(), Option.of(Pair.of(row, structType)));
+  public UTF8String getPartitionPath(InternalRow row, StructType schema) {
+    return UTF8String.fromString(getPartitionPath(Option.empty(), Option.empty(), Option.of(Pair.of(row, schema))));
   }
 
   private String getPartitionPath(Option<GenericRecord> record, Option<Row> row, Option<Pair<InternalRow, StructType>> internalRowStructTypePair) {
@@ -99,7 +113,7 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
       return "";
     }
     for (String field : getPartitionPathFields()) {
-      String[] fieldWithType = field.split(customAvroKeyGenerator.SPLIT_REGEX);
+      String[] fieldWithType = field.split(CustomAvroKeyGenerator.SPLIT_REGEX);
       if (fieldWithType.length != 2) {
         throw new HoodieKeyGeneratorException("Unable to find field names for partition path in proper format");
       }
@@ -142,9 +156,18 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
   }
 
   private void validateRecordKeyFields() {
-    if (getRecordKeyFields() == null || getRecordKeyFields().isEmpty()) {
+    if (getRecordKeyFieldNames() == null || getRecordKeyFieldNames().isEmpty()) {
       throw new HoodieKeyException("Unable to find field names for record key in cfg");
     }
   }
+
+  private static TypedProperties stripPartitionPathConfig(TypedProperties props) {
+    TypedProperties filtered = new TypedProperties(props);
+    // NOTE: We have to stub it out w/ empty string, since we properties are:
+    //         - Expected to bear this config
+    //         - Can't be stubbed out w/ null
+    filtered.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "");
+    return filtered;
+  }
 }
 
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
index 6cf674f18e..7fcc16094e 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
@@ -25,6 +25,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -40,7 +41,12 @@ public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator {
   public GlobalDeleteKeyGenerator(TypedProperties config) {
     super(config);
     this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","));
-    globalAvroDeleteKeyGenerator = new GlobalAvroDeleteKeyGenerator(config);
+    this.globalAvroDeleteKeyGenerator = new GlobalAvroDeleteKeyGenerator(config);
+  }
+
+  @Override
+  public List<String> getPartitionPathFields() {
+    return new ArrayList<>();
   }
 
   @Override
@@ -53,21 +59,16 @@ public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator {
     return globalAvroDeleteKeyGenerator.getPartitionPath(record);
   }
 
-  @Override
-  public List<String> getPartitionPathFields() {
-    return new ArrayList<>();
-  }
-
   @Override
   public String getRecordKey(Row row) {
-    buildFieldSchemaInfoIfNeeded(row.schema());
-    return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, true);
+    tryInitRowAccessor(row.schema());
+    return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row));
   }
 
   @Override
-  public String getRecordKey(InternalRow internalRow, StructType schema) {
-    buildFieldSchemaInfoIfNeeded(schema);
-    return RowKeyGeneratorHelper.getRecordKeyFromInternalRow(internalRow, getRecordKeyFields(), recordKeySchemaInfo, true);
+  public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+    tryInitRowAccessor(schema);
+    return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow));
   }
 
   @Override
@@ -76,8 +77,8 @@ public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator {
   }
 
   @Override
-  public String getPartitionPath(InternalRow row, StructType structType) {
-    return globalAvroDeleteKeyGenerator.getEmptyPartition();
+  public UTF8String getPartitionPath(InternalRow row, StructType schema) {
+    return UTF8String.EMPTY_UTF8;
   }
 }
 
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
index dc8b253b0f..ccd37dc9ce 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
@@ -18,13 +18,13 @@
 
 package org.apache.hudi.keygen;
 
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
-
-import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -41,9 +41,16 @@ public class NonpartitionedKeyGenerator extends BuiltinKeyGenerator {
   public NonpartitionedKeyGenerator(TypedProperties props) {
     super(props);
     this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
-        .split(",")).map(String::trim).collect(Collectors.toList());
+        .split(","))
+        .map(String::trim)
+        .collect(Collectors.toList());
     this.partitionPathFields = Collections.emptyList();
-    nonpartitionedAvroKeyGenerator = new NonpartitionedAvroKeyGenerator(props);
+    this.nonpartitionedAvroKeyGenerator = new NonpartitionedAvroKeyGenerator(props);
+  }
+
+  @Override
+  public List<String> getPartitionPathFields() {
+    return nonpartitionedAvroKeyGenerator.getPartitionPathFields();
   }
 
   @Override
@@ -52,19 +59,20 @@ public class NonpartitionedKeyGenerator extends BuiltinKeyGenerator {
   }
 
   @Override
-  public String getPartitionPath(GenericRecord record) {
-    return nonpartitionedAvroKeyGenerator.getPartitionPath(record);
+  public String getRecordKey(Row row) {
+    tryInitRowAccessor(row.schema());
+    return combineRecordKey(rowAccessor.getRecordKeyParts(row));
   }
 
   @Override
-  public List<String> getPartitionPathFields() {
-    return nonpartitionedAvroKeyGenerator.getPartitionPathFields();
+  public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+    tryInitRowAccessor(schema);
+    return combineRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow));
   }
 
   @Override
-  public String getRecordKey(Row row) {
-    buildFieldSchemaInfoIfNeeded(row.schema());
-    return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, false);
+  public String getPartitionPath(GenericRecord record) {
+    return nonpartitionedAvroKeyGenerator.getPartitionPath(record);
   }
 
   @Override
@@ -73,8 +81,8 @@ public class NonpartitionedKeyGenerator extends BuiltinKeyGenerator {
   }
 
   @Override
-  public String getPartitionPath(InternalRow internalRow, StructType structType) {
-    return nonpartitionedAvroKeyGenerator.getEmptyPartition();
+  public UTF8String getPartitionPath(InternalRow row, StructType schema) {
+    return UTF8String.EMPTY_UTF8;
   }
 }
 
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGenUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGenUtils.java
deleted file mode 100644
index 9616212378..0000000000
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGenUtils.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.keygen;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.DateType;
-import org.apache.spark.sql.types.TimestampType;
-
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.time.LocalDate;
-
-public class RowKeyGenUtils {
-
-  /**
-   * Converts provided (raw) value extracted from the {@link InternalRow} object into a deserialized,
-   * JVM native format (for ex, converting {@code Long} into {@link Instant},
-   * {@code Integer} to {@link LocalDate}, etc)
-   *
-   * This method allows to avoid costly full-row deserialization sequence. Note, that this method
-   * should be maintained in sync w/
-   *
-   * <ol>
-   *   <li>{@code RowEncoder#deserializerFor}, as well as</li>
-   *   <li>{@code HoodieAvroUtils#convertValueForAvroLogicalTypes}</li>
-   * </ol>
-   *
-   * @param dataType target data-type of the given value
-   * @param value target value to be converted
-   */
-  public static Object convertToLogicalDataType(DataType dataType, Object value) {
-    if (dataType instanceof TimestampType) {
-      // Provided value have to be [[Long]] in this case, representing micros since epoch
-      return new Timestamp((Long) value / 1000);
-    } else if (dataType instanceof DateType) {
-      // Provided value have to be [[Int]] in this case
-      return LocalDate.ofEpochDay((Integer) value);
-    }
-
-    return value;
-  }
-}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
deleted file mode 100644
index c79481bd2b..0000000000
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.keygen;
-
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieKeyException;
-
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import org.apache.spark.sql.types.StructType$;
-import scala.Option;
-
-import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
-import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
-import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
-import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
-import static org.apache.hudi.keygen.RowKeyGenUtils.convertToLogicalDataType;
-
-/**
- * Helper class to fetch fields from Row.
- *
- * TODO cleanup
- */
-@Deprecated
-public class RowKeyGeneratorHelper {
-
-  public static String getRecordKeyFromInternalRow(InternalRow internalRow, List<String> recordKeyFields,
-                                                   Map<String, Pair<List<Integer>, DataType>> recordKeyPositions, boolean prefixFieldName) {
-    AtomicBoolean keyIsNullOrEmpty = new AtomicBoolean(true);
-    String toReturn = recordKeyFields.stream().map(field -> {
-      String val = null;
-      List<Integer> fieldPositions = recordKeyPositions.get(field).getKey();
-      if (fieldPositions.size() == 1) { // simple field
-        Integer fieldPos = fieldPositions.get(0);
-        if (internalRow.isNullAt(fieldPos)) {
-          val = NULL_RECORDKEY_PLACEHOLDER;
-        } else {
-          DataType dataType = recordKeyPositions.get(field).getValue();
-          val = convertToLogicalDataType(dataType, internalRow.get(fieldPos, dataType)).toString();
-          if (val.isEmpty()) {
-            val = EMPTY_RECORDKEY_PLACEHOLDER;
-          } else {
-            keyIsNullOrEmpty.set(false);
-          }
-        }
-      } else { // nested fields
-        val = getNestedFieldVal(internalRow, recordKeyPositions.get(field)).toString();
-        if (!val.contains(NULL_RECORDKEY_PLACEHOLDER) && !val.contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
-          keyIsNullOrEmpty.set(false);
-        }
-      }
-      return prefixFieldName ? (field + ":" + val) : val;
-    }).collect(Collectors.joining(","));
-    if (keyIsNullOrEmpty.get()) {
-      throw new HoodieKeyException("recordKey value: \"" + toReturn + "\" for fields: \"" + Arrays.toString(recordKeyFields.toArray()) + "\" cannot be null or empty.");
-    }
-    return toReturn;
-  }
-
-  /**
-   * Generates record key for the corresponding {@link Row}.
-   *
-   * @param row                instance of {@link Row} of interest
-   * @param recordKeyFields    record key fields as a list
-   * @param recordKeyPositions record key positions for the corresponding record keys in {@code recordKeyFields}
-   * @param prefixFieldName    {@code true} if field name need to be prefixed in the returned result. {@code false} otherwise.
-   * @return the record key thus generated
-   */
-  public static String getRecordKeyFromRow(Row row, List<String> recordKeyFields, Map<String, Pair<List<Integer>, DataType>> recordKeyPositions, boolean prefixFieldName) {
-    AtomicBoolean keyIsNullOrEmpty = new AtomicBoolean(true);
-    String toReturn = recordKeyFields.stream().map(field -> {
-      String val = null;
-      List<Integer> fieldPositions = recordKeyPositions.get(field).getKey();
-      if (fieldPositions.size() == 1) { // simple field
-        Integer fieldPos = fieldPositions.get(0);
-        if (row.isNullAt(fieldPos)) {
-          val = NULL_RECORDKEY_PLACEHOLDER;
-        } else {
-          val = row.getAs(field).toString();
-          if (val.isEmpty()) {
-            val = EMPTY_RECORDKEY_PLACEHOLDER;
-          } else {
-            keyIsNullOrEmpty.set(false);
-          }
-        }
-      } else { // nested fields
-        val = getNestedFieldVal(row, recordKeyPositions.get(field).getKey()).toString();
-        if (!val.contains(NULL_RECORDKEY_PLACEHOLDER) && !val.contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
-          keyIsNullOrEmpty.set(false);
-        }
-      }
-      return prefixFieldName ? (field + ":" + val) : val;
-    }).collect(Collectors.joining(","));
-    if (keyIsNullOrEmpty.get()) {
-      throw new HoodieKeyException("recordKey value: \"" + toReturn + "\" for fields: \"" + Arrays.toString(recordKeyFields.toArray()) + "\" cannot be null or empty.");
-    }
-    return toReturn;
-  }
-
-  /**
-   * Generates partition path for the corresponding {@link Row}.
-   *
-   * @param row                    instance of {@link Row} of interest
-   * @param partitionPathFields    partition path fields as a list
-   * @param hiveStylePartitioning  {@code true} if hive style partitioning is set. {@code false} otherwise
-   * @param partitionPathPositions partition path positions for the corresponding fields in {@code partitionPathFields}
-   * @return the generated partition path for the row
-   */
-  public static String getPartitionPathFromRow(Row row, List<String> partitionPathFields, boolean hiveStylePartitioning, Map<String, Pair<List<Integer>, DataType>> partitionPathPositions) {
-    return IntStream.range(0, partitionPathFields.size()).mapToObj(idx -> {
-      String field = partitionPathFields.get(idx);
-      String val = null;
-      List<Integer> fieldPositions = partitionPathPositions.get(field).getKey();
-      if (fieldPositions.size() == 1) { // simple
-        Integer fieldPos = fieldPositions.get(0);
-        // for partition path, if field is not found, index will be set to -1
-        if (fieldPos == -1 || row.isNullAt(fieldPos)) {
-          val = HUDI_DEFAULT_PARTITION_PATH;
-        } else {
-          Object data = row.get(fieldPos);
-          val = convertToTimestampIfInstant(data).toString();
-          if (val.isEmpty()) {
-            val = HUDI_DEFAULT_PARTITION_PATH;
-          }
-        }
-        if (hiveStylePartitioning) {
-          val = field + "=" + val;
-        }
-      } else { // nested
-        Object data = getNestedFieldVal(row, partitionPathPositions.get(field).getKey());
-        data = convertToTimestampIfInstant(data);
-        if (data.toString().contains(NULL_RECORDKEY_PLACEHOLDER) || data.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
-          val = hiveStylePartitioning ? field + "=" + HUDI_DEFAULT_PARTITION_PATH : HUDI_DEFAULT_PARTITION_PATH;
-        } else {
-          val = hiveStylePartitioning ? field + "=" + data.toString() : data.toString();
-        }
-      }
-      return val;
-    }).collect(Collectors.joining(DEFAULT_PARTITION_PATH_SEPARATOR));
-  }
-
-  public static String getPartitionPathFromInternalRow(InternalRow internalRow, List<String> partitionPathFields, boolean hiveStylePartitioning,
-                                                       Map<String, Pair<List<Integer>, DataType>> partitionPathPositions) {
-    return IntStream.range(0, partitionPathFields.size()).mapToObj(idx -> {
-      String field = partitionPathFields.get(idx);
-      String val = null;
-      List<Integer> fieldPositions = partitionPathPositions.get(field).getKey();
-      DataType dataType = partitionPathPositions.get(field).getValue();
-      if (fieldPositions.size() == 1) { // simple
-        Integer fieldPos = fieldPositions.get(0);
-        // for partition path, if field is not found, index will be set to -1
-        if (fieldPos == -1 || internalRow.isNullAt(fieldPos)) {
-          val = HUDI_DEFAULT_PARTITION_PATH;
-        } else {
-          Object value = convertToLogicalDataType(dataType, internalRow.get(fieldPos, dataType));
-          if (value == null || value.toString().isEmpty()) {
-            val = HUDI_DEFAULT_PARTITION_PATH;
-          } else {
-            val = value.toString();
-          }
-        }
-        if (hiveStylePartitioning) {
-          val = field + "=" + val;
-        }
-      } else { // nested
-        throw new IllegalArgumentException("Nested partitioning is not supported with disabling meta columns.");
-      }
-      return val;
-    }).collect(Collectors.joining(DEFAULT_PARTITION_PATH_SEPARATOR));
-  }
-
-  public static Object getFieldValFromInternalRow(InternalRow internalRow,
-                                                  Integer partitionPathPosition,
-                                                  DataType partitionPathDataType) {
-    Object val = null;
-    if (internalRow.isNullAt(partitionPathPosition)) {
-      return HUDI_DEFAULT_PARTITION_PATH;
-    } else {
-      Object value = partitionPathDataType == DataTypes.StringType ? internalRow.getString(partitionPathPosition) : internalRow.get(partitionPathPosition, partitionPathDataType);
-      if (value == null || value.toString().isEmpty()) {
-        val = HUDI_DEFAULT_PARTITION_PATH;
-      } else {
-        val = value;
-      }
-    }
-    return val;
-  }
-
-
-  /**
-   * Fetch the field value located at the positions requested for.
-   * <p>
-   * The fetching logic recursively goes into the nested field based on the position list to get the field value.
-   * For example, given the row [4357686,key1,2020-03-21,pi,[val1,10]] with the following schema, which has the fourth
-   * field as a nested field, and positions list as [4,0],
-   * <p>
-   * 0 = "StructField(timestamp,LongType,false)"
-   * 1 = "StructField(_row_key,StringType,false)"
-   * 2 = "StructField(ts_ms,StringType,false)"
-   * 3 = "StructField(pii_col,StringType,false)"
-   * 4 = "StructField(nested_col,StructType(StructField(prop1,StringType,false), StructField(prop2,LongType,false)),false)"
-   * <p>
-   * the logic fetches the value from field nested_col.prop1.
-   * If any level of the nested field is null, {@link KeyGenUtils#NULL_RECORDKEY_PLACEHOLDER} is returned.
-   * If the field value is an empty String, {@link KeyGenUtils#EMPTY_RECORDKEY_PLACEHOLDER} is returned.
-   *
-   * @param row       instance of {@link Row} of interest
-   * @param positions tree style positions where the leaf node need to be fetched and returned
-   * @return the field value as per the positions requested for.
-   */
-  public static Object getNestedFieldVal(Row row, List<Integer> positions) {
-    if (positions.size() == 1 && positions.get(0) == -1) {
-      return HUDI_DEFAULT_PARTITION_PATH;
-    }
-    int index = 0;
-    int totalCount = positions.size();
-    Row valueToProcess = row;
-    Object toReturn = null;
-
-    while (index < totalCount) {
-      if (valueToProcess.isNullAt(positions.get(index))) {
-        toReturn = NULL_RECORDKEY_PLACEHOLDER;
-        break;
-      }
-
-      if (index < totalCount - 1) {
-        valueToProcess = (Row) valueToProcess.get(positions.get(index));
-      } else { // last index
-        if (valueToProcess.getAs(positions.get(index)).toString().isEmpty()) {
-          toReturn = EMPTY_RECORDKEY_PLACEHOLDER;
-          break;
-        }
-        toReturn = valueToProcess.getAs(positions.get(index));
-      }
-      index++;
-    }
-    return toReturn;
-  }
-
-  public static Object getNestedFieldVal(InternalRow internalRow, Pair<List<Integer>, DataType> positionsAndType) {
-    if (positionsAndType.getKey().size() == 1 && positionsAndType.getKey().get(0) == -1) {
-      return HUDI_DEFAULT_PARTITION_PATH;
-    }
-    int index = 0;
-    int totalCount = positionsAndType.getKey().size();
-    InternalRow valueToProcess = internalRow;
-    Object toReturn = null;
-
-    while (index < totalCount) {
-      if (valueToProcess.isNullAt(positionsAndType.getKey().get(index))) {
-        toReturn = NULL_RECORDKEY_PLACEHOLDER;
-        break;
-      }
-
-      if (index < totalCount - 1) {
-        valueToProcess = (InternalRow) valueToProcess.get(positionsAndType.getKey().get(index), StructType$.MODULE$.defaultConcreteType());
-      } else { // last index
-        if (valueToProcess.get(positionsAndType.getKey().get(index), positionsAndType.getValue()).toString().isEmpty()) {
-          toReturn = EMPTY_RECORDKEY_PLACEHOLDER;
-          break;
-        }
-        toReturn = valueToProcess.get(positionsAndType.getKey().get(index), positionsAndType.getValue());
-      }
-      index++;
-    }
-    return toReturn;
-  }
-
-  /**
-   * Generate the tree style positions for the field requested for as per the defined struct type.
-   *
-   * @param structType  schema of interest
-   * @param field       field of interest for which the positions are requested for
-   * @param isRecordKey {@code true} if the field requested for is a record key. {@code false} in case of a partition path.
-   * @return the positions of the field as per the struct type and the leaf field's datatype.
-   */
-  public static Pair<List<Integer>, DataType> getFieldSchemaInfo(StructType structType, String field, boolean isRecordKey) {
-    String[] slices = field.split("\\.");
-    List<Integer> positions = new ArrayList<>();
-    int index = 0;
-    int totalCount = slices.length;
-    DataType leafFieldDataType = null;
-    while (index < totalCount) {
-      String slice = slices[index];
-      Option<Object> curIndexOpt = structType.getFieldIndex(slice);
-      if (curIndexOpt.isDefined()) {
-        int curIndex = (int) curIndexOpt.get();
-        positions.add(curIndex);
-        final StructField nestedField = structType.fields()[curIndex];
-        if (index < totalCount - 1) {
-          if (!(nestedField.dataType() instanceof StructType)) {
-            if (isRecordKey) {
-              throw new HoodieKeyException("Nested field should be of type StructType " + nestedField);
-            } else {
-              positions = Collections.singletonList(-1);
-              break;
-            }
-          }
-          structType = (StructType) nestedField.dataType();
-        } else {
-          // leaf node.
-          leafFieldDataType = nestedField.dataType();
-        }
-      } else {
-        if (isRecordKey) {
-          throw new HoodieKeyException("Can't find " + slice + " in StructType for the field " + field);
-        } else {
-          positions = Collections.singletonList(-1);
-          break;
-        }
-      }
-      index++;
-    }
-    return Pair.of(positions, leafFieldDataType);
-  }
-
-  private static Object convertToTimestampIfInstant(Object data) {
-    if (data instanceof Instant) {
-      return Timestamp.from((Instant) data);
-    }
-    return data;
-  }
-}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
index 2f139a61ea..dcffdf3cdb 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
@@ -18,13 +18,13 @@
 
 package org.apache.hudi.keygen;
 
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
-
-import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
 
 import java.util.Collections;
 
@@ -46,11 +46,9 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator {
 
   SimpleKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) {
     super(props);
-    this.recordKeyFields = recordKeyField == null
-        ? Collections.emptyList() : Collections.singletonList(recordKeyField);
-    this.partitionPathFields = partitionPathField == null
-        ? Collections.emptyList() : Collections.singletonList(partitionPathField);
-    simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props, recordKeyField, partitionPathField);
+    this.recordKeyFields = recordKeyField == null ? Collections.emptyList() : Collections.singletonList(recordKeyField);
+    this.partitionPathFields = partitionPathField == null ? Collections.emptyList() : Collections.singletonList(partitionPathField);
+    this.simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props, recordKeyField, partitionPathField);
   }
 
   @Override
@@ -65,19 +63,43 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator {
 
   @Override
   public String getRecordKey(Row row) {
-    buildFieldSchemaInfoIfNeeded(row.schema());
-    return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, false);
+    tryInitRowAccessor(row.schema());
+
+    Object[] recordKeys = rowAccessor.getRecordKeyParts(row);
+    // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite)
+    //       record-key field
+    if (recordKeys[0] == null) {
+      return handleNullRecordKey(null);
+    } else {
+      return requireNonNullNonEmptyKey(recordKeys[0].toString());
+    }
+  }
+
+  @Override
+  public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+    tryInitRowAccessor(schema);
+
+    Object[] recordKeyValues = rowAccessor.getRecordKeyParts(internalRow);
+    // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite)
+    //       record-key field
+    if (recordKeyValues[0] == null) {
+      return handleNullRecordKey(null);
+    } else if (recordKeyValues[0] instanceof UTF8String) {
+      return requireNonNullNonEmptyKey((UTF8String) recordKeyValues[0]);
+    } else {
+      return requireNonNullNonEmptyKey(UTF8String.fromString(recordKeyValues[0].toString()));
+    }
   }
 
   @Override
   public String getPartitionPath(Row row) {
-    buildFieldSchemaInfoIfNeeded(row.schema());
-    return RowKeyGeneratorHelper.getPartitionPathFromRow(row, getPartitionPathFields(),
-        hiveStylePartitioning, partitionPathSchemaInfo);
+    tryInitRowAccessor(row.schema());
+    return combinePartitionPath(rowAccessor.getRecordPartitionPathValues(row));
   }
 
   @Override
-  public String getPartitionPath(InternalRow row, StructType structType) {
-    return getPartitionPathInternal(row, structType);
+  public UTF8String getPartitionPath(InternalRow row, StructType schema) {
+    tryInitRowAccessor(schema);
+    return combinePartitionPathUnsafe(rowAccessor.getRecordPartitionPathValues(row));
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
index bbceaf900b..977ff709bb 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
@@ -18,20 +18,65 @@
 
 package org.apache.hudi.keygen;
 
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIMethod;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
 
 /**
- * Spark key generator interface.
+ * Spark-specific {@link KeyGenerator} interface extension allowing implementation to
+ * specifically implement record-key, partition-path generation w/o the need for (expensive)
+ * conversion from Spark internal representation (for ex, to Avro)
  */
 public interface SparkKeyGeneratorInterface extends KeyGeneratorInterface {
 
+  /**
+   * Extracts record key from Spark's {@link Row}
+   *
+   * @param row instance of {@link Row} from which record-key is extracted
+   * @return record's (primary) key
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   String getRecordKey(Row row);
 
-  String getRecordKey(InternalRow row, StructType schema);
+  /**
+   * Extracts record key from Spark's {@link InternalRow}
+   *
+   * NOTE: Difference b/w {@link Row} and {@link InternalRow} is that {@link InternalRow} could
+   *       internally hold just a binary representation of the data, while {@link Row} has it
+   *       deserialized into JVM-native representation (like {@code Integer}, {@code Long},
+   *       {@code String}, etc)
+   *
+   * @param row instance of {@link InternalRow} from which record-key is extracted
+   * @param schema schema {@link InternalRow} is adhering to
+   * @return record-key as instance of {@link UTF8String}
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  UTF8String getRecordKey(InternalRow row, StructType schema);
 
+  /**
+   * Extracts partition-path from {@link Row}
+   *
+   * @param row instance of {@link Row} from which partition-path is extracted
+   * @return record's partition-path
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   String getPartitionPath(Row row);
 
-  String getPartitionPath(InternalRow internalRow, StructType structType);
+  /**
+   * Extracts partition-path from Spark's {@link InternalRow}
+   *
+   * NOTE: Difference b/w {@link Row} and {@link InternalRow} is that {@link InternalRow} could
+   *       internally hold just a binary representation of the data, while {@link Row} has it
+   *       deserialized into JVM-native representation (like {@code Integer}, {@code Long},
+   *       {@code String}, etc)
+   *
+   * @param row instance of {@link InternalRow} from which record-key is extracted
+   * @param schema schema {@link InternalRow} is adhering to
+   * @return partition-path as instance of {@link UTF8String}
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  UTF8String getPartitionPath(InternalRow row, StructType schema);
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
index 004753f246..f090320ccb 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
@@ -18,20 +18,19 @@
 
 package org.apache.hudi.keygen;
 
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.exception.HoodieKeyGeneratorException;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
-
-import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
 
 import java.io.IOException;
+import java.util.Objects;
 
-import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
 import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
-import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
 
 /**
  * Key generator, that relies on timestamps for partitioning field. Still picks record key by name.
@@ -61,39 +60,44 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
 
   @Override
   public String getRecordKey(Row row) {
-    buildFieldSchemaInfoIfNeeded(row.schema());
-    return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, false);
+    tryInitRowAccessor(row.schema());
+    return combineRecordKey(rowAccessor.getRecordKeyParts(row));
+  }
+
+  @Override
+  public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+    tryInitRowAccessor(schema);
+    return combineRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow));
   }
 
   @Override
   public String getPartitionPath(Row row) {
-    buildFieldSchemaInfoIfNeeded(row.schema());
-    Object partitionPathFieldVal = RowKeyGeneratorHelper.getNestedFieldVal(row, partitionPathSchemaInfo.get(getPartitionPathFields().get(0)).getKey());
-    return getTimestampBasedPartitionPath(partitionPathFieldVal);
+    tryInitRowAccessor(row.schema());
+    Object[] partitionPathValues = rowAccessor.getRecordPartitionPathValues(row);
+    return getFormattedPartitionPath(partitionPathValues[0]);
   }
 
   @Override
-  public String getPartitionPath(InternalRow internalRow, StructType structType) {
-    buildFieldSchemaInfoIfNeeded(structType);
-    validatePartitionFieldsForInternalRow();
-    Object partitionPathFieldVal = RowKeyGeneratorHelper.getFieldValFromInternalRow(internalRow,
-        partitionPathSchemaInfo.get(getPartitionPathFields().get(0)).getKey().get(0),
-        partitionPathSchemaInfo.get(getPartitionPathFields().get(0)).getValue());
-    return getTimestampBasedPartitionPath(partitionPathFieldVal);
+  public UTF8String getPartitionPath(InternalRow row, StructType schema) {
+    tryInitRowAccessor(schema);
+    Object[] partitionPathValues = rowAccessor.getRecordPartitionPathValues(row);
+    return UTF8String.fromString(getFormattedPartitionPath(partitionPathValues[0]));
   }
 
-  private String getTimestampBasedPartitionPath(Object partitionPathFieldVal) {
-    Object fieldVal = null;
+  private String getFormattedPartitionPath(Object partitionPathPart) {
+    Object fieldVal;
+    if (partitionPathPart == null || Objects.equals(partitionPathPart, HUDI_DEFAULT_PARTITION_PATH)) {
+      fieldVal = timestampBasedAvroKeyGenerator.getDefaultPartitionVal();
+    } else if (partitionPathPart instanceof UTF8String) {
+      fieldVal = partitionPathPart.toString();
+    } else {
+      fieldVal = partitionPathPart;
+    }
+
     try {
-      if (partitionPathFieldVal == null || partitionPathFieldVal.toString().contains(HUDI_DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER)
-          || partitionPathFieldVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
-        fieldVal = timestampBasedAvroKeyGenerator.getDefaultPartitionVal();
-      } else {
-        fieldVal = partitionPathFieldVal;
-      }
       return timestampBasedAvroKeyGenerator.getPartitionPath(fieldVal);
     } catch (Exception e) {
-      throw new HoodieKeyGeneratorException("Unable to parse input partition field :" + fieldVal, e);
+      throw new HoodieKeyGeneratorException(String.format("Failed to properly format partition-path (%s)", fieldVal), e);
     }
   }
 }
\ No newline at end of file
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BaseBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BaseBootstrapMetadataHandler.java
index 45a0e91335..0bc15fa210 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BaseBootstrapMetadataHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BaseBootstrapMetadataHandler.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.table.action.bootstrap;
 
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieFileStatus;
 import org.apache.hudi.client.bootstrap.BootstrapWriteStatus;
@@ -30,14 +32,13 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.HoodieBootstrapHandle;
 import org.apache.hudi.keygen.KeyGeneratorInterface;
 import org.apache.hudi.table.HoodieTable;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.parquet.avro.AvroReadSupport;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
 
 public abstract class BaseBootstrapMetadataHandler implements BootstrapMetadataHandler {
   private static final Logger LOG = LogManager.getLogger(ParquetBootstrapMetadataHandler.class);
@@ -57,8 +58,10 @@ public abstract class BaseBootstrapMetadataHandler implements BootstrapMetadataH
         table, partitionPath, FSUtils.createNewFileIdPfx(), table.getTaskContextSupplier());
     try {
       Schema avroSchema = getAvroSchema(sourceFilePath);
-      Schema recordKeySchema = HoodieAvroUtils.generateProjectionSchema(avroSchema,
-          keyGenerator.getRecordKeyFieldNames());
+      List<String> recordKeyColumns = keyGenerator.getRecordKeyFieldNames().stream()
+          .map(HoodieAvroUtils::getRootLevelFieldName)
+          .collect(Collectors.toList());
+      Schema recordKeySchema = HoodieAvroUtils.generateProjectionSchema(avroSchema, recordKeyColumns);
       LOG.info("Schema to be used for reading record Keys :" + recordKeySchema);
       AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), recordKeySchema);
       AvroReadSupport.setRequestedProjection(table.getHadoopConf(), recordKeySchema);
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/unsafe/UTF8StringBuilder.java b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/unsafe/UTF8StringBuilder.java
new file mode 100644
index 0000000000..3d9f060515
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/unsafe/UTF8StringBuilder.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.unsafe;
+
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A helper class to write {@link UTF8String}s to an internal buffer and build the concatenated
+ * {@link UTF8String} at the end.
+ */
+public class UTF8StringBuilder {
+
+  private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
+
+  private byte[] buffer;
+  private int cursor = Platform.BYTE_ARRAY_OFFSET;
+
+  public UTF8StringBuilder() {
+    // Since initial buffer size is 16 in `StringBuilder`, we set the same size here
+    this(16);
+  }
+
+  public UTF8StringBuilder(int initialSize) {
+    if (initialSize < 0) {
+      throw new IllegalArgumentException("Size must be non-negative");
+    }
+    if (initialSize > ARRAY_MAX) {
+      throw new IllegalArgumentException(
+          "Size " + initialSize + " exceeded maximum size of " + ARRAY_MAX);
+    }
+    this.buffer = new byte[initialSize];
+  }
+
+  // Grows the buffer by at least `neededSize`
+  private void grow(int neededSize) {
+    if (neededSize > ARRAY_MAX - totalSize()) {
+      throw new UnsupportedOperationException(
+          "Cannot grow internal buffer by size " + neededSize + " because the size after growing " +
+              "exceeds size limitation " + ARRAY_MAX);
+    }
+    final int length = totalSize() + neededSize;
+    if (buffer.length < length) {
+      int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX;
+      final byte[] tmp = new byte[newLength];
+      Platform.copyMemory(
+          buffer,
+          Platform.BYTE_ARRAY_OFFSET,
+          tmp,
+          Platform.BYTE_ARRAY_OFFSET,
+          totalSize());
+      buffer = tmp;
+    }
+  }
+
+  private int totalSize() {
+    return cursor - Platform.BYTE_ARRAY_OFFSET;
+  }
+
+  public void append(UTF8String value) {
+    grow(value.numBytes());
+    value.writeToMemory(buffer, cursor);
+    cursor += value.numBytes();
+  }
+
+  public void append(String value) {
+    append(UTF8String.fromString(value));
+  }
+
+  public void appendBytes(Object base, long offset, int length) {
+    grow(length);
+    Platform.copyMemory(
+        base,
+        offset,
+        buffer,
+        cursor,
+        length);
+    cursor += length;
+  }
+
+  public UTF8String build() {
+    return UTF8String.fromBytes(buffer, 0, totalSize());
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala
index 10d6a2276e..c105142de0 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala
@@ -29,16 +29,16 @@ object HoodieUnsafeRowUtils {
    * Fetches (nested) value w/in provided [[Row]] uniquely identified by the provided nested-field path
    * previously composed by [[composeNestedFieldPath]]
    */
-  def getNestedRowValue(row: Row, nestedFieldPath: Array[(Int, StructField)]): Any = {
+  def getNestedRowValue(row: Row, nestedFieldPath: NestedFieldPath): Any = {
     var curRow = row
-    for (idx <- nestedFieldPath.indices) {
-      val (ord, f) = nestedFieldPath(idx)
+    for (idx <- nestedFieldPath.parts.indices) {
+      val (ord, f) = nestedFieldPath.parts(idx)
       if (curRow.isNullAt(ord)) {
         // scalastyle:off return
         if (f.nullable) return null
         else throw new IllegalArgumentException(s"Found null value for the field that is declared as non-nullable: $f")
         // scalastyle:on return
-      } else if (idx == nestedFieldPath.length - 1) {
+      } else if (idx == nestedFieldPath.parts.length - 1) {
         // scalastyle:off return
         return curRow.get(ord)
         // scalastyle:on return
@@ -57,21 +57,21 @@ object HoodieUnsafeRowUtils {
    * Fetches (nested) value w/in provided [[InternalRow]] uniquely identified by the provided nested-field path
    * previously composed by [[composeNestedFieldPath]]
    */
-  def getNestedInternalRowValue(row: InternalRow, nestedFieldPath: Array[(Int, StructField)]): Any = {
-    if (nestedFieldPath.length == 0) {
+  def getNestedInternalRowValue(row: InternalRow, nestedFieldPath: NestedFieldPath): Any = {
+    if (nestedFieldPath.parts.length == 0) {
       throw new IllegalArgumentException("Nested field-path could not be empty")
     }
 
     var curRow = row
     var idx = 0
-    while (idx < nestedFieldPath.length) {
-      val (ord, f) = nestedFieldPath(idx)
+    while (idx < nestedFieldPath.parts.length) {
+      val (ord, f) = nestedFieldPath.parts(idx)
       if (curRow.isNullAt(ord)) {
         // scalastyle:off return
         if (f.nullable) return null
         else throw new IllegalArgumentException(s"Found null value for the field that is declared as non-nullable: $f")
         // scalastyle:on return
-      } else if (idx == nestedFieldPath.length - 1) {
+      } else if (idx == nestedFieldPath.parts.length - 1) {
         // scalastyle:off return
         return curRow.get(ord, f.dataType)
         // scalastyle:on return
@@ -93,7 +93,7 @@ object HoodieUnsafeRowUtils {
    *
    * This method produces nested-field path, that is subsequently used by [[getNestedInternalRowValue]], [[getNestedRowValue]]
    */
-  def composeNestedFieldPath(schema: StructType, nestedFieldRef: String): Array[(Int, StructField)] = {
+  def composeNestedFieldPath(schema: StructType, nestedFieldRef: String): NestedFieldPath = {
     val fieldRefParts = nestedFieldRef.split('.')
     val ordSeq = ArrayBuffer[(Int, StructField)]()
     var curSchema = schema
@@ -115,6 +115,8 @@ object HoodieUnsafeRowUtils {
       idx += 1
     }
 
-    ordSeq.toArray
+    NestedFieldPath(ordSeq.toArray)
   }
+
+  case class NestedFieldPath(parts: Array[(Int, StructField)])
 }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
index d6c060c6bd..481cda00d6 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io.storage.row;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.storage.HoodieParquetConfig;
@@ -115,6 +116,6 @@ public class TestHoodieInternalRowParquetWriter extends HoodieClientTestHarness
         writeConfig.getBloomFilterFPP(),
         writeConfig.getDynamicBloomFilterMaxNumEntries(),
         writeConfig.getBloomFilterType());
-    return new HoodieRowParquetWriteSupport(hadoopConf, SparkDatasetTestUtils.STRUCT_TYPE, filter, writeConfig);
+    return new HoodieRowParquetWriteSupport(hadoopConf, SparkDatasetTestUtils.STRUCT_TYPE, Option.of(filter), writeConfig);
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
index f6e65adad7..ad73a256a6 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
@@ -80,9 +80,11 @@ public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
   @ValueSource(booleans = { true, false })
   public void testRowCreateHandle(boolean populateMetaFields) throws Exception {
     // init config and table
-    HoodieWriteConfig cfg =
-        SparkDatasetTestUtils.getConfigBuilder(basePath, timelineServicePort).build();
-    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieWriteConfig config = SparkDatasetTestUtils.getConfigBuilder(basePath, timelineServicePort)
+        .withPopulateMetaFields(populateMetaFields)
+        .build();
+
+    HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
     List<String> fileNames = new ArrayList<>();
     List<String> fileAbsPaths = new ArrayList<>();
 
@@ -95,8 +97,8 @@ public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
       String fileId = UUID.randomUUID().toString();
       String instantTime = "000";
 
-      HoodieRowCreateHandle handle = new HoodieRowCreateHandle(table, cfg, partitionPath, fileId, instantTime,
-          RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE, populateMetaFields);
+      HoodieRowCreateHandle handle = new HoodieRowCreateHandle(table, config, partitionPath, fileId, instantTime,
+          RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE);
       int size = 10 + RANDOM.nextInt(1000);
       // Generate inputs
       Dataset<Row> inputRows = SparkDatasetTestUtils.getRandomRows(sqlContext, size, partitionPath, false);
@@ -133,7 +135,7 @@ public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
     String instantTime = "000";
 
     HoodieRowCreateHandle handle =
-        new HoodieRowCreateHandle(table, cfg, partitionPath, fileId, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE, true);
+        new HoodieRowCreateHandle(table, cfg, partitionPath, fileId, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE);
     int size = 10 + RANDOM.nextInt(1000);
     int totalFailures = 5;
     // Generate first batch of valid rows
@@ -186,7 +188,7 @@ public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
 
     try {
       HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
-      new HoodieRowCreateHandle(table, cfg, " def", UUID.randomUUID().toString(), "001", RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE, true);
+      new HoodieRowCreateHandle(table, cfg, " def", UUID.randomUUID().toString(), "001", RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE);
       fail("Should have thrown exception");
     } catch (HoodieInsertException ioe) {
       // expected without metadata table
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java
index fd8ece1e06..918462ac0a 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java
@@ -179,6 +179,7 @@ public class SparkDatasetTestUtils {
 
   public static HoodieWriteConfig.Builder getConfigBuilder(String basePath, int timelineServicePort) {
     return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+        .withPopulateMetaFields(true)
         .withParallelism(2, 2)
         .withDeleteParallelism(2)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
diff --git a/hudi-client/hudi-spark-client/src/test/scala/org/apache/hudi/keygen/TestRowGeneratorHelper.scala b/hudi-client/hudi-spark-client/src/test/scala/org/apache/hudi/keygen/TestRowGeneratorHelper.scala
deleted file mode 100644
index cd55e381e2..0000000000
--- a/hudi-client/hudi-spark-client/src/test/scala/org/apache/hudi/keygen/TestRowGeneratorHelper.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.keygen
-
-import java.sql.Timestamp
-import org.apache.spark.sql.Row
-import org.apache.hudi.keygen.RowKeyGeneratorHelper._
-import org.apache.spark.sql.types.{DataType, DataTypes}
-import org.junit.jupiter.api.{Assertions, Test}
-
-import scala.collection.JavaConverters._
-
-class TestRowGeneratorHelper {
-
-  @Test
-  def testGetPartitionPathFromRow(): Unit = {
-
-    /** single plain partition */
-    val row1 = Row.fromSeq(Seq(1, "z3", 10.0, "20220108"))
-    val ptField1 = List("dt").asJava
-    val mapValue = org.apache.hudi.common.util.collection.Pair.of(List(new Integer(3)).asJava, DataTypes.LongType)
-    val ptPos1 = Map("dt" -> mapValue).asJava
-
-    Assertions.assertEquals("20220108",
-      getPartitionPathFromRow(row1, ptField1, false, ptPos1))
-    Assertions.assertEquals("dt=20220108",
-      getPartitionPathFromRow(row1, ptField1, true, ptPos1))
-
-    /** multiple plain partitions */
-    val row2 = Row.fromSeq(Seq(1, "z3", 10.0, "2022", "01", "08"))
-    val ptField2 = List("year", "month", "day").asJava
-    val ptPos2 = Map("year" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(3)).asJava, DataTypes.StringType),
-      "month" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(4)).asJava, DataTypes.StringType),
-      "day" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(5)).asJava, DataTypes.StringType)
-    ).asJava
-    Assertions.assertEquals("2022/01/08",
-      getPartitionPathFromRow(row2, ptField2, false, ptPos2))
-    Assertions.assertEquals("year=2022/month=01/day=08",
-      getPartitionPathFromRow(row2, ptField2, true, ptPos2))
-
-    /** multiple partitions which contains TimeStamp type or Instant type */
-    val timestamp = Timestamp.valueOf("2020-01-08 10:00:00")
-    val instant = timestamp.toInstant
-    val ptField3 = List("event", "event_time").asJava
-    val ptPos3 = Map("event" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(3)).asJava, DataTypes.StringType),
-      "event_time" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(4)).asJava, DataTypes.TimestampType)
-    ).asJava
-
-    // with timeStamp type
-    val row2_ts = Row.fromSeq(Seq(1, "z3", 10.0, "click", timestamp))
-    Assertions.assertEquals("click/2020-01-08 10:00:00.0",
-      getPartitionPathFromRow(row2_ts, ptField3, false, ptPos3))
-    Assertions.assertEquals("event=click/event_time=2020-01-08 10:00:00.0",
-      getPartitionPathFromRow(row2_ts, ptField3, true, ptPos3))
-
-    // with instant type
-    val row2_instant = Row.fromSeq(Seq(1, "z3", 10.0, "click", instant))
-    Assertions.assertEquals("click/2020-01-08 10:00:00.0",
-      getPartitionPathFromRow(row2_instant, ptField3, false, ptPos3))
-    Assertions.assertEquals("event=click/event_time=2020-01-08 10:00:00.0",
-      getPartitionPathFromRow(row2_instant, ptField3, true, ptPos3))
-
-    /** mixed case with plain and nested partitions */
-    val nestedRow4 = Row.fromSeq(Seq(instant, "ad"))
-    val ptField4 = List("event_time").asJava
-    val ptPos4 = Map("event_time" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(3), new Integer(0)).asJava, DataTypes.TimestampType)).asJava
-    // with instant type
-    val row4 = Row.fromSeq(Seq(1, "z3", 10.0, nestedRow4, "click"))
-    Assertions.assertEquals("2020-01-08 10:00:00.0",
-      getPartitionPathFromRow(row4, ptField4, false, ptPos4))
-    Assertions.assertEquals("event_time=2020-01-08 10:00:00.0",
-      getPartitionPathFromRow(row4, ptField4, true, ptPos4))
-
-    val nestedRow5 = Row.fromSeq(Seq(timestamp, "ad"))
-    val ptField5 = List("event", "event_time").asJava
-    val ptPos5 = Map(
-      "event_time" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(3), new Integer(0)).asJava, DataTypes.TimestampType),
-      "event" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(4)).asJava, DataTypes.StringType)
-    ).asJava
-    val row5 = Row.fromSeq(Seq(1, "z3", 10.0, nestedRow5, "click"))
-    Assertions.assertEquals("click/2020-01-08 10:00:00.0",
-      getPartitionPathFromRow(row5, ptField5, false, ptPos5))
-    Assertions.assertEquals("event=click/event_time=2020-01-08 10:00:00.0",
-      getPartitionPathFromRow(row5, ptField5, true, ptPos5))
-  }
-}
diff --git a/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/TestHoodieUnsafeRowUtils.scala b/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/TestHoodieUnsafeRowUtils.scala
index b051a9b507..c23bbab99b 100644
--- a/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/TestHoodieUnsafeRowUtils.scala
+++ b/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/TestHoodieUnsafeRowUtils.scala
@@ -41,7 +41,7 @@ class TestHoodieUnsafeRowUtils {
 
     assertEquals(
       Seq((1, schema(1)), (0, schema(1).dataType.asInstanceOf[StructType](0))),
-      composeNestedFieldPath(schema, "bar.baz").toSeq)
+      composeNestedFieldPath(schema, "bar.baz").parts.toSeq)
 
     assertThrows(classOf[IllegalArgumentException]) { () =>
       composeNestedFieldPath(schema, "foo.baz")
@@ -148,6 +148,7 @@ class TestHoodieUnsafeRowUtils {
     }
   }
 
+  // TODO rebase on ScalaAssertionSupport
   private def assertThrows[T <: Throwable](expectedExceptionClass: Class[T])(f: () => Unit): T = {
     try {
       f.apply()
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index fa65461bfd..1daf638f2b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -388,7 +388,7 @@ public class HoodieAvroUtils {
       copyOldValueOrSetDefault(genericRecord, newRecord, f);
     }
     // do not preserve FILENAME_METADATA_FIELD
-    newRecord.put(HoodieRecord.FILENAME_META_FIELD_POS, fileName);
+    newRecord.put(HoodieRecord.FILENAME_META_FIELD_ORD, fileName);
     if (!GenericData.get().validate(newSchema, newRecord)) {
       throw new SchemaCompatibilityException(
           "Unable to validate the rewritten record " + genericRecord + " against schema " + newSchema);
@@ -400,7 +400,7 @@ public class HoodieAvroUtils {
   public static GenericRecord rewriteEvolutionRecordWithMetadata(GenericRecord genericRecord, Schema newSchema, String fileName) {
     GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(genericRecord, newSchema, new HashMap<>());
     // do not preserve FILENAME_METADATA_FIELD
-    newRecord.put(HoodieRecord.FILENAME_META_FIELD_POS, fileName);
+    newRecord.put(HoodieRecord.FILENAME_META_FIELD_ORD, fileName);
     return newRecord;
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index e504b7b87d..c7ef08a162 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -57,9 +57,9 @@ public abstract class HoodieRecord<T> implements Serializable {
       IntStream.range(0, HOODIE_META_COLUMNS.size()).mapToObj(idx -> Pair.of(HOODIE_META_COLUMNS.get(idx), idx))
           .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
 
-  public static int RECORD_KEY_META_FIELD_POS = HOODIE_META_COLUMNS_NAME_TO_POS.get(RECORD_KEY_METADATA_FIELD);
-  public static int PARTITION_PATH_META_FIELD_POS = HOODIE_META_COLUMNS_NAME_TO_POS.get(PARTITION_PATH_METADATA_FIELD);
-  public static int FILENAME_META_FIELD_POS = HOODIE_META_COLUMNS_NAME_TO_POS.get(FILENAME_METADATA_FIELD);
+  public static int RECORD_KEY_META_FIELD_ORD = HOODIE_META_COLUMNS_NAME_TO_POS.get(RECORD_KEY_METADATA_FIELD);
+  public static int PARTITION_PATH_META_FIELD_ORD = HOODIE_META_COLUMNS_NAME_TO_POS.get(PARTITION_PATH_METADATA_FIELD);
+  public static int FILENAME_META_FIELD_ORD = HOODIE_META_COLUMNS_NAME_TO_POS.get(FILENAME_METADATA_FIELD);
 
   /**
    * Identifies the record across the table.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 9945eb0650..529b0e8c99 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -402,13 +402,13 @@ public class HoodieTableMetaClient implements Serializable {
       throw new HoodieException(HoodieTableConfig.POPULATE_META_FIELDS.key() + " already disabled for the table. Can't be re-enabled back");
     }
 
-    // meta fields can be disabled only with SimpleKeyGenerator, NonPartitioned and ComplexKeyGen.
+    // Meta fields can be disabled only when either {@code SimpleKeyGenerator}, {@code ComplexKeyGenerator}, {@code NonpartitionedKeyGenerator} is used
     if (!getTableConfig().populateMetaFields()) {
       String keyGenClass = properties.getProperty(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.SimpleKeyGenerator");
-      if (!keyGenClass.equals("org.apache.hudi.keygen.SimpleKeyGenerator") && !keyGenClass.equals("org.apache.hudi.keygen.NonpartitionedKeyGenerator")
+      if (!keyGenClass.equals("org.apache.hudi.keygen.SimpleKeyGenerator")
+          && !keyGenClass.equals("org.apache.hudi.keygen.NonpartitionedKeyGenerator")
           && !keyGenClass.equals("org.apache.hudi.keygen.ComplexKeyGenerator")) {
-        throw new HoodieException("Only simple, non partitioned and complex key generator is supported when meta fields are disabled. KeyGenerator used : "
-            + properties.getProperty(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key()));
+        throw new HoodieException("Only simple, non-partitioned or complex key generator are supported when meta-fields are disabled. Used: " + keyGenClass);
       }
     }
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
index bb1ef72bea..8036995fab 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
@@ -40,6 +40,8 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+
 public class CollectionUtils {
 
   public static final Properties EMPTY_PROPERTIES = new Properties();
@@ -52,6 +54,14 @@ public class CollectionUtils {
     return !isNullOrEmpty(c);
   }
 
+  /**
+   * Returns last element of the array of {@code T}
+   */
+  public static <T> T tail(T[] ts) {
+    checkArgument(ts.length > 0);
+    return ts[ts.length - 1];
+  }
+
   /**
    * Collects provided {@link Iterator} to a {@link Stream}
    */
@@ -143,7 +153,7 @@ public class CollectionUtils {
   }
 
   public static <E> Stream<List<E>> batchesAsStream(List<E> list, int batchSize) {
-    ValidationUtils.checkArgument(batchSize > 0, "batch size must be positive.");
+    checkArgument(batchSize > 0, "batch size must be positive.");
     int total = list.size();
     if (total <= 0) {
       return Stream.empty();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java
index ce14f6c91c..a0a8ca0867 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java
@@ -32,11 +32,15 @@ public class HoodieTimer {
   // Ordered stack of TimeInfo's to make sure stopping the timer returns the correct elapsed time
   private final Deque<TimeInfo> timeInfoDeque = new ArrayDeque<>();
 
+  /**
+   * @deprecated please use either {@link HoodieTimer#start} or {@link HoodieTimer#create} APIs
+   */
+  @Deprecated
   public HoodieTimer() {
     this(false);
   }
 
-  public HoodieTimer(boolean shouldStart) {
+  private HoodieTimer(boolean shouldStart) {
     if (shouldStart) {
       startTimer();
     }
@@ -79,4 +83,12 @@ public class HoodieTimer {
     }
     return timeInfoDeque.pop().stop();
   }
+
+  public static HoodieTimer start() {
+    return new HoodieTimer(true);
+  }
+
+  public static HoodieTimer create() {
+    return new HoodieTimer(false);
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index c779a3269a..ddd28fc4ea 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -168,7 +168,7 @@ public class ParquetUtils extends BaseFileUtils {
       conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf());
       Schema readSchema = keyGeneratorOpt.map(keyGenerator -> {
         List<String> fields = new ArrayList<>();
-        fields.addAll(keyGenerator.getRecordKeyFields());
+        fields.addAll(keyGenerator.getRecordKeyFieldNames());
         fields.addAll(keyGenerator.getPartitionPathFields());
         return HoodieAvroUtils.getSchemaForFields(readAvroSchema(conf, filePath), fields);
       })
diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
index de4e0c3ccb..a09101dedf 100644
--- a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
@@ -24,7 +24,6 @@ import org.apache.hudi.exception.HoodieKeyException;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 import java.util.List;
-import java.util.stream.Collectors;
 
 public abstract class BaseKeyGenerator extends KeyGenerator {
 
@@ -59,23 +58,14 @@ public abstract class BaseKeyGenerator extends KeyGenerator {
    */
   @Override
   public final HoodieKey getKey(GenericRecord record) {
-    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+    if (getRecordKeyFieldNames() == null || getPartitionPathFields() == null) {
       throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
     }
     return new HoodieKey(getRecordKey(record), getPartitionPath(record));
   }
 
   @Override
-  public final List<String> getRecordKeyFieldNames() {
-    // For nested columns, pick top level column name
-    // TODO materialize
-    return getRecordKeyFields().stream().map(k -> {
-      int idx = k.indexOf('.');
-      return idx > 0 ? k.substring(0, idx) : k;
-    }).collect(Collectors.toList());
-  }
-
-  public List<String> getRecordKeyFields() {
+  public List<String> getRecordKeyFieldNames() {
     return recordKeyFields;
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/KeyGenerator.java b/hudi-common/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
index 8c3f794ee6..691b1f4d55 100644
--- a/hudi-common/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
@@ -34,7 +34,7 @@ import java.util.List;
 @PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
 public abstract class KeyGenerator implements KeyGeneratorInterface {
 
-  protected TypedProperties config;
+  protected final TypedProperties config;
 
   protected KeyGenerator(TypedProperties config) {
     this.config = config;
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
index 7bef847712..d5b769190c 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
@@ -259,7 +259,7 @@ public class TestParquetUtils extends HoodieCommonTestHarness {
     }
 
     @Override
-    public List<String> getRecordKeyFields() {
+    public List<String> getRecordKeyFieldNames() {
       return Arrays.asList(new String[]{recordKeyField});
     }
 
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
index 20e12e9030..e643d9f9d0 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
@@ -83,7 +83,7 @@ public class DeltaGenerator implements Serializable {
     this.jsc = jsc;
     this.sparkSession = sparkSession;
     this.schemaStr = schemaStr;
-    this.recordRowKeyFieldNames = keyGenerator.getRecordKeyFields();
+    this.recordRowKeyFieldNames = keyGenerator.getRecordKeyFieldNames();
     this.partitionPathFieldNames = keyGenerator.getPartitionPathFields();
   }
 
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
index db87f5dce0..8115d50a78 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
@@ -235,9 +235,8 @@ public abstract class ITTestBase {
     }
     int exitCode = dockerClient.inspectExecCmd(createCmdResponse.getId()).exec().getExitCode();
     LOG.info("Exit code for command : " + exitCode);
-    if (exitCode != 0) {
-      LOG.error("\n\n ###### Stdout #######\n" + callback.getStdout().toString());
-    }
+
+    LOG.error("\n\n ###### Stdout #######\n" + callback.getStdout().toString());
     LOG.error("\n\n ###### Stderr #######\n" + callback.getStderr().toString());
 
     if (checkIfSucceed) {
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
index cc37de2f29..d62c9a768c 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.connect.utils;
 
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -37,9 +39,6 @@ import org.apache.hudi.keygen.CustomAvroKeyGenerator;
 import org.apache.hudi.keygen.CustomKeyGenerator;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
-
-import com.google.protobuf.ByteString;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.TopicDescription;
@@ -49,14 +48,14 @@ import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
 import java.nio.file.FileVisitOption;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java
index 516c6c5fc7..f7918cf3fd 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java
@@ -23,7 +23,6 @@ import org.apache.hudi.client.HoodieInternalWriteStatus;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.PartitionPathEncodeUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.row.HoodieRowCreateHandle;
@@ -44,6 +43,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -74,7 +74,7 @@ public class BulkInsertDataInternalWriterHelper {
    * NOTE: This is stored as Catalyst's internal {@link UTF8String} to avoid
    *       conversion (deserialization) b/w {@link UTF8String} and {@link String}
    */
-  private String lastKnownPartitionPath = null;
+  private UTF8String lastKnownPartitionPath = null;
   private HoodieRowCreateHandle handle;
   private int numFilesWritten = 0;
 
@@ -133,11 +133,13 @@ public class BulkInsertDataInternalWriterHelper {
 
   public void write(InternalRow row) throws IOException {
     try {
-      String partitionPath = extractPartitionPath(row);
-      if (lastKnownPartitionPath == null || !lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) {
+      UTF8String partitionPath = extractPartitionPath(row);
+      if (lastKnownPartitionPath == null || !Objects.equals(lastKnownPartitionPath, partitionPath) || !handle.canWrite()) {
         LOG.info("Creating new file for partition path " + partitionPath);
-        handle = getRowCreateHandle(partitionPath);
-        lastKnownPartitionPath = partitionPath;
+        handle = getRowCreateHandle(partitionPath.toString());
+        // NOTE: It's crucial to make a copy here, since [[UTF8String]] could be pointing into
+        //       a mutable underlying buffer
+        lastKnownPartitionPath = partitionPath.clone();
       }
 
       handle.write(row);
@@ -162,31 +164,19 @@ public class BulkInsertDataInternalWriterHelper {
     handle = null;
   }
 
-  private String extractPartitionPath(InternalRow row) {
-    String partitionPath;
+  private UTF8String extractPartitionPath(InternalRow row) {
     if (populateMetaFields) {
       // In case meta-fields are materialized w/in the table itself, we can just simply extract
       // partition path from there
       //
       // NOTE: Helper keeps track of [[lastKnownPartitionPath]] as [[UTF8String]] to avoid
       //       conversion from Catalyst internal representation into a [[String]]
-      partitionPath = row.getString(HoodieRecord.PARTITION_PATH_META_FIELD_POS);
+      return row.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_ORD);
     } else if (keyGeneratorOpt.isPresent()) {
-      // TODO(HUDI-4039) this should be handled by the SimpleKeyGenerator itself
-      if (simpleKeyGen) {
-        String partitionPathValue = row.get(simplePartitionFieldIndex, simplePartitionFieldDataType).toString();
-        partitionPath = partitionPathValue != null ? partitionPathValue : PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
-        if (writeConfig.isHiveStylePartitioningEnabled()) {
-          partitionPath = (keyGeneratorOpt.get()).getPartitionPathFields().get(0) + "=" + partitionPath;
-        }
-      } else {
-        // only BuiltIn key generators are supported if meta fields are disabled.
-        partitionPath = keyGeneratorOpt.get().getPartitionPath(row, structType);
-      }
+      return keyGeneratorOpt.get().getPartitionPath(row, structType);
     } else {
-      partitionPath = "";
+      return UTF8String.EMPTY_UTF8;
     }
-    return partitionPath;
   }
 
   private HoodieRowCreateHandle getRowCreateHandle(String partitionPath) throws IOException {
@@ -209,7 +199,7 @@ public class BulkInsertDataInternalWriterHelper {
 
   private HoodieRowCreateHandle createHandle(String partitionPath) {
     return new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
-        instantTime, taskPartitionId, taskId, taskEpochId, structType, populateMetaFields);
+        instantTime, taskPartitionId, taskId, taskEpochId, structType);
   }
 
   private String getNextFileId() {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index 168cc6b265..dad7c17650 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -23,7 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.util.ReflectionUtils
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.index.SparkHoodieIndexFactory
-import org.apache.hudi.keygen.BuiltinKeyGenerator
+import org.apache.hudi.keygen.{BuiltinKeyGenerator, SparkKeyGeneratorInterface}
 import org.apache.hudi.table.BulkInsertPartitioner
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
@@ -63,13 +63,12 @@ object HoodieDatasetBulkInsertHelper extends Logging {
       df.queryExecution.toRdd.mapPartitions { iter =>
         val keyGenerator =
           ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps))
-            .asInstanceOf[BuiltinKeyGenerator]
+            .asInstanceOf[SparkKeyGeneratorInterface]
 
         iter.map { row =>
           val (recordKey, partitionPath) =
             if (populateMetaFields) {
-              (UTF8String.fromString(keyGenerator.getRecordKey(row, schema)),
-                UTF8String.fromString(keyGenerator.getPartitionPath(row, schema)))
+              (keyGenerator.getRecordKey(row, schema), keyGenerator.getPartitionPath(row, schema))
             } else {
               (UTF8String.EMPTY_UTF8, UTF8String.EMPTY_UTF8)
             }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
index 6719c2a3d6..caed61249a 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
@@ -28,6 +28,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.unsafe.types.UTF8String;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -83,7 +84,6 @@ public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities {
   public void testWrongRecordKeyField() {
     ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getWrongRecordKeyFieldProps());
     Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord()));
-    Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldSchemaInfoIfNeeded(KeyGeneratorTestUtilities.structType));
   }
 
   @Test
@@ -98,7 +98,7 @@ public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities {
     Assertions.assertEquals(keyGenerator.getPartitionPath(row), "timestamp=4357686/ts_ms=2020-03-21");
 
     InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
-    Assertions.assertEquals(keyGenerator.getPartitionPath(internalRow, row.schema()), "timestamp=4357686/ts_ms=2020-03-21");
+    Assertions.assertEquals(UTF8String.fromString("timestamp=4357686/ts_ms=2020-03-21"), keyGenerator.getPartitionPath(internalRow, row.schema()));
   }
 
   @Test
@@ -107,7 +107,7 @@ public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities {
     properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
     properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "timestamp");
     ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties);
-    assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 1);
+    assertEquals(compositeKeyGenerator.getRecordKeyFieldNames().size(), 1);
     assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 1);
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
@@ -119,9 +119,9 @@ public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities {
 
     Row row = KeyGeneratorTestUtilities.getRow(record, HoodieTestDataGenerator.AVRO_SCHEMA,
         AvroConversionUtils.convertAvroSchemaToStructType(HoodieTestDataGenerator.AVRO_SCHEMA));
-    Assertions.assertEquals(compositeKeyGenerator.getPartitionPath(row), partitionPath);
+    Assertions.assertEquals(partitionPath, compositeKeyGenerator.getPartitionPath(row));
     InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
-    Assertions.assertEquals(compositeKeyGenerator.getPartitionPath(internalRow, row.schema()), partitionPath);
+    Assertions.assertEquals(UTF8String.fromString(partitionPath), compositeKeyGenerator.getPartitionPath(internalRow, row.schema()));
   }
 
   @Test
@@ -130,7 +130,7 @@ public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities {
     properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key,timestamp");
     properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "rider,driver");
     ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties);
-    assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 2);
+    assertEquals(compositeKeyGenerator.getRecordKeyFieldNames().size(), 2);
     assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 2);
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
@@ -144,10 +144,10 @@ public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities {
 
     Row row = KeyGeneratorTestUtilities.getRow(record, HoodieTestDataGenerator.AVRO_SCHEMA,
         AvroConversionUtils.convertAvroSchemaToStructType(HoodieTestDataGenerator.AVRO_SCHEMA));
-    Assertions.assertEquals(compositeKeyGenerator.getPartitionPath(row), partitionPath);
+    Assertions.assertEquals(partitionPath, compositeKeyGenerator.getPartitionPath(row));
 
     InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
-    Assertions.assertEquals(compositeKeyGenerator.getPartitionPath(internalRow, row.schema()), partitionPath);
+    Assertions.assertEquals(UTF8String.fromString(partitionPath), compositeKeyGenerator.getPartitionPath(internalRow, row.schema()));
   }
 
   @Test
@@ -156,7 +156,7 @@ public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities {
     properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key,timestamp");
     properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "");
     ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties);
-    assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 2);
+    assertEquals(compositeKeyGenerator.getRecordKeyFieldNames().size(), 2);
     assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 0);
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
@@ -170,9 +170,9 @@ public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities {
 
     Row row = KeyGeneratorTestUtilities.getRow(record, HoodieTestDataGenerator.AVRO_SCHEMA,
         AvroConversionUtils.convertAvroSchemaToStructType(HoodieTestDataGenerator.AVRO_SCHEMA));
-    Assertions.assertEquals(compositeKeyGenerator.getPartitionPath(row), partitionPath);
+    Assertions.assertEquals(partitionPath, compositeKeyGenerator.getPartitionPath(row));
 
     InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
-    Assertions.assertEquals(compositeKeyGenerator.getPartitionPath(internalRow, row.schema()), partitionPath);
+    Assertions.assertEquals(UTF8String.fromString(partitionPath), compositeKeyGenerator.getPartitionPath(internalRow, row.schema()));
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
index 26a2b439ab..311356a0f7 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
@@ -28,6 +28,7 @@ import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.unsafe.types.UTF8String;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -145,13 +146,13 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
         (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
     GenericRecord record = getRecord();
     HoodieKey key = keyGenerator.getKey(record);
-    Assertions.assertEquals(key.getRecordKey(), "key1");
-    Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686");
+    Assertions.assertEquals("key1", key.getRecordKey());
+    Assertions.assertEquals("timestamp=4357686", key.getPartitionPath());
     Row row = KeyGeneratorTestUtilities.getRow(record);
-    Assertions.assertEquals(keyGenerator.getRecordKey(row), "key1");
-    Assertions.assertEquals(keyGenerator.getPartitionPath(row), "timestamp=4357686");
+    Assertions.assertEquals("key1", keyGenerator.getRecordKey(row));
+    Assertions.assertEquals("timestamp=4357686", keyGenerator.getPartitionPath(row));
     InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
-    Assertions.assertEquals(keyGenerator.getPartitionPath(internalRow, row.schema()), "timestamp=4357686");
+    Assertions.assertEquals(UTF8String.fromString("timestamp=4357686"), keyGenerator.getPartitionPath(internalRow, row.schema()));
   }
 
   @Test
@@ -170,13 +171,13 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
 
     GenericRecord record = getRecord();
     HoodieKey key = keyGenerator.getKey(record);
-    Assertions.assertEquals(key.getRecordKey(), "key1");
-    Assertions.assertEquals(key.getPartitionPath(), "ts_ms=20200321");
+    Assertions.assertEquals("key1", key.getRecordKey());
+    Assertions.assertEquals("ts_ms=20200321", key.getPartitionPath());
     Row row = KeyGeneratorTestUtilities.getRow(record);
-    Assertions.assertEquals(keyGenerator.getRecordKey(row), "key1");
-    Assertions.assertEquals(keyGenerator.getPartitionPath(row), "ts_ms=20200321");
+    Assertions.assertEquals("key1", keyGenerator.getRecordKey(row));
+    Assertions.assertEquals("ts_ms=20200321", keyGenerator.getPartitionPath(row));
     InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
-    Assertions.assertEquals(keyGenerator.getPartitionPath(internalRow, row.schema()), "ts_ms=20200321");
+    Assertions.assertEquals(UTF8String.fromString("ts_ms=20200321"), keyGenerator.getPartitionPath(internalRow, row.schema()));
   }
 
   @Test
@@ -202,7 +203,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
     Assertions.assertTrue(keyGenerator.getPartitionPath(row).isEmpty());
 
     InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
-    Assertions.assertTrue(keyGenerator.getPartitionPath(internalRow, row.schema()).isEmpty());
+    Assertions.assertEquals(0, keyGenerator.getPartitionPath(internalRow, row.schema()).numBytes());
   }
 
   @Test
@@ -345,15 +346,15 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
 
     GenericRecord record = getRecord();
     HoodieKey key = keyGenerator.getKey(record);
-    Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
-    Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686");
+    Assertions.assertEquals("_row_key:key1,pii_col:pi", key.getRecordKey());
+    Assertions.assertEquals("timestamp=4357686", key.getPartitionPath());
 
     Row row = KeyGeneratorTestUtilities.getRow(record);
-    Assertions.assertEquals(keyGenerator.getRecordKey(row), "_row_key:key1,pii_col:pi");
-    Assertions.assertEquals(keyGenerator.getPartitionPath(row), "timestamp=4357686");
+    Assertions.assertEquals("_row_key:key1,pii_col:pi", keyGenerator.getRecordKey(row));
+    Assertions.assertEquals("timestamp=4357686", keyGenerator.getPartitionPath(row));
 
     InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
-    Assertions.assertEquals(keyGenerator.getPartitionPath(internalRow, row.schema()), "timestamp=4357686");
+    Assertions.assertEquals(UTF8String.fromString("timestamp=4357686"), keyGenerator.getPartitionPath(internalRow, row.schema()));
   }
 
   @Test
@@ -372,14 +373,14 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
 
     GenericRecord record = getRecord();
     HoodieKey key = keyGenerator.getKey(record);
-    Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
-    Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686/ts_ms=20200321");
+    Assertions.assertEquals("_row_key:key1,pii_col:pi", key.getRecordKey());
+    Assertions.assertEquals("timestamp=4357686/ts_ms=20200321", key.getPartitionPath());
 
     Row row = KeyGeneratorTestUtilities.getRow(record);
-    Assertions.assertEquals(keyGenerator.getRecordKey(row), "_row_key:key1,pii_col:pi");
-    Assertions.assertEquals(keyGenerator.getPartitionPath(row), "timestamp=4357686/ts_ms=20200321");
+    Assertions.assertEquals("_row_key:key1,pii_col:pi", keyGenerator.getRecordKey(row));
+    Assertions.assertEquals("timestamp=4357686/ts_ms=20200321", keyGenerator.getPartitionPath(row));
 
     InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
-    Assertions.assertEquals(keyGenerator.getPartitionPath(internalRow, row.schema()), "timestamp=4357686/ts_ms=20200321");
+    Assertions.assertEquals(UTF8String.fromString("timestamp=4357686/ts_ms=20200321"), keyGenerator.getPartitionPath(internalRow, row.schema()));
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteRecordGenerator.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteRecordGenerator.java
index f6c4c8a8b5..1b25ce6505 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteRecordGenerator.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteRecordGenerator.java
@@ -68,7 +68,6 @@ public class TestGlobalDeleteRecordGenerator extends KeyGeneratorTestUtilities {
   public void testWrongRecordKeyField() {
     GlobalDeleteKeyGenerator keyGenerator = new GlobalDeleteKeyGenerator(getWrongRecordKeyFieldProps());
     Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord()));
-    Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldSchemaInfoIfNeeded(KeyGeneratorTestUtilities.structType));
   }
 
   @Test
@@ -78,7 +77,6 @@ public class TestGlobalDeleteRecordGenerator extends KeyGeneratorTestUtilities {
     HoodieKey key = keyGenerator.getKey(record);
     Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
     Assertions.assertEquals(key.getPartitionPath(), "");
-    keyGenerator.buildFieldSchemaInfoIfNeeded(KeyGeneratorTestUtilities.structType);
     Row row = KeyGeneratorTestUtilities.getRow(record);
     Assertions.assertEquals(keyGenerator.getRecordKey(row), "_row_key:key1,pii_col:pi");
     Assertions.assertEquals(keyGenerator.getPartitionPath(row), "");
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestNonpartitionedKeyGenerator.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestNonpartitionedKeyGenerator.java
index 75d9b7da74..fd299f179c 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestNonpartitionedKeyGenerator.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestNonpartitionedKeyGenerator.java
@@ -94,7 +94,6 @@ public class TestNonpartitionedKeyGenerator extends KeyGeneratorTestUtilities {
   public void testWrongRecordKeyField() {
     NonpartitionedKeyGenerator keyGenerator = new NonpartitionedKeyGenerator(getWrongRecordKeyFieldProps());
     Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord()));
-    Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldSchemaInfoIfNeeded(KeyGeneratorTestUtilities.structType));
   }
 
   @Test
@@ -103,7 +102,7 @@ public class TestNonpartitionedKeyGenerator extends KeyGeneratorTestUtilities {
     properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "timestamp");
     properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "");
     NonpartitionedKeyGenerator keyGenerator = new NonpartitionedKeyGenerator(properties);
-    assertEquals(keyGenerator.getRecordKeyFields().size(), 1);
+    assertEquals(keyGenerator.getRecordKeyFieldNames().size(), 1);
     assertEquals(keyGenerator.getPartitionPathFields().size(), 0);
 
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
@@ -120,7 +119,7 @@ public class TestNonpartitionedKeyGenerator extends KeyGeneratorTestUtilities {
     properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "timestamp,driver");
     properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "");
     NonpartitionedKeyGenerator keyGenerator = new NonpartitionedKeyGenerator(properties);
-    assertEquals(keyGenerator.getRecordKeyFields().size(), 2);
+    assertEquals(keyGenerator.getRecordKeyFieldNames().size(), 2);
     assertEquals(keyGenerator.getPartitionPathFields().size(), 0);
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
index 17cff3505e..32f372a2d7 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
@@ -21,11 +21,13 @@ package org.apache.hudi.keygen;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieKeyException;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.unsafe.types.UTF8String;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -34,6 +36,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities {
   private TypedProperties getCommonProps() {
@@ -88,35 +91,37 @@ public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities {
 
   @Test
   public void testNullPartitionPathFields() {
-    Assertions.assertThrows(IllegalArgumentException.class, () -> new SimpleKeyGenerator(getPropertiesWithoutPartitionPathProp()));
+    assertThrows(IllegalArgumentException.class, () -> new SimpleKeyGenerator(getPropertiesWithoutPartitionPathProp()));
   }
 
   @Test
   public void testNullRecordKeyFields() {
-    Assertions.assertThrows(IllegalArgumentException.class, () -> new SimpleKeyGenerator(getPropertiesWithoutRecordKeyProp()));
+    assertThrows(IllegalArgumentException.class, () -> new SimpleKeyGenerator(getPropertiesWithoutRecordKeyProp()));
   }
 
   @Test
   public void testWrongRecordKeyField() {
     SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(getWrongRecordKeyFieldProps());
-    Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord()));
-    Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldSchemaInfoIfNeeded(KeyGeneratorTestUtilities.structType));
+    assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord()));
   }
 
   @Test
   public void testWrongPartitionPathField() {
     SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(getWrongPartitionPathFieldProps());
     GenericRecord record = getRecord();
-    Assertions.assertEquals(keyGenerator.getPartitionPath(record), KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH);
-    Assertions.assertEquals(keyGenerator.getPartitionPath(KeyGeneratorTestUtilities.getRow(record)),
-        KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH);
+    // TODO this should throw as well
+    //assertThrows(HoodieException.class, () -> {
+    //  keyGenerator.getPartitionPath(record);
+    //});
+    assertThrows(HoodieException.class, () -> {
+      keyGenerator.getPartitionPath(KeyGeneratorTestUtilities.getRow(record));
+    });
   }
 
   @Test
   public void testComplexRecordKeyField() {
     SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(getComplexRecordKeyProp());
-    Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord()));
-    Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldSchemaInfoIfNeeded(KeyGeneratorTestUtilities.structType));
+    assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord()));
   }
 
   @Test
@@ -124,15 +129,15 @@ public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities {
     SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(getProps());
     GenericRecord record = getRecord();
     HoodieKey key = keyGenerator.getKey(getRecord());
-    Assertions.assertEquals(key.getRecordKey(), "key1");
-    Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686");
+    Assertions.assertEquals("key1", key.getRecordKey());
+    Assertions.assertEquals("timestamp=4357686", key.getPartitionPath());
 
     Row row = KeyGeneratorTestUtilities.getRow(record);
-    Assertions.assertEquals(keyGenerator.getRecordKey(row), "key1");
-    Assertions.assertEquals(keyGenerator.getPartitionPath(row), "timestamp=4357686");
+    Assertions.assertEquals("key1", keyGenerator.getRecordKey(row));
+    Assertions.assertEquals("timestamp=4357686", keyGenerator.getPartitionPath(row));
 
     InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
-    Assertions.assertEquals(keyGenerator.getPartitionPath(internalRow, row.schema()), "timestamp=4357686");
+    Assertions.assertEquals(UTF8String.fromString("timestamp=4357686"), keyGenerator.getPartitionPath(internalRow, row.schema()));
   }
 
   private static Stream<GenericRecord> nestedColTestRecords() {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
index 1fc4b9f1ef..8cfd7b0450 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
@@ -34,6 +34,7 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -138,7 +139,7 @@ public class TestTimestampBasedKeyGenerator {
     baseRow = genericRecordToRow(baseRecord);
     assertEquals("2020-01-06 12", keyGen.getPartitionPath(baseRow));
     internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow);
-    assertEquals("2020-01-06 12", keyGen.getPartitionPath(internalRow, baseRow.schema()));
+    assertEquals(UTF8String.fromString("2020-01-06 12"), keyGen.getPartitionPath(internalRow, baseRow.schema()));
 
     // timezone is GMT+8:00, createTime is BigDecimal
     BigDecimal decimal = new BigDecimal("1578283932000.0001");
@@ -186,7 +187,7 @@ public class TestTimestampBasedKeyGenerator {
     baseRow = genericRecordToRow(baseRecord);
     assertEquals("1970-01-01 08", keyGen.getPartitionPath(baseRow));
     internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow);
-    assertEquals("1970-01-01 08", keyGen.getPartitionPath(internalRow, baseRow.schema()));
+    assertEquals(UTF8String.fromString("1970-01-01 08"), keyGen.getPartitionPath(internalRow, baseRow.schema()));
 
     // timestamp is DATE_STRING, timezone is GMT, createTime is null
     baseRecord.put("createTimeString", null);
@@ -198,7 +199,7 @@ public class TestTimestampBasedKeyGenerator {
     baseRow = genericRecordToRow(baseRecord);
     assertEquals("1970-01-01 12:00:00", keyGen.getPartitionPath(baseRow));
     internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow);
-    assertEquals("1970-01-01 12:00:00", keyGen.getPartitionPath(internalRow, baseRow.schema()));
+    assertEquals(UTF8String.fromString("1970-01-01 12:00:00"), keyGen.getPartitionPath(internalRow, baseRow.schema()));
   }
 
   @Test
@@ -216,7 +217,7 @@ public class TestTimestampBasedKeyGenerator {
     baseRow = genericRecordToRow(baseRecord);
     assertEquals("2024-10-04 12", keyGen.getPartitionPath(baseRow));
     internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow);
-    assertEquals("2024-10-04 12", keyGen.getPartitionPath(internalRow, baseRow.schema()));
+    assertEquals(UTF8String.fromString("2024-10-04 12"), keyGen.getPartitionPath(internalRow, baseRow.schema()));
 
     // timezone is GMT, createTime is null
     baseRecord.put("createTime", null);
@@ -229,7 +230,7 @@ public class TestTimestampBasedKeyGenerator {
     baseRow = genericRecordToRow(baseRecord);
     assertEquals("1970-01-02 12", keyGen.getPartitionPath(baseRow));
     internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow);
-    assertEquals("1970-01-02 12", keyGen.getPartitionPath(internalRow, baseRow.schema()));
+    assertEquals(UTF8String.fromString("1970-01-02 12"), keyGen.getPartitionPath(internalRow, baseRow.schema()));
 
     // timezone is GMT. number of days store integer in mysql
     baseRecord.put("createTime", 18736L);
@@ -260,7 +261,7 @@ public class TestTimestampBasedKeyGenerator {
     baseRow = genericRecordToRow(baseRecord);
     assertEquals("2021/12/03", keyGen.getPartitionPath(baseRow));
     internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow);
-    assertEquals("2021/12/03", keyGen.getPartitionPath(internalRow, baseRow.schema()));
+    assertEquals(UTF8String.fromString("2021/12/03"), keyGen.getPartitionPath(internalRow, baseRow.schema()));
 
     // timezone is GMT, createTime is null
     baseRecord.put("createTime", null);
@@ -274,7 +275,7 @@ public class TestTimestampBasedKeyGenerator {
     baseRow = genericRecordToRow(baseRecord);
     assertEquals("1970/01/01", keyGen.getPartitionPath(baseRow));
     internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow);
-    assertEquals("1970/01/01", keyGen.getPartitionPath(internalRow, baseRow.schema()));
+    assertEquals(UTF8String.fromString("1970/01/01"), keyGen.getPartitionPath(internalRow, baseRow.schema()));
   }
 
   @Test
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/KeyGeneratorTestUtilities.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/KeyGeneratorTestUtilities.java
index c2256f40c6..e1f8f9f610 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/KeyGeneratorTestUtilities.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/KeyGeneratorTestUtilities.java
@@ -43,7 +43,7 @@ import java.util.stream.Collectors;
 public class KeyGeneratorTestUtilities {
 
   public static final String NESTED_COL_SCHEMA = "{\"type\":\"record\", \"name\":\"nested_col\",\"fields\": ["
-      + "{\"name\": \"prop1\",\"type\": \"string\"},{\"name\": \"prop2\", \"type\": \"long\"}]}";
+      + "{\"name\": \"prop1\",\"type\": [\"null\", \"string\"]},{\"name\": \"prop2\", \"type\": \"long\"}]}";
   public static final String EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ "
       + "{\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
       + "{\"name\": \"ts_ms\", \"type\": \"string\"},"
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/ScalaAssertionSupport.scala
similarity index 56%
copy from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
copy to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/ScalaAssertionSupport.scala
index bbceaf900b..c26adb4bc0 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/ScalaAssertionSupport.scala
@@ -16,22 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.keygen;
+package org.apache.hudi
 
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.Assertions.fail
 
-/**
- * Spark key generator interface.
- */
-public interface SparkKeyGeneratorInterface extends KeyGeneratorInterface {
-
-  String getRecordKey(Row row);
-
-  String getRecordKey(InternalRow row, StructType schema);
+class ScalaAssertionSupport {
 
-  String getPartitionPath(Row row);
+  def assertThrows[T <: Throwable, R](expectedExceptionClass: Class[T])(f: => R): T = {
+    try {
+      f
+    } catch {
+      case t: Throwable if expectedExceptionClass.isAssignableFrom(t.getClass) =>
+        // scalastyle:off return
+        return t.asInstanceOf[T]
+      // scalastyle:on return
+      case ot @ _ =>
+        fail(s"Expected exception of class $expectedExceptionClass, but ${ot.getClass} has been thrown")
+    }
 
-  String getPartitionPath(InternalRow internalRow, StructType structType);
+    fail(s"Expected exception of class $expectedExceptionClass, but nothing has been thrown")
+  }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
index 968b4039f8..50f7f58793 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
@@ -31,19 +31,20 @@ import org.apache.hudi.testutils.KeyGeneratorTestUtilities
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{BeforeEach, Test}
-import org.scalatest.Assertions.fail
 
 /**
  * Tests on the default key generator, payload classes.
  */
-class TestDataSourceDefaults {
+class TestDataSourceDefaults extends ScalaAssertionSupport {
 
   val schema = SchemaTestUtil.getComplexEvolvedSchema
   val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
   var baseRecord: GenericRecord = _
   var baseRow: Row = _
+  var internalRow: InternalRow = _
   val testStructName = "testStructName"
   val testNamespace = "testNamespace"
 
@@ -51,6 +52,7 @@ class TestDataSourceDefaults {
     baseRecord = SchemaTestUtil
       .generateAvroRecordFromJson(schema, 1, "001", "f1")
     baseRow = KeyGeneratorTestUtilities.getRow(baseRecord, schema, structType)
+    internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow)
   }
 
   private def getKeyConfig(recordKeyFieldName: String, partitionPathField: String, hiveStylePartitioning: String): TypedProperties = {
@@ -61,174 +63,136 @@ class TestDataSourceDefaults {
     props
   }
 
-  @Test def testSimpleKeyGenerator() = {
+  @Test def testSimpleKeyGenerator(): Unit = {
 
-    // top level, valid fields
-    var keyGen = new SimpleKeyGenerator(getKeyConfig("field1", "name", "false"))
-    val hk1 = keyGen.getKey(baseRecord)
-    assertEquals("field1", hk1.getRecordKey)
-    assertEquals("name1", hk1.getPartitionPath)
+    {
+      // Top level, valid fields
+      val keyGen = new SimpleKeyGenerator(getKeyConfig("field1", "name", "false"))
 
-    assertEquals("field1", keyGen.getRecordKey(baseRow))
-    assertEquals("name1", keyGen.getPartitionPath(baseRow))
+      val expectedKey = new HoodieKey("field1", "name1")
+      assertEquals(expectedKey, keyGen.getKey(baseRecord))
 
-    // partition path field not specified
-    try {
-      val props = new TypedProperties()
-      props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD.key, "field1")
-      new SimpleKeyGenerator(props).getKey(baseRecord)
-      fail("Should have errored out")
-    } catch {
-      case e: IllegalArgumentException =>
-      // do nothing
+      assertEquals(expectedKey.getRecordKey, keyGen.getRecordKey(baseRow))
+      assertEquals(expectedKey.getPartitionPath, keyGen.getPartitionPath(baseRow))
+      assertEquals(UTF8String.fromString(expectedKey.getRecordKey), keyGen.getRecordKey(internalRow, structType))
+      assertEquals(UTF8String.fromString(expectedKey.getPartitionPath), keyGen.getPartitionPath(internalRow, structType))
     }
 
-    // partition path field not specified using Row
-    try {
+    {
+      // Partition path field not specified
       val props = new TypedProperties()
       props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD.key, "field1")
-      val keyGen = new SimpleKeyGenerator(props)
-      keyGen.getRecordKey(baseRow)
-      fail("Should have errored out")
-    } catch {
-      case e: IllegalArgumentException =>
-      // do nothing
+
+      assertThrows(classOf[IllegalArgumentException]) {
+        new SimpleKeyGenerator(props)
+      }
     }
 
-    // recordkey field not specified
-    try {
+    {
+      // Record's key field not specified
       val props = new TypedProperties()
       props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "partitionField")
-      new SimpleKeyGenerator(props).getKey(baseRecord)
-      fail("Should have errored out")
-    } catch {
-      case e: IllegalArgumentException =>
-      // do nothing
-    }
 
-    // recordkey field not specified using Row
-    try {
-      val props = new TypedProperties()
-      props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "partitionField")
-      val keyGen = new SimpleKeyGenerator(props)
-      keyGen.getPartitionPath(baseRow)
-      fail("Should have errored out")
-    } catch {
-      case e: IllegalArgumentException =>
-      // do nothing
+      assertThrows(classOf[IllegalArgumentException]) {
+        new SimpleKeyGenerator(props)
+      }
     }
 
-    // nested field as record key and partition path
-    val hk2 = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.isAdmin", "false"))
-      .getKey(baseRecord)
-    assertEquals("UserId1@001", hk2.getRecordKey)
-    assertEquals("false", hk2.getPartitionPath)
-
-    // Nested record key not found
-    try {
-      new SimpleKeyGenerator(getKeyConfig("testNestedRecord.NotThere", "testNestedRecord.isAdmin", "false"))
-        .getKey(baseRecord)
-      fail("Should have errored out")
-    } catch {
-      case e: HoodieException =>
-      // do nothing
+    {
+      // nested field as record key and partition path
+      val keyGen = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.isAdmin", "false"))
+
+      assertEquals(new HoodieKey("UserId1@001", "false"), keyGen.getKey(baseRecord))
     }
 
-    // if partition path can't be found, return default partition path
-    val hk3 = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere", "false"))
-      .getKey(baseRecord)
-    assertEquals("default", hk3.getPartitionPath)
-
-    // if partition path can't be found, return default partition path using row
-    keyGen = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere", "false"))
-    val hk3_row = keyGen.getPartitionPath(baseRow)
-    assertEquals("default", hk3_row)
-
-    // if enable hive style partitioning
-    val hk4 = new SimpleKeyGenerator(getKeyConfig("field1", "name", "true")).getKey(baseRecord)
-    assertEquals("name=name1", hk4.getPartitionPath)
-
-    // if enable hive style partitioning using row
-    keyGen = new SimpleKeyGenerator(getKeyConfig("field1", "name", "true"))
-    val hk4_row = keyGen.getPartitionPath(baseRow)
-    assertEquals("name=name1", hk4_row)
-
-    // if partition is null, return default partition path
-    baseRecord.put("name", "")
-    val hk5 = new SimpleKeyGenerator(getKeyConfig("field1", "name", "false"))
-      .getKey(baseRecord)
-    assertEquals("default", hk5.getPartitionPath)
-
-    // if partition is null, return default partition path using Row
-    keyGen = new SimpleKeyGenerator(getKeyConfig("field1", "name", "false"))
-    baseRow = KeyGeneratorTestUtilities.getRow(baseRecord, schema, structType)
-    val hk5_row = keyGen.getPartitionPath(baseRow)
-    assertEquals("default", hk5_row)
+    {
+      // Nested record key not found
+      val keyGen = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.NotThere", "testNestedRecord.isAdmin", "false"))
 
-    // if partition is empty, return default partition path
-    baseRecord.put("name", null)
-    val hk6 = new SimpleKeyGenerator(getKeyConfig("field1", "name", "false"))
-      .getKey(baseRecord)
-    assertEquals("default", hk6.getPartitionPath)
+      assertThrows(classOf[HoodieException]) {
+        keyGen.getKey(baseRecord)
+      }
+    }
 
-    // if partition is empty, return default partition path using Row
-    keyGen = new SimpleKeyGenerator(getKeyConfig("field1", "name", "false"))
-    baseRow = KeyGeneratorTestUtilities.getRow(baseRecord, schema, structType)
-    val hk6_row = keyGen.getPartitionPath(baseRow)
-    assertEquals("default", hk6_row)
+    {
+      // Fail in case partition path can't be found in schema
+      val keyGen = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere", "false"))
+
+      // TODO this should throw
+      //assertThrows(classOf[HoodieException]) {
+      //  keyGen.getKey(baseRecord)
+      //}
+      assertThrows(classOf[HoodieException]) {
+        keyGen.getPartitionPath(baseRow)
+      }
+      assertThrows(classOf[HoodieException]) {
+        keyGen.getPartitionPath(internalRow, structType)
+      }
+    }
 
-    // if record key is empty, throw error
-    try {
-      baseRecord.put("field1", "")
-      val props = new TypedProperties()
-      props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD.key, "field1")
-      props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "name")
-      new SimpleKeyGenerator(props).getKey(baseRecord)
-      fail("Should have errored out")
-    } catch {
-      case e: HoodieKeyException =>
-      // do nothing
+    {
+      val keyGen = new SimpleKeyGenerator(getKeyConfig("field1", "name", "true"))
+
+      assertEquals("name=name1", keyGen.getKey(baseRecord).getPartitionPath)
+      assertEquals("name=name1", keyGen.getPartitionPath(baseRow))
+      assertEquals(UTF8String.fromString("name=name1"), keyGen.getPartitionPath(internalRow, structType))
     }
 
-    // if record key is empty, throw error. Using Row
-    try {
-      val props = new TypedProperties()
-      props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD.key, "field1")
-      props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "name")
-      keyGen = new SimpleKeyGenerator(props)
+    {
+      // If partition is null/empty, return default partition path
+      val keyGen = new SimpleKeyGenerator(getKeyConfig("field1", "name", "false"))
+
+      baseRecord.put("name", "")
       baseRow = KeyGeneratorTestUtilities.getRow(baseRecord, schema, structType)
-      keyGen.getRecordKey(baseRow)
-      fail("Should have errored out")
-    } catch {
-      case e: HoodieKeyException =>
-      // do nothing
-    }
+      internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow)
 
-    // if record key is null, throw error
-    try {
-      baseRecord.put("field1", null)
-      val props = new TypedProperties()
-      props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD.key, "field1")
-      props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "name")
-      new SimpleKeyGenerator(props).getKey(baseRecord)
-      fail("Should have errored out")
-    } catch {
-      case e: HoodieKeyException =>
-      // do nothing
+      assertEquals("default", keyGen.getKey(baseRecord).getPartitionPath)
+      assertEquals("default", keyGen.getPartitionPath(baseRow))
+      assertEquals(UTF8String.fromString("default"), keyGen.getPartitionPath(internalRow, structType))
+
+      baseRecord.put("name", null)
+      baseRow = KeyGeneratorTestUtilities.getRow(baseRecord, schema, structType)
+      internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow)
+
+      assertEquals("default", keyGen.getKey(baseRecord).getPartitionPath)
+      assertEquals("default", keyGen.getPartitionPath(baseRow))
+      assertEquals(UTF8String.fromString("default"), keyGen.getPartitionPath(internalRow, structType))
     }
 
-    // if record key is null, throw error. Using Row
-    try {
+    {
+      // If record key is null/empty, throw error
       val props = new TypedProperties()
       props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD.key, "field1")
       props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "name")
-      keyGen = new SimpleKeyGenerator(props)
+      val keyGen = new SimpleKeyGenerator(props)
+
+      baseRecord.put("field1", "")
       baseRow = KeyGeneratorTestUtilities.getRow(baseRecord, schema, structType)
-      keyGen.getRecordKey(baseRow)
-      fail("Should have errored out")
-    } catch {
-      case e: HoodieKeyException =>
-      // do nothing
+      internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow)
+
+      assertThrows(classOf[HoodieKeyException]) {
+        keyGen.getKey(baseRecord)
+      }
+      assertThrows(classOf[HoodieKeyException]) {
+        keyGen.getRecordKey(baseRow)
+      }
+      assertThrows(classOf[HoodieKeyException]) {
+        keyGen.getRecordKey(internalRow, structType)
+      }
+
+      baseRecord.put("field1", null)
+      baseRow = KeyGeneratorTestUtilities.getRow(baseRecord, schema, structType)
+      internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow)
+
+      assertThrows(classOf[HoodieKeyException]) {
+        keyGen.getKey(baseRecord)
+      }
+      assertThrows(classOf[HoodieKeyException]) {
+        keyGen.getRecordKey(baseRow)
+      }
+      assertThrows(classOf[HoodieKeyException]) {
+        keyGen.getRecordKey(internalRow, structType)
+      }
     }
   }
 
@@ -256,7 +220,7 @@ class TestDataSourceDefaults {
       getKey(genericRecord).getRecordKey
     }
 
-    override def getRecordKey(row: InternalRow, schema: StructType): String = null
+    override def getRecordKey(row: InternalRow, schema: StructType): UTF8String = null
 
     override def getPartitionPath(row: Row): String = {
       if (null == converterFn) converterFn = AvroConversionUtils.createConverterToAvro(row.schema, STRUCT_NAME, NAMESPACE)
@@ -264,301 +228,312 @@ class TestDataSourceDefaults {
       getKey(genericRecord).getPartitionPath
     }
 
-    override def getPartitionPath(internalRow: InternalRow, structType: StructType): String = null
+    override def getPartitionPath(internalRow: InternalRow, structType: StructType): UTF8String = null
   }
 
-  @Test def testComplexKeyGenerator() = {
-    // top level, valid fields
-    var keyGen = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name", "false"))
-    val hk1 = keyGen.getKey(baseRecord)
-    assertEquals("field1:field1,name:name1", hk1.getRecordKey)
-    assertEquals("field1/name1", hk1.getPartitionPath)
+  @Test def testComplexKeyGenerator(): Unit = {
 
-    // top level, valid fields with Row
-    assertEquals("field1:field1,name:name1", keyGen.getRecordKey(baseRow))
-    assertEquals("field1/name1", keyGen.getPartitionPath(baseRow))
+    {
+      // Top level, valid fields
+      val keyGen = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name", "false"))
+      val expectedKey = new HoodieKey("field1:field1,name:name1", "field1/name1")
 
-    // partition path field not specified
-    try {
-      val props = new TypedProperties()
-      props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD.key, "field1")
-      new ComplexKeyGenerator(props).getKey(baseRecord)
-      fail("Should have errored out")
-    } catch {
-      case e: IllegalArgumentException =>
-      // do nothing
+      assertEquals(expectedKey, keyGen.getKey(baseRecord))
+
+      assertEquals(expectedKey.getRecordKey, keyGen.getRecordKey(baseRow))
+      assertEquals(expectedKey.getPartitionPath, keyGen.getPartitionPath(baseRow))
+      assertEquals(UTF8String.fromString(expectedKey.getRecordKey), keyGen.getRecordKey(internalRow, structType))
+      assertEquals(UTF8String.fromString(expectedKey.getPartitionPath), keyGen.getPartitionPath(internalRow, structType))
     }
 
-    // partition path field not specified using Row
-    try {
+    // Partition path field not specified
+    assertThrows(classOf[IllegalArgumentException]) {
       val props = new TypedProperties()
       props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD.key, "field1")
       val keyGen = new ComplexKeyGenerator(props)
-      keyGen.getRecordKey(baseRow)
-      fail("Should have errored out")
-    } catch {
-      case e: IllegalArgumentException =>
-      // do nothing
-    }
 
-    // recordkey field not specified
-    try {
-      val props = new TypedProperties()
-      props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "partitionField")
-      new ComplexKeyGenerator(props).getKey(baseRecord)
-      fail("Should have errored out")
-    } catch {
-      case e: IllegalArgumentException =>
-      // do nothing
+      keyGen.getKey(baseRecord)
+      keyGen.getRecordKey(baseRow)
+      keyGen.getRecordKey(internalRow, structType)
     }
 
-    // recordkey field not specified
-    try {
+    // Record's key field not specified
+    assertThrows(classOf[IllegalArgumentException]) {
       val props = new TypedProperties()
       props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "partitionField")
       val keyGen = new ComplexKeyGenerator(props)
+
+      keyGen.getKey(baseRecord)
       keyGen.getPartitionPath(baseRow)
-      fail("Should have errored out")
-    } catch {
-      case e: IllegalArgumentException =>
-      // do nothing
+      keyGen.getPartitionPath(internalRow, structType)
     }
 
-    // nested field as record key and partition path
-    keyGen = new ComplexKeyGenerator(getKeyConfig("testNestedRecord.userId,testNestedRecord.isAdmin", "testNestedRecord.userId,testNestedRecord.isAdmin", "false"))
-    val hk2 = keyGen.getKey(baseRecord)
-    assertEquals("testNestedRecord.userId:UserId1@001,testNestedRecord.isAdmin:false", hk2.getRecordKey)
-    assertEquals("UserId1@001/false", hk2.getPartitionPath)
-
-    // nested field as record key and partition path
-    assertEquals("testNestedRecord.userId:UserId1@001,testNestedRecord.isAdmin:false", keyGen.getRecordKey(baseRow))
-    assertEquals("UserId1@001/false", keyGen.getPartitionPath(baseRow))
-
-    // Nested record key not found
-    try {
-      new ComplexKeyGenerator(getKeyConfig("testNestedRecord.NotThere", "testNestedRecord.isAdmin", "false"))
-        .getKey(baseRecord)
-      fail("Should have errored out")
-    } catch {
-      case e: HoodieException =>
-      // do nothing
+    {
+      // Nested field as record key and partition path
+      val keyGen = new ComplexKeyGenerator(getKeyConfig("testNestedRecord.userId,testNestedRecord.isAdmin", "testNestedRecord.userId,testNestedRecord.isAdmin", "false"))
+
+      val expectedKey = new HoodieKey("testNestedRecord.userId:UserId1@001,testNestedRecord.isAdmin:false", "UserId1@001/false")
+
+      assertEquals(expectedKey, keyGen.getKey(baseRecord))
+
+      assertEquals(expectedKey.getRecordKey, keyGen.getRecordKey(baseRow))
+      assertEquals(expectedKey.getPartitionPath, keyGen.getPartitionPath(baseRow))
+      assertEquals(UTF8String.fromString(expectedKey.getRecordKey), keyGen.getRecordKey(internalRow, structType))
+      assertEquals(UTF8String.fromString(expectedKey.getPartitionPath), keyGen.getPartitionPath(internalRow, structType))
     }
 
-    // Nested record key not found
-    try {
+    {
+      // Nested record key not found
       val keyGen = new ComplexKeyGenerator(getKeyConfig("testNestedRecord.NotThere", "testNestedRecord.isAdmin", "false"))
-      keyGen.getRecordKey(baseRow)
-      fail("Should have errored out")
-    } catch {
-      case e: HoodieException =>
-      // do nothing
+
+      assertThrows(classOf[HoodieException]) {
+        keyGen.getKey(baseRecord)
+      }
+      assertThrows(classOf[HoodieException]) {
+        keyGen.getRecordKey(baseRow)
+      }
+      assertThrows(classOf[HoodieException]) {
+        keyGen.getRecordKey(internalRow, structType)
+      }
     }
 
-    // if partition path can't be found, return default partition path
-    keyGen = new ComplexKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere", "false"))
-    val hk3 = keyGen.getKey(baseRecord)
-    assertEquals("default", hk3.getPartitionPath)
+    {
+      // If partition path can't be found, return default partition path
+      val keyGen = new ComplexKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere", "false"))
+
+      // TODO this should throw
+      //assertThrows(classOf[HoodieException]) {
+      //  keyGen.getKey(baseRecord)
+      //}
+      assertThrows(classOf[HoodieException]) {
+        keyGen.getPartitionPath(baseRow)
+      }
+      assertThrows(classOf[HoodieException]) {
+        keyGen.getPartitionPath(internalRow, structType)
+      }
+    }
 
-    assertEquals("default", keyGen.getPartitionPath(baseRow))
+    {
+      // If enable hive style partitioning
+      val keyGen = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name", "true"))
 
-    // if enable hive style partitioning
-    keyGen = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name", "true"))
-    val hk4 = keyGen.getKey(baseRecord)
-    assertEquals("field1:field1,name:name1", hk4.getRecordKey)
-    assertEquals("field1=field1/name=name1", hk4.getPartitionPath)
+      val expectedKey = new HoodieKey("field1:field1,name:name1", "field1=field1/name=name1")
 
-    assertEquals("field1:field1,name:name1", keyGen.getRecordKey(baseRow))
-    assertEquals("field1=field1/name=name1", keyGen.getPartitionPath(baseRow))
+      assertEquals(expectedKey, keyGen.getKey(baseRecord))
 
-    // if one part of the record key is empty, replace with "__empty__"
-    baseRecord.put("name", "")
-    keyGen = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name", "false"))
-    val hk5 = keyGen.getKey(baseRecord)
-    assertEquals("field1:field1,name:__empty__", hk5.getRecordKey)
-    assertEquals("field1/default", hk5.getPartitionPath)
+      assertEquals(expectedKey.getRecordKey, keyGen.getRecordKey(baseRow))
+      assertEquals(expectedKey.getPartitionPath, keyGen.getPartitionPath(baseRow))
+      assertEquals(UTF8String.fromString(expectedKey.getRecordKey), keyGen.getRecordKey(internalRow, structType))
+      assertEquals(UTF8String.fromString(expectedKey.getPartitionPath), keyGen.getPartitionPath(internalRow, structType))
+    }
 
-    baseRow = KeyGeneratorTestUtilities.getRow(baseRecord, schema, structType)
-    assertEquals("field1:field1,name:__empty__", keyGen.getRecordKey(baseRow))
-    assertEquals("field1/default", keyGen.getPartitionPath(baseRow))
+    {
+      // If one part of the record key is empty, replace with "__empty__"
+      val keyGen = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name", "false"))
 
-    // if one part of the record key is null, replace with "__null__"
-    baseRecord.put("name", null)
-    keyGen = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name", "false"))
-    val hk6 = keyGen.getKey(baseRecord)
-    assertEquals("field1:field1,name:__null__", hk6.getRecordKey)
-    assertEquals("field1/default", hk6.getPartitionPath)
+      baseRecord.put("name", "")
+      baseRow = KeyGeneratorTestUtilities.getRow(baseRecord, schema, structType)
+      internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow)
 
-    baseRow = KeyGeneratorTestUtilities.getRow(baseRecord, schema, structType)
-    assertEquals("field1:field1,name:__null__", keyGen.getRecordKey(baseRow))
-    assertEquals("field1/default", keyGen.getPartitionPath(baseRow))
+      val expectedKey = new HoodieKey("field1:field1,name:__empty__", "field1/default")
 
-    // if all parts of the composite record key are null/empty, throw error
-    try {
-      baseRecord.put("name", "")
-      baseRecord.put("field1", null)
+      assertEquals(expectedKey, keyGen.getKey(baseRecord))
+
+      assertEquals(expectedKey.getRecordKey, keyGen.getRecordKey(baseRow))
+      assertEquals(expectedKey.getPartitionPath, keyGen.getPartitionPath(baseRow))
+      assertEquals(UTF8String.fromString(expectedKey.getRecordKey), keyGen.getRecordKey(internalRow, structType))
+      assertEquals(UTF8String.fromString(expectedKey.getPartitionPath), keyGen.getPartitionPath(internalRow, structType))
+    }
+
+    {
+      // If one part of the record key is null, replace with "__null__"
+      val keyGen = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name", "false"))
+
+      baseRecord.put("name", null)
+      baseRow = KeyGeneratorTestUtilities.getRow(baseRecord, schema, structType)
+      internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow)
+
+      val expectedKey = new HoodieKey("field1:field1,name:__null__", "field1/default")
+
+      assertEquals(expectedKey, keyGen.getKey(baseRecord))
+
+      assertEquals(expectedKey.getRecordKey, keyGen.getRecordKey(baseRow))
+      assertEquals(expectedKey.getPartitionPath, keyGen.getPartitionPath(baseRow))
+      assertEquals(UTF8String.fromString(expectedKey.getRecordKey), keyGen.getRecordKey(internalRow, structType))
+      assertEquals(UTF8String.fromString(expectedKey.getPartitionPath), keyGen.getPartitionPath(internalRow, structType))
+    }
+
+    {
+      // If all parts of the composite record key are null/empty, throw error
       val props = new TypedProperties()
       props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD.key, "field1,name")
       props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "field1,name")
-      new ComplexKeyGenerator(props).getKey(baseRecord)
-      fail("Should have errored out")
-    } catch {
-      case e: HoodieKeyException =>
-      // do nothing
-    }
+      val keyGen = new ComplexKeyGenerator(props)
 
-    // if all parts of the composite record key are null/empty, throw error
-    try {
       baseRecord.put("name", "")
       baseRecord.put("field1", null)
-      val props = new TypedProperties()
-      props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD.key, "field1,name")
-      props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "field1,name")
-      keyGen = new ComplexKeyGenerator(props)
+
       baseRow = KeyGeneratorTestUtilities.getRow(baseRecord, schema, structType)
-      keyGen.getRecordKey(baseRow)
-      fail("Should have errored out")
-    } catch {
-      case e: HoodieKeyException =>
-      // do nothing
+      internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow)
+
+      assertThrows(classOf[HoodieKeyException]) {
+        keyGen.getKey(baseRecord)
+      }
+      assertThrows(classOf[HoodieKeyException]) {
+        keyGen.getRecordKey(baseRow)
+      }
+      assertThrows(classOf[HoodieKeyException]) {
+        keyGen.getRecordKey(internalRow, structType)
+      }
     }
 
-    // reset name and field1 values.
-    baseRecord.put("name", "name1")
-    baseRecord.put("field1", "field1")
-    keyGen = new ComplexKeyGenerator(getKeyConfig("field1, name", "field1, name", "false"))
-    val hk7 = keyGen.getKey(baseRecord)
-    assertEquals("field1:field1,name:name1", hk7.getRecordKey)
-    assertEquals("field1/name1", hk7.getPartitionPath)
+    {
+      // Reset name and field1 values.
+      val keyGen = new ComplexKeyGenerator(getKeyConfig("field1, name", "field1, name", "false"))
 
-    baseRow = KeyGeneratorTestUtilities.getRow(baseRecord, schema, structType)
-    assertEquals("field1:field1,name:name1", keyGen.getRecordKey(baseRow))
-    assertEquals("field1/name1", keyGen.getPartitionPath(baseRow))
+      baseRecord.put("name", "name1")
+      baseRecord.put("field1", "field1")
 
-    keyGen = new ComplexKeyGenerator(getKeyConfig("field1,", "field1,", "false"))
-    val hk8 = keyGen.getKey(baseRecord)
-    assertEquals("field1:field1", hk8.getRecordKey)
-    assertEquals("field1", hk8.getPartitionPath)
+      baseRow = KeyGeneratorTestUtilities.getRow(baseRecord, schema, structType)
+      internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow)
 
-    assertEquals("field1:field1", keyGen.getRecordKey(baseRow))
-    assertEquals("field1", keyGen.getPartitionPath(baseRow))
-  }
+      val expectedKey = new HoodieKey("field1:field1,name:name1", "field1/name1")
 
-  @Test def testGlobalDeleteKeyGenerator() = {
-    // top level, partition value included but not actually used
-    var keyGen = new GlobalDeleteKeyGenerator(getKeyConfig("field1,name", "field1,name", "false"))
-    val hk1 = keyGen.getKey(baseRecord)
-    assertEquals("field1:field1,name:name1", hk1.getRecordKey)
-    assertEquals("", hk1.getPartitionPath)
+      assertEquals(expectedKey, keyGen.getKey(baseRecord))
 
-    assertEquals("field1:field1,name:name1", keyGen.getRecordKey(baseRow))
-    assertEquals("", keyGen.getPartitionPath(baseRow))
+      assertEquals(expectedKey.getRecordKey, keyGen.getRecordKey(baseRow))
+      assertEquals(expectedKey.getPartitionPath, keyGen.getPartitionPath(baseRow))
+      assertEquals(UTF8String.fromString(expectedKey.getRecordKey), keyGen.getRecordKey(internalRow, structType))
+      assertEquals(UTF8String.fromString(expectedKey.getPartitionPath), keyGen.getPartitionPath(internalRow, structType))
+    }
 
-    // top level, partition value not included
-    val props = new TypedProperties()
-    props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD.key, "field1,name")
-    keyGen = new GlobalDeleteKeyGenerator(props)
-    val hk2 = keyGen.getKey(baseRecord)
-    assertEquals("field1:field1,name:name1", hk2.getRecordKey)
-    assertEquals("", hk2.getPartitionPath)
-
-    assertEquals("field1:field1,name:name1", keyGen.getRecordKey(baseRow))
-    assertEquals("", keyGen.getPartitionPath(baseRow))
-
-    // if one part of the record key is empty, replace with "__empty__"
-    baseRecord.put("name", "")
-    keyGen = new GlobalDeleteKeyGenerator(getKeyConfig("field1,name", "field1,name", "false"))
-    val hk3 = keyGen.getKey(baseRecord)
-    assertEquals("field1:field1,name:__empty__", hk3.getRecordKey)
-    assertEquals("", hk3.getPartitionPath)
+    {
+      val keyGen = new ComplexKeyGenerator(getKeyConfig("field1,", "field1,", "false"))
 
-    baseRow = KeyGeneratorTestUtilities.getRow(baseRecord, schema, structType)
-    assertEquals("field1:field1,name:__empty__", keyGen.getRecordKey(baseRow))
-    assertEquals("", keyGen.getPartitionPath(baseRow))
+      val expectedKey = new HoodieKey("field1:field1", "field1")
 
-    // if one part of the record key is null, replace with "__null__"
-    baseRecord.put("name", null)
-    keyGen = new GlobalDeleteKeyGenerator(getKeyConfig("field1,name", "field1,name", "false"))
-    val hk4 = keyGen.getKey(baseRecord)
-    assertEquals("field1:field1,name:__null__", hk4.getRecordKey)
-    assertEquals("", hk4.getPartitionPath)
+      assertEquals(expectedKey, keyGen.getKey(baseRecord))
 
-    baseRow = KeyGeneratorTestUtilities.getRow(baseRecord, schema, structType)
-    assertEquals("field1:field1,name:__null__", keyGen.getRecordKey(baseRow))
-    assertEquals("", keyGen.getPartitionPath(baseRow))
+      assertEquals(expectedKey.getRecordKey, keyGen.getRecordKey(baseRow))
+      assertEquals(expectedKey.getPartitionPath, keyGen.getPartitionPath(baseRow))
+      assertEquals(UTF8String.fromString(expectedKey.getRecordKey), keyGen.getRecordKey(internalRow, structType))
+      assertEquals(UTF8String.fromString(expectedKey.getPartitionPath), keyGen.getPartitionPath(internalRow, structType))
+    }
+  }
 
-    // recordkey field not specified
-    try {
-      val props = new TypedProperties()
-      props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "partitionField")
-      new GlobalDeleteKeyGenerator(props).getKey(baseRecord)
-      fail("Should have errored out")
-    } catch {
-      case e: IllegalArgumentException =>
-      // do nothing
+  @Test def testGlobalDeleteKeyGenerator(): Unit = {
+    {
+      // Top level, partition value included but not actually used
+      val keyGen = new GlobalDeleteKeyGenerator(getKeyConfig("field1,name", "field1,name", "false"))
+
+      val expectedKey = new HoodieKey("field1:field1,name:name1", "")
+
+      assertEquals(expectedKey, keyGen.getKey(baseRecord))
+
+      assertEquals(expectedKey.getRecordKey, keyGen.getRecordKey(baseRow))
+      assertEquals(expectedKey.getPartitionPath, keyGen.getPartitionPath(baseRow))
+      assertEquals(UTF8String.fromString(expectedKey.getRecordKey), keyGen.getRecordKey(internalRow, structType))
+      assertEquals(UTF8String.fromString(expectedKey.getPartitionPath), keyGen.getPartitionPath(internalRow, structType))
     }
 
-    // recordkey field not specified
-    try {
+    {
+      // top level, partition value not included
       val props = new TypedProperties()
-      props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "partitionField")
+      props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD.key, "field1,name")
+
       val keyGen = new GlobalDeleteKeyGenerator(props)
-      keyGen.getRecordKey(baseRow)
-      fail("Should have errored out")
-    } catch {
-      case e: IllegalArgumentException =>
-      // do nothing
+
+      val expectedKey = new HoodieKey("field1:field1,name:name1", "")
+
+      assertEquals(expectedKey, keyGen.getKey(baseRecord))
+
+      assertEquals(expectedKey.getRecordKey, keyGen.getRecordKey(baseRow))
+      assertEquals(expectedKey.getPartitionPath, keyGen.getPartitionPath(baseRow))
+      assertEquals(UTF8String.fromString(expectedKey.getRecordKey), keyGen.getRecordKey(internalRow, structType))
+      assertEquals(UTF8String.fromString(expectedKey.getPartitionPath), keyGen.getPartitionPath(internalRow, structType))
+    }
+
+    {
+      // If one part of the record key is empty, replace with "__empty__"
+      baseRecord.put("name", "")
+      baseRow = KeyGeneratorTestUtilities.getRow(baseRecord, schema, structType)
+      internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow)
+
+      val keyGen = new GlobalDeleteKeyGenerator(getKeyConfig("field1,name", "field1,name", "false"))
+
+      val expectedKey = new HoodieKey("field1:field1,name:__empty__", "")
+
+      assertEquals(expectedKey.getRecordKey, keyGen.getRecordKey(baseRow))
+      assertEquals(expectedKey.getPartitionPath, keyGen.getPartitionPath(baseRow))
+      assertEquals(UTF8String.fromString(expectedKey.getRecordKey), keyGen.getRecordKey(internalRow, structType))
+      assertEquals(UTF8String.fromString(expectedKey.getPartitionPath), keyGen.getPartitionPath(internalRow, structType))
+    }
+
+    {
+      // If one part of the record key is null, replace with "__null__"
+      baseRecord.put("name", null)
+      baseRow = KeyGeneratorTestUtilities.getRow(baseRecord, schema, structType)
+      internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow)
+
+      val keyGen = new GlobalDeleteKeyGenerator(getKeyConfig("field1,name", "field1,name", "false"))
+
+      val expectedKey = new HoodieKey("field1:field1,name:__null__", "")
+
+      assertEquals(expectedKey.getRecordKey, keyGen.getRecordKey(baseRow))
+      assertEquals(expectedKey.getPartitionPath, keyGen.getPartitionPath(baseRow))
+      assertEquals(UTF8String.fromString(expectedKey.getRecordKey), keyGen.getRecordKey(internalRow, structType))
+      assertEquals(UTF8String.fromString(expectedKey.getPartitionPath), keyGen.getPartitionPath(internalRow, structType))
     }
 
-    // Nested record key not found
-    try {
-      new GlobalDeleteKeyGenerator(getKeyConfig("testNestedRecord.NotThere", "testNestedRecord.isAdmin", "false"))
-        .getKey(baseRecord)
-      fail("Should have errored out")
-    } catch {
-      case e: HoodieException =>
-      // do nothing
+    {
+      // Record's key field not specified
+      val props = new TypedProperties()
+      props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "partitionField")
+
+      assertThrows(classOf[IllegalArgumentException]) {
+        new GlobalDeleteKeyGenerator(props)
+      }
     }
 
-    // Nested record key not found
-    try {
+    {
+      // Nested record key not found
       val keyGen = new GlobalDeleteKeyGenerator(getKeyConfig("testNestedRecord.NotThere", "testNestedRecord.isAdmin", "false"))
-      keyGen.getRecordKey(baseRow)
-      fail("Should have errored out")
-    } catch {
-      case e: HoodieException =>
-      // do nothing
+
+      assertThrows(classOf[HoodieException]) {
+        keyGen.getKey(baseRecord)
+      }
+      assertThrows(classOf[HoodieException]) {
+        keyGen.getRecordKey(baseRow)
+      }
+      assertThrows(classOf[HoodieException]) {
+        keyGen.getRecordKey(internalRow, structType)
+      }
     }
 
-    // if all parts of the composite record key are null/empty, throw error
-    try {
-      baseRecord.put("name", "")
-      baseRecord.put("field1", null)
+    {
+      // If all parts of the composite record key are null/empty, throw error
       val props = new TypedProperties()
       props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD.key, "field1,name")
-      new GlobalDeleteKeyGenerator(props).getKey(baseRecord)
-      fail("Should have errored out")
-    } catch {
-      case e: HoodieKeyException =>
-      // do nothing
-    }
+      val keyGen = new GlobalDeleteKeyGenerator(props)
 
-    // if all parts of the composite record key are null/empty, throw error
-    try {
       baseRecord.put("name", "")
       baseRecord.put("field1", null)
       baseRow = KeyGeneratorTestUtilities.getRow(baseRecord, schema, structType)
-      val props = new TypedProperties()
-      props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD.key, "field1,name")
-      val keyGen = new GlobalDeleteKeyGenerator(props)
-      keyGen.getRecordKey(baseRow)
-      fail("Should have errored out")
-    } catch {
-      case e: HoodieKeyException =>
-      // do nothing
+      internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow)
+
+      assertThrows(classOf[HoodieKeyException]) {
+        keyGen.getKey(baseRecord)
+      }
+      assertThrows(classOf[HoodieKeyException]) {
+        keyGen.getRecordKey(baseRow)
+      }
+      assertThrows(classOf[HoodieKeyException]) {
+        keyGen.getRecordKey(internalRow, structType)
+      }
     }
   }
 
-  @Test def testOverwriteWithLatestAvroPayload() = {
+  @Test def testOverwriteWithLatestAvroPayload(): Unit = {
     val overWritePayload1 = new OverwriteWithLatestAvroPayload(baseRecord, 1)
     val laterRecord = SchemaTestUtil
       .generateAvroRecordFromJson(schema, 2, "001", "f1")
@@ -575,7 +550,7 @@ class TestDataSourceDefaults {
     assertEquals("field2", combinedGR21.get("field1").toString)
   }
 
-  @Test def testOverwriteWithLatestAvroPayloadCombineAndGetUpdateValue() = {
+  @Test def testOverwriteWithLatestAvroPayloadCombineAndGetUpdateValue(): Unit = {
     val baseOrderingVal: Object = baseRecord.get("favoriteIntNumber")
     val fieldSchema: Schema = baseRecord.getSchema().getField("favoriteIntNumber").schema()
     val props = new TypedProperties()
@@ -594,7 +569,7 @@ class TestDataSourceDefaults {
     assertEquals("field2", precombinedGR.get("field1").toString)
   }
 
-  @Test def testDefaultHoodieRecordPayloadCombineAndGetUpdateValue() = {
+  @Test def testDefaultHoodieRecordPayloadCombineAndGetUpdateValue(): Unit = {
     val fieldSchema: Schema = baseRecord.getSchema().getField("favoriteIntNumber").schema()
     val props = HoodiePayloadConfig.newBuilder()
       .withPayloadOrderingField("favoriteIntNumber").build().getProps;
@@ -630,7 +605,7 @@ class TestDataSourceDefaults {
     assertEquals(laterOrderingVal, laterWithEarlierGR.get("favoriteIntNumber"))
   }
 
-  @Test def testEmptyHoodieRecordPayload() = {
+  @Test def testEmptyHoodieRecordPayload(): Unit = {
     val emptyPayload1 = new EmptyHoodieRecordPayload(baseRecord, 1)
     val laterRecord = SchemaTestUtil
       .generateAvroRecordFromJson(schema, 2, "001", "f1")