You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/09/09 03:09:07 UTC

[hudi] branch master updated: [HUDI-4465] Optimizing file-listing sequence of Metadata Table (#6016)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4af60dcfba [HUDI-4465] Optimizing file-listing sequence of Metadata Table (#6016)
4af60dcfba is described below

commit 4af60dcfbaeea5e79bc4b9457477e9ee0f9cdb79
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Thu Sep 8 20:09:00 2022 -0700

    [HUDI-4465] Optimizing file-listing sequence of Metadata Table (#6016)
    
    Optimizes file-listing sequence of the Metadata Table to make sure it's on par or better than FS-based file-listing
    
    Change log:
    
    - Cleaned up avoidable instantiations of Hadoop's Path
    - Replaced new Path w/ createUnsafePath where possible
    - Cached TimestampFormatter, DateFormatter for timezone
    - Avoid loading defaults in Hadoop conf when init-ing HFile reader
    - Avoid re-instantiating BaseTableMetadata twice w/in BaseHoodieTableFileIndex
    - Avoid looking up FileSystem for every partition when listing partitioned table, instead do it just once
---
 docker/demo/config/dfs-source.properties           |   4 +
 .../metadata/HoodieBackedTableMetadataWriter.java  |   6 +-
 .../apache/hudi/keygen/BuiltinKeyGenerator.java    |   1 -
 .../org/apache/hudi/keygen/SimpleKeyGenerator.java |   7 ++
 .../datasources/SparkParsePartitionUtil.scala      |  12 +-
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |   2 +-
 .../functional/TestConsistentBucketIndex.java      |   2 +-
 .../hudi/table/TestHoodieMergeOnReadTable.java     |  26 +++-
 .../TestHoodieSparkMergeOnReadTableCompaction.java |  19 ++-
 ...HoodieSparkMergeOnReadTableIncrementalRead.java |   2 +-
 .../TestHoodieSparkMergeOnReadTableRollback.java   |   9 +-
 .../hudi/testutils/HoodieClientTestHarness.java    |   6 +-
 .../SparkClientFunctionalTestHarness.java          |  12 +-
 .../org/apache/hudi/BaseHoodieTableFileIndex.java  |  87 +++++++++----
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  13 +-
 .../org/apache/hudi/common/model/BaseFile.java     |   9 ++
 .../hudi/common/table/HoodieTableMetaClient.java   |   2 +-
 .../table/log/block/HoodieHFileDataBlock.java      |  26 ++--
 .../table/view/AbstractTableFileSystemView.java    |  13 +-
 .../apache/hudi/common/util/CollectionUtils.java   |   9 ++
 .../java/org/apache/hudi/hadoop/CachingPath.java   |  59 ++++++++-
 .../org/apache/hudi/hadoop/SerializablePath.java   |   9 +-
 .../apache/hudi/io/storage/HoodieHFileUtils.java   |   4 +-
 .../apache/hudi/metadata/BaseTableMetadata.java    | 135 ++++++++++++---------
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  11 +-
 .../hudi/metadata/HoodieMetadataPayload.java       |  18 ++-
 .../apache/hudi/metadata/HoodieTableMetadata.java  |  21 +++-
 .../hudi/common/testutils/HoodieTestUtils.java     |  35 +++---
 .../hudi/hadoop/testutils/InputFormatTestUtil.java |   5 +-
 .../apache/hudi/SparkHoodieTableFileIndex.scala    |  28 +++--
 .../hudi/functional/TestTimeTravelQuery.scala      |   6 +-
 .../apache/spark/sql/adapter/Spark2Adapter.scala   |   2 +-
 .../datasources/Spark2ParsePartitionUtil.scala     |  14 +--
 .../apache/hudi/spark3/internal/ReflectUtil.java   |   4 +-
 .../spark/sql/adapter/BaseSpark3Adapter.scala      |   4 +-
 .../datasources/Spark3ParsePartitionUtil.scala     |  48 ++++----
 .../functional/TestHoodieDeltaStreamer.java        |  10 +-
 .../utilities/sources/TestHoodieIncrSource.java    |   4 +-
 38 files changed, 451 insertions(+), 233 deletions(-)

diff --git a/docker/demo/config/dfs-source.properties b/docker/demo/config/dfs-source.properties
index ac7080e141..a90629ef8e 100644
--- a/docker/demo/config/dfs-source.properties
+++ b/docker/demo/config/dfs-source.properties
@@ -19,6 +19,10 @@ include=base.properties
 # Key fields, for kafka example
 hoodie.datasource.write.recordkey.field=key
 hoodie.datasource.write.partitionpath.field=date
+# NOTE: We have to duplicate configuration since this is being used
+#       w/ both Spark and DeltaStreamer
+hoodie.table.recordkey.fields=key
+hoodie.table.partition.fields=date
 # Schema provider props (change to absolute path based on your installation)
 hoodie.deltastreamer.schemaprovider.source.schema.file=/var/demo/config/schema.avsc
 hoodie.deltastreamer.schemaprovider.target.schema.file=/var/demo/config/schema.avsc
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index c7cc50967a..962875fb92 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1006,21 +1006,21 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
     // finish off any pending compactions if any from previous attempt.
     writeClient.runAnyPendingCompactions();
 
-    String latestDeltacommitTime = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
+    String latestDeltaCommitTime = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
         .get().getTimestamp();
     List<HoodieInstant> pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
         .findInstantsBefore(instantTime).getInstants().collect(Collectors.toList());
 
     if (!pendingInstants.isEmpty()) {
       LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s",
-          pendingInstants.size(), latestDeltacommitTime, Arrays.toString(pendingInstants.toArray())));
+          pendingInstants.size(), latestDeltaCommitTime, Arrays.toString(pendingInstants.toArray())));
       return;
     }
 
     // Trigger compaction with suffixes based on the same instant time. This ensures that any future
     // delta commits synced over will not have an instant time lesser than the last completed instant on the
     // metadata table.
-    final String compactionInstantTime = latestDeltacommitTime + "001";
+    final String compactionInstantTime = latestDeltaCommitTime + "001";
     if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) {
       writeClient.compact(compactionInstantTime);
     }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
index d7561cc126..ad71b17ce7 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
@@ -77,7 +77,6 @@ public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements Sp
   protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER);
   protected static final UTF8String EMPTY_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(EMPTY_RECORDKEY_PLACEHOLDER);
 
