You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ak...@apache.org on 2023/02/24 16:44:00 UTC

[hudi] branch master updated: [HUDI-915][HUDI-5656] Rebased `HoodieBootstrapRelation` onto `HoodieBaseRelation` (#7804)

This is an automated email from the ASF dual-hosted git repository.

akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2770ff50714 [HUDI-915][HUDI-5656] Rebased `HoodieBootstrapRelation` onto `HoodieBaseRelation` (#7804)
2770ff50714 is described below

commit 2770ff507141f013f7500354595137b52a543e8b
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Fri Feb 24 08:43:49 2023 -0800

    [HUDI-915][HUDI-5656] Rebased `HoodieBootstrapRelation` onto `HoodieBaseRelation` (#7804)
    
    Currently `HoodieBootstrapRelation` is treats partitioned tables improperly resulting in NPE while trying to read bootstrapped table.
    
    To address that `HoodieBootstrapRelation` have been rebased onto `HoodieBaseRelation` sharing core of the reading semantic with other Hudi's file-based Relation implementations for COW, MOR (such as schema handling, file-listing, etc)
---
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |  47 ++--
 .../scala/org/apache/hudi/HoodieBootstrapRDD.scala | 103 ++++----
 .../org/apache/hudi/HoodieBootstrapRelation.scala  | 259 +++++++++++----------
 .../apache/hudi/MergeOnReadSnapshotRelation.scala  |   2 +-
 .../spark/sql/hudi/HoodieSqlCommonUtils.scala      |  30 ++-
 .../functional/TestDataSourceForBootstrap.scala    | 166 +++++++------
 .../deltastreamer/TestHoodieDeltaStreamer.java     |   4 +
 7 files changed, 344 insertions(+), 267 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 99b5b5c87ba..cb02c59a690 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -48,6 +48,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.HoodieCatalystExpressionUtils.{convertToCatalystExpression, generateUnsafeProjection}
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
 import org.apache.spark.sql.execution.FileRelation
 import org.apache.spark.sql.execution.datasources._
@@ -66,7 +67,12 @@ import scala.util.{Failure, Success, Try}
 
 trait HoodieFileSplit {}
 
-case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String, internalSchema: Option[InternalSchema] = None)
+case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String, internalSchema: Option[InternalSchema] = None) {
+
+  def this(structTypeSchema: StructType) =
+    this(structTypeSchema, convertToAvroSchema(structTypeSchema).toString)
+
+}
 
 case class HoodieTableState(tablePath: String,
                             latestCommitTimestamp: Option[String],
@@ -98,6 +104,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
 
   protected val sparkSession: SparkSession = sqlContext.sparkSession
 
+  protected lazy val resolver: Resolver = sparkSession.sessionState.analyzer.resolver
+
   protected lazy val conf: Configuration = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
   protected lazy val jobConf = new JobConf(conf)
 
@@ -174,8 +182,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
 
   protected lazy val tableStructSchema: StructType = {
     val converted = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
-
-    val resolver = sparkSession.sessionState.analyzer.resolver
     val metaFieldMetadata = sparkAdapter.createCatalystMetadataForMetaField
 
     // NOTE: Here we annotate meta-fields with corresponding metadata such that Spark (>= 3.2)
@@ -466,10 +472,15 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
    * For enable hoodie.datasource.write.drop.partition.columns, need to create an InternalRow on partition values
    * and pass this reader on parquet file. So that, we can query the partition columns.
    */
-  protected def getPartitionColumnsAsInternalRow(file: FileStatus): InternalRow = {
+
+  protected def getPartitionColumnsAsInternalRow(file: FileStatus): InternalRow =
+    getPartitionColumnsAsInternalRowInternal(file, shouldExtractPartitionValuesFromPartitionPath)
+
+  protected def getPartitionColumnsAsInternalRowInternal(file: FileStatus,
+                                                         extractPartitionValuesFromPartitionPath: Boolean): InternalRow = {
     try {
       val tableConfig = metaClient.getTableConfig
-      if (shouldExtractPartitionValuesFromPartitionPath) {
+      if (extractPartitionValuesFromPartitionPath) {
         val relativePath = new URI(metaClient.getBasePath).relativize(new URI(file.getPath.getParent.toString)).toString
         val hiveStylePartitioningEnabled = tableConfig.getHiveStylePartitioningEnable.toBoolean
         if (hiveStylePartitioningEnabled) {
@@ -514,7 +525,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
                                      requiredDataSchema: HoodieTableSchema,
                                      filters: Seq[Filter],
                                      options: Map[String, String],
-                                     hadoopConf: Configuration): BaseFileReader = {
+                                     hadoopConf: Configuration,
+                                     shouldAppendPartitionValuesOverride: Option[Boolean] = None): BaseFileReader = {
     val tableBaseFileFormat = tableConfig.getBaseFileFormat
 
     // NOTE: PLEASE READ CAREFULLY
@@ -535,7 +547,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
             hadoopConf = hadoopConf,
             // We're delegating to Spark to append partition values to every row only in cases
             // when these corresponding partition-values are not persisted w/in the data file itself
-            appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
+            appendPartitionValues = shouldAppendPartitionValuesOverride.getOrElse(shouldExtractPartitionValuesFromPartitionPath)
           )
           // Since partition values by default are omitted, and not persisted w/in data-files by Spark,
           // data-file readers (such as [[ParquetFileFormat]]) have to inject partition values while reading
@@ -589,6 +601,12 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
 
   protected def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
                                          requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = {
+    tryPrunePartitionColumnsInternal(tableSchema, requiredSchema, shouldExtractPartitionValuesFromPartitionPath)
+  }
+
+  protected def tryPrunePartitionColumnsInternal(tableSchema: HoodieTableSchema,
+                                                 requiredSchema: HoodieTableSchema,
+                                                 extractPartitionValuesFromPartitionPath: Boolean): (StructType, HoodieTableSchema, HoodieTableSchema) = {
     // Since schema requested by the caller might contain partition columns, we might need to
     // prune it, removing all partition columns from it in case these columns are not persisted
     // in the data files
@@ -598,21 +616,24 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
     //       the partition path, and omitted from the data file, back into fetched rows;
     //       Note that, by default, partition columns are not omitted therefore specifying
     //       partition schema for reader is not required
-    if (shouldExtractPartitionValuesFromPartitionPath) {
-      val partitionSchema = StructType(partitionColumns.map(StructField(_, StringType)))
+    if (extractPartitionValuesFromPartitionPath) {
+      val partitionSchema = filterInPartitionColumns(tableSchema.structTypeSchema)
       val prunedDataStructSchema = prunePartitionColumns(tableSchema.structTypeSchema)
       val prunedRequiredSchema = prunePartitionColumns(requiredSchema.structTypeSchema)
 
       (partitionSchema,
-        HoodieTableSchema(prunedDataStructSchema, convertToAvroSchema(prunedDataStructSchema).toString),
-        HoodieTableSchema(prunedRequiredSchema, convertToAvroSchema(prunedRequiredSchema).toString))
+        new HoodieTableSchema(prunedDataStructSchema),
+        new HoodieTableSchema(prunedRequiredSchema))
     } else {
       (StructType(Nil), tableSchema, requiredSchema)
     }
   }
 
-  private def prunePartitionColumns(dataStructSchema: StructType): StructType =
-    StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name)))
+  private def filterInPartitionColumns(structType: StructType): StructType =
+    StructType(structType.filter(f => partitionColumns.exists(col => resolver(f.name, col))))
+
+  private def prunePartitionColumns(structType: StructType): StructType =
+    StructType(structType.filterNot(f => partitionColumns.exists(pc => resolver(f.name, pc))))
 
   private def getConfigValue(config: ConfigProperty[String],
                              defaultValueOption: Option[String]=Option.empty): String = {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala
index ea997c86acb..b72c41bbd66 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala
@@ -18,23 +18,22 @@
 
 package org.apache.hudi
 
-import org.apache.spark.{Partition, TaskContext}
+import org.apache.hudi.HoodieBaseRelation.BaseFileReader
+import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
 import org.apache.spark.sql.types.StructType
-
-import org.apache.hudi.HoodieDataSourceHelper._
+import org.apache.spark.{Partition, TaskContext}
 
 class HoodieBootstrapRDD(@transient spark: SparkSession,
-                        dataReadFunction: PartitionedFile => Iterator[InternalRow],
-                        skeletonReadFunction: PartitionedFile => Iterator[InternalRow],
-                        regularReadFunction: PartitionedFile => Iterator[InternalRow],
-                        dataSchema: StructType,
-                        skeletonSchema: StructType,
-                        requiredColumns: Array[String],
-                        tableState: HoodieBootstrapTableState)
+                         bootstrapDataFileReader: BaseFileReader,
+                         bootstrapSkeletonFileReader: BaseFileReader,
+                         regularFileReader: BaseFileReader,
+                         requiredSchema: HoodieTableSchema,
+                         @transient splits: Seq[HoodieBootstrapSplit])
   extends RDD[InternalRow](spark.sparkContext, Nil) {
 
   override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
@@ -51,59 +50,57 @@ class HoodieBootstrapRDD(@transient spark: SparkSession,
       }
     }
 
-    var partitionedFileIterator: Iterator[InternalRow] = null
+    bootstrapPartition.split.skeletonFile match {
+      case Some(skeletonFile) =>
+        // It is a bootstrap split. Check both skeleton and data files.
+        val (iterator, schema) = if (bootstrapDataFileReader.schema.isEmpty) {
+          // No data column to fetch, hence fetch only from skeleton file
+          (bootstrapSkeletonFileReader.read(skeletonFile), bootstrapSkeletonFileReader.schema)
+        } else if (bootstrapSkeletonFileReader.schema.isEmpty) {
+          // No metadata column to fetch, hence fetch only from data file
+          (bootstrapDataFileReader.read(bootstrapPartition.split.dataFile), bootstrapDataFileReader.schema)
+        } else {
+          // Fetch from both data and skeleton file, and merge
+          val dataFileIterator = bootstrapDataFileReader.read(bootstrapPartition.split.dataFile)
+          val skeletonFileIterator = bootstrapSkeletonFileReader.read(skeletonFile)
+          val mergedSchema = StructType(bootstrapSkeletonFileReader.schema.fields ++ bootstrapDataFileReader.schema.fields)
 
-    if (bootstrapPartition.split.skeletonFile.isDefined) {
-      // It is a bootstrap split. Check both skeleton and data files.
-      if (dataSchema.isEmpty) {
-        // No data column to fetch, hence fetch only from skeleton file
-        partitionedFileIterator = skeletonReadFunction(bootstrapPartition.split.skeletonFile.get)
-      } else if (skeletonSchema.isEmpty) {
-        // No metadata column to fetch, hence fetch only from data file
-        partitionedFileIterator = dataReadFunction(bootstrapPartition.split.dataFile)
-      } else {
-        // Fetch from both data and skeleton file, and merge
-        val dataFileIterator = dataReadFunction(bootstrapPartition.split.dataFile)
-        val skeletonFileIterator = skeletonReadFunction(bootstrapPartition.split.skeletonFile.get)
-        partitionedFileIterator = merge(skeletonFileIterator, dataFileIterator)
-      }
-    } else {
-      partitionedFileIterator = regularReadFunction(bootstrapPartition.split.dataFile)
+          (merge(skeletonFileIterator, dataFileIterator), mergedSchema)
+        }
+
+        // NOTE: Here we have to project the [[InternalRow]]s fetched into the expected target schema.
+        //       These could diverge for ex, when requested schema contains partition columns which might not be
+        //       persisted w/in the data file, but instead would be parsed from the partition path. In that case
+        //       output of the file-reader will have different ordering of the fields than the original required
+        //       schema (for more details please check out [[ParquetFileFormat]] implementation).
+        val unsafeProjection = generateUnsafeProjection(schema, requiredSchema.structTypeSchema)
+
+        iterator.map(unsafeProjection)
+
+      case _ =>
+        // NOTE: Regular file-reader is already projected into the required schema
+        regularFileReader.read(bootstrapPartition.split.dataFile)
     }
-    partitionedFileIterator
   }
 
-  def merge(skeletonFileIterator: Iterator[InternalRow], dataFileIterator: Iterator[InternalRow])
-  : Iterator[InternalRow] = {
+  def merge(skeletonFileIterator: Iterator[InternalRow], dataFileIterator: Iterator[InternalRow]): Iterator[InternalRow] = {
     new Iterator[InternalRow] {
-      override def hasNext: Boolean = dataFileIterator.hasNext && skeletonFileIterator.hasNext
-      override def next(): InternalRow = {
-        mergeInternalRow(skeletonFileIterator.next(), dataFileIterator.next())
-      }
-    }
-  }
+      private val combinedRow = new JoinedRow()
 
-  def mergeInternalRow(skeletonRow: InternalRow, dataRow: InternalRow): InternalRow = {
-    val skeletonArr  = skeletonRow.copy().toSeq(skeletonSchema)
-    val dataArr = dataRow.copy().toSeq(dataSchema)
-    // We need to return it in the order requested
-    val mergedArr = requiredColumns.map(col => {
-      if (skeletonSchema.fieldNames.contains(col)) {
-        val idx = skeletonSchema.fieldIndex(col)
-        skeletonArr(idx)
-      } else {
-        val idx = dataSchema.fieldIndex(col)
-        dataArr(idx)
+      override def hasNext: Boolean = {
+        checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
+          "Bootstrap data-file iterator and skeleton-file iterator have to be in-sync!")
+        dataFileIterator.hasNext && skeletonFileIterator.hasNext
       }
-    })
 
-    logDebug("Merged data and skeleton values => " + mergedArr.mkString(","))
-    val mergedRow = InternalRow.fromSeq(mergedArr)
-    mergedRow
+      override def next(): InternalRow = {
+        combinedRow(skeletonFileIterator.next(), dataFileIterator.next())
+      }
+    }
   }
 
   override protected def getPartitions: Array[Partition] = {
-    tableState.files.zipWithIndex.map(file => {
+    splits.zipWithIndex.map(file => {
       if (file._1.skeletonFile.isDefined) {
         logDebug("Forming partition with => Index: " + file._2 + ", Files: " + file._1.dataFile.filePath
           + "," + file._1.skeletonFile.get.filePath)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
index 0dd54237ef5..5c58c10493d 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
@@ -19,20 +19,20 @@
 package org.apache.hudi
 
 import org.apache.hadoop.fs.Path
-import org.apache.hudi.common.model.HoodieBaseFile
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView
-import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
-import org.apache.hudi.exception.HoodieException
-import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
-import org.apache.spark.internal.Logging
+import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader}
+import org.apache.hudi.HoodieBootstrapRelation.validate
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile}
-import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isMetaField, removeMetaFields}
+import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{Row, SQLContext}
 
-import scala.collection.JavaConverters._
+case class HoodieBootstrapSplit(dataFile: PartitionedFile, skeletonFile: Option[PartitionedFile] = None) extends HoodieFileSplit
 
 /**
   * This is Spark relation that can be used for querying metadata/fully bootstrapped query hoodie tables, as well as
@@ -44,150 +44,161 @@ import scala.collection.JavaConverters._
   * bootstrapped files, because then the metadata file and data file can return different number of rows causing errors
   * merging.
   *
-  * @param _sqlContext Spark SQL Context
+  * @param sqlContext Spark SQL Context
   * @param userSchema User specified schema in the datasource query
   * @param globPaths  The global paths to query. If it not none, read from the globPaths,
   *                   else read data from tablePath using HoodiFileIndex.
   * @param metaClient Hoodie table meta client
   * @param optParams DataSource options passed by the user
   */
-class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
-                              val userSchema: Option[StructType],
-                              val globPaths: Seq[Path],
-                              val metaClient: HoodieTableMetaClient,
-                              val optParams: Map[String, String]) extends BaseRelation
-  with PrunedFilteredScan with Logging {
+case class HoodieBootstrapRelation(override val sqlContext: SQLContext,
+                                   private val userSchema: Option[StructType],
+                                   private val globPaths: Seq[Path],
+                                   override val metaClient: HoodieTableMetaClient,
+                                   override val optParams: Map[String, String],
+                                   private val prunedDataSchema: Option[StructType] = None)
+  extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema, prunedDataSchema) {
 
-  val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema
-  var dataSchema: StructType = _
-  var fullSchema: StructType = _
+  override type FileSplit = HoodieBootstrapSplit
+  override type Relation = HoodieBootstrapRelation
 
-  val fileIndex: HoodieBootstrapFileIndex = buildFileIndex()
+  private lazy val skeletonSchema = HoodieSparkUtils.getMetaSchema
 
-  override def sqlContext: SQLContext = _sqlContext
+  override val mandatoryFields: Seq[String] = Seq.empty
 
-  override val needConversion: Boolean = false
+  protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit] = {
+    val fileSlices = listLatestFileSlices(globPaths, partitionFilters, dataFilters)
+    fileSlices.map { fileSlice =>
+      val baseFile = fileSlice.getBaseFile.get()
 
-  override def schema: StructType = inferFullSchema()
+      if (baseFile.getBootstrapBaseFile.isPresent) {
+        val partitionValues =
+          getPartitionColumnsAsInternalRowInternal(baseFile.getFileStatus, extractPartitionValuesFromPartitionPath = true)
+        val dataFile = PartitionedFile(partitionValues, baseFile.getBootstrapBaseFile.get().getPath, 0, baseFile.getBootstrapBaseFile.get().getFileLen)
+        val skeletonFile = Option(PartitionedFile(InternalRow.empty, baseFile.getPath, 0, baseFile.getFileLen))
 
-  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
-    logInfo("Starting scan..")
-
-    // Compute splits
-    val bootstrapSplits = fileIndex.files.map(hoodieBaseFile => {
-      var skeletonFile: Option[PartitionedFile] = Option.empty
-      var dataFile: PartitionedFile = null
-
-      if (hoodieBaseFile.getBootstrapBaseFile.isPresent) {
-        skeletonFile = Option(PartitionedFile(InternalRow.empty, hoodieBaseFile.getPath, 0, hoodieBaseFile.getFileLen))
-        dataFile = PartitionedFile(InternalRow.empty, hoodieBaseFile.getBootstrapBaseFile.get().getPath, 0,
-          hoodieBaseFile.getBootstrapBaseFile.get().getFileLen)
-      } else {
-        dataFile = PartitionedFile(InternalRow.empty, hoodieBaseFile.getPath, 0, hoodieBaseFile.getFileLen)
-      }
-      HoodieBootstrapSplit(dataFile, skeletonFile)
-    })
-    val tableState = HoodieBootstrapTableState(bootstrapSplits)
-
-    // Get required schemas for column pruning
-    var requiredDataSchema = StructType(Seq())
-    var requiredSkeletonSchema = StructType(Seq())
-    // requiredColsSchema is the schema of requiredColumns, note that requiredColumns is in a random order
-    // so requiredColsSchema is not always equal to (requiredSkeletonSchema.fields ++ requiredDataSchema.fields)
-    var requiredColsSchema = StructType(Seq())
-    requiredColumns.foreach(col => {
-      var field = dataSchema.find(_.name == col)
-      if (field.isDefined) {
-        requiredDataSchema = requiredDataSchema.add(field.get)
+        HoodieBootstrapSplit(dataFile, skeletonFile)
       } else {
-        field = skeletonSchema.find(_.name == col)
-        requiredSkeletonSchema = requiredSkeletonSchema.add(field.get)
+        val dataFile = PartitionedFile(getPartitionColumnsAsInternalRow(baseFile.getFileStatus), baseFile.getPath, 0, baseFile.getFileLen)
+        HoodieBootstrapSplit(dataFile)
       }
-      requiredColsSchema = requiredColsSchema.add(field.get)
-    })
+    }
+  }
 
-    // Prepare readers for reading data file and skeleton files
-    val dataReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader(
-      sparkSession = _sqlContext.sparkSession,
-      dataSchema = dataSchema,
-      partitionSchema = StructType(Seq.empty),
-      requiredSchema = requiredDataSchema,
-      filters = if (requiredSkeletonSchema.isEmpty) filters else Seq() ,
+  protected override def composeRDD(fileSplits: Seq[FileSplit],
+                                    tableSchema: HoodieTableSchema,
+                                    requiredSchema: HoodieTableSchema,
+                                    requestedColumns: Array[String],
+                                    filters: Array[Filter]): RDD[InternalRow] = {
+    val requiredSkeletonFileSchema =
+      StructType(skeletonSchema.filter(f => requestedColumns.exists(col => resolver(f.name, col))))
+
+    val (bootstrapDataFileReader, bootstrapSkeletonFileReader) =
+      createBootstrapFileReaders(tableSchema, requiredSchema, requiredSkeletonFileSchema, filters)
+
+    val regularFileReader = createRegularFileReader(tableSchema, requiredSchema, filters)
+
+    new HoodieBootstrapRDD(sqlContext.sparkSession, bootstrapDataFileReader, bootstrapSkeletonFileReader, regularFileReader,
+      requiredSchema, fileSplits)
+  }
+
+  private def createBootstrapFileReaders(tableSchema: HoodieTableSchema,
+                                         requiredSchema: HoodieTableSchema,
+                                         requiredSkeletonFileSchema: StructType,
+                                         filters: Array[Filter]): (BaseFileReader, BaseFileReader) = {
+    // NOTE: "Data" schema in here refers to the whole table's schema that doesn't include only partition
+    //       columns, as opposed to data file schema not including any meta-fields columns in case of
+    //       Bootstrap relation
+    val (partitionSchema, dataSchema, requiredDataSchema) =
+      tryPrunePartitionColumnsInternal(tableSchema, requiredSchema, extractPartitionValuesFromPartitionPath = true)
+
+    val bootstrapDataFileSchema = StructType(dataSchema.structTypeSchema.filterNot(sf => isMetaField(sf.name)))
+    val requiredBootstrapDataFileSchema = StructType(requiredDataSchema.structTypeSchema.filterNot(sf => isMetaField(sf.name)))
+
+    validate(requiredDataSchema, requiredBootstrapDataFileSchema, requiredSkeletonFileSchema)
+
+    val bootstrapDataFileReader = createBaseFileReader(
+      spark = sqlContext.sparkSession,
+      dataSchema = new HoodieTableSchema(bootstrapDataFileSchema),
+      partitionSchema = partitionSchema,
+      requiredDataSchema = new HoodieTableSchema(requiredBootstrapDataFileSchema),
+      // NOTE: For bootstrapped files we can't apply any filtering in case we'd need to merge it with
+      //       a skeleton-file as we rely on matching ordering of the records across bootstrap- and skeleton-files
+      filters = if (requiredSkeletonFileSchema.isEmpty) filters else Seq(),
       options = optParams,
-      hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
+      hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf(),
+      // NOTE: Bootstrap relation have to always extract partition values from the partition-path as this is a
+      //       default Spark behavior: Spark by default strips partition-columns from the data schema and does
+      //       NOT persist them in the data files, instead parsing them from partition-paths (on the fly) whenever
+      //       table is queried
+      shouldAppendPartitionValuesOverride = Some(true)
     )
 
-    val skeletonReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader(
-      sparkSession = _sqlContext.sparkSession,
-      dataSchema = skeletonSchema,
+    val boostrapSkeletonFileReader = createBaseFileReader(
+      spark = sqlContext.sparkSession,
+      dataSchema = new HoodieTableSchema(skeletonSchema),
+      // NOTE: Here we specify partition-schema as empty since we don't need Spark to inject partition-values
+      //       parsed from the partition-path
       partitionSchema = StructType(Seq.empty),
-      requiredSchema = requiredSkeletonSchema,
-      filters = if (requiredDataSchema.isEmpty) filters else Seq(),
+      requiredDataSchema = new HoodieTableSchema(requiredSkeletonFileSchema),
+      // NOTE: For bootstrapped files we can't apply any filtering in case we'd need to merge it with
+      //       a skeleton-file as we rely on matching ordering of the records across bootstrap- and skeleton-files
+      filters = if (requiredBootstrapDataFileSchema.isEmpty) filters else Seq(),
       options = optParams,
-      hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
+      hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf(),
+      // NOTE: We override Spark to avoid injecting partition values into the records read from
+      //       skeleton-file
+      shouldAppendPartitionValuesOverride = Some(false)
     )
 
-    val regularReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader(
-      sparkSession = _sqlContext.sparkSession,
-      dataSchema = fullSchema,
-      partitionSchema = StructType(Seq.empty),
-      requiredSchema = requiredColsSchema,
+    (bootstrapDataFileReader, boostrapSkeletonFileReader)
+  }
+
+  private def createRegularFileReader(tableSchema: HoodieTableSchema,
+                                     requiredSchema: HoodieTableSchema,
+                                     filters: Array[Filter]): BaseFileReader = {
+    // NOTE: "Data" schema in here refers to the whole table's schema that doesn't include only partition
+    //       columns, as opposed to data file schema not including any meta-fields columns in case of
+    //       Bootstrap relation
+    val (partitionSchema, dataSchema, requiredDataSchema) =
+      tryPrunePartitionColumns(tableSchema, requiredSchema)
+
+    // NOTE: Bootstrapped table allows Hudi created file-slices to be co-located w/ the "bootstrapped"
+    //       ones (ie persisted by Spark). Therefore to be able to read the data from Bootstrapped
+    //       table we also need to create regular file-reader to read file-slices created by Hudi
+    val regularFileReader = createBaseFileReader(
+      spark = sqlContext.sparkSession,
+      dataSchema = dataSchema,
+      partitionSchema = partitionSchema,
+      requiredDataSchema = requiredDataSchema,
       filters = filters,
       options = optParams,
-      hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
+      hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
     )
 
-    val rdd = new HoodieBootstrapRDD(_sqlContext.sparkSession, dataReadFunction, skeletonReadFunction,
-      regularReadFunction, requiredDataSchema, requiredSkeletonSchema, requiredColumns, tableState)
-    rdd.asInstanceOf[RDD[Row]]
+    // NOTE: In some case schema of the reader's output (reader's schema) might not match the schema expected by the caller.
+    //       This could occur for ex, when requested schema contains partition columns which might not be persisted w/in the
+    //       data file, but instead would be parsed from the partition path. In that case output of the file-reader will have
+    //       different ordering of the fields than the original required schema (for more details please check out
+    //       [[ParquetFileFormat]] impl). In that case we have to project the rows from the file-reader's schema
+    //       back into the one expected by the caller
+    projectReader(regularFileReader, requiredSchema.structTypeSchema)
   }
 
-  def inferFullSchema(): StructType = {
-    if (fullSchema == null) {
-      logInfo("Inferring schema..")
-      val schemaResolver = new TableSchemaResolver(metaClient)
-      val tableSchema = schemaResolver.getTableAvroSchema(false)
-      dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
-      fullSchema = StructType(skeletonSchema.fields ++ dataSchema.fields)
-    }
-    fullSchema
-  }
-
-  def buildFileIndex(): HoodieBootstrapFileIndex = {
-    logInfo("Building file index..")
-    val fileStatuses  = if (globPaths.nonEmpty) {
-      // Load files from the global paths if it has defined to be compatible with the original mode
-      val inMemoryFileIndex = HoodieInMemoryFileIndex.create(_sqlContext.sparkSession, globPaths)
-      inMemoryFileIndex.allFiles()
-    } else { // Load files by the HoodieFileIndex.
-        HoodieFileIndex(sqlContext.sparkSession, metaClient, Some(schema), optParams,
-          FileStatusCache.getOrCreate(sqlContext.sparkSession)).allFiles
-    }
-    if (fileStatuses.isEmpty) {
-      throw new HoodieException("No files found for reading in user provided path.")
-    }
+  override def updatePrunedDataSchema(prunedSchema: StructType): HoodieBootstrapRelation =
+    this.copy(prunedDataSchema = Some(prunedSchema))
+}
 
-    val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitsTimeline
-      .filterCompletedInstants, fileStatuses.toArray)
-    val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList
-
-    if (log.isDebugEnabled) {
-      latestFiles.foreach(file => {
-        logDebug("Printing indexed files:")
-        if (file.getBootstrapBaseFile.isPresent) {
-          logDebug("Skeleton File: " + file.getPath + ", Data File: " + file.getBootstrapBaseFile.get().getPath)
-        } else {
-          logDebug("Regular Hoodie File: " + file.getPath)
-        }
-      })
-    }
 
-    HoodieBootstrapFileIndex(latestFiles)
-  }
-}
+object HoodieBootstrapRelation {
 
-case class HoodieBootstrapFileIndex(files: List[HoodieBaseFile])
+  private def validate(requiredDataSchema: HoodieTableSchema, requiredDataFileSchema: StructType, requiredSkeletonFileSchema: StructType): Unit = {
+    val requiredDataColumns: Seq[String] = requiredDataSchema.structTypeSchema.fieldNames.toSeq
+    val combinedColumns = (requiredSkeletonFileSchema.fieldNames ++ requiredDataFileSchema.fieldNames).toSeq
 
-case class HoodieBootstrapTableState(files: List[HoodieBootstrapSplit])
+    // NOTE: Here we validate that all required data columns are covered by the combination of the columns
+    //       from both skeleton file and the corresponding data file
+    checkState(combinedColumns.sorted == requiredDataColumns.sorted)
+  }
 
-case class HoodieBootstrapSplit(dataFile: PartitionedFile, skeletonFile: Option[PartitionedFile])
+}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index accfc8f2470..94168755cbf 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -190,7 +190,7 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: SQLContext,
           StructType(requiredDataSchema.structTypeSchema.fields
             .filterNot(f => unusedMandatoryColumnNames.contains(f.name)))
 
-        HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema).toString)
+        new HoodieTableSchema(prunedStructSchema)
       }
 
       val requiredSchemaReaderSkipMerging = createBaseFileReader(
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
index 8e589abbc18..54c58bace7c 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
@@ -23,9 +23,11 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieMetadataConfig}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator}
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline.parseDateFromInstantTime
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator, HoodieTimeline}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.common.util.PartitionPathEncodeUtils
+import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper, DataSourceReadOptions, SparkAdapterSupport}
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -42,6 +44,7 @@ import java.text.SimpleDateFormat
 import java.util.{Locale, Properties}
 import scala.collection.JavaConverters._
 import scala.collection.immutable.Map
+import scala.util.Try
 
 object HoodieSqlCommonUtils extends SparkAdapterSupport {
   // NOTE: {@code SimpleDataFormat} is NOT thread-safe
@@ -251,11 +254,13 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
    */
   def formatQueryInstant(queryInstant: String): String = {
     val instantLength = queryInstant.length
-    if (instantLength == 19 || instantLength == 23) { // for yyyy-MM-dd HH:mm:ss[.SSS]
+    if (instantLength == 19 || instantLength == 23) {
+      // Handle "yyyy-MM-dd HH:mm:ss[.SSS]" format
       HoodieInstantTimeGenerator.getInstantForDateString(queryInstant)
     } else if (instantLength == HoodieInstantTimeGenerator.SECS_INSTANT_ID_LENGTH
-      || instantLength  == HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) { // for yyyyMMddHHmmss[SSS]
-      HoodieActiveTimeline.parseDateFromInstantTime(queryInstant) // validate the format
+      || instantLength  == HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) {
+      // Handle already serialized "yyyyMMddHHmmss[SSS]" format
+      validateInstant(queryInstant)
       queryInstant
     } else if (instantLength == 10) { // for yyyy-MM-dd
       HoodieActiveTimeline.formatDate(defaultDateFormat.get().parse(queryInstant))
@@ -356,4 +361,21 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
     }.mkString(",")
     partitionsToDrop
   }
+
+  private def validateInstant(queryInstant: String): Unit = {
+    // Provided instant has to either
+    //  - Match one of the bootstrapping instants
+    //  - Be parse-able (as a date)
+    val valid = queryInstant match {
+      case HoodieTimeline.INIT_INSTANT_TS |
+           HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS |
+           HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS => true
+
+      case _ => Try(parseDateFromInstantTime(queryInstant)).isSuccess
+    }
+
+    if (!valid) {
+      throw new HoodieException(s"Got an invalid instant ($queryInstant)")
+    }
+  }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index 82f79eeb44e..e3d235591d4 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -20,14 +20,16 @@ package org.apache.hudi.functional
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
 import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector
+import org.apache.hudi.common.config.HoodieStorageConfig
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.table.timeline.HoodieTimeline
 import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieCompactionConfig, HoodieWriteConfig}
 import org.apache.hudi.functional.TestDataSourceForBootstrap.{dropMetaCols, sort}
 import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
 import org.apache.hudi.testutils.HoodieClientTestUtils
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkRecordMerger}
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.functions.{col, lit}
 import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
@@ -35,7 +37,7 @@ import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.io.TempDir
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
 
 import java.time.Instant
 import java.util.Collections
@@ -56,6 +58,12 @@ class TestDataSourceForBootstrap {
     DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
     HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
   )
+
+  val sparkRecordTypeOpts = Map(
+    HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> classOf[HoodieSparkRecordMerger].getName,
+    HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet"
+  )
+
   var basePath: String = _
   var srcPath: String = _
   var fs: FileSystem = _
@@ -153,12 +161,18 @@ class TestDataSourceForBootstrap {
     assertEquals(numRecords, hoodieROViewDF1WithBasePath.count())
     assertEquals(numRecordsUpdate, hoodieROViewDF1WithBasePath.filter(s"timestamp == $updateTimestamp").count())
 
-    verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = false, isHiveStylePartitioned = false)
+    verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = false, isHiveStylePartitioned = true)
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("METADATA_ONLY", "FULL_RECORD"))
-  def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapMode: String): Unit = {
+  @CsvSource(value = Array(
+    "METADATA_ONLY,AVRO",
+    // TODO(HUDI-5807) enable for spark native records
+    /* "METADATA_ONLY,SPARK", */
+    "FULL_RECORD,AVRO",
+    "FULL_RECORD,SPARK"
+  ))
+  def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapMode: String, recordType: HoodieRecordType): Unit = {
     val timestamp = Instant.now.toEpochMilli
     val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
 
@@ -181,16 +195,15 @@ class TestDataSourceForBootstrap {
     // Perform bootstrap
     val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
       DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
-      readOpts,
+      readOpts ++ getRecordTypeOpts(recordType),
       classOf[SimpleKeyGenerator].getName)
 
     // check marked directory clean up
     assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
 
-    // TODO(HUDI-5602) troubleshoot
     val expectedDF = bootstrapMode match {
       case "METADATA_ONLY" =>
-        sort(sourceDF).withColumn("datestr", lit(null))
+        sort(sourceDF)
       case "FULL_RECORD" =>
         sort(sourceDF)
     }
@@ -208,9 +221,11 @@ class TestDataSourceForBootstrap {
     val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava,
       jsc, spark.sqlContext)
 
+    val writeOpts = commonOpts ++ getRecordTypeOpts(recordType)
+
     updateDF.write
       .format("hudi")
-      .options(commonOpts)
+      .options(writeOpts)
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
       .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
@@ -234,28 +249,31 @@ class TestDataSourceForBootstrap {
     verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = true)
   }
 
