You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/04 15:08:28 UTC

[hudi] branch master updated: [HUDI-3290] Different file formats for the partition metadata file. (#5179)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b28f0d6ceb [HUDI-3290] Different file formats for the partition metadata file. (#5179)
b28f0d6ceb is described below

commit b28f0d6ceb7750075be82b7bd4160a4475801159
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Mon Apr 4 08:08:20 2022 -0700

    [HUDI-3290] Different file formats for the partition metadata file. (#5179)
    
    * [HUDI-3290] Different file formats for the partition metadata file.
    
    Partition metadata files are stored in each partition to help identify the base path of a table. These files are saved in the properties file format. Some query engines do not work when non Parquet/ORC files are found in a partition.
    
    Added a new table config 'hoodie.partition.metafile.use.data.format' which when enabled (default false for backward compatibility) ensures that partition metafiles will be saved in the same format as the base files of a dataset.
    
    For new datasets, the config can be set via hudi-cli. Deltastreamer has a new parameter --partition-metafile-use-data-format which will create a table with this setting.
    
    * Code review comments
    
    - Adding a new command to migrate from text to base file formats for meta file.
    - Reimplementing readFromFS() to first read the text format, then base format
    - Avoid extra exists() checks in readFromFS()
    - Added unit tests, enabled parquet format across hoodie-hadoop-mr
    - Code cleanup, restructuring, naming consistency.
    
    * Wiring in all the other Spark code paths to respect this config
    
     - Turned on parquet meta format for COW data source tests
     - Removed the deltastreamer command line to keep it shorter
    
    * populate HoodiePartitionMetadata#format after readFromFS()
    
    Co-authored-by: Vinoth Chandar <vi...@apache.org>
    Co-authored-by: Raymond Xu <27...@users.noreply.github.com>
---
 .../apache/hudi/cli/HoodieTableHeaderFields.java   |   2 +
 .../apache/hudi/cli/commands/RepairsCommand.java   |  66 ++++++-
 .../org/apache/hudi/io/HoodieAppendHandle.java     |   3 +-
 .../org/apache/hudi/io/HoodieCreateHandle.java     |   3 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |   3 +-
 .../metadata/HoodieBackedTableMetadataWriter.java  |   2 +-
 .../java/org/apache/hudi/table/HoodieTable.java    |   4 +
 .../io/storage/row/HoodieRowDataCreateHandle.java  |   3 +-
 .../hudi/io/storage/row/HoodieRowCreateHandle.java |   3 +-
 .../TestBulkInsertInternalPartitioner.java         |   6 +
 .../commit/TestCopyOnWriteActionExecutor.java      |  52 ++++++
 .../apache/hudi/avro/HoodieAvroWriteSupport.java   |  17 +-
 .../java/org/apache/hudi/common/fs/FSUtils.java    |   2 +-
 .../hudi/common/model/HoodiePartitionMetadata.java | 196 ++++++++++++++++++---
 .../hudi/common/table/HoodieTableConfig.java       |  16 ++
 .../hudi/common/table/HoodieTableMetaClient.java   |  12 ++
 .../org/apache/hudi/common/util/BaseFileUtils.java |  23 ++-
 .../java/org/apache/hudi/common/util/OrcUtils.java |  39 ++--
 .../org/apache/hudi/common/util/ParquetUtils.java  |   6 +
 .../metadata/FileSystemBackedTableMetadata.java    |   2 +-
 .../common/model/TestHoodiePartitionMetadata.java  |  93 ++++++++++
 .../hudi/common/testutils/FileCreateUtils.java     |   4 +-
 .../common/testutils/HoodieTestDataGenerator.java  |   2 +-
 .../hudi/common/testutils/HoodieTestTable.java     |   2 +-
 .../hudi/common/util/TestTablePathUtils.java       |  29 ++-
 .../hudi/hadoop/testutils/InputFormatTestUtil.java |   7 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  13 +-
 .../apache/hudi/functional/TestCOWDataSource.scala |   3 +-
 .../hudi/utilities/HoodieSnapshotCopier.java       |   4 +-
 .../hudi/utilities/HoodieSnapshotExporter.java     |   4 +-
 .../utilities/deltastreamer/BootstrapExecutor.java |   1 +
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   4 +
 .../deltastreamer/HoodieDeltaStreamer.java         |   2 +-
 33 files changed, 539 insertions(+), 89 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
index e317d5a4f5..a4a8e46dfd 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
@@ -83,6 +83,8 @@ public class HoodieTableHeaderFields {
   public static final String HEADER_HOODIE_PROPERTY = "Property";
   public static final String HEADER_OLD_VALUE = "Old Value";
   public static final String HEADER_NEW_VALUE = "New Value";
+  public static final String HEADER_TEXT_METAFILE_PRESENT = "Text Metafile present ?";
+  public static final String HEADER_BASE_METAFILE_PRESENT = "Base Metafile present ?";
 
   /**
    * Fields of Savepoints.
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
index 6c068c898b..ac17019157 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
@@ -24,6 +24,7 @@ import org.apache.hudi.cli.HoodiePrintHelper;
 import org.apache.hudi.cli.HoodieTableHeaderFields;
 import org.apache.hudi.cli.utils.InputStreamConsumer;
 import org.apache.hudi.cli.utils.SparkUtil;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.table.HoodieTableConfig;
@@ -31,11 +32,13 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieIOException;
 
 import org.apache.avro.AvroRuntimeException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.util.StringUtils;
+
 import org.apache.log4j.Logger;
 import org.apache.spark.launcher.SparkLauncher;
 import org.apache.spark.util.Utils;
@@ -133,7 +136,8 @@ public class RepairsCommand implements CommandMarker {
         row[1] = "No";
         if (!dryRun) {
           HoodiePartitionMetadata partitionMetadata =
-              new HoodiePartitionMetadata(HoodieCLI.fs, latestCommit, basePath, partitionPath);
+              new HoodiePartitionMetadata(HoodieCLI.fs, latestCommit, basePath, partitionPath,
+                  client.getTableConfig().getPartitionMetafileFormat());
           partitionMetadata.trySave(0);
           row[2] = "Repaired";
         }
@@ -199,4 +203,64 @@ public class RepairsCommand implements CommandMarker {
       }
     });
   }
+
+  @CliCommand(value = "repair migrate-partition-meta", help = "Migrate all partition meta file currently stored in text format "
+      + "to be stored in base file format. See HoodieTableConfig#PARTITION_METAFILE_USE_DATA_FORMAT.")
+  public String migratePartitionMeta(
+      @CliOption(key = {"dryrun"}, help = "dry run without modifying anything.", unspecifiedDefaultValue = "true") final boolean dryRun)
+      throws IOException {
+
+    HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(HoodieCLI.conf);
+    HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
+    List<String> partitionPaths = FSUtils.getAllPartitionPaths(engineContext, client.getBasePath(), false, false);
+    Path basePath = new Path(client.getBasePath());
+
+    String[][] rows = new String[partitionPaths.size()][];
+    int ind = 0;
+    for (String partitionPath : partitionPaths) {
+      Path partition = FSUtils.getPartitionPath(client.getBasePath(), partitionPath);
+      Option<Path> textFormatFile = HoodiePartitionMetadata.textFormatMetaPathIfExists(HoodieCLI.fs, partition);
+      Option<Path> baseFormatFile = HoodiePartitionMetadata.baseFormatMetaPathIfExists(HoodieCLI.fs, partition);
+      String latestCommit = client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp();
+
+      String[] row = new String[] {
+          partitionPath,
+          String.valueOf(textFormatFile.isPresent()),
+          String.valueOf(baseFormatFile.isPresent()),
+          textFormatFile.isPresent() ? "MIGRATE" : "NONE"
+      };
+
+      if (!dryRun) {
+        if (!baseFormatFile.isPresent()) {
+          HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(HoodieCLI.fs, latestCommit, basePath, partition,
+              Option.of(client.getTableConfig().getBaseFileFormat()));
+          partitionMetadata.trySave(0);
+        }
+
+        // delete it, in case we failed midway last time.
+        textFormatFile.ifPresent(path -> {
+          try {
+            HoodieCLI.fs.delete(path, false);
+          } catch (IOException e) {
+            throw new HoodieIOException(e.getMessage(), e);
+          }
+        });
+
+        row[3] = "MIGRATED";
+      }
+
+      rows[ind++] = row;
+    }
+
+    Properties props = new Properties();
+    props.setProperty(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), "true");
+    HoodieTableConfig.update(HoodieCLI.fs, new Path(client.getMetaPath()), props);
+
+    return HoodiePrintHelper.print(new String[] {
+        HoodieTableHeaderFields.HEADER_PARTITION_PATH,
+        HoodieTableHeaderFields.HEADER_TEXT_METAFILE_PRESENT,
+        HoodieTableHeaderFields.HEADER_BASE_METAFILE_PRESENT,
+        HoodieTableHeaderFields.HEADER_ACTION
+    }, rows);
+  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 4ab4be3c08..01c16a57ba 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -173,7 +173,8 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
       try {
         // Save hoodie partition meta in the partition path
         HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, baseInstantTime,
-            new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
+            new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
+            hoodieTable.getPartitionMetafileFormat());
         partitionMetadata.trySave(getPartitionId());
 
         // Since the actual log file written to can be different based on when rollover happens, we use the
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 0bc3491250..91a7622bf8 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -94,7 +94,8 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
 
     try {
       HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime,
-          new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
+          new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
+          hoodieTable.getPartitionMetafileFormat());
       partitionMetadata.trySave(getPartitionId());
       createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension()));
       this.fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 7e6f1b36a3..567ae63e1e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -166,7 +166,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
       writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath));
 
       HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime,
-          new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
+          new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
+          hoodieTable.getPartitionMetafileFormat());
       partitionMetadata.trySave(getPartitionId());
 
       String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index b64d8ec090..d09fd2cda0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1074,7 +1074,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
           if (!status.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
             this.subDirectories.add(status.getPath());
           }
-        } else if (status.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
+        } else if (status.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) {
           // Presence of partition meta file implies this is a HUDI partition
           this.isHoodiePartition = true;
         } else if (FSUtils.isDataFile(status.getPath())) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 49c289d2bb..3d8109e6f3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -756,6 +756,10 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
     return metaClient.getTableConfig().getLogFileFormat();
   }
 
+  public Option<HoodieFileFormat> getPartitionMetafileFormat() {
+    return metaClient.getTableConfig().getPartitionMetafileFormat();
+  }
+
   public String getBaseFileExtension() {
     return getBaseFileFormat().getFileExtension();
   }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
index bbd9b882db..486a5cc54b 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -94,7 +94,8 @@ public class HoodieRowDataCreateHandle implements Serializable {
               fs,
               instantTime,
               new Path(writeConfig.getBasePath()),
-              FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath));
+              FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath),
+              table.getPartitionMetafileFormat());
       partitionMetadata.trySave(taskPartitionId);
       createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, getWriteToken(), this.fileId, table.getBaseFileExtension()));
       this.fileWriter = createNewFileWriter(path, table, writeConfig, rowType);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
index 5cdb2ff68f..ce3cd6f097 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
@@ -93,7 +93,8 @@ public class HoodieRowCreateHandle implements Serializable {
               fs,
               instantTime,
               new Path(writeConfig.getBasePath()),
-              FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath));
+              FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath),
+              table.getPartitionMetafileFormat());
       partitionMetadata.trySave(taskPartitionId);
       createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, getWriteToken(), this.fileId, table.getBaseFileExtension()));
       this.fileWriter = createNewFileWriter(path, table, writeConfig, structType);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
index 712f40568a..4d2f5e0c5e 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
@@ -60,6 +60,12 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase {
     return jsc.parallelize(records1, 1).union(jsc.parallelize(records2, 1));
   }
 
+  public static JavaRDD<HoodieRecord> generateTestRecordsForBulkInsert(JavaSparkContext jsc, int count) {
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    List<HoodieRecord> records = dataGenerator.generateInserts("0", count);
+    return jsc.parallelize(records, 1);
+  }
+
   public static Map<String, Long> generateExpectedPartitionNumRecords(JavaRDD<HoodieRecord> records) {
     return records.map(record -> record.getPartitionPath()).countByValue();
   }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
index 4ae8456360..8114daa30f 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
@@ -25,10 +25,13 @@ import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
 import org.apache.hudi.common.testutils.Transformations;
@@ -87,6 +90,7 @@ import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResou
 import static org.apache.hudi.execution.bulkinsert.TestBulkInsertInternalPartitioner.generateExpectedPartitionNumRecords;
 import static org.apache.hudi.execution.bulkinsert.TestBulkInsertInternalPartitioner.generateTestRecordsForBulkInsert;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -498,4 +502,52 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
   public void testBulkInsertRecordsWithGlobalSort(String bulkInsertMode) throws Exception {
     testBulkInsertRecords(bulkInsertMode);
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testPartitionMetafileFormat(boolean partitionMetafileUseBaseFormat) throws Exception {
+    // By default there is no format specified for partition metafile
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).build();
+    HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config, context, metaClient);
+    assertFalse(table.getPartitionMetafileFormat().isPresent());
+
+    if (partitionMetafileUseBaseFormat) {
+      // Add the setting to use datafile format
+      Properties properties = new Properties();
+      properties.setProperty(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), "true");
+      initMetaClient(HoodieTableType.COPY_ON_WRITE, properties);
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      assertTrue(metaClient.getTableConfig().getPartitionMetafileFormat().isPresent());
+      table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config, context, metaClient);
+      assertTrue(table.getPartitionMetafileFormat().isPresent());
+    }
+
+    String instantTime = makeNewCommitTime();
+    SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
+    writeClient.startCommitWithTime(instantTime);
+
+    // Insert new records
+    final JavaRDD<HoodieRecord> inputRecords = generateTestRecordsForBulkInsert(jsc, 10);
+    writeClient.bulkInsert(inputRecords, instantTime);
+
+    // Partition metafile should be created
+    Path partitionPath = new Path(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
+    assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, partitionPath));
+    Option<Path> metafilePath = HoodiePartitionMetadata.getPartitionMetafilePath(fs, partitionPath);
+    if (partitionMetafileUseBaseFormat) {
+      // Extension should be the same as the data file format of the table
+      assertTrue(metafilePath.get().toString().endsWith(table.getBaseFileFormat().getFileExtension()));
+    } else {
+      // No extension as it is in properties file format
+      assertTrue(metafilePath.get().toString().endsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX));
+    }
+
+    // Validate contents of the partition metafile
+    HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, partitionPath);
+    partitionMetadata.readFromFS();
+    assertTrue(partitionMetadata.getPartitionDepth() == 3);
+    assertTrue(partitionMetadata.readPartitionCreatedCommitTime().get().equals(instantTime));
+  }
+
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
index 020fcc26b7..c3920211ae 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.schema.MessageType;
 
 import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Wrap AvroWriterSupport for plugging in the bloom filter.
@@ -36,6 +37,7 @@ public class HoodieAvroWriteSupport extends AvroWriteSupport {
   private Option<BloomFilter> bloomFilterOpt;
   private String minRecordKey;
   private String maxRecordKey;
+  private Map<String, String> footerMetadata = new HashMap<>();
 
   public static final String OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY = "com.uber.hoodie.bloomfilter";
   public static final String HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY = "org.apache.hudi.bloomfilter";
@@ -50,18 +52,17 @@ public class HoodieAvroWriteSupport extends AvroWriteSupport {
 
   @Override
   public WriteSupport.FinalizedWriteContext finalizeWrite() {
-    HashMap<String, String> extraMetaData = new HashMap<>();
     if (bloomFilterOpt.isPresent()) {
-      extraMetaData.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilterOpt.get().serializeToString());
+      footerMetadata.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilterOpt.get().serializeToString());
       if (minRecordKey != null && maxRecordKey != null) {
-        extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey);
-        extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey);
+        footerMetadata.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey);
+        footerMetadata.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey);
       }
       if (bloomFilterOpt.get().getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
-        extraMetaData.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilterOpt.get().getBloomFilterTypeCode().name());
+        footerMetadata.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilterOpt.get().getBloomFilterTypeCode().name());
       }
     }
-    return new WriteSupport.FinalizedWriteContext(extraMetaData);
+    return new WriteSupport.FinalizedWriteContext(footerMetadata);
   }
 
   public void add(String recordKey) {
@@ -80,4 +81,8 @@ public class HoodieAvroWriteSupport extends AvroWriteSupport {
       }
     }
   }
+
+  public void addFooterMetadata(String key, String value) {
+    footerMetadata.put(key, value);
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 86bb320491..57fa4acc90 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -246,7 +246,7 @@ public class FSUtils {
     final List<String> partitions = new ArrayList<>();
     processFiles(fs, basePathStr, (locatedFileStatus) -> {
       Path filePath = locatedFileStatus.getPath();
-      if (filePath.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
+      if (filePath.getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) {
         partitions.add(getRelativePartitionPath(basePath, filePath.getParent()));
       }
       return true;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
index 3a19c187f7..93e9ea5d34 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
@@ -18,27 +18,47 @@
 
 package org.apache.hudi.common.model;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.util.AvroOrcUtils;
+import org.apache.hudi.common.util.BaseFileUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
 
+import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Writer;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Types;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * The metadata that goes into the meta file in each partition.
  */
 public class HoodiePartitionMetadata {
 
-  public static final String HOODIE_PARTITION_METAFILE = ".hoodie_partition_metadata";
-  public static final String PARTITION_DEPTH_KEY = "partitionDepth";
+  public static final String HOODIE_PARTITION_METAFILE_PREFIX = ".hoodie_partition_metadata";
   public static final String COMMIT_TIME_KEY = "commitTime";
+  private static final String PARTITION_DEPTH_KEY = "partitionDepth";
+  private static final Logger LOG = LogManager.getLogger(HoodiePartitionMetadata.class);
 
   /**
    * Contents of the metadata.
@@ -52,7 +72,8 @@ public class HoodiePartitionMetadata {
 
   private final FileSystem fs;
 
-  private static final Logger LOG = LogManager.getLogger(HoodiePartitionMetadata.class);
+  // The format in which to write the partition metadata
+  private Option<HoodieFileFormat> format;
 
   /**
    * Construct metadata from existing partition.
@@ -61,13 +82,15 @@ public class HoodiePartitionMetadata {
     this.fs = fs;
     this.props = new Properties();
     this.partitionPath = partitionPath;
+    this.format = Option.empty();
   }
 
   /**
    * Construct metadata object to be written out.
    */
-  public HoodiePartitionMetadata(FileSystem fs, String instantTime, Path basePath, Path partitionPath) {
+  public HoodiePartitionMetadata(FileSystem fs, String instantTime, Path basePath, Path partitionPath, Option<HoodieFileFormat> format) {
     this(fs, partitionPath);
+    this.format = format;
     props.setProperty(COMMIT_TIME_KEY, instantTime);
     props.setProperty(PARTITION_DEPTH_KEY, String.valueOf(partitionPath.depth() - basePath.depth()));
   }
@@ -83,21 +106,17 @@ public class HoodiePartitionMetadata {
    * Write the metadata safely into partition atomically.
    */
   public void trySave(int taskPartitionId) {
+    String extension = getMetafileExtension();
     Path tmpMetaPath =
-        new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE + "_" + taskPartitionId);
-    Path metaPath = new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE);
+        new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX + "_" + taskPartitionId + extension);
+    Path metaPath = new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX + extension);
     boolean metafileExists = false;
 
     try {
       metafileExists = fs.exists(metaPath);
       if (!metafileExists) {
         // write to temporary file
-        FSDataOutputStream os = fs.create(tmpMetaPath, true);
-        props.store(os, "partition metadata");
-        os.hsync();
-        os.hflush();
-        os.close();
-
+        writeMetafile(tmpMetaPath);
         // move to actual path
         fs.rename(tmpMetaPath, metaPath);
       }
@@ -118,22 +137,103 @@ public class HoodiePartitionMetadata {
     }
   }
 
+  private String getMetafileExtension() {
+    // To be backwards compatible, there is no extension to the properties file base partition metafile
+    return format.isPresent() ? format.get().getFileExtension() : StringUtils.EMPTY_STRING;
+  }
+
+  /**
+   * Write the partition metadata in the correct format in the given file path.
+   *
+   * @param filePath Path of the file to write
+   * @throws IOException
+   */
+  private void writeMetafile(Path filePath) throws IOException {
+    if (format.isPresent()) {
+      Schema schema = HoodieAvroUtils.getRecordKeySchema();
+
+      switch (format.get()) {
+        case PARQUET:
+          // Since we are only interested in saving metadata to the footer, the schema, blocksizes and other
+          // parameters are not important.
+          MessageType type = Types.buildMessage().optional(PrimitiveTypeName.INT64).named("dummyint").named("dummy");
+          HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(type, schema, Option.empty());
+          try (ParquetWriter writer = new ParquetWriter(filePath, writeSupport, CompressionCodecName.UNCOMPRESSED, 1024, 1024)) {
+            for (String key : props.stringPropertyNames()) {
+              writeSupport.addFooterMetadata(key, props.getProperty(key));
+            }
+          }
+          break;
+        case ORC:
+          // Since we are only interested in saving metadata to the footer, the schema, blocksizes and other
+          // parameters are not important.
+          OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(fs.getConf()).fileSystem(fs)
+              .setSchema(AvroOrcUtils.createOrcSchema(schema));
+          try (Writer writer = OrcFile.createWriter(filePath, writerOptions)) {
+            for (String key : props.stringPropertyNames()) {
+              writer.addUserMetadata(key, ByteBuffer.wrap(props.getProperty(key).getBytes()));
+            }
+          }
+          break;
+        default:
+          throw new HoodieException("Unsupported format for partition metafiles: " + format.get());
+      }
+    } else {
+      // Backwards compatible properties file format
+      FSDataOutputStream os = fs.create(filePath, true);
+      props.store(os, "partition metadata");
+      os.hsync();
+      os.hflush();
+      os.close();
+    }
+  }
+
   /**
    * Read out the metadata for this partition.
    */
   public void readFromFS() throws IOException {
-    FSDataInputStream is = null;
-    try {
-      Path metaFile = new Path(partitionPath, HOODIE_PARTITION_METAFILE);
-      is = fs.open(metaFile);
+    // first try reading the text format (legacy, currently widespread)
+    boolean readFile = readTextFormatMetaFile();
+    if (!readFile) {
+      // now try reading the base file formats.
+      readFile = readBaseFormatMetaFile();
+    }
+
+    // throw exception.
+    if (!readFile) {
+      throw new HoodieException("Unable to read any partition meta file to locate the table timeline.");
+    }
+  }
+
+  private boolean readTextFormatMetaFile() {
+    // Properties file format
+    Path metafilePath = textFormatMetaFilePath(partitionPath);
+    try (FSDataInputStream is = fs.open(metafilePath)) {
       props.load(is);
-    } catch (IOException ioe) {
-      throw new HoodieException("Error reading Hoodie partition metadata for " + partitionPath, ioe);
-    } finally {
-      if (is != null) {
-        is.close();
+      format = Option.empty();
+      return true;
+    } catch (Throwable t) {
+      LOG.warn("Unable to read partition meta properties file for partition " + partitionPath, t);
+      return false;
+    }
+  }
+
+  private boolean readBaseFormatMetaFile() {
+    for (Path metafilePath : baseFormatMetaFilePaths(partitionPath)) {
+      try {
+        BaseFileUtils reader = BaseFileUtils.getInstance(metafilePath.toString());
+        // Data file format
+        Map<String, String> metadata = reader.readFooter(fs.getConf(), true, metafilePath, PARTITION_DEPTH_KEY, COMMIT_TIME_KEY);
+        props.clear();
+        props.putAll(metadata);
+        format = Option.of(reader.getFormat());
+        return true;
+      } catch (Throwable t) {
+        // any error, log, check the next base format
+        LOG.warn("Unable to read partition metadata " + metafilePath.getName() + " for partition " + partitionPath, t);
       }
     }
+    return false;
   }
 
   /**
@@ -141,12 +241,10 @@ public class HoodiePartitionMetadata {
    */
   public Option<String> readPartitionCreatedCommitTime() {
     try {
-      if (props.containsKey(COMMIT_TIME_KEY)) {
-        return Option.of(props.getProperty(COMMIT_TIME_KEY));
-      } else {
+      if (!props.containsKey(COMMIT_TIME_KEY)) {
         readFromFS();
-        return Option.of(props.getProperty(COMMIT_TIME_KEY));
       }
+      return Option.of(props.getProperty(COMMIT_TIME_KEY));
     } catch (IOException ioe) {
       LOG.warn("Error fetch Hoodie partition metadata for " + partitionPath, ioe);
       return Option.empty();
@@ -156,9 +254,55 @@ public class HoodiePartitionMetadata {
   // methods related to partition meta data
   public static boolean hasPartitionMetadata(FileSystem fs, Path partitionPath) {
     try {
-      return fs.exists(new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
+      return textFormatMetaPathIfExists(fs, partitionPath).isPresent()
+          || baseFormatMetaPathIfExists(fs, partitionPath).isPresent();
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Error checking presence of partition meta file for " + partitionPath, ioe);
+    }
+  }
+
+  /**
+   * Returns the name of the partition metadata.
+   *
+   * @return Name of the partition metafile or empty option
+   */
+  public static Option<Path> getPartitionMetafilePath(FileSystem fs, Path partitionPath) {
+    // The partition listing is a costly operation so instead we are searching for existence of the files instead.
+    // This is in expected order as properties file based partition metafiles should be the most common.
+    try {
+      Option<Path> textFormatPath = textFormatMetaPathIfExists(fs, partitionPath);
+      if (textFormatPath.isPresent()) {
+        return textFormatPath;
+      } else {
+        return baseFormatMetaPathIfExists(fs, partitionPath);
+      }
     } catch (IOException ioe) {
       throw new HoodieException("Error checking Hoodie partition metadata for " + partitionPath, ioe);
     }
   }
+
+  public static Option<Path> baseFormatMetaPathIfExists(FileSystem fs, Path partitionPath) throws IOException {
+    // Parquet should be more common than ORC so check it first
+    for (Path metafilePath : baseFormatMetaFilePaths(partitionPath)) {
+      if (fs.exists(metafilePath)) {
+        return Option.of(metafilePath);
+      }
+    }
+    return Option.empty();
+  }
+
+  public static Option<Path> textFormatMetaPathIfExists(FileSystem fs, Path partitionPath) throws IOException {
+    Path path = textFormatMetaFilePath(partitionPath);
+    return Option.ofNullable(fs.exists(path) ? path : null);
+  }
+
+  static Path textFormatMetaFilePath(Path partitionPath) {
+    return new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX);
+  }
+
+  static List<Path> baseFormatMetaFilePaths(Path partitionPath) {
+    return Stream.of(HoodieFileFormat.PARQUET.getFileExtension(), HoodieFileFormat.ORC.getFileExtension())
+        .map(ext -> new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX + ext))
+        .collect(Collectors.toList());
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index cfb0df3b88..bfcec84ce3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -190,6 +190,12 @@ public class HoodieTableConfig extends HoodieConfig {
       .defaultValue(HoodieTimelineTimeZone.LOCAL)
       .withDocumentation("User can set hoodie commit timeline timezone, such as utc, local and so on. local is default");
 
+  public static final ConfigProperty<Boolean> PARTITION_METAFILE_USE_BASE_FORMAT = ConfigProperty
+      .key("hoodie.partition.metafile.use.base.format")
+      .defaultValue(false)
+      .withDocumentation("If true, partition metafiles are saved in the same format as basefiles for this dataset (e.g. Parquet / ORC). "
+          + "If false (default) partition metafiles are saved as properties files.");
+
   public static final ConfigProperty<String> URL_ENCODE_PARTITIONING = KeyGeneratorOptions.URL_ENCODE_PARTITIONING;
   public static final ConfigProperty<String> HIVE_STYLE_PARTITIONING_ENABLE = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE;
 
@@ -608,6 +614,16 @@ public class HoodieTableConfig extends HoodieConfig {
     return getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING);
   }
   
+  /**
+   * Returns the format to use for partition meta files.
+   */
+  public Option<HoodieFileFormat> getPartitionMetafileFormat() {
+    if (getBooleanOrDefault(PARTITION_METAFILE_USE_BASE_FORMAT)) {
+      return Option.of(getBaseFileFormat());
+    }
+    return Option.empty();
+  }
+
   public Map<String, String> propsMap() {
     return props.entrySet().stream()
         .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())));
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 847244d7c7..4e04ad9db0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -699,6 +699,7 @@ public class HoodieTableMetaClient implements Serializable {
     private Boolean hiveStylePartitioningEnable;
     private Boolean urlEncodePartitioning;
     private HoodieTimelineTimeZone commitTimeZone;
+    private Boolean partitionMetafileUseBaseFormat;
 
     /**
      * Persist the configs that is written at the first time, and should not be changed.
@@ -813,6 +814,11 @@ public class HoodieTableMetaClient implements Serializable {
       return this;
     }
 
+    public PropertyBuilder setPartitionMetafileUseBaseFormat(Boolean useBaseFormat) {
+      this.partitionMetafileUseBaseFormat = useBaseFormat;
+      return this;
+    }
+
     public PropertyBuilder set(String key, Object value) {
       if (HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(key)) {
         this.others.put(key, value);
@@ -908,6 +914,9 @@ public class HoodieTableMetaClient implements Serializable {
       if (hoodieConfig.contains(HoodieTableConfig.URL_ENCODE_PARTITIONING)) {
         setUrlEncodePartitioning(hoodieConfig.getBoolean(HoodieTableConfig.URL_ENCODE_PARTITIONING));
       }
+      if (hoodieConfig.contains(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT)) {
+        setPartitionMetafileUseBaseFormat(hoodieConfig.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT));
+      }
       return this;
     }
 
@@ -986,6 +995,9 @@ public class HoodieTableMetaClient implements Serializable {
       if (null != commitTimeZone) {
         tableConfig.setValue(HoodieTableConfig.TIMELINE_TIMEZONE, commitTimeZone.toString());
       }
+      if (null != partitionMetafileUseBaseFormat) {
+        tableConfig.setValue(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT, partitionMetafileUseBaseFormat.toString());
+      }
       return tableConfig.getProps();
     }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
index 7ec6110d72..d6391d178e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
@@ -18,14 +18,6 @@
 
 package org.apache.hudi.common.util;
 
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroWriteSupport;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
@@ -36,6 +28,16 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 public abstract class BaseFileUtils {
 
   public static BaseFileUtils getInstance(String path) {
@@ -204,4 +206,9 @@ public abstract class BaseFileUtils {
    * @return The Avro schema of the data file
    */
   public abstract Schema readAvroSchema(Configuration configuration, Path filePath);
+
+  /**
+   * @return The subclass's {@link HoodieFileFormat}.
+   */
+  public abstract HoodieFileFormat getFormat();
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
index 88c28d7520..0cc4059197 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
@@ -18,37 +18,39 @@
 
 package org.apache.hudi.common.util;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
-import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
-import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.MetadataNotFoundException;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.orc.OrcFile;
 import org.apache.orc.OrcProto.UserMetadataItem;
 import org.apache.orc.Reader;
 import org.apache.orc.Reader.Options;
 import org.apache.orc.RecordReader;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Utility functions for ORC files.
@@ -248,6 +250,11 @@ public class OrcUtils extends BaseFileUtils {
     }
   }
 
+  @Override
+  public HoodieFileFormat getFormat() {
+    return HoodieFileFormat.ORC;
+  }
+
   @Override
   public long getRowCount(Configuration conf, Path orcFilePath) {
     try (Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index c0f7aabde6..c779a3269a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.util;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.exception.HoodieIOException;
@@ -228,6 +229,11 @@ public class ParquetUtils extends BaseFileUtils {
     return new AvroSchemaConverter(configuration).convert(parquetSchema);
   }
 
+  @Override
+  public HoodieFileFormat getFormat() {
+    return HoodieFileFormat.PARQUET;
+  }
+
   /**
    * NOTE: This literally reads the entire file contents, thus should be used with caution.
    */
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index 1bb18bad16..b4a76a7282 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -96,7 +96,7 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
               } else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
                 pathsToList.add(fileStatus.getPath());
               }
-            } else if (fileStatus.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
+            } else if (fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) {
               String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath().getParent());
               partitionPaths.add(partitionName);
             }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodiePartitionMetadata.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodiePartitionMetadata.java
new file mode 100644
index 0000000000..3ec15d4f65
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodiePartitionMetadata.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodiePartitionMetadata extends HoodieCommonTestHarness {
+
+  FileSystem fs;
+
+  @BeforeEach
+  public void setupTest() throws IOException {
+    initMetaClient();
+    fs = metaClient.getFs();
+  }
+
+  static Stream<Arguments> formatProviderFn() {
+    return Stream.of(
+        Arguments.arguments(Option.empty()),
+        Arguments.arguments(Option.of(HoodieFileFormat.PARQUET)),
+        Arguments.arguments(Option.of(HoodieFileFormat.ORC))
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("formatProviderFn")
+  public void testTextFormatMetaFile(Option<HoodieFileFormat> format) throws IOException {
+    // given
+    final Path partitionPath = new Path(basePath, "a/b/"
+        + format.map(Enum::name).orElse("text"));
+    fs.mkdirs(partitionPath);
+    final String commitTime = "000000000001";
+    HoodiePartitionMetadata writtenMetadata = new HoodiePartitionMetadata(metaClient.getFs(), commitTime, new Path(basePath), partitionPath, format);
+    writtenMetadata.trySave(0);
+
+    // when
+    HoodiePartitionMetadata readMetadata = new HoodiePartitionMetadata(metaClient.getFs(), new Path(metaClient.getBasePath(), partitionPath));
+
+    // then
+    assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, partitionPath));
+    assertEquals(Option.of(commitTime), readMetadata.readPartitionCreatedCommitTime());
+    assertEquals(3, readMetadata.getPartitionDepth());
+  }
+
+  @Test
+  public void testErrorIfAbsent() throws IOException {
+    final Path partitionPath = new Path(basePath, "a/b/not-a-partition");
+    fs.mkdirs(partitionPath);
+    HoodiePartitionMetadata readMetadata = new HoodiePartitionMetadata(metaClient.getFs(), new Path(metaClient.getBasePath(), partitionPath));
+    assertThrows(HoodieException.class, readMetadata::readPartitionCreatedCommitTime);
+  }
+
+  @Test
+  public void testFileNames() {
+    assertEquals(new Path("/a/b/c/.hoodie_partition_metadata"), HoodiePartitionMetadata.textFormatMetaFilePath(new Path("/a/b/c")));
+    assertEquals(Arrays.asList(new Path("/a/b/c/.hoodie_partition_metadata.parquet"),
+        new Path("/a/b/c/.hoodie_partition_metadata.orc")), HoodiePartitionMetadata.baseFormatMetaFilePaths(new Path("/a/b/c")));
+  }
+}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index c80c5e28ff..06f0ac49b6 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -280,7 +280,7 @@ public class FileCreateUtils {
   public static void createPartitionMetaFile(String basePath, String partitionPath) throws IOException {
     Path parentPath = Paths.get(basePath, partitionPath);
     Files.createDirectories(parentPath);
-    Path metaFilePath = parentPath.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE);
+    Path metaFilePath = parentPath.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX);
     if (Files.notExists(metaFilePath)) {
       Files.createFile(metaFilePath);
     }
@@ -397,7 +397,7 @@ public class FileCreateUtils {
     }
     return Files.list(basePath).filter(entry -> (!entry.getFileName().toString().equals(HoodieTableMetaClient.METAFOLDER_NAME)
         && !entry.getFileName().toString().contains("parquet") && !entry.getFileName().toString().contains("log"))
-        && !entry.getFileName().toString().endsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)).collect(Collectors.toList());
+        && !entry.getFileName().toString().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)).collect(Collectors.toList());
   }
 
   /**
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 3e147b7fdd..cb4f557074 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -205,7 +205,7 @@ public class HoodieTestDataGenerator implements AutoCloseable {
    */
   public void writePartitionMetadata(FileSystem fs, String[] partitionPaths, String basePath) {
     for (String partitionPath : partitionPaths) {
-      new HoodiePartitionMetadata(fs, "000", new Path(basePath), new Path(basePath, partitionPath)).trySave(0);
+      new HoodiePartitionMetadata(fs, "000", new Path(basePath), new Path(basePath, partitionPath), Option.empty()).trySave(0);
     }
   }
 
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 881197ecad..f0aae0a69d 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -680,7 +680,7 @@ public class HoodieTestTable {
           boolean toReturn = true;
           String filePath = entry.getPath().toString();
           String fileName = entry.getPath().getName();
-          if (fileName.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE) || (!fileName.contains("log") && !fileName.contains("parquet"))
+          if (fileName.startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX) || (!fileName.contains("log") && !fileName.contains("parquet"))
               || filePath.contains("metadata")) {
             toReturn = false;
           } else {
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java
index 056f2121ce..eae1cdce83 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.hudi.common.util;
 
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -24,9 +25,10 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.io.File;
 import java.io.IOException;
@@ -41,7 +43,7 @@ public final class TestTablePathUtils {
   private static final String BASE_FILE_EXTENSION = HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();
 
   @TempDir
-  static File tempDir;
+  public File tempDir;
   private static FileSystem fs;
   private static Path tablePath;
   private static Path partitionPath1;
@@ -49,9 +51,12 @@ public final class TestTablePathUtils {
   private static Path filePath1;
   private static Path filePath2;
 
-  @BeforeAll
-  static void setup() throws IOException {
-    URI tablePathURI = Paths.get(tempDir.getAbsolutePath(),"test_table").toUri();
+  private void setup() throws IOException {
+    setup(Option.empty());
+  }
+
+  private void setup(Option<HoodieFileFormat> partitionMetafileFormat) throws IOException {
+    URI tablePathURI = Paths.get(tempDir.getAbsolutePath(), "test_table").toUri();
     tablePath = new Path(tablePathURI);
     fs = tablePath.getFileSystem(new Configuration());
 
@@ -69,10 +74,10 @@ public final class TestTablePathUtils {
     assertTrue(new File(partitionPathURI2).mkdirs());
 
     HoodiePartitionMetadata partitionMetadata1 = new HoodiePartitionMetadata(fs, Instant.now().toString(), tablePath,
-                                                                             partitionPath1);
+                                                                             partitionPath1, partitionMetafileFormat);
     partitionMetadata1.trySave(1);
     HoodiePartitionMetadata partitionMetadata2 = new HoodiePartitionMetadata(fs, Instant.now().toString(), tablePath,
-                                                                             partitionPath2);
+                                                                             partitionPath2, partitionMetafileFormat);
     partitionMetadata2.trySave(2);
 
     // Create files
@@ -87,12 +92,14 @@ public final class TestTablePathUtils {
 
   @Test
   void getTablePathFromTablePath() throws IOException {
+    setup();
     Option<Path> inferredTablePath = TablePathUtils.getTablePath(fs, tablePath);
     assertEquals(tablePath, inferredTablePath.get());
   }
 
   @Test
   void getTablePathFromMetadataFolderPath() throws IOException {
+    setup();
     Path metaFolder = new Path(tablePath, HoodieTableMetaClient.METAFOLDER_NAME);
     Option<Path> inferredTablePath = TablePathUtils.getTablePath(fs, metaFolder);
     assertEquals(tablePath, inferredTablePath.get());
@@ -100,6 +107,7 @@ public final class TestTablePathUtils {
 
   @Test
   void getTablePathFromMetadataSubFolderPath() throws IOException {
+    setup();
     Path auxFolder = new Path(tablePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME);
     assertEquals(tablePath, TablePathUtils.getTablePath(fs, auxFolder).get());
 
@@ -117,8 +125,10 @@ public final class TestTablePathUtils {
     assertEquals(metadataTableFolder, TablePathUtils.getTablePath(fs, metadataTablePartitionFolder).get());
   }
 
-  @Test
-  void getTablePathFromPartitionFolderPath() throws IOException {
+  @ParameterizedTest
+  @EnumSource(value = HoodieFileFormat.class, names = {"PARQUET", "ORC"})
+  void getTablePathFromPartitionFolderPath(HoodieFileFormat partitionMetafileFormat) throws IOException {
+    setup(Option.of(partitionMetafileFormat));
     Option<Path> inferredTablePath = TablePathUtils.getTablePath(fs, partitionPath1);
     assertEquals(tablePath, inferredTablePath.get());
 
@@ -128,6 +138,7 @@ public final class TestTablePathUtils {
 
   @Test
   void getTablePathFromFilePath() throws IOException {
+    setup();
     Option<Path> inferredTablePath = TablePathUtils.getTablePath(fs, filePath1);
     assertEquals(tablePath, inferredTablePath.get());
 
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index 7557938711..ccd85d3829 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
 
 import org.apache.avro.Schema;
@@ -185,7 +186,7 @@ public class InputFormatTestUtil {
   public static void setupSnapshotScanMode(JobConf jobConf) {
     setupSnapshotScanMode(jobConf, false);
   }
-  
+
   private static void setupSnapshotScanMode(JobConf jobConf, boolean includePending) {
     setUpScanMode(jobConf);
     String includePendingCommitsName =
@@ -467,8 +468,8 @@ public class InputFormatTestUtil {
             new LocalFileSystem(lfs),
             "0",
             new Path(basePath.toAbsolutePath().toString()),
-            new Path(partitionPath.toAbsolutePath().toString())
-        );
+            new Path(partitionPath.toAbsolutePath().toString()),
+            Option.of(HoodieFileFormat.PARQUET));
 
     partitionMetadata.trySave((int) (Math.random() * 1000));
   }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 6bccf9d7fb..7dbc9997cf 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -141,6 +141,7 @@ object HoodieSparkSqlWriter {
         val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
         val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
         val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS)
+        val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT);
 
         val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
           .setTableType(tableType)
@@ -158,6 +159,7 @@ object HoodieSparkSqlWriter {
           .set(timestampKeyGeneratorConfigs)
           .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
           .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
+          .setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile)
           .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
           .initTable(sparkContext.hadoopConfiguration, path)
         tableConfig = tableMetaClient.getTableConfig
@@ -437,9 +439,15 @@ object HoodieSparkSqlWriter {
         val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters)
         val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
         val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
-        val populateMetaFields = java.lang.Boolean.parseBoolean((parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(),
-          String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))))
+        val populateMetaFields = java.lang.Boolean.parseBoolean(parameters.getOrElse(
+          HoodieTableConfig.POPULATE_META_FIELDS.key(),
+          String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())
+        ))
         val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
+        val useBaseFormatMetaFile = java.lang.Boolean.parseBoolean(parameters.getOrElse(
+          HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
+          String.valueOf(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())
+        ))
 
         HoodieTableMetaClient.withPropertyBuilder()
           .setTableType(HoodieTableType.valueOf(tableType))
@@ -457,6 +465,7 @@ object HoodieSparkSqlWriter {
           .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
           .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
           .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
+          .setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile)
           .initTable(sparkContext.hadoopConfiguration, path)
       }
 
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 0776a3116a..8c9e9daf89 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -21,7 +21,7 @@ import org.apache.hadoop.fs.FileSystem
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.table.timeline.HoodieInstant
-import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings}
 import org.apache.hudi.config.HoodieWriteConfig
@@ -56,6 +56,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
     "hoodie.upsert.shuffle.parallelism" -> "4",
     "hoodie.bulkinsert.shuffle.parallelism" -> "2",
     "hoodie.delete.shuffle.parallelism" -> "1",
+    HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key() -> "true",
     DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
     DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
     DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
index 43e58d531e..a2717a3561 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
@@ -117,8 +117,8 @@ public class HoodieSnapshotCopier implements Serializable {
         dataFiles.forEach(hoodieDataFile -> filePaths.add(new Tuple2<>(partition, hoodieDataFile.getPath())));
 
         // also need to copy over partition metadata
-        Path partitionMetaFile =
-            new Path(FSUtils.getPartitionPath(baseDir, partition), HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE);
+        Path partitionMetaFile = HoodiePartitionMetadata.getPartitionMetafilePath(fs1,
+            FSUtils.getPartitionPath(baseDir, partition)).get();
         if (fs1.exists(partitionMetaFile)) {
           filePaths.add(new Tuple2<>(partition, partitionMetaFile.toString()));
         }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
index c2cfa390d0..255393b232 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -206,9 +206,9 @@ public class HoodieSnapshotExporter {
       Stream<HoodieBaseFile> dataFiles = fsView.getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp);
       dataFiles.forEach(hoodieDataFile -> filePaths.add(new Tuple2<>(partition, hoodieDataFile.getPath())));
       // also need to copy over partition metadata
-      Path partitionMetaFile =
-          new Path(FSUtils.getPartitionPath(cfg.sourceBasePath, partition), HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE);
       FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
+      Path partitionMetaFile = HoodiePartitionMetadata.getPartitionMetafilePath(fs,
+          FSUtils.getPartitionPath(cfg.sourceBasePath, partition)).get();
       if (fs.exists(partitionMetaFile)) {
         filePaths.add(new Tuple2<>(partition, partitionMetaFile.toString()));
       }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java
index 84b7933767..7e605dbd36 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java
@@ -185,6 +185,7 @@ public class BootstrapExecutor implements Serializable {
       }
     }
     HoodieTableMetaClient.withPropertyBuilder()
+        .fromProperties(props)
         .setTableType(cfg.tableType)
         .setTableName(cfg.targetTableName)
         .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 50338e5510..0e57bd379a 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -278,6 +278,8 @@ public class DeltaSync implements Serializable {
           .setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(),
               SimpleKeyGenerator.class.getName()))
           .setPreCombineField(cfg.sourceOrderingField)
+          .setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
+              HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
           .initTable(new Configuration(jssc.hadoopConfiguration()),
             cfg.targetBasePath);
     }
@@ -371,6 +373,8 @@ public class DeltaSync implements Serializable {
               HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))
           .setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(),
               SimpleKeyGenerator.class.getName()))
+          .setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
+              HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
           .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath);
     }
 
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 4b0f148355..56124b82af 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -468,7 +468,7 @@ public class HoodieDeltaStreamer implements Serializable {
               compactSchedulingMinShare, clusterSchedulingMinShare, forceDisableCompaction, checkpoint,
               initialCheckpointProvider, help);
     }
-  
+
     @Override
     public String toString() {
       return "Config{"