-
   protected transient volatile SparkRowConverter rowConverter;
   protected transient volatile SparkRowAccessor rowAccessor;
 
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
index dcffdf3cdb..8c43e19baa 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
@@ -20,6 +20,7 @@ package org.apache.hudi.keygen;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -46,6 +47,12 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator {
 
   SimpleKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) {
     super(props);
+    // Make sure key-generator is configured properly
+    ValidationUtils.checkArgument(recordKeyField == null || !recordKeyField.isEmpty(),
+        "Record key field has to be non-empty!");
+    ValidationUtils.checkArgument(partitionPathField == null || !partitionPathField.isEmpty(),
+        "Partition path field has to be non-empty!");
+
     this.recordKeyFields = recordKeyField == null ? Collections.emptyList() : Collections.singletonList(recordKeyField);
     this.partitionPathFields = partitionPathField == null ? Collections.emptyList() : Collections.singletonList(partitionPathField);
     this.simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props, recordKeyField, partitionPathField);
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala
index 626b3c6ef0..2279e5a13f 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala
@@ -26,10 +26,10 @@ import org.apache.spark.sql.types.DataType
 
 trait SparkParsePartitionUtil extends Serializable {
 
-  def parsePartition(
-    path: Path,
-    typeInference: Boolean,
-    basePaths: Set[Path],
-    userSpecifiedDataTypes: Map[String, DataType],
-    timeZone: TimeZone): InternalRow
+  def parsePartition(path: Path,
+                     typeInference: Boolean,
+                     basePaths: Set[Path],
+                     userSpecifiedDataTypes: Map[String, DataType],
+                     timeZone: TimeZone,
+                     validatePartitionValues: Boolean = false): InternalRow
 }
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index ab7d0164ea..eaad4d471e 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -85,7 +85,7 @@ trait SparkAdapter extends Serializable {
   /**
    * Create the SparkParsePartitionUtil.
    */
-  def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil
+  def getSparkParsePartitionUtil: SparkParsePartitionUtil
 
   /**
    * ParserInterface#parseMultipartIdentifier is supported since spark3, for spark2 this should not be called.
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
index e0bc22f70d..0561730193 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
@@ -97,7 +97,7 @@ public class TestConsistentBucketIndex extends HoodieClientTestHarness {
       initTestDataGenerator(new String[] {""});
     }
     initFileSystem();
-    Properties props = populateMetaFields ? new Properties() : getPropertiesForKeyGen();
+    Properties props = getPropertiesForKeyGen(populateMetaFields);
     props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
     metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, props);
     config = getConfigBuilder()
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 0ce6ca0ee9..0b80d20b39 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -36,6 +36,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.Transformations;
+import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -61,7 +62,6 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.storage.StorageLevel;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -87,9 +87,8 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
   private HoodieTableMetaClient metaClient;
   private HoodieTestDataGenerator dataGen;
 
-  @BeforeEach
-  void setUp() throws IOException {
-    Properties properties = new Properties();
+  void setUp(Properties props) throws IOException {
+    Properties properties = CollectionUtils.copy(props);
     properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
     metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
     dataGen = new HoodieTestDataGenerator();
@@ -99,6 +98,9 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
   @Test
   public void testMetadataAggregateFromWriteStatus() throws Exception {
     HoodieWriteConfig cfg = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build();
+
+    setUp(cfg.getProps());
+
     try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
 
       String newCommitTime = "001";
@@ -125,6 +127,9 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
     HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true);
     addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
     HoodieWriteConfig cfg = cfgBuilder.build();
+
+    setUp(cfg.getProps());
+
     try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
 
       /**
@@ -213,6 +218,8 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
     addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
     HoodieWriteConfig config = cfgBuilder.build();
 
+    setUp(config.getProps());
+
     try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
@@ -302,6 +309,8 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
     HoodieWriteConfig cfg = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY)
         .withAutoCommit(false).build();
 
+    setUp(cfg.getProps());
+
     try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
       HoodieTable table = HoodieSparkTable.create(cfg, context(), metaClient);
 
@@ -381,6 +390,9 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
   @Test
   public void testRollingStatsWithSmallFileHandling() throws Exception {
     HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
+
+    setUp(cfg.getProps());
+
     try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
       Map<String, Long> fileIdToInsertsMap = new HashMap<>();
       Map<String, Long> fileIdToUpsertsMap = new HashMap<>();
@@ -497,6 +509,9 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
   @Test
   public void testHandleUpdateWithMultiplePartitions() throws Exception {
     HoodieWriteConfig cfg = getConfig(true);
+
+    setUp(cfg.getProps());
+
     try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
 
       /**
@@ -578,6 +593,9 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
     HoodieWriteConfig.Builder builder = getConfigBuilder(true);
     builder.withReleaseResourceEnabled(true);
     builder.withAutoCommit(false);
+
+    setUp(builder.build().getProps());
+
     /**
      * Write 1 (test when RELEASE_RESOURCE_ENABLE is true)
      */
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
index 3b30c5b767..f959a8f0d9 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
@@ -54,6 +54,7 @@ import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -98,8 +99,13 @@ public class TestHoodieSparkMergeOnReadTableCompaction extends SparkClientFuncti
         .withLayoutConfig(HoodieLayoutConfig.newBuilder()
             .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
             .withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
-        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).build();
-    metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps());
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
+        .build();
+
+    Properties props = getPropertiesForKeyGen(true);
+    props.putAll(config.getProps());
+
+    metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
     client = getHoodieWriteClient(config);
 
     // write data and commit
@@ -138,8 +144,13 @@ public class TestHoodieSparkMergeOnReadTableCompaction extends SparkClientFuncti
         .withLayoutConfig(HoodieLayoutConfig.newBuilder()
             .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
             .withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
-        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).build();
-    metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps());
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
+        .build();
+
+    Properties props = getPropertiesForKeyGen(true);
+    props.putAll(config.getProps());
+
+    metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
     client = getHoodieWriteClient(config);
 
     final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
index 5df7b4daec..275fd32ca7 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
@@ -82,7 +82,7 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead extends SparkClientF
   public void testIncrementalReadsWithCompaction() throws Exception {
     final String partitionPath = "2020/02/20"; // use only one partition for this test
     final HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(new String[] { partitionPath });
-    Properties props = new Properties();
+    Properties props = getPropertiesForKeyGen(true);
     props.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieFileFormat.PARQUET.toString());
     HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
     HoodieWriteConfig cfg = getConfigBuilder(true).build();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index 2f0e585ec9..0a11425ec5 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -41,6 +41,7 @@ import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCleanConfig;
@@ -155,7 +156,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
     addConfigsForPopulateMetaFields(cfgBuilder, true);
     HoodieWriteConfig cfg = cfgBuilder.build();
 
-    Properties properties = new Properties();
+    Properties properties = CollectionUtils.copy(cfg.getProps());
     properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
     HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
 
@@ -327,7 +328,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
     addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
     HoodieWriteConfig cfg = cfgBuilder.build();
 
-    Properties properties = populateMetaFields ? new Properties() : getPropertiesForKeyGen();
+    Properties properties = getPropertiesForKeyGen(populateMetaFields);
     properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
     HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
 
@@ -606,8 +607,10 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
         .withMarkersType(MarkerType.DIRECT.name());
     HoodieWriteConfig cfg = cfgBuilder.build();
 
-    Properties properties = new Properties();
+    Properties properties = getPropertiesForKeyGen(true);
+    properties.putAll(cfg.getProps());
     properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
+
     HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
 
     try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index 80d185f62b..468be5fb26 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -341,8 +341,12 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
   }
 
   protected Properties getPropertiesForKeyGen() {
+    return getPropertiesForKeyGen(false);
+  }
+
+  protected Properties getPropertiesForKeyGen(boolean populateMetaFields) {
     Properties properties = new Properties();
-    properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false");
+    properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields));
     properties.put("hoodie.datasource.write.recordkey.field", "_row_key");
     properties.put("hoodie.datasource.write.partitionpath.field", "partition_path");
     properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key");
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index c58dd178dc..ba1afbebb2 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -160,7 +160,7 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe
   }
 
   public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath) throws IOException {
-    return getHoodieMetaClient(hadoopConf, basePath, new Properties());
+    return getHoodieMetaClient(hadoopConf, basePath, getPropertiesForKeyGen(true));
   }
 
   @Override
@@ -310,8 +310,12 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe
   }
 
   protected Properties getPropertiesForKeyGen() {
+    return getPropertiesForKeyGen(false);
+  }
+
+  protected Properties getPropertiesForKeyGen(boolean populateMetaFields) {
     Properties properties = new Properties();
-    properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false");
+    properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields));
     properties.put("hoodie.datasource.write.recordkey.field", "_row_key");
     properties.put("hoodie.datasource.write.partitionpath.field", "partition_path");
     properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key");
@@ -321,9 +325,9 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe
   }
 
   protected void addConfigsForPopulateMetaFields(HoodieWriteConfig.Builder configBuilder, boolean populateMetaFields) {
+    configBuilder.withProperties(getPropertiesForKeyGen(populateMetaFields));
     if (!populateMetaFields) {
-      configBuilder.withProperties(getPropertiesForKeyGen())
-          .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build());
+      configBuilder.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build());
     }
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 0b2c34618e..5910de5f1d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -34,14 +36,14 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.hadoop.CachingPath;
+import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -53,6 +55,8 @@ import java.util.Objects;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.hadoop.CachingPath.createPathUnsafe;
+
 /**
  * Common (engine-agnostic) File Index implementation enabling individual query engines to
  * list Hudi Table contents based on the
@@ -63,13 +67,12 @@ import java.util.stream.Collectors;
  *   <li>Query instant/range</li>
  * </ul>
  */