-  @Test def testMetadataBootstrapCOWPartitioned(): Unit = {
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieRecordType],
+    // TODO(HUDI-5807) enable for spark native records
+    names = Array("AVRO" /*, "SPARK" */))
+  def testMetadataBootstrapCOWPartitioned(recordType: HoodieRecordType): Unit = {
     val timestamp = Instant.now.toEpochMilli
     val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
 
     val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
       spark.sqlContext)
 
-    // Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence
-    // have partitioned columns stored in the data file
-    partitionPaths.foreach(partitionPath => {
-      sourceDF
-        .filter(sourceDF("datestr").equalTo(lit(partitionPath)))
-        .write
-        .format("parquet")
-        .mode(SaveMode.Overwrite)
-        .save(srcPath + "/" + partitionPath)
-    })
+    sourceDF.write.format("parquet")
+      .partitionBy("datestr")
+      .mode(SaveMode.Overwrite)
+      .save(srcPath)
+
+    val writeOpts = commonOpts ++ getRecordTypeOpts(recordType) ++ Map(
+      DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr"
+    )
 
     // Perform bootstrap
     val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
       DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
-      commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"),
+      writeOpts,
       classOf[SimpleKeyGenerator].getName)
 
     // Read bootstrapped table and verify count using glob path
@@ -270,10 +288,9 @@ class TestDataSourceForBootstrap {
     val updateDf1 = hoodieROViewDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
     updateDf1.write
       .format("hudi")
-      .options(commonOpts)
+      .options(writeOpts)
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
-      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
       .mode(SaveMode.Append)
       .save(basePath)
 
@@ -290,10 +307,9 @@ class TestDataSourceForBootstrap {
 
     updateDF2.write
       .format("hudi")
-      .options(commonOpts)
+      .options(writeOpts)
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
-      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
       .mode(SaveMode.Append)
       .save(basePath)
 
@@ -309,31 +325,34 @@ class TestDataSourceForBootstrap {
     assertEquals(numRecords, hoodieROViewDF4.count())
     assertEquals(numRecordsUpdate, hoodieROViewDF4.filter(s"timestamp == $updateTimestamp").count())
 
-    verifyIncrementalViewResult(commitInstantTime1, commitInstantTime3, isPartitioned = true, isHiveStylePartitioned = false)
+    verifyIncrementalViewResult(commitInstantTime1, commitInstantTime3, isPartitioned = true, isHiveStylePartitioned = true)
   }
 
-  @Test def testMetadataBootstrapMORPartitionedInlineCompactionOn(): Unit = {
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieRecordType],
+    // TODO(HUDI-5807) enable for spark native records
+    names = Array("AVRO" /*, "SPARK" */))
+  def testMetadataBootstrapMORPartitionedInlineCompactionOn(recordType: HoodieRecordType): Unit = {
     val timestamp = Instant.now.toEpochMilli
     val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
 
     val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
       spark.sqlContext)
 
-    // Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence
-    // have partitioned columns stored in the data file
-    partitionPaths.foreach(partitionPath => {
-      sourceDF
-        .filter(sourceDF("datestr").equalTo(lit(partitionPath)))
-        .write
-        .format("parquet")
-        .mode(SaveMode.Overwrite)
-        .save(srcPath + "/" + partitionPath)
-    })
+    sourceDF.write.format("parquet")
+      .partitionBy("datestr")
+      .mode(SaveMode.Overwrite)
+      .save(srcPath)
+
+    val writeOpts = commonOpts ++ getRecordTypeOpts(recordType) ++ Map(
+      DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr"
+    )
 
     // Perform bootstrap
     val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
       DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
-      commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"),
+      writeOpts,
       classOf[SimpleKeyGenerator].getName)
 
     // Read bootstrapped table and verify count
