You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/07/21 13:20:55 UTC

[hudi] branch master updated: [HUDI-3993] Replacing UDF in Bulk Insert w/ RDD transformation (#5470)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a33bdd32e3 [HUDI-3993] Replacing UDF in Bulk Insert w/ RDD transformation (#5470)
a33bdd32e3 is described below

commit a33bdd32e37a0a7859256f9db32d2bef0a44c1fa
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Thu Jul 21 06:20:47 2022 -0700

    [HUDI-3993] Replacing UDF in Bulk Insert w/ RDD transformation (#5470)
---
 .../hudi/client/HoodieInternalWriteStatus.java     |   4 +
 .../keygen/NonpartitionedAvroKeyGenerator.java     |   2 +-
 .../hudi/client/model/HoodieInternalRow.java       | 251 +++++++++++----------
 .../storage/row/HoodieInternalRowFileWriter.java   |   3 +-
 .../row/HoodieInternalRowParquetWriter.java        |   3 +-
 .../hudi/io/storage/row/HoodieRowCreateHandle.java | 123 +++++++---
 .../HoodieRowCreateHandleWithoutMetaFields.java    |  64 ------
 .../storage/row/HoodieRowParquetWriteSupport.java  |  34 +--
 .../apache/hudi/keygen/BuiltinKeyGenerator.java    |  26 ++-
 .../apache/hudi/keygen/ComplexKeyGenerator.java    |   6 +
 .../hudi/keygen/GlobalDeleteKeyGenerator.java      |   6 +
 .../org/apache/hudi/keygen/RowKeyGenUtils.java     |  59 +++++
 .../apache/hudi/keygen/RowKeyGeneratorHelper.java  |  69 +++++-
 .../hudi/keygen/SparkKeyGeneratorInterface.java    |   2 +
 .../java/org/apache/hudi/util/DataTypeUtils.java   |  95 --------
 .../apache/spark/sql/HoodieUnsafeRowUtils.scala    | 120 ++++++++++
 .../hudi/client/model/TestHoodieInternalRow.java   |  35 ++-
 .../io/storage/row/TestHoodieRowCreateHandle.java  |  41 ++--
 .../spark/sql/TestHoodieUnsafeRowUtils.scala       | 166 ++++++++++++++
 .../src/main/java/org/apache/hudi/TypeUtils.java   |  35 ---
 .../org/apache/hudi/common/bloom/BloomFilter.java  |   9 +-
 .../bloom/HoodieDynamicBoundedBloomFilter.java     |   7 +-
 .../hudi/common/bloom/SimpleBloomFilter.java       |   9 +-
 .../java/org/apache/hudi/common/util/Either.java   |   2 +-
 .../org/apache/hudi/common/util/HoodieTimer.java   |  12 +-
 .../org/apache/hudi/common/util/TypeUtils.java     |  12 +
 .../org/apache/hudi/keygen/BaseKeyGenerator.java   |   1 +
 .../hudi/metadata/HoodieMetadataPayload.java       |   2 +-
 .../common/testutils/HoodieTestDataGenerator.java  |   2 +-
 .../utils/HoodieRealtimeInputFormatUtils.java      |   2 +-
 .../main/java/org/apache/hudi/DataSourceUtils.java |   9 +-
 .../apache/hudi/HoodieDatasetBulkInsertHelper.java | 189 ----------------
 .../java/org/apache/hudi/SparkRowWriteHelper.java  |  72 ------
 .../BulkInsertDataInternalWriterHelper.java        | 120 ++++++----
 .../hudi/HoodieDatasetBulkInsertHelper.scala       | 158 +++++++++++++
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  13 +-
 .../TestHoodieDatasetBulkInsertHelper.java         |  98 ++++----
 .../apache/hudi/testutils/DataSourceTestUtils.java |  14 +-
 .../org/apache/hudi/TestDataSourceDefaults.scala   |   2 +
 .../hudi/TestGenericRecordAndRowConsistency.scala  |   3 +-
 hudi-spark-datasource/hudi-spark3/pom.xml          |  56 +++--
 41 files changed, 1123 insertions(+), 813 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatus.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatus.java
index b1675dfd88..815ef4892e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatus.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatus.java
@@ -55,6 +55,10 @@ public class HoodieInternalWriteStatus implements Serializable {
     this.random = new Random(RANDOM_SEED);
   }
 
+  public boolean isTrackingSuccessfulWrites() {
+    return trackSuccessRecords;
+  }
+
   public void markSuccess(String recordKey) {
     if (trackSuccessRecords) {
       this.successRecordKeys.add(recordKey);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
index 3f6aaadf6e..db7596993d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
@@ -56,7 +56,7 @@ public class NonpartitionedAvroKeyGenerator extends BaseKeyGenerator {
     // for backward compatibility, we need to use the right format according to the number of record key fields
     // 1. if there is only one record key field, the format of record key is just "<value>"
     // 2. if there are multiple record key fields, the format is "<field1>:<value1>,<field2>:<value2>,..."
-    if (getRecordKeyFieldNames().size() == 1) {
+    if (getRecordKeyFields().size() == 1) {
       return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0), isConsistentLogicalTimestampEnabled());
     }
     return KeyGenUtils.getRecordKey(record, getRecordKeyFields(), isConsistentLogicalTimestampEnabled());
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java
index 5fcd1dcbfb..c2f86bd6b8 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java
@@ -24,31 +24,66 @@ import org.apache.spark.sql.catalyst.util.ArrayData;
 import org.apache.spark.sql.catalyst.util.MapData;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.StringType$;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
+import java.util.Arrays;
+
 /**
- * Internal Row implementation for Hoodie Row. It wraps an {@link InternalRow} and keeps meta columns locally. But the {@link InternalRow}
- * does include the meta columns as well just that {@link HoodieInternalRow} will intercept queries for meta columns and serve from its
- * copy rather than fetching from {@link InternalRow}.
+ * Hudi internal implementation of the {@link InternalRow} allowing to extend arbitrary
+ * {@link InternalRow} overlaying Hudi-internal meta-fields on top of it.
+ *
+ * Capable of overlaying meta-fields in both cases: whether original {@link #row} contains
+ * meta columns or not. This allows to handle following use-cases allowing to avoid any
+ * manipulation (reshuffling) of the source row, by simply creating new instance
+ * of {@link HoodieInternalRow} with all the meta-values provided
+ *
+ * <ul>
+ *   <li>When meta-fields need to be prepended to the source {@link InternalRow}</li>
+ *   <li>When meta-fields need to be updated w/in the source {@link InternalRow}
+ *   ({@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} currently does not
+ *   allow in-place updates due to its memory layout)</li>
+ * </ul>
  */
 public class HoodieInternalRow extends InternalRow {
 
-  private String commitTime;
-  private String commitSeqNumber;
-  private String recordKey;
-  private String partitionPath;
-  private String fileName;
-  private InternalRow row;
-
-  public HoodieInternalRow(String commitTime, String commitSeqNumber, String recordKey, String partitionPath,
-      String fileName, InternalRow row) {
-    this.commitTime = commitTime;
-    this.commitSeqNumber = commitSeqNumber;
-    this.recordKey = recordKey;
-    this.partitionPath = partitionPath;
-    this.fileName = fileName;
+  /**
+   * Collection of meta-fields as defined by {@link HoodieRecord#HOODIE_META_COLUMNS}
+   */
+  private final UTF8String[] metaFields;
+  private final InternalRow row;
+
+  /**
+   * Specifies whether source {@link #row} contains meta-fields
+   */
+  private final boolean containsMetaFields;
+
+  public HoodieInternalRow(UTF8String commitTime,
+                           UTF8String commitSeqNumber,
+                           UTF8String recordKey,
+                           UTF8String partitionPath,
+                           UTF8String fileName,
+                           InternalRow row,
+                           boolean containsMetaFields) {
+    this.metaFields = new UTF8String[] {
+        commitTime,
+        commitSeqNumber,
+        recordKey,
+        partitionPath,
+        fileName
+    };
+
     this.row = row;
+    this.containsMetaFields = containsMetaFields;
+  }
+
+  private HoodieInternalRow(UTF8String[] metaFields,
+                           InternalRow row,
+                           boolean containsMetaFields) {
+    this.metaFields = metaFields;
+    this.row = row;
+    this.containsMetaFields = containsMetaFields;
   }
 
   @Override
@@ -57,187 +92,153 @@ public class HoodieInternalRow extends InternalRow {
   }
 
   @Override
-  public void setNullAt(int i) {
-    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      switch (i) {
-        case 0: {
-          this.commitTime = null;
-          break;
-        }
-        case 1: {
-          this.commitSeqNumber = null;
-          break;
-        }
-        case 2: {
-          this.recordKey = null;
-          break;
-        }
-        case 3: {
-          this.partitionPath = null;
-          break;
-        }
-        case 4: {
-          this.fileName = null;
-          break;
-        }
-        default: throw new IllegalArgumentException("Not expected");
-      }
+  public void setNullAt(int ordinal) {
+    if (ordinal < metaFields.length) {
+      metaFields[ordinal] = null;
     } else {
-      row.setNullAt(i);
+      row.setNullAt(rebaseOrdinal(ordinal));
     }
   }
 
   @Override
-  public void update(int i, Object value) {
-    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      switch (i) {
-        case 0: {
-          this.commitTime = value.toString();
-          break;
-        }
-        case 1: {
-          this.commitSeqNumber = value.toString();
-          break;
-        }
-        case 2: {
-          this.recordKey = value.toString();
-          break;
-        }
-        case 3: {
-          this.partitionPath = value.toString();
-          break;
-        }
-        case 4: {
-          this.fileName = value.toString();
-          break;
-        }
-        default: throw new IllegalArgumentException("Not expected");
+  public void update(int ordinal, Object value) {
+    if (ordinal < metaFields.length) {
+      if (value instanceof UTF8String) {
+        metaFields[ordinal] = (UTF8String) value;
+      } else if (value instanceof String) {
+        metaFields[ordinal] = UTF8String.fromString((String) value);
+      } else {
+        throw new IllegalArgumentException(
+            String.format("Could not update the row at (%d) with value of type (%s), either UTF8String or String are expected", ordinal, value.getClass().getSimpleName()));
       }
     } else {
-      row.update(i, value);
+      row.update(rebaseOrdinal(ordinal), value);
     }
   }
 
-  private String getMetaColumnVal(int ordinal) {
-    switch (ordinal) {
-      case 0: {
-        return commitTime;
-      }
-      case 1: {
-        return commitSeqNumber;
-      }
-      case 2: {
-        return recordKey;
-      }
-      case 3: {
-        return partitionPath;
-      }
-      case 4: {
-        return fileName;
-      }
-      default: throw new IllegalArgumentException("Not expected");
+  @Override
+  public boolean isNullAt(int ordinal) {
+    if (ordinal < metaFields.length) {
+      return metaFields[ordinal] == null;
     }
+    return row.isNullAt(rebaseOrdinal(ordinal));
   }
 
   @Override
-  public boolean isNullAt(int ordinal) {
+  public UTF8String getUTF8String(int ordinal) {
+    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
+      return metaFields[ordinal];
+    }
+    return row.getUTF8String(rebaseOrdinal(ordinal));
+  }
+
+  @Override
+  public Object get(int ordinal, DataType dataType) {
     if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      return null == getMetaColumnVal(ordinal);
+      validateMetaFieldDataType(dataType);
+      return metaFields[ordinal];
     }
-    return row.isNullAt(ordinal);
+    return row.get(rebaseOrdinal(ordinal), dataType);
   }
 
   @Override
   public boolean getBoolean(int ordinal) {
-    return row.getBoolean(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Boolean.class);
+    return row.getBoolean(rebaseOrdinal(ordinal));
   }
 
   @Override
   public byte getByte(int ordinal) {
-    return row.getByte(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Byte.class);
+    return row.getByte(rebaseOrdinal(ordinal));
   }
 
   @Override
   public short getShort(int ordinal) {
-    return row.getShort(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Short.class);
+    return row.getShort(rebaseOrdinal(ordinal));
   }
 
   @Override
   public int getInt(int ordinal) {
-    return row.getInt(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Integer.class);
+    return row.getInt(rebaseOrdinal(ordinal));
   }
 
   @Override
   public long getLong(int ordinal) {
-    return row.getLong(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Long.class);
+    return row.getLong(rebaseOrdinal(ordinal));
   }
 
   @Override
   public float getFloat(int ordinal) {
-    return row.getFloat(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Float.class);
+    return row.getFloat(rebaseOrdinal(ordinal));
   }
 
   @Override
   public double getDouble(int ordinal) {
-    return row.getDouble(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Double.class);
+    return row.getDouble(rebaseOrdinal(ordinal));
   }
 
   @Override
   public Decimal getDecimal(int ordinal, int precision, int scale) {
-    return row.getDecimal(ordinal, precision, scale);
-  }
-
-  @Override
-  public UTF8String getUTF8String(int ordinal) {
-    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      return UTF8String.fromBytes(getMetaColumnVal(ordinal).getBytes());
-    }
-    return row.getUTF8String(ordinal);
-  }
-
-  @Override
-  public String getString(int ordinal) {
-    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      return new String(getMetaColumnVal(ordinal).getBytes());
-    }
-    return row.getString(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Decimal.class);
+    return row.getDecimal(rebaseOrdinal(ordinal), precision, scale);
   }
 
   @Override
   public byte[] getBinary(int ordinal) {
-    return row.getBinary(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Byte[].class);
+    return row.getBinary(rebaseOrdinal(ordinal));
   }
 
   @Override
   public CalendarInterval getInterval(int ordinal) {
-    return row.getInterval(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, CalendarInterval.class);
+    return row.getInterval(rebaseOrdinal(ordinal));
   }
 
   @Override
   public InternalRow getStruct(int ordinal, int numFields) {
-    return row.getStruct(ordinal, numFields);
+    ruleOutMetaFieldsAccess(ordinal, InternalRow.class);
+    return row.getStruct(rebaseOrdinal(ordinal), numFields);
   }
 
   @Override
   public ArrayData getArray(int ordinal) {
-    return row.getArray(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, ArrayData.class);
+    return row.getArray(rebaseOrdinal(ordinal));
   }
 
   @Override
   public MapData getMap(int ordinal) {
-    return row.getMap(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, MapData.class);
+    return row.getMap(rebaseOrdinal(ordinal));
   }
 
   @Override
-  public Object get(int ordinal, DataType dataType) {
-    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      return UTF8String.fromBytes(getMetaColumnVal(ordinal).getBytes());
+  public InternalRow copy() {
+    return new HoodieInternalRow(Arrays.copyOf(metaFields, metaFields.length), row.copy(), containsMetaFields);
+  }
+
+  private int rebaseOrdinal(int ordinal) {
+    // NOTE: In cases when source row does not contain meta fields, we will have to
+    //       rebase ordinal onto its indexes
+    return containsMetaFields ? ordinal : ordinal - metaFields.length;
+  }
+
+  private void validateMetaFieldDataType(DataType dataType) {
+    if (!dataType.sameType(StringType$.MODULE$)) {
+      throw new ClassCastException(String.format("Can not cast meta-field of type UTF8String to %s", dataType.simpleString()));
     }
-    return row.get(ordinal, dataType);
   }
 
-  @Override
-  public InternalRow copy() {
-    return new HoodieInternalRow(commitTime, commitSeqNumber, recordKey, partitionPath, fileName, row.copy());
+  private void ruleOutMetaFieldsAccess(int ordinal, Class<?> expectedDataType) {
+    if (ordinal < metaFields.length) {
+      throw new ClassCastException(String.format("Can not cast meta-field of type UTF8String at (%d) as %s", ordinal, expectedDataType.getName()));
+    }
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriter.java
index 61cf2f13cc..0d1012fbf6 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriter.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriter.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.io.storage.row;
 
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.unsafe.types.UTF8String;
 
 import java.io.IOException;
 
@@ -37,7 +38,7 @@ public interface HoodieInternalRowFileWriter {
    *
    * @throws IOException on any exception while writing.
    */
-  void writeRow(String key, InternalRow row) throws IOException;
+  void writeRow(UTF8String key, InternalRow row) throws IOException;
 
   /**
    * Writes an {@link InternalRow} to the HoodieInternalRowFileWriter.
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java
index 1d11529352..a7cacd055a 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.io.storage.HoodieBaseParquetWriter;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.unsafe.types.UTF8String;
 
 import java.io.IOException;
 
@@ -41,7 +42,7 @@ public class HoodieInternalRowParquetWriter extends HoodieBaseParquetWriter<Inte
   }
 
   @Override
-  public void writeRow(String key, InternalRow row) throws IOException {
+  public void writeRow(UTF8String key, InternalRow row) throws IOException {
     super.write(row);
     writeSupport.add(key);
   }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
index 916b31d293..310afd4f14 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
@@ -25,11 +25,11 @@ import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.IOType;
-import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.hadoop.CachingPath;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 
@@ -39,10 +39,12 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 
 /**
  * Create handle with InternalRow for datasource implementation of bulk insert.
@@ -50,38 +52,61 @@ import java.util.concurrent.atomic.AtomicLong;
 public class HoodieRowCreateHandle implements Serializable {
 
   private static final long serialVersionUID = 1L;
+
   private static final Logger LOG = LogManager.getLogger(HoodieRowCreateHandle.class);
-  private static final AtomicLong SEQGEN = new AtomicLong(1);
+  private static final AtomicLong GLOBAL_SEQ_NO = new AtomicLong(1);
+
+  private static final Integer RECORD_KEY_META_FIELD_ORD =
+      HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+  private static final Integer PARTITION_PATH_META_FIELD_ORD =
+      HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
 
-  private final String instantTime;
-  private final int taskPartitionId;
-  private final long taskId;
-  private final long taskEpochId;
   private final HoodieTable table;
   private final HoodieWriteConfig writeConfig;
-  protected final HoodieInternalRowFileWriter fileWriter;
+
   private final String partitionPath;
   private final Path path;
   private final String fileId;
-  private final FileSystem fs;
-  protected final HoodieInternalWriteStatus writeStatus;
+
+  private final boolean populateMetaFields;
+
+  private final UTF8String fileName;
+  private final UTF8String commitTime;
+  private final Function<Long, String> seqIdGenerator;
+
   private final HoodieTimer currTimer;
 
-  public HoodieRowCreateHandle(HoodieTable table, HoodieWriteConfig writeConfig, String partitionPath, String fileId,
-                               String instantTime, int taskPartitionId, long taskId, long taskEpochId,
-                               StructType structType) {
+  protected final HoodieInternalRowFileWriter fileWriter;
+  protected final HoodieInternalWriteStatus writeStatus;
+
+  public HoodieRowCreateHandle(HoodieTable table,
+                               HoodieWriteConfig writeConfig,
+                               String partitionPath,
+                               String fileId,
+                               String instantTime,
+                               int taskPartitionId,
+                               long taskId,
+                               long taskEpochId,
+                               StructType structType,
+                               boolean populateMetaFields) {
     this.partitionPath = partitionPath;
     this.table = table;
     this.writeConfig = writeConfig;
-    this.instantTime = instantTime;
-    this.taskPartitionId = taskPartitionId;
-    this.taskId = taskId;
-    this.taskEpochId = taskEpochId;
     this.fileId = fileId;
-    this.currTimer = new HoodieTimer();
-    this.currTimer.startTimer();
-    this.fs = table.getMetaClient().getFs();
-    this.path = makeNewPath(partitionPath);
+
+    this.currTimer = new HoodieTimer(true);
+
+    FileSystem fs = table.getMetaClient().getFs();
+
+    String writeToken = getWriteToken(taskPartitionId, taskId, taskEpochId);
+    String fileName = FSUtils.makeBaseFileName(instantTime, writeToken, this.fileId, table.getBaseFileExtension());
+    this.path = makeNewPath(fs, partitionPath, fileName, writeConfig);
+
+    this.populateMetaFields = populateMetaFields;
+    this.fileName = UTF8String.fromString(path.getName());
+    this.commitTime = UTF8String.fromString(instantTime);
+    this.seqIdGenerator = (id) -> HoodieRecord.generateSequenceId(instantTime, taskPartitionId, id);
+
     this.writeStatus = new HoodieInternalWriteStatus(!table.getIndex().isImplicitWithStorage(),
         writeConfig.getWriteStatusFailureFraction());
     writeStatus.setPartitionPath(partitionPath);
@@ -96,7 +121,7 @@ public class HoodieRowCreateHandle implements Serializable {
               FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath),
               table.getPartitionMetafileFormat());
       partitionMetadata.trySave(taskPartitionId);
-      createMarkerFile(partitionPath, FSUtils.makeBaseFileName(this.instantTime, getWriteToken(), this.fileId, table.getBaseFileExtension()));
+      createMarkerFile(partitionPath, fileName, instantTime, table, writeConfig);
       this.fileWriter = createNewFileWriter(path, table, writeConfig, structType);
     } catch (IOException e) {
       throw new HoodieInsertException("Failed to initialize file writer for path " + path, e);
@@ -108,21 +133,42 @@ public class HoodieRowCreateHandle implements Serializable {
    * Writes an {@link InternalRow} to the underlying HoodieInternalRowFileWriter. Before writing, value for meta columns are computed as required
    * and wrapped in {@link HoodieInternalRow}. {@link HoodieInternalRow} is what gets written to HoodieInternalRowFileWriter.
    *
-   * @param record instance of {@link InternalRow} that needs to be written to the fileWriter.
+   * @param row instance of {@link InternalRow} that needs to be written to the fileWriter.
    * @throws IOException
    */
-  public void write(InternalRow record) throws IOException {
+  public void write(InternalRow row) throws IOException {
     try {
-      final String partitionPath = String.valueOf(record.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_POS));
-      final String seqId = HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement());
-      final String recordKey = String.valueOf(record.getUTF8String(HoodieRecord.RECORD_KEY_META_FIELD_POS));
-      HoodieInternalRow internalRow = new HoodieInternalRow(instantTime, seqId, recordKey, partitionPath, path.getName(),
-          record);
+      // NOTE: PLEASE READ THIS CAREFULLY BEFORE MODIFYING
+      //       This code lays in the hot-path, and substantial caution should be
+      //       exercised making changes to it to minimize amount of excessive:
+      //          - Conversions b/w Spark internal (low-level) types and JVM native ones (like
+      //         [[UTF8String]] and [[String]])
+      //          - Repeated computations (for ex, converting file-path to [[UTF8String]] over and
+      //          over again)
+      UTF8String recordKey = row.getUTF8String(RECORD_KEY_META_FIELD_ORD);
+
+      InternalRow updatedRow;
+      // In cases when no meta-fields need to be added we simply relay provided row to
+      // the writer as is
+      if (!populateMetaFields) {
+        updatedRow = row;
+      } else {
+        UTF8String partitionPath = row.getUTF8String(PARTITION_PATH_META_FIELD_ORD);
+        // This is the only meta-field that is generated dynamically, hence conversion b/w
+        // [[String]] and [[UTF8String]] is unavoidable
+        UTF8String seqId = UTF8String.fromString(seqIdGenerator.apply(GLOBAL_SEQ_NO.getAndIncrement()));
+
+        updatedRow = new HoodieInternalRow(commitTime, seqId, recordKey,
+            partitionPath, fileName, row, true);
+      }
+
       try {
-        fileWriter.writeRow(recordKey, internalRow);
-        writeStatus.markSuccess(recordKey);
+        fileWriter.writeRow(recordKey, updatedRow);
+        // NOTE: To avoid conversion on the hot-path we only convert [[UTF8String]] into [[String]]
+        //       in cases when successful records' writes are being tracked
+        writeStatus.markSuccess(writeStatus.isTrackingSuccessfulWrites() ? recordKey.toString() : null);
       } catch (Throwable t) {
-        writeStatus.markFailure(recordKey, t);
+        writeStatus.markFailure(recordKey.toString(), t);
       }
     } catch (Throwable ge) {
       writeStatus.setGlobalError(ge);
@@ -168,7 +214,7 @@ public class HoodieRowCreateHandle implements Serializable {
     return path.getName();
   }
 
-  private Path makeNewPath(String partitionPath) {
+  private static Path makeNewPath(FileSystem fs, String partitionPath, String fileName, HoodieWriteConfig writeConfig) {
     Path path = FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath);
     try {
       if (!fs.exists(path)) {
@@ -177,9 +223,7 @@ public class HoodieRowCreateHandle implements Serializable {
     } catch (IOException e) {
       throw new HoodieIOException("Failed to make dir " + path, e);
     }
-    HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
-    return new Path(path.toString(), FSUtils.makeBaseFileName(instantTime, getWriteToken(), fileId,
-        tableConfig.getBaseFileFormat().getFileExtension()));
+    return new CachingPath(path.toString(), fileName);
   }
 
   /**
@@ -187,12 +231,17 @@ public class HoodieRowCreateHandle implements Serializable {
    *
    * @param partitionPath Partition path
    */
-  private void createMarkerFile(String partitionPath, String dataFileName) {
+  private static void createMarkerFile(String partitionPath,
+                                       String dataFileName,
+                                       String instantTime,
+                                       HoodieTable<?, ?, ?, ?> table,
+                                       HoodieWriteConfig writeConfig) {
     WriteMarkersFactory.get(writeConfig.getMarkersType(), table, instantTime)
         .create(partitionPath, dataFileName, IOType.CREATE);
   }
 
-  private String getWriteToken() {
+  // TODO extract to utils
+  private static String getWriteToken(int taskPartitionId, long taskId, long taskEpochId) {
     return taskPartitionId + "-" + taskId + "-" + taskEpochId;
   }
 
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandleWithoutMetaFields.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandleWithoutMetaFields.java
deleted file mode 100644
index 444ad5dbda..0000000000
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandleWithoutMetaFields.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.io.storage.row;
-
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.table.HoodieTable;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.StructType;
-
-import java.io.IOException;
-
-/**
- * RowCreateHandle to be used when meta fields are disabled.
- */
-public class HoodieRowCreateHandleWithoutMetaFields extends HoodieRowCreateHandle {
-
-  public HoodieRowCreateHandleWithoutMetaFields(HoodieTable table, HoodieWriteConfig writeConfig, String partitionPath, String fileId, String instantTime,
-                                                int taskPartitionId, long taskId, long taskEpochId, StructType structType) {
-    super(table, writeConfig, partitionPath, fileId, instantTime, taskPartitionId, taskId, taskEpochId, structType);
-  }
-
-  /**
-   * Write the incoming InternalRow as is.
-   *
-   * @param record instance of {@link InternalRow} that needs to be written to the fileWriter.
-   * @throws IOException
-   */
-  @Override
-  public void write(InternalRow record) throws IOException {
-    try {
-      fileWriter.writeRow(record);
-      writeStatus.markSuccess();
-    } catch (Throwable ge) {
-      writeStatus.setGlobalError(ge);
-      throw new HoodieException("Exception thrown while writing spark InternalRows to file ", ge);
-    }
-  }
-
-  protected HoodieInternalRowFileWriter createNewFileWriter(
-      Path path, HoodieTable hoodieTable, HoodieWriteConfig config, StructType schema)
-      throws IOException {
-    return HoodieInternalRowFileWriterFactory.getInternalRowFileWriterWithoutMetaFields(
-        path, hoodieTable, config, schema);
-  }
-}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
index f7fe50776d..46c2a6d835 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
@@ -25,6 +25,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
 
 import java.util.HashMap;
 
@@ -38,10 +39,11 @@ import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_
  */
 public class HoodieRowParquetWriteSupport extends ParquetWriteSupport {
 
-  private Configuration hadoopConf;
-  private BloomFilter bloomFilter;
-  private String minRecordKey;
-  private String maxRecordKey;
+  private final Configuration hadoopConf;
+  private final BloomFilter bloomFilter;
+
+  private UTF8String minRecordKey;
+  private UTF8String maxRecordKey;
 
   public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, BloomFilter bloomFilter, HoodieWriteConfig writeConfig) {
     super();
@@ -63,8 +65,8 @@ public class HoodieRowParquetWriteSupport extends ParquetWriteSupport {
     if (bloomFilter != null) {
       extraMetaData.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilter.serializeToString());
       if (minRecordKey != null && maxRecordKey != null) {
-        extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey);
-        extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey);
+        extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey.toString());
+        extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey.toString());
       }
       if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
         extraMetaData.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilter.getBloomFilterTypeCode().name());
@@ -73,18 +75,18 @@ public class HoodieRowParquetWriteSupport extends ParquetWriteSupport {
     return new WriteSupport.FinalizedWriteContext(extraMetaData);
   }
 
-  public void add(String recordKey) {
-    this.bloomFilter.add(recordKey);
-    if (minRecordKey != null) {
-      minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey;
-    } else {
-      minRecordKey = recordKey;
+  public void add(UTF8String recordKey) {
+    this.bloomFilter.add(recordKey.getBytes());
+
+    if (minRecordKey == null || minRecordKey.compareTo(recordKey) < 0) {
+      // NOTE: [[clone]] is performed here (rather than [[copy]]) to only copy underlying buffer in
+      //       cases when [[UTF8String]] is pointing into a buffer storing the whole containing record,
+      //       and simply do a pass over when it holds a (immutable) buffer holding just the string
+      minRecordKey =  recordKey.clone();
     }
 
-    if (maxRecordKey != null) {
-      maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : recordKey;
-    } else {
-      maxRecordKey = recordKey;
+    if (maxRecordKey == null || maxRecordKey.compareTo(recordKey) > 0) {
+      maxRecordKey = recordKey.clone();
     }
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
index 0642a85c5f..b7cdcf851a 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
@@ -18,26 +18,24 @@
 
 package org.apache.hudi.keygen;
 
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.PublicAPIMethod;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieIOException;
-
-import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructType;
+import scala.Function1;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import scala.Function1;
-
 /**
  * Base class for the built-in key generators. Contains methods structured for
  * code reuse amongst them.
@@ -66,18 +64,32 @@ public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements Sp
   @Override
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public String getRecordKey(Row row) {
+    // TODO avoid conversion to avro
+    //      since converterFn is transient this will be repeatedly initialized over and over again
     if (null == converterFn) {
       converterFn = AvroConversionUtils.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE);
     }
     return getKey(converterFn.apply(row)).getRecordKey();
   }
 
+  @Override
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public String getRecordKey(InternalRow internalRow, StructType schema) {
+    try {
+      // TODO fix
+      buildFieldSchemaInfoIfNeeded(schema);
+      return RowKeyGeneratorHelper.getRecordKeyFromInternalRow(internalRow, getRecordKeyFields(), recordKeySchemaInfo, false);
+    } catch (Exception e) {
+      throw new HoodieException("Conversion of InternalRow to Row failed with exception", e);
+    }
+  }
   /**
    * Fetch partition path from {@link Row}.
    *
    * @param row instance of {@link Row} from which partition path is requested
    * @return the partition path of interest from {@link Row}.
    */
+
   @Override
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public String getPartitionPath(Row row) {
@@ -102,12 +114,13 @@ public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements Sp
       return RowKeyGeneratorHelper.getPartitionPathFromInternalRow(internalRow, getPartitionPathFields(),
           hiveStylePartitioning, partitionPathSchemaInfo);
     } catch (Exception e) {
-      throw new HoodieIOException("Conversion of InternalRow to Row failed with exception " + e);
+      throw new HoodieException("Conversion of InternalRow to Row failed with exception", e);
     }
   }
 
   void buildFieldSchemaInfoIfNeeded(StructType structType) {
     if (this.structType == null) {
+      this.structType = structType;
       getRecordKeyFields()
           .stream().filter(f -> !f.isEmpty())
           .forEach(f -> recordKeySchemaInfo.put(f, RowKeyGeneratorHelper.getFieldSchemaInfo(structType, f, true)));
@@ -115,7 +128,6 @@ public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements Sp
         getPartitionPathFields().stream().filter(f -> !f.isEmpty())
             .forEach(f -> partitionPathSchemaInfo.put(f, RowKeyGeneratorHelper.getFieldSchemaInfo(structType, f, false)));
       }
-      this.structType = structType;
     }
   }
 
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
index 9ba3fb8760..b09ff0755a 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
@@ -64,6 +64,12 @@ public class ComplexKeyGenerator extends BuiltinKeyGenerator {
     return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, true);
   }
 
+  @Override
+  public String getRecordKey(InternalRow internalRow, StructType schema) {
+    buildFieldSchemaInfoIfNeeded(schema);
+    return RowKeyGeneratorHelper.getRecordKeyFromInternalRow(internalRow, getRecordKeyFields(), recordKeySchemaInfo, true);
+  }
+
   @Override
   public String getPartitionPath(Row row) {
     buildFieldSchemaInfoIfNeeded(row.schema());
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
index 77eec748c7..6cf674f18e 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
@@ -64,6 +64,12 @@ public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator {
     return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, true);
   }
 
+  @Override
+  public String getRecordKey(InternalRow internalRow, StructType schema) {
+    buildFieldSchemaInfoIfNeeded(schema);
+    return RowKeyGeneratorHelper.getRecordKeyFromInternalRow(internalRow, getRecordKeyFields(), recordKeySchemaInfo, true);
+  }
+
   @Override
   public String getPartitionPath(Row row) {
     return globalAvroDeleteKeyGenerator.getEmptyPartition();
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGenUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGenUtils.java
new file mode 100644
index 0000000000..9616212378
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGenUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.TimestampType;
+
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+
+public class RowKeyGenUtils {
+
+  /**
+   * Converts provided (raw) value extracted from the {@link InternalRow} object into a deserialized,
+   * JVM native format (for ex, converting {@code Long} into {@link Instant},
+   * {@code Integer} to {@link LocalDate}, etc)
+   *
+   * This method allows to avoid costly full-row deserialization sequence. Note, that this method
+   * should be maintained in sync w/
+   *
+   * <ol>
+   *   <li>{@code RowEncoder#deserializerFor}, as well as</li>
+   *   <li>{@code HoodieAvroUtils#convertValueForAvroLogicalTypes}</li>
+   * </ol>
+   *
+   * @param dataType target data-type of the given value
+   * @param value target value to be converted
+   */
+  public static Object convertToLogicalDataType(DataType dataType, Object value) {
+    if (dataType instanceof TimestampType) {
+      // Provided value have to be [[Long]] in this case, representing micros since epoch
+      return new Timestamp((Long) value / 1000);
+    } else if (dataType instanceof DateType) {
+      // Provided value have to be [[Int]] in this case
+      return LocalDate.ofEpochDay((Integer) value);
+    }
+
+    return value;
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
index c0e10e6f9b..c79481bd2b 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
@@ -39,18 +39,56 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import org.apache.spark.sql.types.StructType$;
 import scala.Option;
 
 import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
 import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
 import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
 import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.RowKeyGenUtils.convertToLogicalDataType;
 
 /**
  * Helper class to fetch fields from Row.
+ *
+ * TODO cleanup
  */
+@Deprecated
 public class RowKeyGeneratorHelper {
 
+  public static String getRecordKeyFromInternalRow(InternalRow internalRow, List<String> recordKeyFields,
+                                                   Map<String, Pair<List<Integer>, DataType>> recordKeyPositions, boolean prefixFieldName) {
+    AtomicBoolean keyIsNullOrEmpty = new AtomicBoolean(true);
+    String toReturn = recordKeyFields.stream().map(field -> {
+      String val = null;
+      List<Integer> fieldPositions = recordKeyPositions.get(field).getKey();
+      if (fieldPositions.size() == 1) { // simple field
+        Integer fieldPos = fieldPositions.get(0);
+        if (internalRow.isNullAt(fieldPos)) {
+          val = NULL_RECORDKEY_PLACEHOLDER;
+        } else {
+          DataType dataType = recordKeyPositions.get(field).getValue();
+          val = convertToLogicalDataType(dataType, internalRow.get(fieldPos, dataType)).toString();
+          if (val.isEmpty()) {
+            val = EMPTY_RECORDKEY_PLACEHOLDER;
+          } else {
+            keyIsNullOrEmpty.set(false);
+          }
+        }
+      } else { // nested fields
+        val = getNestedFieldVal(internalRow, recordKeyPositions.get(field)).toString();
+        if (!val.contains(NULL_RECORDKEY_PLACEHOLDER) && !val.contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
+          keyIsNullOrEmpty.set(false);
+        }
+      }
+      return prefixFieldName ? (field + ":" + val) : val;
+    }).collect(Collectors.joining(","));
+    if (keyIsNullOrEmpty.get()) {
+      throw new HoodieKeyException("recordKey value: \"" + toReturn + "\" for fields: \"" + Arrays.toString(recordKeyFields.toArray()) + "\" cannot be null or empty.");
+    }
+    return toReturn;
+  }
+
   /**
    * Generates record key for the corresponding {@link Row}.
    *
@@ -146,7 +184,7 @@ public class RowKeyGeneratorHelper {
         if (fieldPos == -1 || internalRow.isNullAt(fieldPos)) {
           val = HUDI_DEFAULT_PARTITION_PATH;
         } else {
-          Object value = internalRow.get(fieldPos, dataType);
+          Object value = convertToLogicalDataType(dataType, internalRow.get(fieldPos, dataType));
           if (value == null || value.toString().isEmpty()) {
             val = HUDI_DEFAULT_PARTITION_PATH;
           } else {
@@ -231,6 +269,35 @@ public class RowKeyGeneratorHelper {
     return toReturn;
   }
 
+  public static Object getNestedFieldVal(InternalRow internalRow, Pair<List<Integer>, DataType> positionsAndType) {
+    if (positionsAndType.getKey().size() == 1 && positionsAndType.getKey().get(0) == -1) {
+      return HUDI_DEFAULT_PARTITION_PATH;
+    }
+    int index = 0;
+    int totalCount = positionsAndType.getKey().size();
+    InternalRow valueToProcess = internalRow;
+    Object toReturn = null;
+
+    while (index < totalCount) {
+      if (valueToProcess.isNullAt(positionsAndType.getKey().get(index))) {
+        toReturn = NULL_RECORDKEY_PLACEHOLDER;
+        break;
+      }
+
+      if (index < totalCount - 1) {
+        valueToProcess = (InternalRow) valueToProcess.get(positionsAndType.getKey().get(index), StructType$.MODULE$.defaultConcreteType());
+      } else { // last index
+        if (valueToProcess.get(positionsAndType.getKey().get(index), positionsAndType.getValue()).toString().isEmpty()) {
+          toReturn = EMPTY_RECORDKEY_PLACEHOLDER;
+          break;
+        }
+        toReturn = valueToProcess.get(positionsAndType.getKey().get(index), positionsAndType.getValue());
+      }
+      index++;
+    }
+    return toReturn;
+  }
+
   /**
    * Generate the tree style positions for the field requested for as per the defined struct type.
    *
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
index 2d5d52bec8..bbceaf900b 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
@@ -29,6 +29,8 @@ public interface SparkKeyGeneratorInterface extends KeyGeneratorInterface {
 
   String getRecordKey(Row row);
 
+  String getRecordKey(InternalRow row, StructType schema);
+
   String getPartitionPath(Row row);
 
   String getPartitionPath(InternalRow internalRow, StructType structType);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/DataTypeUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/DataTypeUtils.java
index 3ff7e1055c..57b7a9d7bb 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/DataTypeUtils.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/DataTypeUtils.java
@@ -19,112 +19,17 @@
 package org.apache.hudi.util;
 
 import org.apache.spark.sql.types.ArrayType;
-import org.apache.spark.sql.types.ByteType$;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.sql.types.DoubleType$;
-import org.apache.spark.sql.types.FloatType$;
-import org.apache.spark.sql.types.IntegerType$;
-import org.apache.spark.sql.types.LongType$;
 import org.apache.spark.sql.types.MapType;
-import org.apache.spark.sql.types.ShortType$;
-import org.apache.spark.sql.types.StringType$;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.types.VarcharType$;
 
-import javax.annotation.Nonnull;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 
 public class DataTypeUtils {
 
-  private static Map<Class<?>, Set<Class<?>>> sparkPrimitiveTypesCompatibilityMap =
-      new HashMap<Class<?>, Set<Class<?>>>() {{
-
-        // Integral types
-        put(ShortType$.class,
-            newHashSet(ByteType$.class, ShortType$.class));
-        put(IntegerType$.class,
-            newHashSet(ByteType$.class, ShortType$.class, IntegerType$.class));
-        put(LongType$.class,
-            newHashSet(ByteType$.class, ShortType$.class, IntegerType$.class, LongType$.class));
-
-        // Float types
-        put(DoubleType$.class,
-            newHashSet(FloatType$.class, DoubleType$.class));
-
-        // String types
-        put(StringType$.class,
-            newHashSet(VarcharType$.class, StringType$.class));
-      }
-  };
-
-  /**
-   * Validates whether one {@link StructType} is compatible w/ the other one.
-   * Compatibility rules are defined like following: types A and B are considered
-   * compatible iff
-   *
-   * <ol>
-   *   <li>A and B are identical</li>
-   *   <li>All values comprising A domain are contained w/in B domain (for ex, {@code ShortType}
-   *   in this sense is compatible w/ {@code IntegerType})</li>
-   * </ol>
-   *
-   * @param left operand
-   * @param right operand
-   * @return true if {@code left} instance of {@link StructType} is compatible w/ the {@code right}
-   */
-  public static boolean areCompatible(@Nonnull DataType left, @Nonnull DataType right) {
-    // First, check if types are equal
-    if (Objects.equals(left, right)) {
-      return true;
-    }
-
-    // If not, check whether both are instances of {@code StructType} that
-    // should be matched structurally
-    if (left instanceof StructType && right instanceof StructType) {
-      return areCompatible((StructType) left, (StructType) right);
-    }
-
-    // If not, simply check if those data-types constitute compatibility
-    // relationship outlined above; otherwise return false
-    return sparkPrimitiveTypesCompatibilityMap.getOrDefault(left.getClass(), Collections.emptySet())
-        .contains(right.getClass());
-  }
-
-  private static boolean areCompatible(@Nonnull StructType left, @Nonnull StructType right) {
-    StructField[] oneSchemaFields = left.fields();
-    StructField[] anotherSchemaFields = right.fields();
-
-    if (oneSchemaFields.length != anotherSchemaFields.length) {
-      return false;
-    }
-
-    for (int i = 0; i < oneSchemaFields.length; ++i) {
-      StructField oneField = oneSchemaFields[i];
-      StructField anotherField = anotherSchemaFields[i];
-      // NOTE: Metadata is deliberately omitted from comparison
-      if (!Objects.equals(oneField.name(), anotherField.name())
-          || !areCompatible(oneField.dataType(), anotherField.dataType())
-          || oneField.nullable() != anotherField.nullable()) {
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-  private static <T> HashSet<T> newHashSet(T... ts) {
-    return new HashSet<>(Arrays.asList(ts));
-  }
-
   /**
    * Checks whether provided {@link DataType} contains {@link DecimalType} whose scale is less than
    * {@link Decimal#MAX_LONG_DIGITS()}
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala
new file mode 100644
index 0000000000..10d6a2276e
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import scala.collection.mutable.ArrayBuffer
+
+object HoodieUnsafeRowUtils {
+
+  /**
+   * Fetches (nested) value w/in provided [[Row]] uniquely identified by the provided nested-field path
+   * previously composed by [[composeNestedFieldPath]]
+   */
+  def getNestedRowValue(row: Row, nestedFieldPath: Array[(Int, StructField)]): Any = {
+    var curRow = row
+    for (idx <- nestedFieldPath.indices) {
+      val (ord, f) = nestedFieldPath(idx)
+      if (curRow.isNullAt(ord)) {
+        // scalastyle:off return
+        if (f.nullable) return null
+        else throw new IllegalArgumentException(s"Found null value for the field that is declared as non-nullable: $f")
+        // scalastyle:on return
+      } else if (idx == nestedFieldPath.length - 1) {
+        // scalastyle:off return
+        return curRow.get(ord)
+        // scalastyle:on return
+      } else {
+        curRow = f.dataType match {
+          case _: StructType =>
+            curRow.getStruct(ord)
+          case dt@_ =>
+            throw new IllegalArgumentException(s"Invalid nested-field path: expected StructType, but was $dt")
+        }
+      }
+    }
+  }
+
+  /**
+   * Fetches (nested) value w/in provided [[InternalRow]] uniquely identified by the provided nested-field path
+   * previously composed by [[composeNestedFieldPath]]
+   */
+  def getNestedInternalRowValue(row: InternalRow, nestedFieldPath: Array[(Int, StructField)]): Any = {
+    if (nestedFieldPath.length == 0) {
+      throw new IllegalArgumentException("Nested field-path could not be empty")
+    }
+
+    var curRow = row
+    var idx = 0
+    while (idx < nestedFieldPath.length) {
+      val (ord, f) = nestedFieldPath(idx)
+      if (curRow.isNullAt(ord)) {
+        // scalastyle:off return
+        if (f.nullable) return null
+        else throw new IllegalArgumentException(s"Found null value for the field that is declared as non-nullable: $f")
+        // scalastyle:on return
+      } else if (idx == nestedFieldPath.length - 1) {
+        // scalastyle:off return
+        return curRow.get(ord, f.dataType)
+        // scalastyle:on return
+      } else {
+        curRow = f.dataType match {
+          case st: StructType =>
+            curRow.getStruct(ord, st.fields.length)
+          case dt@_ =>
+            throw new IllegalArgumentException(s"Invalid nested-field path: expected StructType, but was $dt")
+        }
+      }
+      idx += 1
+    }
+  }
+
+  /**
+   * For the provided [[nestedFieldRef]] (of the form "a.b.c") and [[schema]], produces nested-field path comprised
+   * of (ordinal, data-type) tuples of the respective fields w/in the provided schema.
+   *
+   * This method produces nested-field path, that is subsequently used by [[getNestedInternalRowValue]], [[getNestedRowValue]]
+   */
+  def composeNestedFieldPath(schema: StructType, nestedFieldRef: String): Array[(Int, StructField)] = {
+    val fieldRefParts = nestedFieldRef.split('.')
+    val ordSeq = ArrayBuffer[(Int, StructField)]()
+    var curSchema = schema
+    var idx = 0
+    while (idx < fieldRefParts.length) {
+      val fieldRefPart = fieldRefParts(idx)
+      val ord = curSchema.fieldIndex(fieldRefPart)
+      val field = curSchema(ord)
+      // Append current field's (ordinal, data-type)
+      ordSeq.append((ord, field))
+      // Update current schema, unless terminal field-ref part
+      if (idx < fieldRefParts.length - 1) {
+        curSchema = field.dataType match {
+          case st: StructType => st
+          case dt@_ =>
+            throw new IllegalArgumentException(s"Invalid nested field reference ${fieldRefParts.drop(idx).mkString(".")} into $dt")
+        }
+      }
+      idx += 1
+    }
+
+    ordSeq.toArray
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/model/TestHoodieInternalRow.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/model/TestHoodieInternalRow.java
index bfcb012c37..5624305b80 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/model/TestHoodieInternalRow.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/model/TestHoodieInternalRow.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client.model;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.unsafe.types.UTF8String;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
@@ -64,7 +65,13 @@ public class TestHoodieInternalRow {
     Object[] values = getRandomValue(true);
 
     InternalRow row = new GenericInternalRow(values);
-    HoodieInternalRow hoodieInternalRow = new HoodieInternalRow("commitTime", "commitSeqNo", "recordKey", "partitionPath", "fileName", row);
+    HoodieInternalRow hoodieInternalRow = new HoodieInternalRow(UTF8String.fromString("commitTime"),
+        UTF8String.fromString("commitSeqNo"),
+        UTF8String.fromString("recordKey"),
+        UTF8String.fromString("partitionPath"),
+        UTF8String.fromString("fileName"),
+        row,
+        true);
 
     assertValues(hoodieInternalRow, "commitTime", "commitSeqNo", "recordKey", "partitionPath",
         "fileName", values, nullIndices);
@@ -74,7 +81,13 @@ public class TestHoodieInternalRow {
   public void testUpdate() {
     Object[] values = getRandomValue(true);
     InternalRow row = new GenericInternalRow(values);
-    HoodieInternalRow hoodieInternalRow = new HoodieInternalRow("commitTime", "commitSeqNo", "recordKey", "partitionPath", "fileName", row);
+    HoodieInternalRow hoodieInternalRow = new HoodieInternalRow(UTF8String.fromString("commitTime"),
+        UTF8String.fromString("commitSeqNo"),
+        UTF8String.fromString("recordKey"),
+        UTF8String.fromString("partitionPath"),
+        UTF8String.fromString("fileName"),
+        row,
+        true);
 
     hoodieInternalRow.update(0, "commitTime_updated");
     hoodieInternalRow.update(1, "commitSeqNo_updated");
@@ -106,7 +119,13 @@ public class TestHoodieInternalRow {
       Object[] values = getRandomValue(true);
 
       InternalRow row = new GenericInternalRow(values);
-      HoodieInternalRow hoodieInternalRow = new HoodieInternalRow("commitTime", "commitSeqNo", "recordKey", "partitionPath", "fileName", row);
+      HoodieInternalRow hoodieInternalRow = new HoodieInternalRow(UTF8String.fromString("commitTime"),
+          UTF8String.fromString("commitSeqNo"),
+          UTF8String.fromString("recordKey"),
+          UTF8String.fromString("partitionPath"),
+          UTF8String.fromString("fileName"),
+          row,
+          true);
 
       hoodieInternalRow.setNullAt(i);
       nullIndices.clear();
@@ -129,7 +148,13 @@ public class TestHoodieInternalRow {
 
       Object[] values = getRandomValue(true);
       InternalRow row = new GenericInternalRow(values);
-      HoodieInternalRow hoodieInternalRow = new HoodieInternalRow("commitTime", "commitSeqNo", "recordKey", "partitionPath", "fileName", row);
+      HoodieInternalRow hoodieInternalRow = new HoodieInternalRow(UTF8String.fromString("commitTime"),
+          UTF8String.fromString("commitSeqNo"),
+          UTF8String.fromString("recordKey"),
+          UTF8String.fromString("partitionPath"),
+          UTF8String.fromString("fileName"),
+          row,
+          true);
 
       nullIndices.clear();
 
@@ -173,7 +198,7 @@ public class TestHoodieInternalRow {
   }
 
   private void assertValues(HoodieInternalRow hoodieInternalRow, String commitTime, String commitSeqNo, String recordKey, String partitionPath, String filename, Object[] values,
-      List<Integer> nullIndexes) {
+                            List<Integer> nullIndexes) {
     for (Integer index : nullIndexes) {
       assertTrue(hoodieInternalRow.isNullAt(index));
     }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
index 5a19f0afe9..f6e65adad7 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.exception.TableNotFoundException;
@@ -75,8 +76,9 @@ public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
     cleanupResources();
   }
 
-  @Test
-  public void testRowCreateHandle() throws Exception {
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  public void testRowCreateHandle(boolean populateMetaFields) throws Exception {
     // init config and table
     HoodieWriteConfig cfg =
         SparkDatasetTestUtils.getConfigBuilder(basePath, timelineServicePort).build();
@@ -93,7 +95,8 @@ public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
       String fileId = UUID.randomUUID().toString();
       String instantTime = "000";
 
-      HoodieRowCreateHandle handle = new HoodieRowCreateHandle(table, cfg, partitionPath, fileId, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE);
+      HoodieRowCreateHandle handle = new HoodieRowCreateHandle(table, cfg, partitionPath, fileId, instantTime,
+          RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE, populateMetaFields);
       int size = 10 + RANDOM.nextInt(1000);
       // Generate inputs
       Dataset<Row> inputRows = SparkDatasetTestUtils.getRandomRows(sqlContext, size, partitionPath, false);
@@ -109,7 +112,7 @@ public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
       fileAbsPaths.add(basePath + "/" + writeStatus.getStat().getPath());
       fileNames.add(handle.getFileName());
       // verify output
-      assertOutput(writeStatus, size, fileId, partitionPath, instantTime, totalInputRows, fileNames, fileAbsPaths);
+      assertOutput(writeStatus, size, fileId, partitionPath, instantTime, totalInputRows, fileNames, fileAbsPaths, populateMetaFields);
     }
   }
 
@@ -130,7 +133,7 @@ public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
     String instantTime = "000";
 
     HoodieRowCreateHandle handle =
-        new HoodieRowCreateHandle(table, cfg, partitionPath, fileId, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE);
+        new HoodieRowCreateHandle(table, cfg, partitionPath, fileId, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE, true);
     int size = 10 + RANDOM.nextInt(1000);
     int totalFailures = 5;
     // Generate first batch of valid rows
@@ -169,7 +172,7 @@ public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
     // verify rows
     Dataset<Row> result = sqlContext.read().parquet(basePath + "/" + partitionPath);
     // passing only first batch of inputRows since after first batch global error would have been thrown
-    assertRows(inputRows, result, instantTime, fileNames);
+    assertRows(inputRows, result, instantTime, fileNames, true);
   }
 
   @ParameterizedTest
@@ -183,7 +186,7 @@ public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
 
     try {
       HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
-      new HoodieRowCreateHandle(table, cfg, " def", UUID.randomUUID().toString(), "001", RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE);
+      new HoodieRowCreateHandle(table, cfg, " def", UUID.randomUUID().toString(), "001", RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE, true);
       fail("Should have thrown exception");
     } catch (HoodieInsertException ioe) {
       // expected without metadata table
@@ -209,8 +212,8 @@ public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
     return handle.close();
   }
 
-  private void assertOutput(HoodieInternalWriteStatus writeStatus, int size, String fileId, String partitionPath, String instantTime, Dataset<Row> inputRows, List<String> filenames,
-      List<String> fileAbsPaths) {
+  private void assertOutput(HoodieInternalWriteStatus writeStatus, int size, String fileId, String partitionPath,
+                            String instantTime, Dataset<Row> inputRows, List<String> filenames, List<String> fileAbsPaths, boolean populateMetaFields) {
     assertEquals(writeStatus.getPartitionPath(), partitionPath);
     assertEquals(writeStatus.getTotalRecords(), size);
     assertEquals(writeStatus.getFailedRowsSize(), 0);
@@ -229,15 +232,25 @@ public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
 
     // verify rows
     Dataset<Row> result = sqlContext.read().parquet(fileAbsPaths.toArray(new String[0]));
-    assertRows(inputRows, result, instantTime, filenames);
+    assertRows(inputRows, result, instantTime, filenames, populateMetaFields);
   }
 
-  private void assertRows(Dataset<Row> expectedRows, Dataset<Row> actualRows, String instantTime, List<String> filenames) {
+  private void assertRows(Dataset<Row> expectedRows, Dataset<Row> actualRows, String instantTime, List<String> filenames, boolean populateMetaFields) {
     // verify 3 meta fields that are filled in within create handle
     actualRows.collectAsList().forEach(entry -> {
-      assertEquals(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).toString(), instantTime);
-      assertTrue(filenames.contains(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD)).toString()));
-      assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)));
+      String commitTime = entry.getString(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
+      String fileName = entry.getString(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD));
+      String seqId = entry.getString(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD));
+
+      if (populateMetaFields) {
+        assertEquals(instantTime, commitTime);
+        assertFalse(StringUtils.isNullOrEmpty(seqId));
+        assertTrue(filenames.contains(fileName));
+      } else {
+        assertEquals("", commitTime);
+        assertEquals("", seqId);
+        assertEquals("", fileName);
+      }
     });
 
     // after trimming 2 of the meta fields, rest of the fields should match
diff --git a/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/TestHoodieUnsafeRowUtils.scala b/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/TestHoodieUnsafeRowUtils.scala
new file mode 100644
index 0000000000..b051a9b507
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/TestHoodieUnsafeRowUtils.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue, getNestedRowValue}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.Test
+
+class TestHoodieUnsafeRowUtils {
+
+  @Test
+  def testComposeNestedFieldPath(): Unit = {
+    val schema = StructType(Seq(
+      StructField("foo", StringType),
+      StructField(
+        name = "bar",
+        dataType = StructType(Seq(
+          StructField("baz", DateType),
+          StructField("bor", LongType)
+        ))
+      )
+    ))
+
+    assertEquals(
+      Seq((1, schema(1)), (0, schema(1).dataType.asInstanceOf[StructType](0))),
+      composeNestedFieldPath(schema, "bar.baz").toSeq)
+
+    assertThrows(classOf[IllegalArgumentException]) { () =>
+      composeNestedFieldPath(schema, "foo.baz")
+    }
+  }
+
+  @Test
+  def testGetNestedInternalRowValue(): Unit = {
+    val schema = StructType(Seq(
+      StructField("foo", StringType, nullable = false),
+      StructField(
+        name = "bar",
+        dataType = StructType(Seq(
+          StructField("baz", DateType),
+          StructField("bor", LongType)
+        ))
+      )
+    ))
+
+    val row = InternalRow("str", InternalRow(123, 456L))
+
+    assertEquals(
+      123,
+      getNestedInternalRowValue(row, composeNestedFieldPath(schema, "bar.baz"))
+    )
+    assertEquals(
+      456L,
+      getNestedInternalRowValue(row, composeNestedFieldPath(schema, "bar.bor"))
+    )
+    assertEquals(
+      "str",
+      getNestedInternalRowValue(row, composeNestedFieldPath(schema, "foo"))
+    )
+    assertEquals(
+      row.getStruct(1, 2),
+      getNestedInternalRowValue(row, composeNestedFieldPath(schema, "bar"))
+    )
+
+    val rowProperNullable = InternalRow("str", null)
+
+    assertEquals(
+      null,
+      getNestedInternalRowValue(rowProperNullable, composeNestedFieldPath(schema, "bar.baz"))
+    )
+    assertEquals(
+      null,
+      getNestedInternalRowValue(rowProperNullable, composeNestedFieldPath(schema, "bar"))
+    )
+
+    val rowInvalidNullable = InternalRow(null, InternalRow(123, 456L))
+
+    assertThrows(classOf[IllegalArgumentException]) { () =>
+      getNestedInternalRowValue(rowInvalidNullable, composeNestedFieldPath(schema, "foo"))
+    }
+  }
+
+  @Test
+  def testGetNestedRowValue(): Unit = {
+    val schema = StructType(Seq(
+      StructField("foo", StringType, nullable = false),
+      StructField(
+        name = "bar",
+        dataType = StructType(Seq(
+          StructField("baz", DateType),
+          StructField("bor", LongType)
+        ))
+      )
+    ))
+
+    val row = Row("str", Row(123, 456L))
+
+    assertEquals(
+      123,
+      getNestedRowValue(row, composeNestedFieldPath(schema, "bar.baz"))
+    )
+    assertEquals(
+      456L,
+      getNestedRowValue(row, composeNestedFieldPath(schema, "bar.bor"))
+    )
+    assertEquals(
+      "str",
+      getNestedRowValue(row, composeNestedFieldPath(schema, "foo"))
+    )
+    assertEquals(
+      row.getStruct(1),
+      getNestedRowValue(row, composeNestedFieldPath(schema, "bar"))
+    )
+
+    val rowProperNullable = Row("str", null)
+
+    assertEquals(
+      null,
+      getNestedRowValue(rowProperNullable, composeNestedFieldPath(schema, "bar.baz"))
+    )
+    assertEquals(
+      null,
+      getNestedRowValue(rowProperNullable, composeNestedFieldPath(schema, "bar"))
+    )
+
+    val rowInvalidNullable = Row(null, Row(123, 456L))
+
+    assertThrows(classOf[IllegalArgumentException]) { () =>
+      getNestedRowValue(rowInvalidNullable, composeNestedFieldPath(schema, "foo"))
+    }
+  }
+
+  private def assertThrows[T <: Throwable](expectedExceptionClass: Class[T])(f: () => Unit): T = {
+    try {
+      f.apply()
+    } catch {
+      case t: Throwable if expectedExceptionClass.isAssignableFrom(t.getClass) =>
+        // scalastyle:off return
+        return t.asInstanceOf[T]
+        // scalastyle:on return
+      case ot @ _ =>
+        fail(s"Expected exception of class $expectedExceptionClass, but ${ot.getClass} has been thrown")
+    }
+
+    fail(s"Expected exception of class $expectedExceptionClass, but nothing has been thrown")
+  }
+
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/TypeUtils.java b/hudi-common/src/main/java/org/apache/hudi/TypeUtils.java
deleted file mode 100644
index 6e7d2c8745..0000000000
--- a/hudi-common/src/main/java/org/apache/hudi/TypeUtils.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi;
-
-public class TypeUtils {
-
-  /**
-   * This utility abstracts unsafe type-casting in a way that allows to
-   * <ul>
-   *   <li>Search for such type-casts more easily (just searching for usages of this method)</li>
-   *   <li>Avoid type-cast warnings from the compiler</li>
-   * </ul>
-   */
-  @SuppressWarnings("unchecked")
-  public static <T> T unsafeCast(Object o) {
-    return (T) o;
-  }
-
-}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bloom/BloomFilter.java b/hudi-common/src/main/java/org/apache/hudi/common/bloom/BloomFilter.java
index 7997da159b..fbc46827de 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/bloom/BloomFilter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/bloom/BloomFilter.java
@@ -24,12 +24,19 @@ package org.apache.hudi.common.bloom;
 public interface BloomFilter {
 
   /**
-   * Add a key to the {@link BloomFilter}.
+   * Add a key represented by a {@link String} to the {@link BloomFilter}.
    *
    * @param key the key to the added to the {@link BloomFilter}
    */
   void add(String key);
 
+  /**
+   * Add a key's bytes, representing UTF8-encoded string, to the {@link BloomFilter}.
+   *
+   * @param key the key bytes to the added to the {@link BloomFilter}
+   */
+  void add(byte[] key);
+
   /**
    * Tests for key membership.
    *
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bloom/HoodieDynamicBoundedBloomFilter.java b/hudi-common/src/main/java/org/apache/hudi/common/bloom/HoodieDynamicBoundedBloomFilter.java
index d4bc287c55..32093fc9c5 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/bloom/HoodieDynamicBoundedBloomFilter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/bloom/HoodieDynamicBoundedBloomFilter.java
@@ -78,7 +78,12 @@ public class HoodieDynamicBoundedBloomFilter implements BloomFilter {
 
   @Override
   public void add(String key) {
-    internalDynamicBloomFilter.add(new Key(key.getBytes(StandardCharsets.UTF_8)));
+    add(key.getBytes(StandardCharsets.UTF_8));
+  }
+
+  @Override
+  public void add(byte[] keyBytes) {
+    internalDynamicBloomFilter.add(new Key(keyBytes));
   }
 
   @Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bloom/SimpleBloomFilter.java b/hudi-common/src/main/java/org/apache/hudi/common/bloom/SimpleBloomFilter.java
index 2403ffd995..43b19a1953 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/bloom/SimpleBloomFilter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/bloom/SimpleBloomFilter.java
@@ -77,10 +77,15 @@ public class SimpleBloomFilter implements BloomFilter {
 
   @Override
   public void add(String key) {
-    if (key == null) {
+    add(key.getBytes(StandardCharsets.UTF_8));
+  }
+
+  @Override
+  public void add(byte[] keyBytes) {
+    if (keyBytes == null) {
       throw new NullPointerException("Key cannot be null");
     }
-    filter.add(new Key(key.getBytes(StandardCharsets.UTF_8)));
+    filter.add(new Key(keyBytes));
   }
 
   @Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/Either.java b/hudi-common/src/main/java/org/apache/hudi/common/util/Either.java
index 93accc4b75..fb624c6075 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/Either.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/Either.java
@@ -20,7 +20,7 @@ package org.apache.hudi.common.util;
 
 import javax.annotation.Nonnull;
 
-import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
 
 /**
  * Utility that could hold exclusively only either of (hence the name):
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java
index 0ccc7ca110..ce14f6c91c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java
@@ -30,7 +30,17 @@ import java.util.Deque;
 public class HoodieTimer {
 
   // Ordered stack of TimeInfo's to make sure stopping the timer returns the correct elapsed time
-  Deque<TimeInfo> timeInfoDeque = new ArrayDeque<>();
+  private final Deque<TimeInfo> timeInfoDeque = new ArrayDeque<>();
+
+  public HoodieTimer() {
+    this(false);
+  }
+
+  public HoodieTimer(boolean shouldStart) {
+    if (shouldStart) {
+      startTimer();
+    }
+  }
 
   static class TimeInfo {
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/TypeUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/TypeUtils.java
index d713b183a6..87ce471baa 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/TypeUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/TypeUtils.java
@@ -39,4 +39,16 @@ public final class TypeUtils {
         .collect(Collectors.toMap(valueMapper, Function.identity()));
   }
 
+  /**
+   * This utility abstracts unsafe type-casting in a way that allows to
+   * <ul>
+   *   <li>Search for such type-casts more easily (just searching for usages of this method)</li>
+   *   <li>Avoid type-cast warnings from the compiler</li>
+   * </ul>
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T unsafeCast(Object o) {
+    return (T) o;
+  }
+
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
index 2afd7df3fb..de4e0c3ccb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
@@ -68,6 +68,7 @@ public abstract class BaseKeyGenerator extends KeyGenerator {
   @Override
   public final List<String> getRecordKeyFieldNames() {
     // For nested columns, pick top level column name
+    // TODO materialize
     return getRecordKeyFields().stream().map(k -> {
       int idx = k.indexOf('.');
       return idx > 0 ? k.substring(0, idx) : k;
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index c9bdc59da9..df138cd124 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -75,9 +75,9 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.hudi.TypeUtils.unsafeCast;
 import static org.apache.hudi.common.util.DateTimeUtils.instantToMicros;
 import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index e05d5f6f3e..6e4d50e75f 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -898,7 +898,7 @@ public class HoodieTestDataGenerator implements AutoCloseable {
     return anchorTs + r.nextLong() % 259200000L;
   }
 
-  private static UUID genPseudoRandomUUID(Random r) {
+  public static UUID genPseudoRandomUUID(Random r) {
     byte[] bytes = new byte[16];
     r.nextBytes(bytes);
 
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
index 42038e61f6..4b351d1205 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
@@ -31,7 +31,7 @@ import org.apache.hudi.hadoop.realtime.RealtimeSplit;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
-import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
 
 public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index 80be97ebef..5d3e0bc3eb 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -18,6 +18,9 @@
 
 package org.apache.hudi;
 
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.client.HoodieReadClient;
 import org.apache.hudi.client.HoodieWriteResult;
 import org.apache.hudi.client.SparkRDDWriteClient;
@@ -41,10 +44,6 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.util.DataTypeUtils;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -293,7 +292,7 @@ public class DataSourceUtils {
       //    - {@code HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED} has not been explicitly
       //    set by the writer
       //
-      // If both of these conditions are true, than we override the default value of {@code
+      // If both of these conditions are true, then we override the default value of {@code
       // HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED} and set it to "true"
       LOG.warn("Small Decimal Type found in the persisted schema, reverting default value of 'hoodie.parquet.writelegacyformat.enabled' to true");
       properties.put(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key(), "true");
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java
deleted file mode 100644
index bc1172f387..0000000000
--- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi;
-
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.keygen.BuiltinKeyGenerator;
-import org.apache.hudi.keygen.ComplexKeyGenerator;
-import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
-import org.apache.hudi.keygen.SimpleKeyGenerator;
-import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
-import org.apache.hudi.table.BulkInsertPartitioner;
-
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.spark.sql.Column;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.api.java.UDF1;
-import org.apache.spark.sql.functions;
-import org.apache.spark.sql.types.DataTypes;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import scala.collection.JavaConverters;
-
-import static org.apache.spark.sql.functions.callUDF;
-
-/**
- * Helper class to assist in preparing {@link Dataset<Row>}s for bulk insert with datasource implementation.
- */
-public class HoodieDatasetBulkInsertHelper {
-
-  private static final Logger LOG = LogManager.getLogger(HoodieDatasetBulkInsertHelper.class);
-
-  private static final String RECORD_KEY_UDF_FN = "hudi_recordkey_gen_function_";
-  private static final String PARTITION_PATH_UDF_FN = "hudi_partition_gen_function_";
-
-  /**
-   * Prepares input hoodie spark dataset for bulk insert. It does the following steps.
-   * 1. Uses KeyGenerator to generate hoodie record keys and partition path.
-   * 2. Add hoodie columns to input spark dataset.
-   * 3. Reorders input dataset columns so that hoodie columns appear in the beginning.
-   * 4. Sorts input dataset by hoodie partition path and record key
-   *
-   * @param sqlContext SQL Context
-   * @param config     Hoodie Write Config
-   * @param rows       Spark Input dataset
-   * @return hoodie dataset which is ready for bulk insert.
-   */
-  public static Dataset<Row> prepareHoodieDatasetForBulkInsert(SQLContext sqlContext,
-                                                               HoodieWriteConfig config, Dataset<Row> rows, String structName, String recordNamespace,
-                                                               BulkInsertPartitioner<Dataset<Row>> bulkInsertPartitionerRows,
-                                                               boolean isGlobalIndex, boolean dropPartitionColumns) {
-    List<Column> originalFields =
-        Arrays.stream(rows.schema().fields()).map(f -> new Column(f.name())).collect(Collectors.toList());
-
-    TypedProperties properties = new TypedProperties();
-    properties.putAll(config.getProps());
-    String keyGeneratorClass = properties.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key());
-    String recordKeyFields = properties.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
-    String partitionPathFields = properties.containsKey(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
-        ? properties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()) : "";
-    BuiltinKeyGenerator keyGenerator = (BuiltinKeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, properties);
-
-    Dataset<Row> rowDatasetWithRecordKeysAndPartitionPath;
-    if (keyGeneratorClass.equals(NonpartitionedKeyGenerator.class.getName())) {
-      // for non partitioned, set partition path to empty.
-      rowDatasetWithRecordKeysAndPartitionPath = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, functions.col(recordKeyFields))
-          .withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, functions.lit("").cast(DataTypes.StringType));
-    } else if (keyGeneratorClass.equals(SimpleKeyGenerator.class.getName())
-        || (keyGeneratorClass.equals(ComplexKeyGenerator.class.getName()) && !recordKeyFields.contains(",") && !partitionPathFields.contains(",")
-        && (!partitionPathFields.contains("timestamp")))) { // incase of ComplexKeyGen, check partition path type.
-      // simple fields for both record key and partition path: can directly use withColumn
-      String partitionPathField = keyGeneratorClass.equals(SimpleKeyGenerator.class.getName()) ? partitionPathFields :
-          partitionPathFields.substring(partitionPathFields.indexOf(":") + 1);
-      rowDatasetWithRecordKeysAndPartitionPath = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, functions.col(recordKeyFields).cast(DataTypes.StringType))
-          .withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, functions.col(partitionPathField).cast(DataTypes.StringType));
-    } else {
-      // use udf
-      String tableName = properties.getString(HoodieWriteConfig.TBL_NAME.key());
-      String recordKeyUdfFn = RECORD_KEY_UDF_FN + tableName;
-      String partitionPathUdfFn = PARTITION_PATH_UDF_FN + tableName;
-      sqlContext.udf().register(recordKeyUdfFn, (UDF1<Row, String>) keyGenerator::getRecordKey, DataTypes.StringType);
-      sqlContext.udf().register(partitionPathUdfFn, (UDF1<Row, String>) keyGenerator::getPartitionPath, DataTypes.StringType);
-
-      final Dataset<Row> rowDatasetWithRecordKeys = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD,
-          callUDF(recordKeyUdfFn, org.apache.spark.sql.functions.struct(
-              JavaConverters.collectionAsScalaIterableConverter(originalFields).asScala().toSeq())));
-      rowDatasetWithRecordKeysAndPartitionPath =
-          rowDatasetWithRecordKeys.withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD,
-              callUDF(partitionPathUdfFn,
-                  org.apache.spark.sql.functions.struct(
-                      JavaConverters.collectionAsScalaIterableConverter(originalFields).asScala().toSeq())));
-    }
-
-    // Add other empty hoodie fields which will be populated before writing to parquet.
-    Dataset<Row> rowDatasetWithHoodieColumns =
-        rowDatasetWithRecordKeysAndPartitionPath.withColumn(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-                functions.lit("").cast(DataTypes.StringType))
-            .withColumn(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD,
-                functions.lit("").cast(DataTypes.StringType))
-            .withColumn(HoodieRecord.FILENAME_METADATA_FIELD,
-                functions.lit("").cast(DataTypes.StringType));
-
-    Dataset<Row> processedDf = rowDatasetWithHoodieColumns;
-    if (dropPartitionColumns) {
-      String partitionColumns = String.join(",", keyGenerator.getPartitionPathFields());
-      for (String partitionField : keyGenerator.getPartitionPathFields()) {
-        originalFields.remove(new Column(partitionField));
-      }
-      processedDf = rowDatasetWithHoodieColumns.drop(partitionColumns);
-    }
-    Dataset<Row> dedupedDf = processedDf;
-    if (config.shouldCombineBeforeInsert()) {
-      dedupedDf = SparkRowWriteHelper.newInstance().deduplicateRows(processedDf, config.getPreCombineField(), isGlobalIndex);
-    }
-
-    List<Column> orderedFields = Stream.concat(HoodieRecord.HOODIE_META_COLUMNS.stream().map(Column::new),
-        originalFields.stream()).collect(Collectors.toList());
-    Dataset<Row> colOrderedDataset = dedupedDf.select(
-        JavaConverters.collectionAsScalaIterableConverter(orderedFields).asScala().toSeq());
-
-    return bulkInsertPartitionerRows.repartitionRecords(colOrderedDataset, config.getBulkInsertShuffleParallelism());
-  }
-
-  /**
-   * Add empty meta fields and reorder such that meta fields are at the beginning.
-   *
-   * @param rows
-   * @return
-   */
-  public static Dataset<Row> prepareHoodieDatasetForBulkInsertWithoutMetaFields(Dataset<Row> rows) {
-    // add empty meta cols.
-    Dataset<Row> rowsWithMetaCols = rows
-        .withColumn(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-            functions.lit("").cast(DataTypes.StringType))
-        .withColumn(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD,
-            functions.lit("").cast(DataTypes.StringType))
-        .withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD,
-            functions.lit("").cast(DataTypes.StringType))
-        .withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD,
-            functions.lit("").cast(DataTypes.StringType))
-        .withColumn(HoodieRecord.FILENAME_METADATA_FIELD,
-            functions.lit("").cast(DataTypes.StringType));
-
-    List<Column> originalFields =
-        Arrays.stream(rowsWithMetaCols.schema().fields())
-            .filter(field -> !HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(field.name()))
-            .map(f -> new Column(f.name())).collect(Collectors.toList());
-
-    List<Column> metaFields =
-        Arrays.stream(rowsWithMetaCols.schema().fields())
-            .filter(field -> HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(field.name()))
-            .map(f -> new Column(f.name())).collect(Collectors.toList());
-
-    // reorder such that all meta columns are at the beginning followed by original columns
-    List<Column> allCols = new ArrayList<>();
-    allCols.addAll(metaFields);
-    allCols.addAll(originalFields);
-
-    return rowsWithMetaCols.select(
-        JavaConverters.collectionAsScalaIterableConverter(allCols).asScala().toSeq());
-  }
-
-}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/SparkRowWriteHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/SparkRowWriteHelper.java
deleted file mode 100644
index ea9c9b2c03..0000000000
--- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/SparkRowWriteHelper.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi;
-
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.spark.api.java.function.MapFunction;
-import org.apache.spark.api.java.function.ReduceFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
-import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
-import org.apache.spark.sql.catalyst.encoders.RowEncoder;
-import org.apache.spark.sql.catalyst.expressions.Attribute;
-import org.apache.spark.sql.types.StructType;
-import scala.Tuple2;
-import scala.collection.JavaConversions;
-import scala.collection.JavaConverters;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * Helper class to assist in deduplicating Rows for BulkInsert with Rows.
- */
-public class SparkRowWriteHelper {
-
-  private SparkRowWriteHelper() {
-  }
-
-  private static class WriteHelperHolder {
-    private static final SparkRowWriteHelper SPARK_WRITE_HELPER = new SparkRowWriteHelper();
-  }
-
-  public static SparkRowWriteHelper newInstance() {
-    return SparkRowWriteHelper.WriteHelperHolder.SPARK_WRITE_HELPER;
-  }
-
-  public Dataset<Row> deduplicateRows(Dataset<Row> inputDf, String preCombineField, boolean isGlobalIndex) {
-    return inputDf.groupByKey((MapFunction<Row, String>) value ->
-            isGlobalIndex
-                ? (value.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD))
-                : (value.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+" + value.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD)), Encoders.STRING())
-        .reduceGroups((ReduceFunction<Row>) (v1, v2) ->
-            ((Comparable) v1.getAs(preCombineField)).compareTo(v2.getAs(preCombineField)) >= 0 ? v1 : v2)
-        .map((MapFunction<Tuple2<String, Row>, Row>) value -> value._2, getEncoder(inputDf.schema()));
-  }
-
-  private ExpressionEncoder getEncoder(StructType schema) {
-    List<Attribute> attributes = JavaConversions.asJavaCollection(schema.toAttributes()).stream()
-        .map(Attribute::toAttribute).collect(Collectors.toList());
-    return RowEncoder.apply(schema)
-        .resolveAndBind(JavaConverters.asScalaBufferConverter(attributes).asScala().toSeq(),
-            SimpleAnalyzer$.MODULE$);
-  }
-}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java
index c9404afe61..516c6c5fc7 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java
@@ -27,18 +27,17 @@ import org.apache.hudi.common.util.PartitionPathEncodeUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.row.HoodieRowCreateHandle;
-import org.apache.hudi.io.storage.row.HoodieRowCreateHandleWithoutMetaFields;
 import org.apache.hudi.keygen.BuiltinKeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
 import org.apache.hudi.keygen.SimpleKeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.table.HoodieTable;
-
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -64,16 +63,20 @@ public class BulkInsertDataInternalWriterHelper {
   private final StructType structType;
   private final Boolean arePartitionRecordsSorted;
   private final List<HoodieInternalWriteStatus> writeStatusList = new ArrayList<>();
-  private HoodieRowCreateHandle handle;
+  private final String fileIdPrefix;
+  private final Map<String, HoodieRowCreateHandle> handles = new HashMap<>();
+  private final boolean populateMetaFields;
+  private final Option<BuiltinKeyGenerator> keyGeneratorOpt;
+  private final boolean simpleKeyGen;
+  private final int simplePartitionFieldIndex;
+  private final DataType simplePartitionFieldDataType;
+  /**
+   * NOTE: This is stored as Catalyst's internal {@link UTF8String} to avoid
+   *       conversion (deserialization) b/w {@link UTF8String} and {@link String}
+   */
   private String lastKnownPartitionPath = null;
-  private String fileIdPrefix;
+  private HoodieRowCreateHandle handle;
   private int numFilesWritten = 0;
-  private Map<String, HoodieRowCreateHandle> handles = new HashMap<>();
-  private final boolean populateMetaFields;
-  private Option<BuiltinKeyGenerator> keyGeneratorOpt = null;
-  private boolean simpleKeyGen = false;
-  private int simplePartitionFieldIndex = -1;
-  private DataType simplePartitionFieldDataType;
 
   public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteConfig writeConfig,
                                             String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType,
@@ -88,13 +91,21 @@ public class BulkInsertDataInternalWriterHelper {
     this.populateMetaFields = populateMetaFields;
     this.arePartitionRecordsSorted = arePartitionRecordsSorted;
     this.fileIdPrefix = UUID.randomUUID().toString();
+
     if (!populateMetaFields) {
       this.keyGeneratorOpt = getKeyGenerator(writeConfig.getProps());
-      if (keyGeneratorOpt.isPresent() && keyGeneratorOpt.get() instanceof SimpleKeyGenerator) {
-        simpleKeyGen = true;
-        simplePartitionFieldIndex = (Integer) structType.getFieldIndex((keyGeneratorOpt.get()).getPartitionPathFields().get(0)).get();
-        simplePartitionFieldDataType = structType.fields()[simplePartitionFieldIndex].dataType();
-      }
+    } else {
+      this.keyGeneratorOpt = Option.empty();
+    }
+
+    if (keyGeneratorOpt.isPresent() && keyGeneratorOpt.get() instanceof SimpleKeyGenerator) {
+      this.simpleKeyGen = true;
+      this.simplePartitionFieldIndex = (Integer) structType.getFieldIndex(keyGeneratorOpt.get().getPartitionPathFields().get(0)).get();
+      this.simplePartitionFieldDataType = structType.fields()[simplePartitionFieldIndex].dataType();
+    } else {
+      this.simpleKeyGen = false;
+      this.simplePartitionFieldIndex = -1;
+      this.simplePartitionFieldDataType = null;
     }
   }
 
@@ -120,32 +131,16 @@ public class BulkInsertDataInternalWriterHelper {
     }
   }
 
-  public void write(InternalRow record) throws IOException {
+  public void write(InternalRow row) throws IOException {
     try {
-      String partitionPath = null;
-      if (populateMetaFields) { // usual path where meta fields are pre populated in prep step.
-        partitionPath = String.valueOf(record.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_POS));
-      } else { // if meta columns are disabled.
-        if (!keyGeneratorOpt.isPresent()) { // NoPartitionerKeyGen
-          partitionPath = "";
-        } else if (simpleKeyGen) { // SimpleKeyGen
-          Object parititionPathValue = record.get(simplePartitionFieldIndex, simplePartitionFieldDataType);
-          partitionPath = parititionPathValue != null ? parititionPathValue.toString() : PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
-          if (writeConfig.isHiveStylePartitioningEnabled()) {
-            partitionPath = (keyGeneratorOpt.get()).getPartitionPathFields().get(0) + "=" + partitionPath;
-          }
-        } else {
-          // only BuiltIn key generators are supported if meta fields are disabled.
-          partitionPath = keyGeneratorOpt.get().getPartitionPath(record, structType);
-        }
-      }
-
-      if ((lastKnownPartitionPath == null) || !lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) {
+      String partitionPath = extractPartitionPath(row);
+      if (lastKnownPartitionPath == null || !lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) {
         LOG.info("Creating new file for partition path " + partitionPath);
         handle = getRowCreateHandle(partitionPath);
         lastKnownPartitionPath = partitionPath;
       }
-      handle.write(record);
+
+      handle.write(row);
     } catch (Throwable t) {
       LOG.error("Global error thrown while trying to write records in HoodieRowCreateHandle ", t);
       throw t;
@@ -157,7 +152,41 @@ public class BulkInsertDataInternalWriterHelper {
     return writeStatusList;
   }
 
-  public void abort() {
+  public void abort() {}
+
+  public void close() throws IOException {
+    for (HoodieRowCreateHandle rowCreateHandle : handles.values()) {
+      writeStatusList.add(rowCreateHandle.close());
+    }
+    handles.clear();
+    handle = null;
+  }
+
+  private String extractPartitionPath(InternalRow row) {
+    String partitionPath;
+    if (populateMetaFields) {
+      // In case meta-fields are materialized w/in the table itself, we can just simply extract
+      // partition path from there
+      //
+      // NOTE: Helper keeps track of [[lastKnownPartitionPath]] as [[UTF8String]] to avoid
+      //       conversion from Catalyst internal representation into a [[String]]
+      partitionPath = row.getString(HoodieRecord.PARTITION_PATH_META_FIELD_POS);
+    } else if (keyGeneratorOpt.isPresent()) {
+      // TODO(HUDI-4039) this should be handled by the SimpleKeyGenerator itself
+      if (simpleKeyGen) {
+        String partitionPathValue = row.get(simplePartitionFieldIndex, simplePartitionFieldDataType).toString();
+        partitionPath = partitionPathValue != null ? partitionPathValue : PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
+        if (writeConfig.isHiveStylePartitioningEnabled()) {
+          partitionPath = (keyGeneratorOpt.get()).getPartitionPathFields().get(0) + "=" + partitionPath;
+        }
+      } else {
+        // only BuiltIn key generators are supported if meta fields are disabled.
+        partitionPath = keyGeneratorOpt.get().getPartitionPath(row, structType);
+      }
+    } else {
+      partitionPath = "";
+    }
+    return partitionPath;
   }
 
   private HoodieRowCreateHandle getRowCreateHandle(String partitionPath) throws IOException {
@@ -166,28 +195,21 @@ public class BulkInsertDataInternalWriterHelper {
       if (arePartitionRecordsSorted) {
         close();
       }
-      HoodieRowCreateHandle rowCreateHandle = populateMetaFields ? new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
-          instantTime, taskPartitionId, taskId, taskEpochId, structType) : new HoodieRowCreateHandleWithoutMetaFields(hoodieTable, writeConfig, partitionPath, getNextFileId(),
-          instantTime, taskPartitionId, taskId, taskEpochId, structType);
+      HoodieRowCreateHandle rowCreateHandle = createHandle(partitionPath);
       handles.put(partitionPath, rowCreateHandle);
     } else if (!handles.get(partitionPath).canWrite()) {
       // even if there is a handle to the partition path, it could have reached its max size threshold. So, we close the handle here and
       // create a new one.
       writeStatusList.add(handles.remove(partitionPath).close());
-      HoodieRowCreateHandle rowCreateHandle = populateMetaFields ? new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
-          instantTime, taskPartitionId, taskId, taskEpochId, structType) : new HoodieRowCreateHandleWithoutMetaFields(hoodieTable, writeConfig, partitionPath, getNextFileId(),
-          instantTime, taskPartitionId, taskId, taskEpochId, structType);
+      HoodieRowCreateHandle rowCreateHandle = createHandle(partitionPath);
       handles.put(partitionPath, rowCreateHandle);
     }
     return handles.get(partitionPath);
   }
 
-  public void close() throws IOException {
-    for (HoodieRowCreateHandle rowCreateHandle : handles.values()) {
-      writeStatusList.add(rowCreateHandle.close());
-    }
-    handles.clear();
-    handle = null;
+  private HoodieRowCreateHandle createHandle(String partitionPath) {
+    return new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
+        instantTime, taskPartitionId, taskId, taskEpochId, structType, populateMetaFields);
   }
 
   private String getNextFileId() {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
new file mode 100644
index 0000000000..168cc6b265
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.client.model.HoodieInternalRow
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.util.ReflectionUtils
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.index.SparkHoodieIndexFactory
+import org.apache.hudi.keygen.BuiltinKeyGenerator
+import org.apache.hudi.table.BulkInsertPartitioner
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.HoodieUnsafeRDDUtils.createDataFrame
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeRDDUtils, Row}
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters.asScalaBufferConverter
+import scala.collection.mutable
+
+object HoodieDatasetBulkInsertHelper extends Logging {
+
+  /**
+   * Prepares [[DataFrame]] for bulk-insert into Hudi table, taking following steps:
+   *
+   * <ol>
+   *   <li>Invoking configured [[KeyGenerator]] to produce record key, alas partition-path value</li>
+   *   <li>Prepends Hudi meta-fields to every row in the dataset</li>
+   *   <li>Dedupes rows (if necessary)</li>
+   *   <li>Partitions dataset using provided [[partitioner]]</li>
+   * </ol>
+   */
+  def prepareForBulkInsert(df: DataFrame,
+                           config: HoodieWriteConfig,
+                           partitioner: BulkInsertPartitioner[Dataset[Row]],
+                           shouldDropPartitionColumns: Boolean): Dataset[Row] = {
+    val populateMetaFields = config.populateMetaFields()
+    val schema = df.schema
+
+    val keyGeneratorClassName = config.getStringOrThrow(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME,
+      "Key-generator class name is required")
+
+    val prependedRdd: RDD[InternalRow] =
+      df.queryExecution.toRdd.mapPartitions { iter =>
+        val keyGenerator =
+          ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps))
+            .asInstanceOf[BuiltinKeyGenerator]
+
+        iter.map { row =>
+          val (recordKey, partitionPath) =
+            if (populateMetaFields) {
+              (UTF8String.fromString(keyGenerator.getRecordKey(row, schema)),
+                UTF8String.fromString(keyGenerator.getPartitionPath(row, schema)))
+            } else {
+              (UTF8String.EMPTY_UTF8, UTF8String.EMPTY_UTF8)
+            }
+          val commitTimestamp = UTF8String.EMPTY_UTF8
+          val commitSeqNo = UTF8String.EMPTY_UTF8
+          val filename = UTF8String.EMPTY_UTF8
+
+          // TODO use mutable row, avoid re-allocating
+          new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey, partitionPath, filename, row, false)
+        }
+      }
+
+    val metaFields = Seq(
+      StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType),
+      StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType),
+      StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, StringType),
+      StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, StringType),
+      StructField(HoodieRecord.FILENAME_METADATA_FIELD, StringType))
+
+    val updatedSchema = StructType(metaFields ++ schema.fields)
+
+    val updatedDF = if (populateMetaFields && config.shouldCombineBeforeInsert) {
+      val dedupedRdd = dedupeRows(prependedRdd, updatedSchema, config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config))
+      HoodieUnsafeRDDUtils.createDataFrame(df.sparkSession, dedupedRdd, updatedSchema)
+    } else {
+      HoodieUnsafeRDDUtils.createDataFrame(df.sparkSession, prependedRdd, updatedSchema)
+    }
+
+    val trimmedDF = if (shouldDropPartitionColumns) {
+      dropPartitionColumns(updatedDF, config)
+    } else {
+      updatedDF
+    }
+
+    partitioner.repartitionRecords(trimmedDF, config.getBulkInsertShuffleParallelism)
+  }
+
+  private def dedupeRows(rdd: RDD[InternalRow], schema: StructType, preCombineFieldRef: String, isGlobalIndex: Boolean): RDD[InternalRow] = {
+    val recordKeyMetaFieldOrd = schema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)
+    val partitionPathMetaFieldOrd = schema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
+    // NOTE: Pre-combine field could be a nested field
+    val preCombineFieldPath = composeNestedFieldPath(schema, preCombineFieldRef)
+
+    rdd.map { row =>
+        val rowKey = if (isGlobalIndex) {
+          row.getString(recordKeyMetaFieldOrd)
+        } else {
+          val partitionPath = row.getString(partitionPathMetaFieldOrd)
+          val recordKey = row.getString(recordKeyMetaFieldOrd)
+          s"$partitionPath:$recordKey"
+        }
+        // NOTE: It's critical whenever we keep the reference to the row, to make a copy
+        //       since Spark might be providing us with a mutable copy (updated during the iteration)
+        (rowKey, row.copy())
+      }
+      .reduceByKey {
+        (oneRow, otherRow) =>
+          val onePreCombineVal = getNestedInternalRowValue(oneRow, preCombineFieldPath).asInstanceOf[Comparable[AnyRef]]
+          val otherPreCombineVal = getNestedInternalRowValue(otherRow, preCombineFieldPath).asInstanceOf[Comparable[AnyRef]]
+          if (onePreCombineVal.compareTo(otherPreCombineVal.asInstanceOf[AnyRef]) >= 0) {
+            oneRow
+          } else {
+            otherRow
+          }
+      }
+      .values
+  }
+
+  private def dropPartitionColumns(df: DataFrame, config: HoodieWriteConfig): DataFrame = {
+    val partitionPathFields = getPartitionPathFields(config).toSet
+    val nestedPartitionPathFields = partitionPathFields.filter(f => f.contains('.'))
+    if (nestedPartitionPathFields.nonEmpty) {
+      logWarning(s"Can not drop nested partition path fields: $nestedPartitionPathFields")
+    }
+
+    val partitionPathCols = (partitionPathFields -- nestedPartitionPathFields).toSeq
+
+    df.drop(partitionPathCols: _*)
+  }
+
+  private def getPartitionPathFields(config: HoodieWriteConfig): Seq[String] = {
+    val keyGeneratorClassName = config.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME)
+    val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
+    keyGenerator.getPartitionPathFields.asScala
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index a90e6b8e8e..13d87c048f 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -515,8 +515,8 @@ object HoodieSparkSqlWriter {
                       instantTime: String,
                       partitionColumns: String): (Boolean, common.util.Option[String]) = {
     val sparkContext = sqlContext.sparkContext
-    val populateMetaFields = java.lang.Boolean.parseBoolean((parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(),
-      String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))))
+    val populateMetaFields = java.lang.Boolean.parseBoolean(parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(),
+      String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())))
     val dropPartitionColumns = parameters.get(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key()).map(_.toBoolean)
       .getOrElse(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.defaultValue())
     // register classes & schemas
@@ -556,12 +556,9 @@ object HoodieSparkSqlWriter {
     } else {
       false
     }
-    val hoodieDF = if (populateMetaFields) {
-      HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace,
-        bulkInsertPartitionerRows, isGlobalIndex, dropPartitionColumns)
-    } else {
-      HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsertWithoutMetaFields(df)
-    }
+
+    val hoodieDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(df, writeConfig, bulkInsertPartitionerRows, dropPartitionColumns)
+
     if (HoodieSparkUtils.isSpark2) {
       hoodieDF.write.format("org.apache.hudi.internal")
         .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
index 6b617ca208..373d187ea6 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
@@ -17,6 +17,7 @@
 
 package org.apache.hudi.functional;
 
+import org.apache.avro.Schema;
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.HoodieDatasetBulkInsertHelper;
@@ -27,10 +28,9 @@ import org.apache.hudi.execution.bulkinsert.NonSortPartitionerWithRows;
 import org.apache.hudi.keygen.ComplexKeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
 import org.apache.hudi.keygen.SimpleKeyGenerator;
+import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.testutils.DataSourceTestUtils;
 import org.apache.hudi.testutils.HoodieClientTestBase;
-
-import org.apache.avro.Schema;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.api.java.function.ReduceFunction;
 import org.apache.spark.sql.Dataset;
@@ -46,6 +46,9 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -56,10 +59,6 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
-import scala.Tuple2;
-import scala.collection.JavaConversions;
-import scala.collection.JavaConverters;
-
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -117,36 +116,42 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase {
     testBulkInsertHelperFor(keyGenClass, "_row_key");
   }
 
-  private void testBulkInsertHelperFor(String keyGenClass, String recordKey) {
+  private void testBulkInsertHelperFor(String keyGenClass, String recordKeyField) {
     Map<String, String> props = null;
     if (keyGenClass.equals(SimpleKeyGenerator.class.getName())) {
-      props = getPropsAllSet(recordKey);
+      props = getPropsAllSet(recordKeyField);
     } else if (keyGenClass.equals(ComplexKeyGenerator.class.getName())) {
-      props = getPropsForComplexKeyGen(recordKey);
+      props = getPropsForComplexKeyGen(recordKeyField);
     } else { // NonPartitioned key gen
-      props = getPropsForNonPartitionedKeyGen(recordKey);
+      props = getPropsForNonPartitionedKeyGen(recordKeyField);
     }
     HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(props).combineInput(false, false).build();
     List<Row> rows = DataSourceTestUtils.generateRandomRows(10);
     Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
-    Dataset<Row> result = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName",
-        "testNamespace", new NonSortPartitionerWithRows(), false, false);
+    Dataset<Row> result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
+        new NonSortPartitionerWithRows(), false);
     StructType resultSchema = result.schema();
 
     assertEquals(result.count(), 10);
     assertEquals(resultSchema.fieldNames().length, structType.fieldNames().length + HoodieRecord.HOODIE_META_COLUMNS.size());
 
     for (Map.Entry<String, Integer> entry : HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.entrySet()) {
-      assertTrue(resultSchema.fieldIndex(entry.getKey()) == entry.getValue());
+      assertEquals(entry.getValue(), resultSchema.fieldIndex(entry.getKey()));
     }
 
-    boolean isNonPartitioned = keyGenClass.equals(NonpartitionedKeyGenerator.class.getName());
+    boolean isNonPartitionedKeyGen = keyGenClass.equals(NonpartitionedKeyGenerator.class.getName());
+    boolean isComplexKeyGen = keyGenClass.equals(ComplexKeyGenerator.class.getName());
+
     result.toJavaRDD().foreach(entry -> {
-      assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)).equals(entry.getAs(recordKey).toString()));
-      assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).equals(isNonPartitioned ? "" : entry.getAs("partition")));
-      assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)).equals(""));
-      assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).equals(""));
-      assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD)).equals(""));
+      String recordKey = isComplexKeyGen ? String.format("%s:%s", recordKeyField, entry.getAs(recordKeyField)) : entry.getAs(recordKeyField).toString();
+      assertEquals(recordKey, entry.get(resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)));
+
+      String partitionPath = isNonPartitionedKeyGen ? HoodieTableMetadata.EMPTY_PARTITION_NAME : entry.getAs("partition").toString();
+      assertEquals(partitionPath, entry.get(resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)));
+
+      assertEquals("", entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)));
+      assertEquals("", entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD)));
+      assertEquals("", entry.get(resultSchema.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD)));
     });
 
     Dataset<Row> trimmedOutput = result.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