-public abstract class   BaseHoodieTableFileIndex {
+public abstract class BaseHoodieTableFileIndex implements AutoCloseable {
 
   private static final Logger LOG = LogManager.getLogger(BaseHoodieTableFileIndex.class);
 
   private final String[] partitionColumns;
 
-  private final FileSystemViewStorageConfig fileSystemStorageConfig;
   protected final HoodieMetadataConfig metadataConfig;
 
   private final HoodieTableQueryType queryType;
@@ -80,7 +83,7 @@ public abstract class   BaseHoodieTableFileIndex {
   private final boolean shouldValidateInstant;
 
   private final HoodieTableType tableType;
-  protected final String basePath;
+  protected final Path basePath;
 
   private final HoodieTableMetaClient metaClient;
   private final HoodieEngineContext engineContext;
@@ -94,6 +97,8 @@ public abstract class   BaseHoodieTableFileIndex {
 
   private transient volatile HoodieTableFileSystemView fileSystemView = null;
 
+  private transient HoodieTableMetadata tableMetadata = null;
+
   /**
    * @param engineContext Hudi engine-specific context
    * @param metaClient Hudi table's meta-client
@@ -117,9 +122,6 @@ public abstract class   BaseHoodieTableFileIndex {
     this.partitionColumns = metaClient.getTableConfig().getPartitionFields()
         .orElse(new String[0]);
 
-    this.fileSystemStorageConfig = FileSystemViewStorageConfig.newBuilder()
-        .fromProperties(configProperties)
-        .build();
     this.metadataConfig = HoodieMetadataConfig.newBuilder()
         .fromProperties(configProperties)
         .build();
@@ -131,7 +133,7 @@ public abstract class   BaseHoodieTableFileIndex {
     this.shouldValidateInstant = shouldValidateInstant;
 
     this.tableType = metaClient.getTableType();
-    this.basePath = metaClient.getBasePath();
+    this.basePath = metaClient.getBasePathV2();
 
     this.metaClient = metaClient;
     this.engineContext = engineContext;
@@ -153,7 +155,7 @@ public abstract class   BaseHoodieTableFileIndex {
    * Returns table's base-path
    */
   public String getBasePath() {
-    return metaClient.getBasePath();
+    return basePath.toString();
   }
 
   /**
@@ -172,14 +174,19 @@ public abstract class   BaseHoodieTableFileIndex {
         .mapToInt(List::size).sum();
   }
 
+  @Override
+  public void close() throws Exception {
+    resetTableMetadata(null);
+  }
+
   protected List<PartitionPath> getAllQueryPartitionPaths() {
     List<String> queryRelativePartitionPaths = queryPaths.stream()
-        .map(path -> FSUtils.getRelativePartitionPath(new Path(basePath), path))
+        .map(path -> FSUtils.getRelativePartitionPath(basePath, path))
         .collect(Collectors.toList());
 
     // Load all the partition path from the basePath, and filter by the query partition path.
     // TODO load files from the queryRelativePartitionPaths directly.
-    List<String> matchedPartitionPaths = FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath)
+    List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
         .stream()
         .filter(path -> queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
         .collect(Collectors.toList());
@@ -244,12 +251,7 @@ public abstract class   BaseHoodieTableFileIndex {
           );
 
       fetchedPartitionToFiles =
-          FSUtils.getFilesInPartitions(
-                  engineContext,
-                  metadataConfig,
-                  basePath,
-                  fullPartitionPathsMapToFetch.keySet().toArray(new String[0]),
-                  fileSystemStorageConfig.getSpillableDir())
+          getAllFilesInPartitionsUnchecked(fullPartitionPathsMapToFetch.keySet())
               .entrySet()
               .stream()
               .collect(Collectors.toMap(e -> fullPartitionPathsMapToFetch.get(e.getKey()), e -> e.getValue()));
@@ -267,6 +269,11 @@ public abstract class   BaseHoodieTableFileIndex {
   private void doRefresh() {
     long startTime = System.currentTimeMillis();
 
+    HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, basePath.toString(),
+        FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
+
+    resetTableMetadata(newTableMetadata);
+
     Map<PartitionPath, FileStatus[]> partitionFiles = loadPartitionPathFiles();
     FileStatus[] allFiles = partitionFiles.values().stream().flatMap(Arrays::stream).toArray(FileStatus[]::new);
 
@@ -278,7 +285,7 @@ public abstract class   BaseHoodieTableFileIndex {
     // TODO we can optimize the flow by:
     //  - First fetch list of files from instants of interest
     //  - Load FileStatus's
-    fileSystemView = new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles);
+    this.fileSystemView = new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles);
 
     Option<String> queryInstant =
         specifiedQueryInstant.or(() -> latestInstant.map(HoodieInstant::getTimestamp));
@@ -324,6 +331,22 @@ public abstract class   BaseHoodieTableFileIndex {
     LOG.info(String.format("Refresh table %s, spent: %d ms", metaClient.getTableConfig().getTableName(), duration));
   }
 
+  private Map<String, FileStatus[]> getAllFilesInPartitionsUnchecked(Collection<String> fullPartitionPathsMapToFetch) {
+    try {
+      return tableMetadata.getAllFilesInPartitions(new ArrayList<>(fullPartitionPathsMapToFetch));
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to list partition paths for a table", e);
+    }
+  }
+
+  private List<String> getAllPartitionPathsUnchecked() {
+    try {
+      return isPartitionedTable() ? tableMetadata.getAllPartitionPaths() : Collections.singletonList("");
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to fetch partition paths for a table", e);
+    }
+  }
+
   private void validate(HoodieTimeline activeTimeline, Option<String> queryInstant) {
     if (shouldValidateInstant) {
       if (queryInstant.isPresent() && !activeTimeline.containsInstant(queryInstant.get())) {
@@ -340,7 +363,23 @@ public abstract class   BaseHoodieTableFileIndex {
     return fileSlice.getBaseFile().map(BaseFile::getFileLen).orElse(0L) + logFileSize;
   }
 
+  private void resetTableMetadata(HoodieTableMetadata newTableMetadata) {
+    if (tableMetadata != null) {
+      try {
+        tableMetadata.close();
+      } catch (Exception e) {
+        throw new HoodieException("Failed to close HoodieTableMetadata instance", e);
+      }
+    }
+    tableMetadata = newTableMetadata;
+  }
+
+  private boolean isPartitionedTable() {
+    return partitionColumns.length > 0 || HoodieTableMetadata.isMetadataTable(basePath.toString());
+  }
+
   public static final class PartitionPath {
+
     final String path;
     final Object[] values;
 
@@ -353,12 +392,14 @@ public abstract class   BaseHoodieTableFileIndex {
       return path;
     }
 
-    Path fullPartitionPath(String basePath) {
+    Path fullPartitionPath(Path basePath) {
       if (!path.isEmpty()) {
-        return new CachingPath(basePath, path);
+        // NOTE: Since we now that the path is a proper relative path that doesn't require
+        //       normalization we create Hadoop's Path using more performant unsafe variant
+        return new CachingPath(basePath, createPathUnsafe(path));
       }
 
-      return new CachingPath(basePath);
+      return basePath;
     }
 
     @Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index d940f3bb45..fe697197f2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.InvalidHoodiePathException;
+import org.apache.hudi.hadoop.CachingPath;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 
 import org.apache.hadoop.conf.Configuration;
@@ -68,6 +69,8 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.hadoop.CachingPath.getPathWithoutSchemeAndAuthority;
+
 /**
  * Utility functions related to accessing the file storage.
  */
@@ -216,8 +219,8 @@ public class FSUtils {
    * Given a base partition and a partition path, return relative path of partition path to the base path.
    */
   public static String getRelativePartitionPath(Path basePath, Path fullPartitionPath) {
-    basePath = Path.getPathWithoutSchemeAndAuthority(basePath);
-    fullPartitionPath = Path.getPathWithoutSchemeAndAuthority(fullPartitionPath);
+    basePath = getPathWithoutSchemeAndAuthority(basePath);
+    fullPartitionPath = getPathWithoutSchemeAndAuthority(fullPartitionPath);
 
     String fullPartitionPathStr = fullPartitionPath.toString();
 
@@ -607,12 +610,12 @@ public class FSUtils {
     String properPartitionPath = partitionPath.startsWith("/")
         ? partitionPath.substring(1)
         : partitionPath;
-    return getPartitionPath(new Path(basePath), properPartitionPath);
+    return getPartitionPath(new CachingPath(basePath), properPartitionPath);
   }
 
   public static Path getPartitionPath(Path basePath, String partitionPath) {
-    // FOr non-partitioned table, return only base-path
-    return StringUtils.isNullOrEmpty(partitionPath) ? basePath : new Path(basePath, partitionPath);
+    // For non-partitioned table, return only base-path
+    return StringUtils.isNullOrEmpty(partitionPath) ? basePath : new CachingPath(basePath, partitionPath);
   }
 
   /**
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java
index cd35861b74..fe9837e6c6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.model;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.hadoop.CachingPath;
 
 import java.io.Serializable;
 import java.util.Objects;
@@ -66,6 +67,14 @@ public class BaseFile implements Serializable {
     return fullPath;
   }
 
+  public Path getHadoopPath() {
+    if (fileStatus != null) {
+      return fileStatus.getPath();
+    }
+
+    return new CachingPath(fullPath);
+  }
+
   public String getFileName() {
     return fileName;
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 7ed70fe0f1..caab37162c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -399,7 +399,7 @@ public class HoodieTableMetaClient implements Serializable {
   public void validateTableProperties(Properties properties) {
     // Once meta fields are disabled, it cant be re-enabled for a given table.
     if (!getTableConfig().populateMetaFields()
-        && Boolean.parseBoolean((String) properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))) {
+        && Boolean.parseBoolean((String) properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue().toString()))) {
       throw new HoodieException(HoodieTableConfig.POPULATE_META_FIELDS.key() + " already disabled for the table. Can't be re-enabled back");
     }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 72cb3a0ef3..d923c59270 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -18,19 +18,6 @@
 
 package org.apache.hudi.common.table.log.block;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.fs.inline.InLineFSUtils;
-import org.apache.hudi.common.fs.inline.InLineFileSystem;
-import org.apache.hudi.common.util.ClosableIterator;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
-import org.apache.hudi.io.storage.HoodieHFileReader;
-
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
@@ -43,6 +30,16 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.fs.inline.InLineFSUtils;
+import org.apache.hudi.common.fs.inline.InLineFileSystem;
+import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
+import org.apache.hudi.io.storage.HoodieHFileReader;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -167,9 +164,8 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
     // Get schema from the header
     Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
 
-    FileSystem fs = FSUtils.getFs(pathForReader.toString(), new Configuration());
     // Read the content
-    HoodieHFileReader<IndexedRecord> reader = new HoodieHFileReader<>(fs, pathForReader, content, Option.of(writerSchema));
+    HoodieHFileReader<IndexedRecord> reader = new HoodieHFileReader<>(null, pathForReader, content, Option.of(writerSchema));
     Iterator<IndexedRecord> recordIterator = reader.getRecordIterator(readerSchema);
     return new ClosableIterator<IndexedRecord>() {
       @Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index dc6fc47b58..5818636cae 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -95,8 +95,8 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
 
   private BootstrapIndex bootstrapIndex;
 
-  private String getPartitionPathFromFilePath(String fullPath) {
-    return FSUtils.getRelativePartitionPath(metaClient.getBasePathV2(), new Path(fullPath).getParent());
+  private String getPartitionPathFor(HoodieBaseFile baseFile) {
+    return FSUtils.getRelativePartitionPath(metaClient.getBasePathV2(), baseFile.getHadoopPath().getParent());
   }
 
   /**
@@ -166,8 +166,8 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
   protected List<HoodieFileGroup> buildFileGroups(Stream<HoodieBaseFile> baseFileStream,
       Stream<HoodieLogFile> logFileStream, HoodieTimeline timeline, boolean addPendingCompactionFileSlice) {
     Map<Pair<String, String>, List<HoodieBaseFile>> baseFiles =
-        baseFileStream.collect(Collectors.groupingBy((baseFile) -> {
-          String partitionPathStr = getPartitionPathFromFilePath(baseFile.getPath());
+        baseFileStream.collect(Collectors.groupingBy(baseFile -> {
+          String partitionPathStr = getPartitionPathFor(baseFile);
           return Pair.of(partitionPathStr, baseFile.getFileId());
         }));
 
@@ -183,7 +183,8 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
     List<HoodieFileGroup> fileGroups = new ArrayList<>();
     fileIdSet.forEach(pair -> {
       String fileId = pair.getValue();
-      HoodieFileGroup group = new HoodieFileGroup(pair.getKey(), fileId, timeline);
+      String partitionPath = pair.getKey();
+      HoodieFileGroup group = new HoodieFileGroup(partitionPath, fileId, timeline);
       if (baseFiles.containsKey(pair)) {
         baseFiles.get(pair).forEach(group::addBaseFile);
       }
@@ -373,7 +374,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
    * @param baseFile base File
    */
   protected boolean isBaseFileDueToPendingCompaction(HoodieBaseFile baseFile) {
-    final String partitionPath = getPartitionPathFromFilePath(baseFile.getPath());
+    final String partitionPath = getPartitionPathFor(baseFile);
 
     Option<Pair<String, CompactionOperation>> compactionWithInstantTime =
         getPendingCompactionOperationWithInstant(new HoodieFileGroupId(partitionPath, baseFile.getFileId()));
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
index 8036995fab..194d67cd0e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
@@ -54,6 +54,15 @@ public class CollectionUtils {
     return !isNullOrEmpty(c);
   }
 
+  /**
+   * Makes a copy of provided {@link Properties} object
+   */
+  public static Properties copy(Properties props) {
+    Properties copy = new Properties();
+    copy.putAll(props);
+    return copy;
+  }
+
   /**
    * Returns last element of the array of {@code T}
    */
diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java
index 01b3eb9d40..d6e35dbbdc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java
+++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java
@@ -19,10 +19,11 @@
 package org.apache.hudi.hadoop;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.exception.HoodieException;
 
 import javax.annotation.concurrent.ThreadSafe;
-import java.io.Serializable;
 import java.net.URI;
+import java.net.URISyntaxException;
 
 /**
  * This is an extension of the {@code Path} class allowing to avoid repetitive
@@ -32,11 +33,12 @@ import java.net.URI;
  * NOTE: This class is thread-safe
  */
 @ThreadSafe
-public class CachingPath extends Path implements Serializable {
+public class CachingPath extends Path {
 
   // NOTE: `volatile` keyword is redundant here and put mostly for reader notice, since all
   //       reads/writes to references are always atomic (including 64-bit JVMs)
   //       https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.7
+  private volatile Path parent;
   private volatile String fileName;
   private volatile String fullPathStr;
 
@@ -74,6 +76,17 @@ public class CachingPath extends Path implements Serializable {
     return fileName;
   }
 
+  @Override
+  public Path getParent() {
+    // This value could be overwritten concurrently and that's okay, since
+    // {@code Path} is immutable
+    if (parent == null) {
+      parent = super.getParent();
+    }
+
+    return parent;
+  }
+
   @Override
   public String toString() {
     // This value could be overwritten concurrently and that's okay, since
@@ -83,4 +96,46 @@ public class CachingPath extends Path implements Serializable {
     }
     return fullPathStr;
   }
+
+  public CachingPath subPath(String relativePath) {
+    return new CachingPath(this, createPathUnsafe(relativePath));
+  }
+
+  public static CachingPath wrap(Path path) {
+    if (path instanceof CachingPath) {
+      return (CachingPath) path;
+    }
+
+    return new CachingPath(path.toUri());
+  }
+
+  /**
+   * Creates path based on the provided *relative* path
+   *
+   * NOTE: This is an unsafe version that is relying on the fact that the caller is aware
+   *       what they are doing this is not going to work with paths having scheme (which require
+   *       parsing) and is only meant to work w/ relative paths in a few specific cases.
+   */
+  public static CachingPath createPathUnsafe(String relativePath) {
+    try {
+      // NOTE: {@code normalize} is going to be invoked by {@code Path} ctor, so there's no
+      //       point in invoking it here
+      URI uri = new URI(null, null, relativePath, null, null);
+      return new CachingPath(uri);
+    } catch (URISyntaxException e) {
+      throw new HoodieException("Failed to instantiate relative path", e);
+    }
+  }
+
+  /**
+   * This is {@link Path#getPathWithoutSchemeAndAuthority(Path)} counterpart, instantiating
+   * {@link CachingPath}
+   */
+  public static Path getPathWithoutSchemeAndAuthority(Path path) {
+    // This code depends on Path.toString() to remove the leading slash before
+    // the drive specification on Windows.
+    return path.isUriPathAbsolute()
+        ? createPathUnsafe(path.toUri().getPath())
+        : path;
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java
index 5ad2307ef8..796600a7e8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java
+++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.net.URI;
 import java.util.Objects;
 
 /**
@@ -42,12 +43,12 @@ public class SerializablePath implements Serializable {
   }
 
   private void writeObject(ObjectOutputStream out) throws IOException {
-    out.writeUTF(path.toString());
+    out.writeObject(path.toUri());
   }
 
-  private void readObject(ObjectInputStream in) throws IOException {
-    String pathStr = in.readUTF();
-    path = new CachingPath(pathStr);
+  private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+    URI uri = (URI) in.readObject();
+    path = new CachingPath(uri);
   }
 
   @Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java
index 3767ea1832..878a3c563b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java
@@ -67,7 +67,9 @@ public class HoodieHFileUtils {
    */
   public static HFile.Reader createHFileReader(
       FileSystem fs, Path dummyPath, byte[] content) throws IOException {
-    Configuration conf = new Configuration();
+    // Avoid loading default configs, from the FS, since this configuration is mostly
+    // used as a stub to initialize HFile reader
+    Configuration conf = new Configuration(false);
     HoodieHFileReader.SeekableByteArrayInputStream bis = new HoodieHFileReader.SeekableByteArrayInputStream(content);
     FSDataInputStream fsdis = new FSDataInputStream(bis);
     FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fsdis);
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 2036500ac6..37a209b0a8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.metadata;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hudi.avro.model.HoodieMetadataBloomFilter;
 import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
 import org.apache.hudi.common.bloom.BloomFilter;
@@ -39,10 +40,13 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.hash.ColumnIndexID;
 import org.apache.hudi.common.util.hash.FileIndexID;
 import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetadataException;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.hadoop.CachingPath;
+import org.apache.hudi.hadoop.SerializablePath;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -55,8 +59,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public abstract class BaseTableMetadata implements HoodieTableMetadata {
@@ -68,7 +74,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
 
   protected final transient HoodieEngineContext engineContext;
   protected final SerializableConfiguration hadoopConf;
-  protected final String dataBasePath;
+  protected final SerializablePath dataBasePath;
   protected final HoodieTableMetaClient dataMetaClient;
   protected final Option<HoodieMetadataMetrics> metrics;
   protected final HoodieMetadataConfig metadataConfig;
@@ -83,7 +89,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
                               String dataBasePath, String spillableMapDirectory) {
     this.engineContext = engineContext;
     this.hadoopConf = new SerializableConfiguration(engineContext.getHadoopConf());
-    this.dataBasePath = dataBasePath;
+    this.dataBasePath = new SerializablePath(new CachingPath(dataBasePath));
     this.dataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(dataBasePath).build();
     this.spillableMapDirectory = spillableMapDirectory;
     this.metadataConfig = metadataConfig;
@@ -113,7 +119,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
         throw new HoodieMetadataException("Failed to retrieve list of partition from metadata", e);
       }
     }
-    return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath,
+    return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath.toString(),
         metadataConfig.shouldAssumeDatePartitioning()).getAllPartitionPaths();
   }
 
@@ -138,13 +144,17 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
       }
     }
 
-    return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath, metadataConfig.shouldAssumeDatePartitioning())
+    return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath.toString(), metadataConfig.shouldAssumeDatePartitioning())
         .getAllFilesInPartition(partitionPath);
   }
 
   @Override
   public Map<String, FileStatus[]> getAllFilesInPartitions(List<String> partitions)
       throws IOException {
+    if (partitions.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
     if (isMetadataTableEnabled) {
       try {
         List<Path> partitionPaths = partitions.stream().map(Path::new).collect(Collectors.toList());
@@ -154,7 +164,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
       }
     }
 
-    return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath, metadataConfig.shouldAssumeDatePartitioning())
+    return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath.toString(), metadataConfig.shouldAssumeDatePartitioning())
         .getAllFilesInPartitions(partitions);
   }
 
@@ -278,20 +288,23 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
    */
   protected List<String> fetchAllPartitionPaths() {
     HoodieTimer timer = new HoodieTimer().startTimer();
-    Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getRecordByKey(RECORDKEY_PARTITION_LIST,
+    Option<HoodieRecord<HoodieMetadataPayload>> recordOpt = getRecordByKey(RECORDKEY_PARTITION_LIST,
         MetadataPartitionType.FILES.getPartitionPath());
     metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer()));
 
-    List<String> partitions = Collections.emptyList();
-    if (hoodieRecord.isPresent()) {
-      handleSpuriousDeletes(hoodieRecord, "\"all partitions\"");
-      partitions = hoodieRecord.get().getData().getFilenames();
-      // Partition-less tables have a single empty partition
-      if (partitions.contains(NON_PARTITIONED_NAME)) {
-        partitions.remove(NON_PARTITIONED_NAME);
-        partitions.add("");
+    List<String> partitions = recordOpt.map(record -> {
+      HoodieMetadataPayload metadataPayload = record.getData();
+      checkForSpuriousDeletes(metadataPayload, "\"all partitions\"");
+
+      List<String> relativePaths = metadataPayload.getFilenames();
+      // Non-partitioned tables have a single empty partition
+      if (relativePaths.size() == 1 && relativePaths.get(0).equals(NON_PARTITIONED_NAME)) {
+        return Collections.singletonList("");
+      } else {
+        return relativePaths;
       }
-    }
+    })
+        .orElse(Collections.emptyList());
 
     LOG.info("Listed partitions from metadata: #partitions=" + partitions.size());
     return partitions;
@@ -303,75 +316,81 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
    * @param partitionPath The absolute path of the partition
    */
   FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException {
-    String partitionName = FSUtils.getRelativePartitionPath(new Path(dataBasePath), partitionPath);
-    if (partitionName.isEmpty()) {
-      partitionName = NON_PARTITIONED_NAME;
-    }
+    String relativePartitionPath = FSUtils.getRelativePartitionPath(dataBasePath.get(), partitionPath);
+    String recordKey = relativePartitionPath.isEmpty() ? NON_PARTITIONED_NAME : relativePartitionPath;
 
     HoodieTimer timer = new HoodieTimer().startTimer();
-    Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getRecordByKey(partitionName,
+    Option<HoodieRecord<HoodieMetadataPayload>> recordOpt = getRecordByKey(recordKey,
         MetadataPartitionType.FILES.getPartitionPath());
     metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
 
-    FileStatus[] statuses = {};
-    if (hoodieRecord.isPresent()) {
-      handleSpuriousDeletes(hoodieRecord, partitionName);
-      statuses = hoodieRecord.get().getData().getFileStatuses(hadoopConf.get(), partitionPath);
-    }
+    FileStatus[] statuses = recordOpt.map(record -> {
+      HoodieMetadataPayload metadataPayload = record.getData();
+      checkForSpuriousDeletes(metadataPayload, recordKey);
+      try {
+        return metadataPayload.getFileStatuses(hadoopConf.get(), partitionPath);
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to extract file-statuses from the payload", e);
+      }
+    })
+        .orElse(new FileStatus[0]);
 
-    LOG.info("Listed file in partition from metadata: partition=" + partitionName + ", #files=" + statuses.length);
+    LOG.info("Listed file in partition from metadata: partition=" + relativePartitionPath + ", #files=" + statuses.length);
     return statuses;
   }
 
   Map<String, FileStatus[]> fetchAllFilesInPartitionPaths(List<Path> partitionPaths) throws IOException {
-    Map<String, Path> partitionInfo = new HashMap<>();
-    boolean foundNonPartitionedPath = false;
-    for (Path partitionPath: partitionPaths) {
-      String partitionName = FSUtils.getRelativePartitionPath(new Path(dataBasePath), partitionPath);
-      if (partitionName.isEmpty()) {
-        if (partitionInfo.size() > 1) {
-          throw new HoodieMetadataException("Found mix of partitioned and non partitioned paths while fetching data from metadata table");
-        }
-        partitionInfo.put(NON_PARTITIONED_NAME, partitionPath);
-        foundNonPartitionedPath = true;
-      } else {
-        if (foundNonPartitionedPath) {
-          throw new HoodieMetadataException("Found mix of partitioned and non partitioned paths while fetching data from metadata table");
-        }
-        partitionInfo.put(partitionName, partitionPath);
-      }
-    }
+    Map<String, Path> partitionIdToPathMap =
+        partitionPaths.parallelStream()
+            .collect(
+                Collectors.toMap(partitionPath -> {
+                  String partitionId = FSUtils.getRelativePartitionPath(dataBasePath.get(), partitionPath);
+                  return partitionId.isEmpty() ? NON_PARTITIONED_NAME : partitionId;
+                }, Function.identity())
+            );
 
     HoodieTimer timer = new HoodieTimer().startTimer();
-    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> partitionsFileStatus =
-        getRecordsByKeys(new ArrayList<>(partitionInfo.keySet()), MetadataPartitionType.FILES.getPartitionPath());
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> partitionIdRecordPairs =
+        getRecordsByKeys(new ArrayList<>(partitionIdToPathMap.keySet()), MetadataPartitionType.FILES.getPartitionPath());
     metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
-    Map<String, FileStatus[]> result = new HashMap<>();
 
-    for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry: partitionsFileStatus) {
-      if (entry.getValue().isPresent()) {
-        handleSpuriousDeletes(entry.getValue(), entry.getKey());
-        result.put(partitionInfo.get(entry.getKey()).toString(), entry.getValue().get().getData().getFileStatuses(hadoopConf.get(), partitionInfo.get(entry.getKey())));
-      }
-    }
+    FileSystem fs = partitionPaths.get(0).getFileSystem(hadoopConf.get());
+
+    Map<String, FileStatus[]> partitionPathToFilesMap = partitionIdRecordPairs.parallelStream()
+        .map(pair -> {
+          String partitionId = pair.getKey();
+          Option<HoodieRecord<HoodieMetadataPayload>> recordOpt = pair.getValue();
+
+          Path partitionPath = partitionIdToPathMap.get(partitionId);
+
+          return recordOpt.map(record -> {
+            HoodieMetadataPayload metadataPayload = record.getData();
+            checkForSpuriousDeletes(metadataPayload, partitionId);
+
+            FileStatus[] files = metadataPayload.getFileStatuses(fs, partitionPath);
+            return Pair.of(partitionPath.toString(), files);
+          })
+              .orElse(null);
+        })
+        .filter(Objects::nonNull)
+        .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
 
     LOG.info("Listed files in partitions from metadata: partition list =" + Arrays.toString(partitionPaths.toArray()));
-    return result;
+
+    return partitionPathToFilesMap;
   }
 
   /**
    * Handle spurious deletes. Depending on config, throw an exception or log a warn msg.
-   * @param hoodieRecord instance of {@link HoodieRecord} of interest.
-   * @param partitionName partition name of interest.
    */
-  private void handleSpuriousDeletes(Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord, String partitionName) {
-    if (!hoodieRecord.get().getData().getDeletions().isEmpty()) {
+  private void checkForSpuriousDeletes(HoodieMetadataPayload metadataPayload, String partitionName) {
+    if (!metadataPayload.getDeletions().isEmpty()) {
       if (metadataConfig.ignoreSpuriousDeletes()) {
         LOG.warn("Metadata record for " + partitionName + " encountered some files to be deleted which was not added before. "
             + "Ignoring the spurious deletes as the `" + HoodieMetadataConfig.IGNORE_SPURIOUS_DELETES.key() + "` config is set to true");
       } else {
         throw new HoodieMetadataException("Metadata record for " + partitionName + " is inconsistent: "
-            + hoodieRecord.get().getData());
+            + metadataPayload);
       }
     }
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index f8a0389da3..d4865875b1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -111,7 +111,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
   }
 
   private void initIfNeeded() {
-    this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(dataBasePath);
+    this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(dataBasePath.toString());
     if (!isMetadataTableEnabled) {
       if (!HoodieTableMetadata.isMetadataTable(metadataBasePath)) {
         LOG.info("Metadata table is disabled.");
@@ -303,8 +303,6 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
       }
     }
 
-    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();
-
     HoodieTimer readTimer = new HoodieTimer();
     readTimer.startTimer();
 
@@ -413,7 +411,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
       // Open the log record scanner using the log files from the latest file slice
       List<HoodieLogFile> logFiles = slice.getLogFiles().collect(Collectors.toList());
       Pair<HoodieMetadataMergedLogRecordReader, Long> logRecordScannerOpenTimePair =
-          getLogRecordScanner(logFiles, partitionName);
+          getLogRecordScanner(logFiles, partitionName, Option.empty());
       HoodieMetadataMergedLogRecordReader logRecordScanner = logRecordScannerOpenTimePair.getKey();
       final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
 
@@ -465,11 +463,6 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     return validInstantTimestamps;
   }
 
-  public Pair<HoodieMetadataMergedLogRecordReader, Long> getLogRecordScanner(List<HoodieLogFile> logFiles,
-                                                                             String partitionName) {
-    return getLogRecordScanner(logFiles, partitionName, Option.empty());
-  }
-
   public Pair<HoodieMetadataMergedLogRecordReader, Long> getLogRecordScanner(List<HoodieLogFile> logFiles,
                                                                              String partitionName,
                                                                              Option<Boolean> allowFullScanOverride) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index df138cd124..0575177800 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -53,6 +53,7 @@ import org.apache.hudi.common.util.hash.ColumnIndexID;
 import org.apache.hudi.common.util.hash.FileIndexID;
 import org.apache.hudi.common.util.hash.PartitionIndexID;
 import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.hadoop.CachingPath;
 import org.apache.hudi.io.storage.HoodieHFileReader;
 import org.apache.hudi.util.Lazy;
 
@@ -80,6 +81,7 @@ import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
 import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.hadoop.CachingPath.createPathUnsafe;
 import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
 import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionIdentifier;
 import static org.apache.hudi.metadata.HoodieTableMetadataUtil.tryUpcastDecimal;
@@ -473,10 +475,22 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
    */
   public FileStatus[] getFileStatuses(Configuration hadoopConf, Path partitionPath) throws IOException {
     FileSystem fs = partitionPath.getFileSystem(hadoopConf);
+    return getFileStatuses(fs, partitionPath);
+  }
+
+  /**
+   * Returns the files added as part of this record.
+   */
+  public FileStatus[] getFileStatuses(FileSystem fs, Path partitionPath) {
     long blockSize = fs.getDefaultBlockSize(partitionPath);
     return filterFileInfoEntries(false)
-        .map(e -> new FileStatus(e.getValue().getSize(), false, 0, blockSize, 0, 0,
-            null, null, null, new Path(partitionPath, e.getKey())))
+        .map(e -> {
+          // NOTE: Since we know that the Metadata Table's Payload is simply a file-name we're
+          //       creating Hadoop's Path using more performant unsafe variant
+          CachingPath filePath = new CachingPath(partitionPath, createPathUnsafe(e.getKey()));
+          return new FileStatus(e.getValue().getSize(), false, 0, blockSize, 0, 0,
+              null, null, null, filePath);
+        })
         .toArray(FileStatus[]::new);
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index ae871e3be0..349c0efb48 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -29,7 +29,6 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
-
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieMetadataException;
 
@@ -107,13 +106,27 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable {
   static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath,
                                     String spillableMapPath, boolean reuse) {
     if (metadataConfig.enabled()) {
-      return new HoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath, reuse);
+      return createHoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath, reuse);
     } else {
-      return new FileSystemBackedTableMetadata(engineContext, new SerializableConfiguration(engineContext.getHadoopConf()),
-          datasetBasePath, metadataConfig.shouldAssumeDatePartitioning());
+      return createFSBackedTableMetadata(engineContext, metadataConfig, datasetBasePath);
     }
   }
 
+  static FileSystemBackedTableMetadata createFSBackedTableMetadata(HoodieEngineContext engineContext,
+                                                                   HoodieMetadataConfig metadataConfig,
+                                                                   String datasetBasePath) {
+    return new FileSystemBackedTableMetadata(engineContext, new SerializableConfiguration(engineContext.getHadoopConf()),
+        datasetBasePath, metadataConfig.shouldAssumeDatePartitioning());
+  }
+
+  static HoodieBackedTableMetadata createHoodieBackedTableMetadata(HoodieEngineContext engineContext,
+                                                                   HoodieMetadataConfig metadataConfig,
+                                                                   String datasetBasePath,
+                                                                   String spillableMapPath,
+                                                                   boolean reuse) {
+    return new HoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath, reuse);
+  }
+
   /**
    * Fetch all the files at the given partition path, per the latest snapshot of the metadata.
    */
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index 6a2bffd34d..d3c1de5677 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -40,6 +40,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -124,28 +125,28 @@ public class HoodieTestUtils {
   }
 
   public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
-                                           Properties properties)
-      throws IOException {
-    properties = HoodieTableMetaClient.withPropertyBuilder()
-      .setTableName(RAW_TRIPS_TEST_NAME)
-      .setTableType(tableType)
-      .setPayloadClass(HoodieAvroPayload.class)
-      .fromProperties(properties)
-      .build();
-    return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties);
+                                           Properties properties) throws IOException {
+    return init(hadoopConf, basePath, tableType, properties, null);
   }
 
   public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
                                            Properties properties, String databaseName)
       throws IOException {
-    properties = HoodieTableMetaClient.withPropertyBuilder()
-      .setDatabaseName(databaseName)
-      .setTableName(RAW_TRIPS_TEST_NAME)
-      .setTableType(tableType)
-      .setPayloadClass(HoodieAvroPayload.class)
-      .fromProperties(properties)
-      .build();
-    return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties);
+    HoodieTableMetaClient.PropertyBuilder builder =
+        HoodieTableMetaClient.withPropertyBuilder()
+            .setDatabaseName(databaseName)
+            .setTableName(RAW_TRIPS_TEST_NAME)
+            .setTableType(tableType)
+            .setPayloadClass(HoodieAvroPayload.class);
+
+    String keyGen = properties.getProperty("hoodie.datasource.write.keygenerator.class");
+    if (!Objects.equals(keyGen, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")) {
+      builder.setPartitionFields("some_nonexistent_field");
+    }
+
+    Properties processedProperties = builder.fromProperties(properties).build();
+
+    return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, processedProperties);
   }
 
   public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, HoodieFileFormat baseFileFormat) throws IOException {
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index db8002cd2d..a4471845c3 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -86,7 +86,10 @@ public class InputFormatTestUtil {
           baseFileFormat);
     }
 