@@ -350,10 +369,9 @@ class TestDataSourceForBootstrap {
 
     updateDF.write
       .format("hudi")
-      .options(commonOpts)
+      .options(writeOpts)
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
-      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
       .option(HoodieCompactionConfig.INLINE_COMPACT.key, "true")
       .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key, "1")
       .mode(SaveMode.Append)
@@ -379,28 +397,29 @@ class TestDataSourceForBootstrap {
     assertEquals(numRecordsUpdate, hoodieROViewDFWithBasePath.filter(s"timestamp == $updateTimestamp").count())
   }
 
-  @Test def testMetadataBootstrapMORPartitioned(): Unit = {
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK"))
+  def testMetadataBootstrapMORPartitioned(recordType: HoodieRecordType): Unit = {
     val timestamp = Instant.now.toEpochMilli
     val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
 
     val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
       spark.sqlContext)
 
-    // Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence
-    // have partitioned columns stored in the data file
-    partitionPaths.foreach(partitionPath => {
-      sourceDF
-        .filter(sourceDF("datestr").equalTo(lit(partitionPath)))
-        .write
-        .format("parquet")
-        .mode(SaveMode.Overwrite)
-        .save(srcPath + "/" + partitionPath)
-    })
+    sourceDF.write.format("parquet")
+      .partitionBy("datestr")
+      .mode(SaveMode.Overwrite)
+      .save(srcPath)
+
+    val writeOpts = commonOpts ++ getRecordTypeOpts(recordType) ++ Map(
+      DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr"
+    )
 
     // Perform bootstrap
     val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
       DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
-      commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"),
+      writeOpts,
       classOf[SimpleKeyGenerator].getName)
 
     // Read bootstrapped table and verify count