@@ -157,8 +162,13 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase {
   @Test
   public void testBulkInsertHelperNoMetaFields() {
     List<Row> rows = DataSourceTestUtils.generateRandomRows(10);
+    HoodieWriteConfig config = getConfigBuilder(schemaStr)
+        .withProps(getPropsAllSet("_row_key"))
+        .withPopulateMetaFields(false)
+        .build();
     Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
-    Dataset<Row> result = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsertWithoutMetaFields(dataset);
+    Dataset<Row> result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
+        new NonSortPartitionerWithRows(), false);
     StructType resultSchema = result.schema();
 
     assertEquals(result.count(), 10);
@@ -194,8 +204,8 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase {
     rows.addAll(inserts);
     rows.addAll(updates);
     Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
-    Dataset<Row> result = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName",
-        "testNamespace", new NonSortPartitionerWithRows(), false, false);
+    Dataset<Row> result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
+        new NonSortPartitionerWithRows(), false);
     StructType resultSchema = result.schema();
 
     assertEquals(result.count(), enablePreCombine ? 10 : 15);
@@ -211,13 +221,15 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase {
     int metadataCommitSeqNoIndex = resultSchema.fieldIndex(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD);
     int metadataFilenameIndex = resultSchema.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD);
 
-    result.toJavaRDD().foreach(entry -> {
-      assertTrue(entry.get(metadataRecordKeyIndex).equals(entry.getAs("_row_key")));
-      assertTrue(entry.get(metadataPartitionPathIndex).equals(entry.getAs("partition")));
-      assertTrue(entry.get(metadataCommitSeqNoIndex).equals(""));
-      assertTrue(entry.get(metadataCommitTimeIndex).equals(""));
-      assertTrue(entry.get(metadataFilenameIndex).equals(""));
-    });
+    result.toJavaRDD()
+        .collect()
+        .forEach(entry -> {
+          assertTrue(entry.get(metadataRecordKeyIndex).equals(entry.getAs("_row_key")));
+          assertTrue(entry.get(metadataPartitionPathIndex).equals(entry.getAs("partition")));
+          assertTrue(entry.get(metadataCommitSeqNoIndex).equals(""));
+          assertTrue(entry.get(metadataCommitTimeIndex).equals(""));
+          assertTrue(entry.get(metadataFilenameIndex).equals(""));
+        });
 
     Dataset<Row> trimmedOutput = result.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
         .drop(HoodieRecord.FILENAME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
@@ -226,7 +238,7 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase {
     ExpressionEncoder encoder = getEncoder(dataset.schema());
     if (enablePreCombine) {
       Dataset<Row> inputSnapshotDf = dataset.groupByKey(
-          (MapFunction<Row, String>) value -> value.getAs("partition") + "+" + value.getAs("_row_key"), Encoders.STRING())
+          (MapFunction<Row, String>) value -> value.getAs("partition") + ":" + value.getAs("_row_key"), Encoders.STRING())
           .reduceGroups((ReduceFunction<Row>) (v1, v2) -> {
             long ts1 = v1.getAs("ts");
             long ts2 = v2.getAs("ts");
@@ -238,9 +250,9 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase {
           })
           .map((MapFunction<Tuple2<String, Row>, Row>) value -> value._2, encoder);
 
-      assertTrue(inputSnapshotDf.except(trimmedOutput).count() == 0);
+      assertEquals(0, inputSnapshotDf.except(trimmedOutput).count());
     } else {
-      assertTrue(dataset.except(trimmedOutput).count() == 0);
+      assertEquals(0, dataset.except(trimmedOutput).count());
     }
   }
 
@@ -277,7 +289,7 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase {
     Map<String, String> props = new HashMap<>();
     props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), ComplexKeyGenerator.class.getName());
     props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKey);