-    java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01"));
+    java.nio.file.Path partitionPath = useNonPartitionedKeyGen
+        ? basePath
+        : basePath.resolve(Paths.get("2016", "05", "01"));
+
     setupPartition(basePath, partitionPath);
 
     if (injectData) {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index 4e70ebad75..a9a38f5f82 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -20,12 +20,15 @@ package org.apache.hudi
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath
 import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
-import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap, toJavaOption}
+import org.apache.hudi.HoodieConversionUtils.toJavaOption
+import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap, shouldValidatePartitionColumns}
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.bootstrap.index.BootstrapIndex
 import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.hadoop.CachingPath
+import org.apache.hudi.hadoop.CachingPath.createPathUnsafe
 import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.internal.Logging
@@ -39,6 +42,7 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
 import scala.collection.JavaConverters._
+import scala.language.implicitConversions
 
 /**
  * Implementation of the [[BaseHoodieTableFileIndex]] for Spark
@@ -79,7 +83,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
     AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
   })
 
-  private lazy val sparkParsePartitionUtil = sparkAdapter.createSparkParsePartitionUtil(spark.sessionState.conf)
+  private lazy val sparkParsePartitionUtil = sparkAdapter.getSparkParsePartitionUtil
 
   /**
    * Get the partition schema from the hoodie.properties.
@@ -254,7 +258,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
             }
         }.mkString("/")
 
-        val pathWithPartitionName = new Path(basePath, partitionWithName)
+        val pathWithPartitionName = new CachingPath(basePath, createPathUnsafe(partitionWithName))
         val partitionValues = parsePartitionPath(pathWithPartitionName, partitionSchema)
 
         partitionValues.map(_.asInstanceOf[Object]).toArray
@@ -269,22 +273,17 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
     sparkParsePartitionUtil.parsePartition(
       partitionPath,
       typeInference = false,
-      Set(new Path(basePath)),
+      Set(basePath),
       partitionDataTypes,
-      DateTimeUtils.getTimeZone(timeZoneId)
+      DateTimeUtils.getTimeZone(timeZoneId),
+      validatePartitionValues = shouldValidatePartitionColumns(spark)
     )
       .toSeq(partitionSchema)
   }
 }
 
-object SparkHoodieTableFileIndex {
 
-  implicit def toJavaOption[T](opt: Option[T]): org.apache.hudi.common.util.Option[T] =
-    if (opt.isDefined) {
-      org.apache.hudi.common.util.Option.of(opt.get)
-    } else {
-      org.apache.hudi.common.util.Option.empty()
-    }
+object SparkHoodieTableFileIndex {
 
   /**
    * This method unravels [[StructType]] into a [[Map]] of pairs of dot-path notation with corresponding
@@ -341,4 +340,9 @@ object SparkHoodieTableFileIndex {
       override def invalidate(): Unit = cache.invalidateAll()
     }
   }
+
+  private def shouldValidatePartitionColumns(spark: SparkSession): Boolean = {
+    // NOTE: We can't use helper, method nor the config-entry to stay compatible w/ Spark 2.4
+    spark.sessionState.conf.getConfString("spark.sql.sources.validatePartitionColumns", "true").toBoolean
+  }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
index 890f8a9019..5a71f0e371 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
@@ -237,7 +237,7 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
     df1.write.format("hudi")
       .options(commonOpts)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
-      .option(PARTITIONPATH_FIELD.key, "")
+      .option(PARTITIONPATH_FIELD.key, "name")
       .mode(SaveMode.Overwrite)
       .save(basePath)
 
@@ -252,7 +252,7 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
     df2.write.format("hudi")
       .options(commonOpts)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
-      .option(PARTITIONPATH_FIELD.key, "")
+      .option(PARTITIONPATH_FIELD.key, "name")
       .mode(SaveMode.Append)
       .save(basePath)
     metaClient.reloadActiveTimeline()
@@ -263,7 +263,7 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
     df3.write.format("hudi")
       .options(commonOpts)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
-      .option(PARTITIONPATH_FIELD.key, "")
+      .option(PARTITIONPATH_FIELD.key, "name")
       .mode(SaveMode.Append)
       .save(basePath)
     metaClient.reloadActiveTimeline()
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index 7cd7271c6b..91ab71ef1c 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -67,7 +67,7 @@ class Spark2Adapter extends SparkAdapter {
     )
   }
 
-  override def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil = new Spark2ParsePartitionUtil
+  override def getSparkParsePartitionUtil: SparkParsePartitionUtil = Spark2ParsePartitionUtil
 
   override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = {
     throw new IllegalStateException(s"Should not call ParserInterface#parseMultipartIdentifier for spark2")
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala
index c3cbcc4075..fe0bf50e69 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala
@@ -24,14 +24,14 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.catalyst.InternalRow
 
-class Spark2ParsePartitionUtil extends SparkParsePartitionUtil {
+object Spark2ParsePartitionUtil extends SparkParsePartitionUtil {
 
-  override def parsePartition(
-      path: Path,
-      typeInference: Boolean,
-      basePaths: Set[Path],
-      userSpecifiedDataTypes: Map[String, DataType],
-      timeZone: TimeZone): InternalRow = {
+  override def parsePartition(path: Path,
+                              typeInference: Boolean,
+                              basePaths: Set[Path],
+                              userSpecifiedDataTypes: Map[String, DataType],
+                              timeZone: TimeZone,
+                              validatePartitionValues: Boolean = false): InternalRow = {
     val (partitionValues, _) = PartitioningUtils.parsePartition(path, typeInference,
       basePaths, userSpecifiedDataTypes, timeZone)
 
diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java
index 1157a68254..d7a9a1f122 100644
--- a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java
+++ b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java
@@ -53,12 +53,12 @@ public class ReflectUtil {
     try {
       ClassLoader loader = Thread.currentThread().getContextClassLoader();
       if (HoodieSparkUtils.gteqSpark3_2()) {
-        Class clazz = loader.loadClass(DateFormatter.class.getName());
+        Class<?> clazz = loader.loadClass(DateFormatter.class.getName());
         Method applyMethod = clazz.getDeclaredMethod("apply");
         applyMethod.setAccessible(true);
         return (DateFormatter)applyMethod.invoke(null);
       } else {
-        Class clazz = loader.loadClass(DateFormatter.class.getName());
+        Class<?> clazz = loader.loadClass(DateFormatter.class.getName());
         Method applyMethod = clazz.getDeclaredMethod("apply", ZoneId.class);
         applyMethod.setAccessible(true);
         return (DateFormatter)applyMethod.invoke(null, zoneId);
diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index 5ba976d362..77df665b98 100644
--- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -50,9 +50,7 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging {
 
   override def getAvroSchemaConverters: HoodieAvroSchemaConverters = HoodieSparkAvroSchemaConverters
 
-  override def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil = {
-    new Spark3ParsePartitionUtil(conf)
-  }
+  override def getSparkParsePartitionUtil: SparkParsePartitionUtil = Spark3ParsePartitionUtil
 
   override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = {
     parser.parseMultipartIdentifier(sqlText)
diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala
index f0cbe0530f..ebe92a5a32 100644
--- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala
+++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala
@@ -17,57 +17,63 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import java.lang.{Boolean => JBoolean, Double => JDouble, Long => JLong}
-import java.math.{BigDecimal => JBigDecimal}
-import java.time.ZoneId
-import java.util.{Locale, TimeZone}
-
 import org.apache.hadoop.fs.Path
-
 import org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH
 import org.apache.hudi.spark3.internal.ReflectUtil
-
+import org.apache.hudi.util.JFunction
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.unescapePathName
 import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
 import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
 import org.apache.spark.sql.execution.datasources.PartitioningUtils.timestampPartitionPattern
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
+import java.lang.{Boolean => JBoolean, Double => JDouble, Long => JLong}
+import java.math.{BigDecimal => JBigDecimal}
+import java.time.ZoneId
+import java.util
+import java.util.concurrent.ConcurrentHashMap
+import java.util.{Locale, TimeZone}
+import scala.collection.convert.Wrappers.JConcurrentMapWrapper
 import scala.collection.mutable.ArrayBuffer
 import scala.util.Try
 import scala.util.control.NonFatal
 
-class Spark3ParsePartitionUtil(conf: SQLConf) extends SparkParsePartitionUtil {
+object Spark3ParsePartitionUtil extends SparkParsePartitionUtil {
+
+  private val cache = JConcurrentMapWrapper(
+    new ConcurrentHashMap[ZoneId, (DateFormatter, TimestampFormatter)](1))
 
   /**
    * The definition of PartitionValues has been changed by SPARK-34314 in Spark3.2.
    * To solve the compatibility between 3.1 and 3.2, copy some codes from PartitioningUtils in Spark3.2 here.
    * And this method will generate and return `InternalRow` directly instead of `PartitionValues`.
    */