@@ -423,10 +442,9 @@ class TestDataSourceForBootstrap {
     val updateDf1 = hoodieROViewDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
     updateDf1.write
       .format("hudi")
-      .options(commonOpts)
+      .options(writeOpts)
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
-      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
       .mode(SaveMode.Append)
       .save(basePath)
 
@@ -446,10 +464,9 @@ class TestDataSourceForBootstrap {
 
     updateDF2.write
       .format("hudi")
-      .options(commonOpts)
+      .options(writeOpts)
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
-      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
       .mode(SaveMode.Append)
       .save(basePath)
 
@@ -466,31 +483,31 @@ class TestDataSourceForBootstrap {
     assertEquals(0, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count())
   }
 
-  @Test def testFullBootstrapCOWPartitioned(): Unit = {
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK"))
+  def testFullBootstrapCOWPartitioned(recordType: HoodieRecordType): Unit = {
     val timestamp = Instant.now.toEpochMilli
     val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
 
     val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
       spark.sqlContext)
 
-    // Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence
-    // have partitioned columns stored in the data file
-    partitionPaths.foreach(partitionPath => {
-      sourceDF
-        .filter(sourceDF("datestr").equalTo(lit(partitionPath)))
-        .write
-        .format("parquet")
-        .mode(SaveMode.Overwrite)
-        .save(srcPath + "/" + partitionPath)
-    })
+    sourceDF.write.format("parquet")
+      .partitionBy("datestr")
+      .mode(SaveMode.Overwrite)
+      .save(srcPath)
+
+    val writeOpts = commonOpts ++ getRecordTypeOpts(recordType) ++ Map(
+      DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr"
+    )
 
     // Perform bootstrap
     val bootstrapDF = spark.emptyDataFrame
     bootstrapDF.write
       .format("hudi")
-      .options(commonOpts)
+      .options(writeOpts)
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
-      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
       .option(HoodieBootstrapConfig.BASE_PATH.key, srcPath)
       .option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName)
       .option(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key, classOf[FullRecordBootstrapModeSelector].getName)
@@ -515,10 +532,9 @@ class TestDataSourceForBootstrap {
 
     updateDF.write
       .format("hudi")
-      .options(commonOpts)
+      .options(writeOpts)
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
-      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
       .mode(SaveMode.Append)
       .save(basePath)
 
@@ -530,7 +546,7 @@ class TestDataSourceForBootstrap {
     assertEquals(numRecords, hoodieROViewDF2.count())
     assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())
 
-    verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = false)
+    verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = true)
   }
 
   def runMetadataBootstrapAndVerifyCommit(tableType: String,
@@ -596,6 +612,12 @@ class TestDataSourceForBootstrap {
         hoodieIncViewDF3.count())
     }
   }