-    props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "simple:partition");
+    props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition");
     props.put(HoodieWriteConfig.TBL_NAME.key(), recordKey + "_table");
     return props;
   }
@@ -296,8 +308,9 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase {
     List<Row> rows = DataSourceTestUtils.generateRandomRows(10);
     Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
     try {
-      HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName",
-          "testNamespace", new NonSortPartitionerWithRows(), false, false);
+      Dataset<Row> preparedDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
+          new NonSortPartitionerWithRows(), false);
+      preparedDF.count();
       fail("Should have thrown exception");
     } catch (Exception e) {
       // ignore
@@ -307,8 +320,9 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase {
     rows = DataSourceTestUtils.generateRandomRows(10);
     dataset = sqlContext.createDataFrame(rows, structType);
     try {
-      HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName",
-          "testNamespace", new NonSortPartitionerWithRows(), false, false);
+      Dataset<Row> preparedDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
+          new NonSortPartitionerWithRows(), false);
+      preparedDF.count();
       fail("Should have thrown exception");
     } catch (Exception e) {
       // ignore
@@ -318,8 +332,9 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase {
     rows = DataSourceTestUtils.generateRandomRows(10);
     dataset = sqlContext.createDataFrame(rows, structType);
     try {
-      HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName",
-          "testNamespace", new NonSortPartitionerWithRows(), false, false);
+      Dataset<Row> preparedDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
+          new NonSortPartitionerWithRows(), false);
+      preparedDF.count();
       fail("Should have thrown exception");
     } catch (Exception e) {
       // ignore
@@ -329,8 +344,9 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase {
     rows = DataSourceTestUtils.generateRandomRows(10);
     dataset = sqlContext.createDataFrame(rows, structType);
     try {
-      HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName",
-          "testNamespace", new NonSortPartitionerWithRows(), false, false);
+      Dataset<Row> preparedDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
+          new NonSortPartitionerWithRows(), false);
+      preparedDF.count();
       fail("Should have thrown exception");
     } catch (Exception e) {
       // ignore
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
index ecc48b8aec..4a93245dc8 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.FileIOUtils;
 
 import org.apache.avro.Schema;
@@ -48,6 +49,8 @@ import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_T
  */
 public class DataSourceTestUtils {
 
+  private static final Random RANDOM = new Random(0xDAADDEED);
+
   public static Schema getStructTypeExampleSchema() throws IOException {
     return new Schema.Parser().parse(FileIOUtils.readAsUTFString(DataSourceTestUtils.class.getResourceAsStream("/exampleSchema.txt")));
   }
@@ -57,13 +60,12 @@ public class DataSourceTestUtils {
   }
 
   public static List<Row> generateRandomRows(int count) {
-    Random random = new Random();
     List<Row> toReturn = new ArrayList<>();
     List<String> partitions = Arrays.asList(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH});
     for (int i = 0; i < count; i++) {
       Object[] values = new Object[3];
-      values[0] = UUID.randomUUID().toString();
-      values[1] = partitions.get(random.nextInt(3));
+      values[0] = HoodieTestDataGenerator.genPseudoRandomUUID(RANDOM).toString();
+      values[1] = partitions.get(RANDOM.nextInt(3));
       values[2] = new Date().getTime();
       toReturn.add(RowFactory.create(values));
     }
@@ -97,13 +99,12 @@ public class DataSourceTestUtils {
   }
 
   public static List<Row> generateRandomRowsEvolvedSchema(int count) {
-    Random random = new Random();
     List<Row> toReturn = new ArrayList<>();
     List<String> partitions = Arrays.asList(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH});
     for (int i = 0; i < count; i++) {
       Object[] values = new Object[4];
       values[0] = UUID.randomUUID().toString();
-      values[1] = partitions.get(random.nextInt(3));
+      values[1] = partitions.get(RANDOM.nextInt(3));
       values[2] = new Date().getTime();
       values[3] = UUID.randomUUID().toString();
       toReturn.add(RowFactory.create(values));
@@ -112,14 +113,13 @@ public class DataSourceTestUtils {
   }
 
   public static List<Row> updateRowsWithHigherTs(Dataset<Row> inputDf) {
-    Random random = new Random();
     List<Row> input = inputDf.collectAsList();
     List<Row> rows = new ArrayList<>();
     for (Row row : input) {
       Object[] values = new Object[3];
       values[0] = row.getAs("_row_key");
       values[1] = row.getAs("partition");
-      values[2] = ((Long) row.getAs("ts")) + random.nextInt(1000);
+      values[2] = ((Long) row.getAs("ts")) + RANDOM.nextInt(1000);
       rows.add(RowFactory.create(values));
     }
     return rows;
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
index 7fc7d318d3..968b4039f8 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
@@ -256,6 +256,8 @@ class TestDataSourceDefaults {
       getKey(genericRecord).getRecordKey
     }
 
+    override def getRecordKey(row: InternalRow, schema: StructType): String = null
+
     override def getPartitionPath(row: Row): String = {
       if (null == converterFn) converterFn = AvroConversionUtils.createConverterToAvro(row.schema, STRUCT_NAME, NAMESPACE)
       val genericRecord = converterFn.apply(row).asInstanceOf[GenericRecord]
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala
index e86c540133..a3e4a8c830 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala
@@ -20,6 +20,7 @@ package org.apache.hudi
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.testutils.HoodieClientTestBase
 import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
 import java.sql.{Date, Timestamp}
@@ -113,6 +114,6 @@ class TestGenericRecordAndRowConsistency extends HoodieClientTestBase {
       .select("_hoodie_record_key")
       .map(_.toString()).collect().sorted
 
-    assert(data1 sameElements data2)
+    assertEquals(data1.toSeq, data2.toSeq)
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark3/pom.xml b/hudi-spark-datasource/hudi-spark3/pom.xml
index 921dd01411..b0f55c7718 100644
--- a/hudi-spark-datasource/hudi-spark3/pom.xml
+++ b/hudi-spark-datasource/hudi-spark3/pom.xml
@@ -228,24 +228,12 @@
       <groupId>org.apache.hudi</groupId>
       <artifactId>hudi-spark-client</artifactId>
       <version>${project.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.spark</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
 
     <dependency>
       <groupId>org.apache.hudi</groupId>
       <artifactId>hudi-spark-common_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.spark</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
 
     <dependency>
@@ -264,14 +252,10 @@
       <groupId>org.apache.hudi</groupId>
       <artifactId>hudi-spark3-common</artifactId>
       <version>${project.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.spark</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
+
     <!-- Hoodie - Test -->
+
     <dependency>
       <groupId>org.apache.hudi</groupId>
       <artifactId>hudi-client-common</artifactId>
@@ -288,12 +272,6 @@
       <classifier>tests</classifier>
       <type>test-jar</type>
       <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.spark</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
 
     <dependency>
@@ -312,18 +290,13 @@
       <classifier>tests</classifier>
       <type>test-jar</type>
       <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.spark</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
 
     <dependency>
       <groupId>org.junit.jupiter</groupId>
       <artifactId>junit-jupiter-api</artifactId>
       <scope>test</scope>
+
     </dependency>
 
     <dependency>
@@ -331,6 +304,29 @@
       <artifactId>junit-jupiter-params</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+      <!-- Need these exclusions to make sure JavaSparkContext can be setup. https://issues.apache.org/jira/browse/SPARK-1693 -->
+      <exclusions>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.servlet.jsp</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
   </dependencies>
 
 </project>