-  override def parsePartition(
-      path: Path,
-      typeInference: Boolean,
-      basePaths: Set[Path],
-      userSpecifiedDataTypes: Map[String, DataType],
-      timeZone: TimeZone): InternalRow = {
-    val dateFormatter = ReflectUtil.getDateFormatter(timeZone.toZoneId)
-    val timestampFormatter = TimestampFormatter(timestampPartitionPattern,
-      timeZone.toZoneId, isParsing = true)
+  override def parsePartition(path: Path,
+                              typeInference: Boolean,
+                              basePaths: Set[Path],
+                              userSpecifiedDataTypes: Map[String, DataType],
+                              tz: TimeZone,
+                              validatePartitionValues: Boolean = false): InternalRow = {
+    val (dateFormatter, timestampFormatter) = cache.getOrElseUpdate(tz.toZoneId, {
+      val dateFormatter = ReflectUtil.getDateFormatter(tz.toZoneId)
+      val timestampFormatter = TimestampFormatter(timestampPartitionPattern, tz.toZoneId, isParsing = true)
+
+      (dateFormatter, timestampFormatter)
+    })
 
     val (partitionValues, _) = parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes,
-      conf.validatePartitionColumns, timeZone.toZoneId, dateFormatter, timestampFormatter)
+      validatePartitionValues, tz.toZoneId, dateFormatter, timestampFormatter)
 
     partitionValues.map {
       case PartitionValues(columnNames: Seq[String], typedValues: Seq[TypedPartValue]) =>
         val rowValues = columnNames.zip(typedValues).map { case (columnName, typedValue) =>
           try {
-            castPartValueToDesiredType(typedValue.dataType, typedValue.value, timeZone.toZoneId)
+            castPartValueToDesiredType(typedValue.dataType, typedValue.value, tz.toZoneId)
           } catch {
             case NonFatal(_) =>
-              if (conf.validatePartitionColumns) {
+              if (validatePartitionValues) {
                 throw new RuntimeException(s"Failed to cast value `${typedValue.value}` to " +
                   s"`${typedValue.dataType}` for partition column `$columnName`")
               } else null
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 741ff8010a..d94ff1477a 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -1677,7 +1677,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     props.setProperty("include", "base.properties");
     props.setProperty("hoodie.embed.timeline.server", "false");
     props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
-    props.setProperty("hoodie.datasource.write.partitionpath.field", "");
+    props.setProperty("hoodie.datasource.write.partitionpath.field", "driver");
     props.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT);
     props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName);
     props.setProperty("hoodie.deltastreamer.source.kafka.checkpoint.type", kafkaCheckpointType);
@@ -1701,7 +1701,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
 
     prepareParquetDFSSource(true, false, "source_uber.avsc", "target_uber.avsc", PROPS_FILENAME_TEST_PARQUET,
-        PARQUET_SOURCE_ROOT, false, "");
+        PARQUET_SOURCE_ROOT, false, "driver");
     // delta streamer w/ parquet source
     String tableBasePath = dfsBasePath + "/test_dfs_to_kafka" + testNum;
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
@@ -1828,8 +1828,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
   private void prepareCsvDFSSource(
       boolean hasHeader, char sep, boolean useSchemaProvider, boolean hasTransformer) throws IOException {
     String sourceRoot = dfsBasePath + "/csvFiles";
-    String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" : "_c0";
-    String partitionPath = (hasHeader || useSchemaProvider) ? "partition_path" : "";
+    String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" : "_c1";
+    String partitionPath = (hasHeader || useSchemaProvider) ? "partition_path" : "_c2";
 
     // Properties used for testing delta-streamer with CSV source
     TypedProperties csvProps = new TypedProperties();
@@ -2101,7 +2101,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
   @Test
   public void testDeletePartitions() throws Exception {
     prepareParquetDFSSource(false, false, "source.avsc", "target.avsc",
-        PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, "");
+        PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, "partition_path");
     String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index df790cf115..b08438c5a7 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -143,9 +143,9 @@ public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness {
     if (expectedCount == 0) {
       assertFalse(batchCheckPoint.getKey().isPresent());
     } else {
-      assertEquals(batchCheckPoint.getKey().get().count(), expectedCount);
+      assertEquals(expectedCount, batchCheckPoint.getKey().get().count());
     }
-    Assertions.assertEquals(batchCheckPoint.getRight(), expectedCheckpoint);
+    Assertions.assertEquals(expectedCheckpoint, batchCheckPoint.getRight());
   }
 
   private Pair<String, List<HoodieRecord>> writeRecords(SparkRDDWriteClient writeClient, boolean insert, List<HoodieRecord> insertRecords, String commit) throws IOException {