+
+  def getRecordTypeOpts(recordType: HoodieRecordType): Map[String, String] =
+    recordType match {
+      case HoodieRecordType.SPARK => sparkRecordTypeOpts
+      case _ => Map.empty
+    }
 }
 
 object TestDataSourceForBootstrap {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index eb6ab80b5f9..f85e55dfd40 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -650,6 +650,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     cfg.configs.add(String.format("hoodie.bootstrap.base.path=%s", bootstrapSourcePath));
     cfg.configs.add(String.format("%s=%s", DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "rider"));
     cfg.configs.add(String.format("hoodie.bootstrap.keygen.class=%s", SimpleKeyGenerator.class.getName()));
+    cfg.configs.add("hoodie.datasource.write.hive_style_partitioning=true");
     cfg.configs.add("hoodie.bootstrap.parallelism=5");
     cfg.targetBasePath = newDatasetBasePath;
     new HoodieDeltaStreamer(cfg, jsc).sync();
@@ -660,6 +661,9 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     TestHelpers.assertRecordCount(1950, newDatasetBasePath, sqlContext);
     res.registerTempTable("bootstrapped");
     assertEquals(1950, sqlContext.sql("select distinct _hoodie_record_key from bootstrapped").count());
+    // NOTE: To fetch record's count Spark will optimize the query fetching minimal possible amount
+    //       of data, which might not provide adequate amount of test coverage
+    sqlContext.sql("select * from bootstrapped").show();
 
     StructField[] fields = res.schema().fields();
     List<String> fieldNames = Arrays.asList(res.schema().fieldNames());