You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/09/16 01:06:50 UTC

[hudi] branch master updated: [HUDI-4785] Fix partition discovery in bootstrap operation (#6673)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 488f58d770 [HUDI-4785] Fix partition discovery in bootstrap operation (#6673)
488f58d770 is described below

commit 488f58d770137057532196065f2f69eea1a15db8
Author: Sagar Sumit <sa...@gmail.com>
AuthorDate: Fri Sep 16 06:36:44 2022 +0530

    [HUDI-4785] Fix partition discovery in bootstrap operation (#6673)
    
    Co-authored-by: Y Ethan Guo <et...@gmail.com>
---
 .../apache/hudi/config/HoodieBootstrapConfig.java  | 20 +++++----
 .../SparkBootstrapCommitActionExecutor.java        | 47 ++++++++++++++++------
 .../SparkBootstrapDeltaCommitActionExecutor.java   | 12 ++++--
 .../hudi/common/table/TableSchemaResolver.java     | 34 ++++++++--------
 .../hudi/common/table/TestTableSchemaResolver.java | 12 +++---
 .../org/apache/hudi/HoodieBootstrapRelation.scala  |  7 ++--
 .../SparkFullBootstrapDataProviderBase.java        |  4 +-
 .../functional/TestDataSourceForBootstrap.scala    | 18 +++++++--
 8 files changed, 99 insertions(+), 55 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
index 94bb7830cc..0b9116b01c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
@@ -34,6 +34,9 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.util.Properties;
 
+import static org.apache.hudi.client.bootstrap.BootstrapMode.FULL_RECORD;
+import static org.apache.hudi.client.bootstrap.BootstrapMode.METADATA_ONLY;
+
 /**
  * Bootstrap specific configs.
  */
@@ -50,6 +53,15 @@ public class HoodieBootstrapConfig extends HoodieConfig {
       .sinceVersion("0.6.0")
       .withDocumentation("Base path of the dataset that needs to be bootstrapped as a Hudi table");
 
+  public static final ConfigProperty<String> PARTITION_SELECTOR_REGEX_MODE = ConfigProperty
+      .key("hoodie.bootstrap.mode.selector.regex.mode")
+      .defaultValue(METADATA_ONLY.name())
+      .sinceVersion("0.6.0")
+      .withValidValues(METADATA_ONLY.name(), FULL_RECORD.name())
+      .withDocumentation("Bootstrap mode to apply for partition paths, that match regex above. "
+          + "METADATA_ONLY will generate just skeleton base files with keys/footers, avoiding full cost of rewriting the dataset. "
+          + "FULL_RECORD will perform a full copy/rewrite of the data as a Hudi table.");
+
   public static final ConfigProperty<String> MODE_SELECTOR_CLASS_NAME = ConfigProperty
       .key("hoodie.bootstrap.mode.selector")
       .defaultValue(MetadataOnlyBootstrapModeSelector.class.getCanonicalName())
@@ -92,14 +104,6 @@ public class HoodieBootstrapConfig extends HoodieConfig {
       .sinceVersion("0.6.0")
       .withDocumentation("Matches each bootstrap dataset partition against this regex and applies the mode below to it.");
 
-  public static final ConfigProperty<String> PARTITION_SELECTOR_REGEX_MODE = ConfigProperty
-      .key("hoodie.bootstrap.mode.selector.regex.mode")
-      .defaultValue(BootstrapMode.METADATA_ONLY.name())
-      .sinceVersion("0.6.0")
-      .withDocumentation("Bootstrap mode to apply for partition paths, that match regex above. "
-          + "METADATA_ONLY will generate just skeleton base files with keys/footers, avoiding full cost of rewriting the dataset. "
-          + "FULL_RECORD will perform a full copy/rewrite of the data as a Hudi table.");
-
   public static final ConfigProperty<String> INDEX_CLASS_NAME = ConfigProperty
       .key("hoodie.bootstrap.index.class")
       .defaultValue(HFileBootstrapIndex.class.getName())
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
index a2ee384940..88f6a54e0d 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
@@ -26,6 +26,7 @@ import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
 import org.apache.hudi.client.bootstrap.HoodieBootstrapSchemaProvider;
 import org.apache.hudi.client.bootstrap.HoodieSparkBootstrapSchemaProvider;
 import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
+import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector;
 import org.apache.hudi.client.bootstrap.translator.BootstrapPartitionPathTranslator;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.client.utils.SparkValidatorUtils;
@@ -47,7 +48,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
@@ -74,11 +74,15 @@ import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.client.bootstrap.BootstrapMode.FULL_RECORD;
+import static org.apache.hudi.client.bootstrap.BootstrapMode.METADATA_ONLY;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 import static org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE;
 import static org.apache.hudi.table.action.bootstrap.MetadataBootstrapHandlerFactory.getMetadataHandler;
 
@@ -93,19 +97,29 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
                                             HoodieWriteConfig config,
                                             HoodieTable table,
                                             Option<Map<String, String>> extraMetadata) {
-    super(context, new HoodieWriteConfig.Builder().withProps(config.getProps())
-        .withAutoCommit(true).withWriteStatusClass(BootstrapWriteStatus.class)
-        .withBulkInsertParallelism(config.getBootstrapParallelism())
-        .build(), table, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, WriteOperationType.BOOTSTRAP,
+    super(
+        context,
+        new HoodieWriteConfig.Builder()
+            .withProps(config.getProps())
+            .withAutoCommit(true)
+            .withWriteStatusClass(BootstrapWriteStatus.class)
+            .withBulkInsertParallelism(config.getBootstrapParallelism()).build(),
+        table,
+        HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
+        WriteOperationType.BOOTSTRAP,
         extraMetadata);
     bootstrapSourceFileSystem = FSUtils.getFs(config.getBootstrapSourceBasePath(), hadoopConf);
   }
 
   private void validate() {
-    ValidationUtils.checkArgument(config.getBootstrapSourceBasePath() != null,
+    checkArgument(config.getBootstrapSourceBasePath() != null,
         "Ensure Bootstrap Source Path is set");
-    ValidationUtils.checkArgument(config.getBootstrapModeSelectorClass() != null,
+    checkArgument(config.getBootstrapModeSelectorClass() != null,
         "Ensure Bootstrap Partition Selector is set");
+    if (METADATA_ONLY.name().equals(config.getBootstrapModeSelectorRegex())) {
+      checkArgument(!config.getBootstrapModeSelectorClass().equals(FullRecordBootstrapModeSelector.class.getCanonicalName()),
+          "FullRecordBootstrapModeSelector cannot be used with METADATA_ONLY bootstrap mode");
+    }
   }
 
   @Override
@@ -115,15 +129,15 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
       HoodieTableMetaClient metaClient = table.getMetaClient();
       Option<HoodieInstant> completedInstant =
           metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
-      ValidationUtils.checkArgument(!completedInstant.isPresent(),
+      checkArgument(!completedInstant.isPresent(),
           "Active Timeline is expected to be empty for bootstrap to be performed. "
               + "If you want to re-bootstrap, please rollback bootstrap first !!");
       Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> partitionSelections = listAndProcessSourcePartitions();
 
       // First run metadata bootstrap which will auto commit
-      Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataResult = metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY));
+      Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataResult = metadataBootstrap(partitionSelections.get(METADATA_ONLY));
       // if there are full bootstrap to be performed, perform that too
-      Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrapResult = fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD));
+      Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrapResult = fullBootstrap(partitionSelections.get(FULL_RECORD));
 
       return new HoodieBootstrapWriteMetadata(metadataResult, fullBootstrapResult);
     } catch (IOException ioe) {
@@ -307,12 +321,21 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
     BootstrapModeSelector selector =
         (BootstrapModeSelector) ReflectionUtils.loadClass(config.getBootstrapModeSelectorClass(), config);
 
-    Map<BootstrapMode, List<String>> result = selector.select(folders);
+    Map<BootstrapMode, List<String>> result = new HashMap<>();
+    // for FULL_RECORD mode, original record along with metadata fields are needed
+    if (FULL_RECORD.equals(config.getBootstrapModeForRegexMatch())) {
+      if (!(selector instanceof FullRecordBootstrapModeSelector)) {
+        FullRecordBootstrapModeSelector fullRecordBootstrapModeSelector = new FullRecordBootstrapModeSelector(config);
+        result.putAll(fullRecordBootstrapModeSelector.select(folders));
+      }
+    } else {
+      result = selector.select(folders);
+    }
     Map<String, List<HoodieFileStatus>> partitionToFiles = folders.stream().collect(
         Collectors.toMap(Pair::getKey, Pair::getValue));
 
     // Ensure all partitions are accounted for
-    ValidationUtils.checkArgument(partitionToFiles.keySet().equals(
+    checkArgument(partitionToFiles.keySet().equals(
         result.values().stream().flatMap(Collection::stream).collect(Collectors.toSet())));
 
     return result.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue().stream()
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapDeltaCommitActionExecutor.java
index d712ca430b..0d2ac6ceef 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapDeltaCommitActionExecutor.java
@@ -42,8 +42,14 @@ public class SparkBootstrapDeltaCommitActionExecutor<T extends HoodieRecordPaylo
 
   @Override
   protected BaseSparkCommitActionExecutor<T> getBulkInsertActionExecutor(HoodieData<HoodieRecord> inputRecordsRDD) {
-    return new SparkBulkInsertDeltaCommitActionExecutor((HoodieSparkEngineContext) context, new HoodieWriteConfig.Builder().withProps(config.getProps())
-        .withSchema(bootstrapSchema).build(), table, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
-        inputRecordsRDD, extraMetadata);
+    return new SparkBulkInsertDeltaCommitActionExecutor(
+        (HoodieSparkEngineContext) context,
+        new HoodieWriteConfig.Builder()
+            .withProps(config.getProps())
+            .withSchema(bootstrapSchema).build(),
+        table,
+        HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
+        inputRecordsRDD,
+        extraMetadata);
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index f825fd6b99..657ac57c63 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -18,17 +18,6 @@
 
 package org.apache.hudi.common.table;
 
-import org.apache.avro.JsonProperties;
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.SchemaCompatibility;
-import org.apache.avro.generic.IndexedRecord;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -56,9 +45,17 @@ import org.apache.hudi.io.storage.HoodieHFileReader;
 import org.apache.hudi.io.storage.HoodieOrcReader;
 import org.apache.hudi.util.Lazy;
 
+import org.apache.avro.JsonProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
@@ -66,6 +63,7 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.MessageType;
 
 import javax.annotation.concurrent.ThreadSafe;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -208,7 +206,7 @@ public class TableSchemaResolver {
     // TODO partition columns have to be appended in all read-paths
     if (metaClient.getTableConfig().shouldDropPartitionColumns()) {
       return metaClient.getTableConfig().getPartitionFields()
-          .map(partitionFields -> appendPartitionColumns(schema, partitionFields))
+          .map(partitionFields -> appendPartitionColumns(schema, Option.ofNullable(partitionFields)))
           .orElse(schema);
     }
 
@@ -650,18 +648,18 @@ public class TableSchemaResolver {
     }
   }
 
-  static Schema appendPartitionColumns(Schema dataSchema, String[] partitionFields) {
+  public static Schema appendPartitionColumns(Schema dataSchema, Option<String[]> partitionFields) {
     // In cases when {@link DROP_PARTITION_COLUMNS} config is set true, partition columns
     // won't be persisted w/in the data files, and therefore we need to append such columns
     // when schema is parsed from data files
     //
     // Here we append partition columns with {@code StringType} as the data type
-    if (partitionFields.length == 0) {
+    if (!partitionFields.isPresent() || partitionFields.get().length == 0) {
       return dataSchema;
     }
 
-    boolean hasPartitionColNotInSchema = Arrays.stream(partitionFields).anyMatch(pf -> !containsFieldInSchema(dataSchema, pf));
-    boolean hasPartitionColInSchema = Arrays.stream(partitionFields).anyMatch(pf -> containsFieldInSchema(dataSchema, pf));
+    boolean hasPartitionColNotInSchema = Arrays.stream(partitionFields.get()).anyMatch(pf -> !containsFieldInSchema(dataSchema, pf));
+    boolean hasPartitionColInSchema = Arrays.stream(partitionFields.get()).anyMatch(pf -> containsFieldInSchema(dataSchema, pf));
     if (hasPartitionColNotInSchema && hasPartitionColInSchema) {
       throw new HoodieIncompatibleSchemaException("Partition columns could not be partially contained w/in the data schema");
     }
@@ -670,7 +668,7 @@ public class TableSchemaResolver {
       // when hasPartitionColNotInSchema is true and hasPartitionColInSchema is false, all partition columns
       // are not in originSchema. So we create and add them.
       List<Field> newFields = new ArrayList<>();
-      for (String partitionField: partitionFields) {
+      for (String partitionField: partitionFields.get()) {
         newFields.add(new Schema.Field(
             partitionField, createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE));
       }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
index 51d5c5212f..5d949431e4 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
@@ -18,10 +18,12 @@
 
 package org.apache.hudi.common.table;
 
-import org.apache.avro.Schema;
 import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieIncompatibleSchemaException;
+
+import org.apache.avro.Schema;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -36,17 +38,17 @@ public class TestTableSchemaResolver {
 
     // case2
     String[] pts1 = new String[0];
-    Schema s2 = TableSchemaResolver.appendPartitionColumns(originSchema, pts1);
+    Schema s2 = TableSchemaResolver.appendPartitionColumns(originSchema, Option.of(pts1));
     assertEquals(originSchema, s2);
 
     // case3: partition_path is in originSchema
     String[] pts2 = {"partition_path"};
-    Schema s3 = TableSchemaResolver.appendPartitionColumns(originSchema, pts2);
+    Schema s3 = TableSchemaResolver.appendPartitionColumns(originSchema, Option.of(pts2));
     assertEquals(originSchema, s3);
 
     // case4: user_partition is not in originSchema
     String[] pts3 = {"user_partition"};
-    Schema s4 = TableSchemaResolver.appendPartitionColumns(originSchema, pts3);
+    Schema s4 = TableSchemaResolver.appendPartitionColumns(originSchema, Option.of(pts3));
     assertNotEquals(originSchema, s4);
     assertTrue(s4.getFields().stream().anyMatch(f -> f.name().equals("user_partition")));
     Schema.Field f = s4.getField("user_partition");
@@ -55,7 +57,7 @@ public class TestTableSchemaResolver {
     // case5: user_partition is in originSchema, but partition_path is in originSchema
     String[] pts4 = {"user_partition", "partition_path"};
     try {
-      TableSchemaResolver.appendPartitionColumns(originSchema, pts3);
+      TableSchemaResolver.appendPartitionColumns(originSchema, Option.of(pts3));
     } catch (HoodieIncompatibleSchemaException e) {
       assertTrue(e.getMessage().contains("Partial partition fields are still in the schema"));
     }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
index 00de2f756e..4ec7f65913 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
@@ -20,18 +20,17 @@ package org.apache.hudi
 
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.common.model.HoodieBaseFile
-import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.exception.HoodieException
 import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
-import org.apache.spark.sql.{Row, SQLContext}
 import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{Row, SQLContext}
 
 import scala.collection.JavaConverters._
 
@@ -147,7 +146,7 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
     if (fullSchema == null) {
       logInfo("Inferring schema..")
       val schemaResolver = new TableSchemaResolver(metaClient)
-      val tableSchema = schemaResolver.getTableAvroSchemaWithoutMetadataFields
+      val tableSchema = TableSchemaResolver.appendPartitionColumns(schemaResolver.getTableAvroSchemaWithoutMetadataFields, metaClient.getTableConfig.getPartitionFields)
       dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
       fullSchema = StructType(skeletonSchema.fields ++ dataSchema.fields)
     }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
index a0204b256e..bc732a1401 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
+++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
@@ -60,7 +60,9 @@ public abstract class SparkFullBootstrapDataProviderBase extends FullRecordBoots
         .flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toString()))
         .toArray(String[]::new);
 
-    Dataset inputDataset = sparkSession.read().format(getFormat()).load(filePaths);
+    // NOTE: "basePath" option is required for spark to discover the partition column
+    // More details at https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery
+    Dataset inputDataset = sparkSession.read().format(getFormat()).option("basePath", sourceBasePath).load(filePaths);
     try {
       KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
       String structName = tableName + "_record";
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index 23b21b315f..a2b9d29009 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -31,6 +31,8 @@ import org.apache.spark.sql.{SaveMode, SparkSession}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.io.TempDir
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import java.time.Instant
 import java.util.Collections
@@ -148,8 +150,9 @@ class TestDataSourceForBootstrap {
     verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = false, isHiveStylePartitioned = false)
   }
 
-  @Test
-  def testMetadataBootstrapCOWHiveStylePartitioned(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("METADATA_ONLY", "FULL_RECORD"))
+  def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapMode: String): Unit = {
     val timestamp = Instant.now.toEpochMilli
     val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
 
@@ -166,7 +169,10 @@ class TestDataSourceForBootstrap {
     // Perform bootstrap
     val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
       DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
-      commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr") ++ Map(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true"),
+      commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr") ++
+        Map(
+          DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
+          HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.key -> bootstrapMode),
       classOf[SimpleKeyGenerator].getName)
 
     // check marked directory clean up
@@ -520,7 +526,11 @@ class TestDataSourceForBootstrap {
       .save(basePath)
 
     val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
-    assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
+    val expectedBootstrapInstant =
+      if ("FULL_RECORD".equals(extraOpts.getOrElse(HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.key, HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.defaultValue)))
+        HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS
+      else HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS
+    assertEquals(expectedBootstrapInstant, commitInstantTime1)
     commitInstantTime1
   }