You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ak...@apache.org on 2023/02/24 16:44:00 UTC
[hudi] branch master updated: [HUDI-915][HUDI-5656] Rebased `HoodieBootstrapRelation` onto `HoodieBaseRelation` (#7804)
This is an automated email from the ASF dual-hosted git repository.
akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2770ff50714 [HUDI-915][HUDI-5656] Rebased `HoodieBootstrapRelation` onto `HoodieBaseRelation` (#7804)
2770ff50714 is described below
commit 2770ff507141f013f7500354595137b52a543e8b
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Fri Feb 24 08:43:49 2023 -0800
[HUDI-915][HUDI-5656] Rebased `HoodieBootstrapRelation` onto `HoodieBaseRelation` (#7804)
Currently `HoodieBootstrapRelation` is treats partitioned tables improperly resulting in NPE while trying to read bootstrapped table.
To address that `HoodieBootstrapRelation` have been rebased onto `HoodieBaseRelation` sharing core of the reading semantic with other Hudi's file-based Relation implementations for COW, MOR (such as schema handling, file-listing, etc)
---
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 47 ++--
.../scala/org/apache/hudi/HoodieBootstrapRDD.scala | 103 ++++----
.../org/apache/hudi/HoodieBootstrapRelation.scala | 259 +++++++++++----------
.../apache/hudi/MergeOnReadSnapshotRelation.scala | 2 +-
.../spark/sql/hudi/HoodieSqlCommonUtils.scala | 30 ++-
.../functional/TestDataSourceForBootstrap.scala | 166 +++++++------
.../deltastreamer/TestHoodieDeltaStreamer.java | 4 +
7 files changed, 344 insertions(+), 267 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 99b5b5c87ba..cb02c59a690 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -48,6 +48,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.HoodieCatalystExpressionUtils.{convertToCatalystExpression, generateUnsafeProjection}
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources._
@@ -66,7 +67,12 @@ import scala.util.{Failure, Success, Try}
trait HoodieFileSplit {}
-case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String, internalSchema: Option[InternalSchema] = None)
+case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String, internalSchema: Option[InternalSchema] = None) {
+
+ def this(structTypeSchema: StructType) =
+ this(structTypeSchema, convertToAvroSchema(structTypeSchema).toString)
+
+}
case class HoodieTableState(tablePath: String,
latestCommitTimestamp: Option[String],
@@ -98,6 +104,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected val sparkSession: SparkSession = sqlContext.sparkSession
+ protected lazy val resolver: Resolver = sparkSession.sessionState.analyzer.resolver
+
protected lazy val conf: Configuration = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
protected lazy val jobConf = new JobConf(conf)
@@ -174,8 +182,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected lazy val tableStructSchema: StructType = {
val converted = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
-
- val resolver = sparkSession.sessionState.analyzer.resolver
val metaFieldMetadata = sparkAdapter.createCatalystMetadataForMetaField
// NOTE: Here we annotate meta-fields with corresponding metadata such that Spark (>= 3.2)
@@ -466,10 +472,15 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
* For enable hoodie.datasource.write.drop.partition.columns, need to create an InternalRow on partition values
* and pass this reader on parquet file. So that, we can query the partition columns.
*/
- protected def getPartitionColumnsAsInternalRow(file: FileStatus): InternalRow = {
+
+ protected def getPartitionColumnsAsInternalRow(file: FileStatus): InternalRow =
+ getPartitionColumnsAsInternalRowInternal(file, shouldExtractPartitionValuesFromPartitionPath)
+
+ protected def getPartitionColumnsAsInternalRowInternal(file: FileStatus,
+ extractPartitionValuesFromPartitionPath: Boolean): InternalRow = {
try {
val tableConfig = metaClient.getTableConfig
- if (shouldExtractPartitionValuesFromPartitionPath) {
+ if (extractPartitionValuesFromPartitionPath) {
val relativePath = new URI(metaClient.getBasePath).relativize(new URI(file.getPath.getParent.toString)).toString
val hiveStylePartitioningEnabled = tableConfig.getHiveStylePartitioningEnable.toBoolean
if (hiveStylePartitioningEnabled) {
@@ -514,7 +525,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
requiredDataSchema: HoodieTableSchema,
filters: Seq[Filter],
options: Map[String, String],
- hadoopConf: Configuration): BaseFileReader = {
+ hadoopConf: Configuration,
+ shouldAppendPartitionValuesOverride: Option[Boolean] = None): BaseFileReader = {
val tableBaseFileFormat = tableConfig.getBaseFileFormat
// NOTE: PLEASE READ CAREFULLY
@@ -535,7 +547,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
hadoopConf = hadoopConf,
// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
- appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
+ appendPartitionValues = shouldAppendPartitionValuesOverride.getOrElse(shouldExtractPartitionValuesFromPartitionPath)
)
// Since partition values by default are omitted, and not persisted w/in data-files by Spark,
// data-file readers (such as [[ParquetFileFormat]]) have to inject partition values while reading
@@ -589,6 +601,12 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = {
+ tryPrunePartitionColumnsInternal(tableSchema, requiredSchema, shouldExtractPartitionValuesFromPartitionPath)
+ }
+
+ protected def tryPrunePartitionColumnsInternal(tableSchema: HoodieTableSchema,
+ requiredSchema: HoodieTableSchema,
+ extractPartitionValuesFromPartitionPath: Boolean): (StructType, HoodieTableSchema, HoodieTableSchema) = {
// Since schema requested by the caller might contain partition columns, we might need to
// prune it, removing all partition columns from it in case these columns are not persisted
// in the data files
@@ -598,21 +616,24 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
// the partition path, and omitted from the data file, back into fetched rows;
// Note that, by default, partition columns are not omitted therefore specifying
// partition schema for reader is not required
- if (shouldExtractPartitionValuesFromPartitionPath) {
- val partitionSchema = StructType(partitionColumns.map(StructField(_, StringType)))
+ if (extractPartitionValuesFromPartitionPath) {
+ val partitionSchema = filterInPartitionColumns(tableSchema.structTypeSchema)
val prunedDataStructSchema = prunePartitionColumns(tableSchema.structTypeSchema)
val prunedRequiredSchema = prunePartitionColumns(requiredSchema.structTypeSchema)
(partitionSchema,
- HoodieTableSchema(prunedDataStructSchema, convertToAvroSchema(prunedDataStructSchema).toString),
- HoodieTableSchema(prunedRequiredSchema, convertToAvroSchema(prunedRequiredSchema).toString))
+ new HoodieTableSchema(prunedDataStructSchema),
+ new HoodieTableSchema(prunedRequiredSchema))
} else {
(StructType(Nil), tableSchema, requiredSchema)
}
}
- private def prunePartitionColumns(dataStructSchema: StructType): StructType =
- StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name)))
+ private def filterInPartitionColumns(structType: StructType): StructType =
+ StructType(structType.filter(f => partitionColumns.exists(col => resolver(f.name, col))))
+
+ private def prunePartitionColumns(structType: StructType): StructType =
+ StructType(structType.filterNot(f => partitionColumns.exists(pc => resolver(f.name, pc))))
private def getConfigValue(config: ConfigProperty[String],
defaultValueOption: Option[String]=Option.empty): String = {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala
index ea997c86acb..b72c41bbd66 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala
@@ -18,23 +18,22 @@
package org.apache.hudi
-import org.apache.spark.{Partition, TaskContext}
+import org.apache.hudi.HoodieBaseRelation.BaseFileReader
+import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.types.StructType
-
-import org.apache.hudi.HoodieDataSourceHelper._
+import org.apache.spark.{Partition, TaskContext}
class HoodieBootstrapRDD(@transient spark: SparkSession,
- dataReadFunction: PartitionedFile => Iterator[InternalRow],
- skeletonReadFunction: PartitionedFile => Iterator[InternalRow],
- regularReadFunction: PartitionedFile => Iterator[InternalRow],
- dataSchema: StructType,
- skeletonSchema: StructType,
- requiredColumns: Array[String],
- tableState: HoodieBootstrapTableState)
+ bootstrapDataFileReader: BaseFileReader,
+ bootstrapSkeletonFileReader: BaseFileReader,
+ regularFileReader: BaseFileReader,
+ requiredSchema: HoodieTableSchema,
+ @transient splits: Seq[HoodieBootstrapSplit])
extends RDD[InternalRow](spark.sparkContext, Nil) {
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
@@ -51,59 +50,57 @@ class HoodieBootstrapRDD(@transient spark: SparkSession,
}
}
- var partitionedFileIterator: Iterator[InternalRow] = null
+ bootstrapPartition.split.skeletonFile match {
+ case Some(skeletonFile) =>
+ // It is a bootstrap split. Check both skeleton and data files.
+ val (iterator, schema) = if (bootstrapDataFileReader.schema.isEmpty) {
+ // No data column to fetch, hence fetch only from skeleton file
+ (bootstrapSkeletonFileReader.read(skeletonFile), bootstrapSkeletonFileReader.schema)
+ } else if (bootstrapSkeletonFileReader.schema.isEmpty) {
+ // No metadata column to fetch, hence fetch only from data file
+ (bootstrapDataFileReader.read(bootstrapPartition.split.dataFile), bootstrapDataFileReader.schema)
+ } else {
+ // Fetch from both data and skeleton file, and merge
+ val dataFileIterator = bootstrapDataFileReader.read(bootstrapPartition.split.dataFile)
+ val skeletonFileIterator = bootstrapSkeletonFileReader.read(skeletonFile)
+ val mergedSchema = StructType(bootstrapSkeletonFileReader.schema.fields ++ bootstrapDataFileReader.schema.fields)
- if (bootstrapPartition.split.skeletonFile.isDefined) {
- // It is a bootstrap split. Check both skeleton and data files.
- if (dataSchema.isEmpty) {
- // No data column to fetch, hence fetch only from skeleton file
- partitionedFileIterator = skeletonReadFunction(bootstrapPartition.split.skeletonFile.get)
- } else if (skeletonSchema.isEmpty) {
- // No metadata column to fetch, hence fetch only from data file
- partitionedFileIterator = dataReadFunction(bootstrapPartition.split.dataFile)
- } else {
- // Fetch from both data and skeleton file, and merge
- val dataFileIterator = dataReadFunction(bootstrapPartition.split.dataFile)
- val skeletonFileIterator = skeletonReadFunction(bootstrapPartition.split.skeletonFile.get)
- partitionedFileIterator = merge(skeletonFileIterator, dataFileIterator)
- }
- } else {
- partitionedFileIterator = regularReadFunction(bootstrapPartition.split.dataFile)
+ (merge(skeletonFileIterator, dataFileIterator), mergedSchema)
+ }
+
+ // NOTE: Here we have to project the [[InternalRow]]s fetched into the expected target schema.
+ // These could diverge for ex, when requested schema contains partition columns which might not be
+ // persisted w/in the data file, but instead would be parsed from the partition path. In that case
+ // output of the file-reader will have different ordering of the fields than the original required
+ // schema (for more details please check out [[ParquetFileFormat]] implementation).
+ val unsafeProjection = generateUnsafeProjection(schema, requiredSchema.structTypeSchema)
+
+ iterator.map(unsafeProjection)
+
+ case _ =>
+ // NOTE: Regular file-reader is already projected into the required schema
+ regularFileReader.read(bootstrapPartition.split.dataFile)
}
- partitionedFileIterator
}
- def merge(skeletonFileIterator: Iterator[InternalRow], dataFileIterator: Iterator[InternalRow])
- : Iterator[InternalRow] = {
+ def merge(skeletonFileIterator: Iterator[InternalRow], dataFileIterator: Iterator[InternalRow]): Iterator[InternalRow] = {
new Iterator[InternalRow] {
- override def hasNext: Boolean = dataFileIterator.hasNext && skeletonFileIterator.hasNext
- override def next(): InternalRow = {
- mergeInternalRow(skeletonFileIterator.next(), dataFileIterator.next())
- }
- }
- }
+ private val combinedRow = new JoinedRow()
- def mergeInternalRow(skeletonRow: InternalRow, dataRow: InternalRow): InternalRow = {
- val skeletonArr = skeletonRow.copy().toSeq(skeletonSchema)
- val dataArr = dataRow.copy().toSeq(dataSchema)
- // We need to return it in the order requested
- val mergedArr = requiredColumns.map(col => {
- if (skeletonSchema.fieldNames.contains(col)) {
- val idx = skeletonSchema.fieldIndex(col)
- skeletonArr(idx)
- } else {
- val idx = dataSchema.fieldIndex(col)
- dataArr(idx)
+ override def hasNext: Boolean = {
+ checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
+ "Bootstrap data-file iterator and skeleton-file iterator have to be in-sync!")
+ dataFileIterator.hasNext && skeletonFileIterator.hasNext
}
- })
- logDebug("Merged data and skeleton values => " + mergedArr.mkString(","))
- val mergedRow = InternalRow.fromSeq(mergedArr)
- mergedRow
+ override def next(): InternalRow = {
+ combinedRow(skeletonFileIterator.next(), dataFileIterator.next())
+ }
+ }
}
override protected def getPartitions: Array[Partition] = {
- tableState.files.zipWithIndex.map(file => {
+ splits.zipWithIndex.map(file => {
if (file._1.skeletonFile.isDefined) {
logDebug("Forming partition with => Index: " + file._2 + ", Files: " + file._1.dataFile.filePath
+ "," + file._1.skeletonFile.get.filePath)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
index 0dd54237ef5..5c58c10493d 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
@@ -19,20 +19,20 @@
package org.apache.hudi
import org.apache.hadoop.fs.Path
-import org.apache.hudi.common.model.HoodieBaseFile
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView
-import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
-import org.apache.hudi.exception.HoodieException
-import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
-import org.apache.spark.internal.Logging
+import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader}
+import org.apache.hudi.HoodieBootstrapRelation.validate
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile}
-import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isMetaField, removeMetaFields}
+import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{Row, SQLContext}
-import scala.collection.JavaConverters._
+case class HoodieBootstrapSplit(dataFile: PartitionedFile, skeletonFile: Option[PartitionedFile] = None) extends HoodieFileSplit
/**
* This is Spark relation that can be used for querying metadata/fully bootstrapped query hoodie tables, as well as
@@ -44,150 +44,161 @@ import scala.collection.JavaConverters._
* bootstrapped files, because then the metadata file and data file can return different number of rows causing errors
* merging.
*
- * @param _sqlContext Spark SQL Context
+ * @param sqlContext Spark SQL Context
* @param userSchema User specified schema in the datasource query
* @param globPaths The global paths to query. If it not none, read from the globPaths,
* else read data from tablePath using HoodiFileIndex.
* @param metaClient Hoodie table meta client
* @param optParams DataSource options passed by the user
*/
-class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
- val userSchema: Option[StructType],
- val globPaths: Seq[Path],
- val metaClient: HoodieTableMetaClient,
- val optParams: Map[String, String]) extends BaseRelation
- with PrunedFilteredScan with Logging {
+case class HoodieBootstrapRelation(override val sqlContext: SQLContext,
+ private val userSchema: Option[StructType],
+ private val globPaths: Seq[Path],
+ override val metaClient: HoodieTableMetaClient,
+ override val optParams: Map[String, String],
+ private val prunedDataSchema: Option[StructType] = None)
+ extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema, prunedDataSchema) {
- val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema
- var dataSchema: StructType = _
- var fullSchema: StructType = _
+ override type FileSplit = HoodieBootstrapSplit
+ override type Relation = HoodieBootstrapRelation
- val fileIndex: HoodieBootstrapFileIndex = buildFileIndex()
+ private lazy val skeletonSchema = HoodieSparkUtils.getMetaSchema
- override def sqlContext: SQLContext = _sqlContext
+ override val mandatoryFields: Seq[String] = Seq.empty
- override val needConversion: Boolean = false
+ protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit] = {
+ val fileSlices = listLatestFileSlices(globPaths, partitionFilters, dataFilters)
+ fileSlices.map { fileSlice =>
+ val baseFile = fileSlice.getBaseFile.get()
- override def schema: StructType = inferFullSchema()
+ if (baseFile.getBootstrapBaseFile.isPresent) {
+ val partitionValues =
+ getPartitionColumnsAsInternalRowInternal(baseFile.getFileStatus, extractPartitionValuesFromPartitionPath = true)
+ val dataFile = PartitionedFile(partitionValues, baseFile.getBootstrapBaseFile.get().getPath, 0, baseFile.getBootstrapBaseFile.get().getFileLen)
+ val skeletonFile = Option(PartitionedFile(InternalRow.empty, baseFile.getPath, 0, baseFile.getFileLen))
- override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
- logInfo("Starting scan..")
-
- // Compute splits
- val bootstrapSplits = fileIndex.files.map(hoodieBaseFile => {
- var skeletonFile: Option[PartitionedFile] = Option.empty
- var dataFile: PartitionedFile = null
-
- if (hoodieBaseFile.getBootstrapBaseFile.isPresent) {
- skeletonFile = Option(PartitionedFile(InternalRow.empty, hoodieBaseFile.getPath, 0, hoodieBaseFile.getFileLen))
- dataFile = PartitionedFile(InternalRow.empty, hoodieBaseFile.getBootstrapBaseFile.get().getPath, 0,
- hoodieBaseFile.getBootstrapBaseFile.get().getFileLen)
- } else {
- dataFile = PartitionedFile(InternalRow.empty, hoodieBaseFile.getPath, 0, hoodieBaseFile.getFileLen)
- }
- HoodieBootstrapSplit(dataFile, skeletonFile)
- })
- val tableState = HoodieBootstrapTableState(bootstrapSplits)
-
- // Get required schemas for column pruning
- var requiredDataSchema = StructType(Seq())
- var requiredSkeletonSchema = StructType(Seq())
- // requiredColsSchema is the schema of requiredColumns, note that requiredColumns is in a random order
- // so requiredColsSchema is not always equal to (requiredSkeletonSchema.fields ++ requiredDataSchema.fields)
- var requiredColsSchema = StructType(Seq())
- requiredColumns.foreach(col => {
- var field = dataSchema.find(_.name == col)
- if (field.isDefined) {
- requiredDataSchema = requiredDataSchema.add(field.get)
+ HoodieBootstrapSplit(dataFile, skeletonFile)
} else {
- field = skeletonSchema.find(_.name == col)
- requiredSkeletonSchema = requiredSkeletonSchema.add(field.get)
+ val dataFile = PartitionedFile(getPartitionColumnsAsInternalRow(baseFile.getFileStatus), baseFile.getPath, 0, baseFile.getFileLen)
+ HoodieBootstrapSplit(dataFile)
}
- requiredColsSchema = requiredColsSchema.add(field.get)
- })
+ }
+ }
- // Prepare readers for reading data file and skeleton files
- val dataReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader(
- sparkSession = _sqlContext.sparkSession,
- dataSchema = dataSchema,
- partitionSchema = StructType(Seq.empty),
- requiredSchema = requiredDataSchema,
- filters = if (requiredSkeletonSchema.isEmpty) filters else Seq() ,
+ protected override def composeRDD(fileSplits: Seq[FileSplit],
+ tableSchema: HoodieTableSchema,
+ requiredSchema: HoodieTableSchema,
+ requestedColumns: Array[String],
+ filters: Array[Filter]): RDD[InternalRow] = {
+ val requiredSkeletonFileSchema =
+ StructType(skeletonSchema.filter(f => requestedColumns.exists(col => resolver(f.name, col))))
+
+ val (bootstrapDataFileReader, bootstrapSkeletonFileReader) =
+ createBootstrapFileReaders(tableSchema, requiredSchema, requiredSkeletonFileSchema, filters)
+
+ val regularFileReader = createRegularFileReader(tableSchema, requiredSchema, filters)
+
+ new HoodieBootstrapRDD(sqlContext.sparkSession, bootstrapDataFileReader, bootstrapSkeletonFileReader, regularFileReader,
+ requiredSchema, fileSplits)
+ }
+
+ private def createBootstrapFileReaders(tableSchema: HoodieTableSchema,
+ requiredSchema: HoodieTableSchema,
+ requiredSkeletonFileSchema: StructType,
+ filters: Array[Filter]): (BaseFileReader, BaseFileReader) = {
+ // NOTE: "Data" schema in here refers to the whole table's schema that doesn't include only partition
+ // columns, as opposed to data file schema not including any meta-fields columns in case of
+ // Bootstrap relation
+ val (partitionSchema, dataSchema, requiredDataSchema) =
+ tryPrunePartitionColumnsInternal(tableSchema, requiredSchema, extractPartitionValuesFromPartitionPath = true)
+
+ val bootstrapDataFileSchema = StructType(dataSchema.structTypeSchema.filterNot(sf => isMetaField(sf.name)))
+ val requiredBootstrapDataFileSchema = StructType(requiredDataSchema.structTypeSchema.filterNot(sf => isMetaField(sf.name)))
+
+ validate(requiredDataSchema, requiredBootstrapDataFileSchema, requiredSkeletonFileSchema)
+
+ val bootstrapDataFileReader = createBaseFileReader(
+ spark = sqlContext.sparkSession,
+ dataSchema = new HoodieTableSchema(bootstrapDataFileSchema),
+ partitionSchema = partitionSchema,
+ requiredDataSchema = new HoodieTableSchema(requiredBootstrapDataFileSchema),
+ // NOTE: For bootstrapped files we can't apply any filtering in case we'd need to merge it with
+ // a skeleton-file as we rely on matching ordering of the records across bootstrap- and skeleton-files
+ filters = if (requiredSkeletonFileSchema.isEmpty) filters else Seq(),
options = optParams,
- hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
+ hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf(),
+ // NOTE: Bootstrap relation have to always extract partition values from the partition-path as this is a
+ // default Spark behavior: Spark by default strips partition-columns from the data schema and does
+ // NOT persist them in the data files, instead parsing them from partition-paths (on the fly) whenever
+ // table is queried
+ shouldAppendPartitionValuesOverride = Some(true)
)
- val skeletonReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader(
- sparkSession = _sqlContext.sparkSession,
- dataSchema = skeletonSchema,
+ val boostrapSkeletonFileReader = createBaseFileReader(
+ spark = sqlContext.sparkSession,
+ dataSchema = new HoodieTableSchema(skeletonSchema),
+ // NOTE: Here we specify partition-schema as empty since we don't need Spark to inject partition-values
+ // parsed from the partition-path
partitionSchema = StructType(Seq.empty),
- requiredSchema = requiredSkeletonSchema,
- filters = if (requiredDataSchema.isEmpty) filters else Seq(),
+ requiredDataSchema = new HoodieTableSchema(requiredSkeletonFileSchema),
+ // NOTE: For bootstrapped files we can't apply any filtering in case we'd need to merge it with
+ // a skeleton-file as we rely on matching ordering of the records across bootstrap- and skeleton-files
+ filters = if (requiredBootstrapDataFileSchema.isEmpty) filters else Seq(),
options = optParams,
- hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
+ hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf(),
+ // NOTE: We override Spark to avoid injecting partition values into the records read from
+ // skeleton-file
+ shouldAppendPartitionValuesOverride = Some(false)
)
- val regularReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader(
- sparkSession = _sqlContext.sparkSession,
- dataSchema = fullSchema,
- partitionSchema = StructType(Seq.empty),
- requiredSchema = requiredColsSchema,
+ (bootstrapDataFileReader, boostrapSkeletonFileReader)
+ }
+
+ private def createRegularFileReader(tableSchema: HoodieTableSchema,
+ requiredSchema: HoodieTableSchema,
+ filters: Array[Filter]): BaseFileReader = {
+ // NOTE: "Data" schema in here refers to the whole table's schema that doesn't include only partition
+ // columns, as opposed to data file schema not including any meta-fields columns in case of
+ // Bootstrap relation
+ val (partitionSchema, dataSchema, requiredDataSchema) =
+ tryPrunePartitionColumns(tableSchema, requiredSchema)
+
+ // NOTE: Bootstrapped table allows Hudi created file-slices to be co-located w/ the "bootstrapped"
+ // ones (ie persisted by Spark). Therefore to be able to read the data from Bootstrapped
+ // table we also need to create regular file-reader to read file-slices created by Hudi
+ val regularFileReader = createBaseFileReader(
+ spark = sqlContext.sparkSession,
+ dataSchema = dataSchema,
+ partitionSchema = partitionSchema,
+ requiredDataSchema = requiredDataSchema,
filters = filters,
options = optParams,
- hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
+ hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
)
- val rdd = new HoodieBootstrapRDD(_sqlContext.sparkSession, dataReadFunction, skeletonReadFunction,
- regularReadFunction, requiredDataSchema, requiredSkeletonSchema, requiredColumns, tableState)
- rdd.asInstanceOf[RDD[Row]]
+ // NOTE: In some case schema of the reader's output (reader's schema) might not match the schema expected by the caller.
+ // This could occur for ex, when requested schema contains partition columns which might not be persisted w/in the
+ // data file, but instead would be parsed from the partition path. In that case output of the file-reader will have
+ // different ordering of the fields than the original required schema (for more details please check out
+ // [[ParquetFileFormat]] impl). In that case we have to project the rows from the file-reader's schema
+ // back into the one expected by the caller
+ projectReader(regularFileReader, requiredSchema.structTypeSchema)
}
- def inferFullSchema(): StructType = {
- if (fullSchema == null) {
- logInfo("Inferring schema..")
- val schemaResolver = new TableSchemaResolver(metaClient)
- val tableSchema = schemaResolver.getTableAvroSchema(false)
- dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
- fullSchema = StructType(skeletonSchema.fields ++ dataSchema.fields)
- }
- fullSchema
- }
-
- def buildFileIndex(): HoodieBootstrapFileIndex = {
- logInfo("Building file index..")
- val fileStatuses = if (globPaths.nonEmpty) {
- // Load files from the global paths if it has defined to be compatible with the original mode
- val inMemoryFileIndex = HoodieInMemoryFileIndex.create(_sqlContext.sparkSession, globPaths)
- inMemoryFileIndex.allFiles()
- } else { // Load files by the HoodieFileIndex.
- HoodieFileIndex(sqlContext.sparkSession, metaClient, Some(schema), optParams,
- FileStatusCache.getOrCreate(sqlContext.sparkSession)).allFiles
- }
- if (fileStatuses.isEmpty) {
- throw new HoodieException("No files found for reading in user provided path.")
- }
+ override def updatePrunedDataSchema(prunedSchema: StructType): HoodieBootstrapRelation =
+ this.copy(prunedDataSchema = Some(prunedSchema))
+}
- val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitsTimeline
- .filterCompletedInstants, fileStatuses.toArray)
- val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList
-
- if (log.isDebugEnabled) {
- latestFiles.foreach(file => {
- logDebug("Printing indexed files:")
- if (file.getBootstrapBaseFile.isPresent) {
- logDebug("Skeleton File: " + file.getPath + ", Data File: " + file.getBootstrapBaseFile.get().getPath)
- } else {
- logDebug("Regular Hoodie File: " + file.getPath)
- }
- })
- }
- HoodieBootstrapFileIndex(latestFiles)
- }
-}
+object HoodieBootstrapRelation {
-case class HoodieBootstrapFileIndex(files: List[HoodieBaseFile])
+ private def validate(requiredDataSchema: HoodieTableSchema, requiredDataFileSchema: StructType, requiredSkeletonFileSchema: StructType): Unit = {
+ val requiredDataColumns: Seq[String] = requiredDataSchema.structTypeSchema.fieldNames.toSeq
+ val combinedColumns = (requiredSkeletonFileSchema.fieldNames ++ requiredDataFileSchema.fieldNames).toSeq
-case class HoodieBootstrapTableState(files: List[HoodieBootstrapSplit])
+ // NOTE: Here we validate that all required data columns are covered by the combination of the columns
+ // from both skeleton file and the corresponding data file
+ checkState(combinedColumns.sorted == requiredDataColumns.sorted)
+ }
-case class HoodieBootstrapSplit(dataFile: PartitionedFile, skeletonFile: Option[PartitionedFile])
+}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index accfc8f2470..94168755cbf 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -190,7 +190,7 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: SQLContext,
StructType(requiredDataSchema.structTypeSchema.fields
.filterNot(f => unusedMandatoryColumnNames.contains(f.name)))
- HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema).toString)
+ new HoodieTableSchema(prunedStructSchema)
}
val requiredSchemaReaderSkipMerging = createBaseFileReader(
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
index 8e589abbc18..54c58bace7c 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
@@ -23,9 +23,11 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieMetadataConfig}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator}
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline.parseDateFromInstantTime
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator, HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.PartitionPathEncodeUtils
+import org.apache.hudi.exception.HoodieException
import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper, DataSourceReadOptions, SparkAdapterSupport}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -42,6 +44,7 @@ import java.text.SimpleDateFormat
import java.util.{Locale, Properties}
import scala.collection.JavaConverters._
import scala.collection.immutable.Map
+import scala.util.Try
object HoodieSqlCommonUtils extends SparkAdapterSupport {
// NOTE: {@code SimpleDataFormat} is NOT thread-safe
@@ -251,11 +254,13 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
*/
def formatQueryInstant(queryInstant: String): String = {
val instantLength = queryInstant.length
- if (instantLength == 19 || instantLength == 23) { // for yyyy-MM-dd HH:mm:ss[.SSS]
+ if (instantLength == 19 || instantLength == 23) {
+ // Handle "yyyy-MM-dd HH:mm:ss[.SSS]" format
HoodieInstantTimeGenerator.getInstantForDateString(queryInstant)
} else if (instantLength == HoodieInstantTimeGenerator.SECS_INSTANT_ID_LENGTH
- || instantLength == HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) { // for yyyyMMddHHmmss[SSS]
- HoodieActiveTimeline.parseDateFromInstantTime(queryInstant) // validate the format
+ || instantLength == HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) {
+ // Handle already serialized "yyyyMMddHHmmss[SSS]" format
+ validateInstant(queryInstant)
queryInstant
} else if (instantLength == 10) { // for yyyy-MM-dd
HoodieActiveTimeline.formatDate(defaultDateFormat.get().parse(queryInstant))
@@ -356,4 +361,21 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
}.mkString(",")
partitionsToDrop
}
+
+ private def validateInstant(queryInstant: String): Unit = {
+ // Provided instant has to either
+ // - Match one of the bootstrapping instants
+ // - Be parse-able (as a date)
+ val valid = queryInstant match {
+ case HoodieTimeline.INIT_INSTANT_TS |
+ HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS |
+ HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS => true
+
+ case _ => Try(parseDateFromInstantTime(queryInstant)).isSuccess
+ }
+
+ if (!valid) {
+ throw new HoodieException(s"Got an invalid instant ($queryInstant)")
+ }
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index 82f79eeb44e..e3d235591d4 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -20,14 +20,16 @@ package org.apache.hudi.functional
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector
+import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieCompactionConfig, HoodieWriteConfig}
import org.apache.hudi.functional.TestDataSourceForBootstrap.{dropMetaCols, sort}
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.testutils.HoodieClientTestUtils
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkRecordMerger}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
@@ -35,7 +37,7 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.io.TempDir
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
import java.time.Instant
import java.util.Collections
@@ -56,6 +58,12 @@ class TestDataSourceForBootstrap {
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
)
+
+ val sparkRecordTypeOpts = Map(
+ HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> classOf[HoodieSparkRecordMerger].getName,
+ HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet"
+ )
+
var basePath: String = _
var srcPath: String = _
var fs: FileSystem = _
@@ -153,12 +161,18 @@ class TestDataSourceForBootstrap {
assertEquals(numRecords, hoodieROViewDF1WithBasePath.count())
assertEquals(numRecordsUpdate, hoodieROViewDF1WithBasePath.filter(s"timestamp == $updateTimestamp").count())
- verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = false, isHiveStylePartitioned = false)
+ verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = false, isHiveStylePartitioned = true)
}
@ParameterizedTest
- @ValueSource(strings = Array("METADATA_ONLY", "FULL_RECORD"))
- def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapMode: String): Unit = {
+ @CsvSource(value = Array(
+ "METADATA_ONLY,AVRO",
+ // TODO(HUDI-5807) enable for spark native records
+ /* "METADATA_ONLY,SPARK", */
+ "FULL_RECORD,AVRO",
+ "FULL_RECORD,SPARK"
+ ))
+ def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapMode: String, recordType: HoodieRecordType): Unit = {
val timestamp = Instant.now.toEpochMilli
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
@@ -181,16 +195,15 @@ class TestDataSourceForBootstrap {
// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
- readOpts,
+ readOpts ++ getRecordTypeOpts(recordType),
classOf[SimpleKeyGenerator].getName)
// check marked directory clean up
assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
- // TODO(HUDI-5602) troubleshoot
val expectedDF = bootstrapMode match {
case "METADATA_ONLY" =>
- sort(sourceDF).withColumn("datestr", lit(null))
+ sort(sourceDF)
case "FULL_RECORD" =>
sort(sourceDF)
}
@@ -208,9 +221,11 @@ class TestDataSourceForBootstrap {
val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava,
jsc, spark.sqlContext)
+ val writeOpts = commonOpts ++ getRecordTypeOpts(recordType)
+
updateDF.write
.format("hudi")
- .options(commonOpts)
+ .options(writeOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
@@ -234,28 +249,31 @@ class TestDataSourceForBootstrap {
verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = true)
}
- @Test def testMetadataBootstrapCOWPartitioned(): Unit = {
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieRecordType],
+ // TODO(HUDI-5807) enable for spark native records
+ names = Array("AVRO" /*, "SPARK" */))
+ def testMetadataBootstrapCOWPartitioned(recordType: HoodieRecordType): Unit = {
val timestamp = Instant.now.toEpochMilli
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
spark.sqlContext)
- // Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence
- // have partitioned columns stored in the data file
- partitionPaths.foreach(partitionPath => {
- sourceDF
- .filter(sourceDF("datestr").equalTo(lit(partitionPath)))
- .write
- .format("parquet")
- .mode(SaveMode.Overwrite)
- .save(srcPath + "/" + partitionPath)
- })
+ sourceDF.write.format("parquet")
+ .partitionBy("datestr")
+ .mode(SaveMode.Overwrite)
+ .save(srcPath)
+
+ val writeOpts = commonOpts ++ getRecordTypeOpts(recordType) ++ Map(
+ DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr"
+ )
// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
- commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"),
+ writeOpts,
classOf[SimpleKeyGenerator].getName)
// Read bootstrapped table and verify count using glob path
@@ -270,10 +288,9 @@ class TestDataSourceForBootstrap {
val updateDf1 = hoodieROViewDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
updateDf1.write
.format("hudi")
- .options(commonOpts)
+ .options(writeOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
- .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
.mode(SaveMode.Append)
.save(basePath)
@@ -290,10 +307,9 @@ class TestDataSourceForBootstrap {
updateDF2.write
.format("hudi")
- .options(commonOpts)
+ .options(writeOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
- .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
.mode(SaveMode.Append)
.save(basePath)
@@ -309,31 +325,34 @@ class TestDataSourceForBootstrap {
assertEquals(numRecords, hoodieROViewDF4.count())
assertEquals(numRecordsUpdate, hoodieROViewDF4.filter(s"timestamp == $updateTimestamp").count())
- verifyIncrementalViewResult(commitInstantTime1, commitInstantTime3, isPartitioned = true, isHiveStylePartitioned = false)
+ verifyIncrementalViewResult(commitInstantTime1, commitInstantTime3, isPartitioned = true, isHiveStylePartitioned = true)
}
- @Test def testMetadataBootstrapMORPartitionedInlineCompactionOn(): Unit = {
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieRecordType],
+ // TODO(HUDI-5807) enable for spark native records
+ names = Array("AVRO" /*, "SPARK" */))
+ def testMetadataBootstrapMORPartitionedInlineCompactionOn(recordType: HoodieRecordType): Unit = {
val timestamp = Instant.now.toEpochMilli
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
spark.sqlContext)
- // Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence
- // have partitioned columns stored in the data file
- partitionPaths.foreach(partitionPath => {
- sourceDF
- .filter(sourceDF("datestr").equalTo(lit(partitionPath)))
- .write
- .format("parquet")
- .mode(SaveMode.Overwrite)
- .save(srcPath + "/" + partitionPath)
- })
+ sourceDF.write.format("parquet")
+ .partitionBy("datestr")
+ .mode(SaveMode.Overwrite)
+ .save(srcPath)
+
+ val writeOpts = commonOpts ++ getRecordTypeOpts(recordType) ++ Map(
+ DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr"
+ )
// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
- commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"),
+ writeOpts,
classOf[SimpleKeyGenerator].getName)
// Read bootstrapped table and verify count
@@ -350,10 +369,9 @@ class TestDataSourceForBootstrap {
updateDF.write
.format("hudi")
- .options(commonOpts)
+ .options(writeOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
- .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
.option(HoodieCompactionConfig.INLINE_COMPACT.key, "true")
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key, "1")
.mode(SaveMode.Append)
@@ -379,28 +397,29 @@ class TestDataSourceForBootstrap {
assertEquals(numRecordsUpdate, hoodieROViewDFWithBasePath.filter(s"timestamp == $updateTimestamp").count())
}
- @Test def testMetadataBootstrapMORPartitioned(): Unit = {
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK"))
+ def testMetadataBootstrapMORPartitioned(recordType: HoodieRecordType): Unit = {
val timestamp = Instant.now.toEpochMilli
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
spark.sqlContext)
- // Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence
- // have partitioned columns stored in the data file
- partitionPaths.foreach(partitionPath => {
- sourceDF
- .filter(sourceDF("datestr").equalTo(lit(partitionPath)))
- .write
- .format("parquet")
- .mode(SaveMode.Overwrite)
- .save(srcPath + "/" + partitionPath)
- })
+ sourceDF.write.format("parquet")
+ .partitionBy("datestr")
+ .mode(SaveMode.Overwrite)
+ .save(srcPath)
+
+ val writeOpts = commonOpts ++ getRecordTypeOpts(recordType) ++ Map(
+ DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr"
+ )
// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
- commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"),
+ writeOpts,
classOf[SimpleKeyGenerator].getName)
// Read bootstrapped table and verify count
@@ -423,10 +442,9 @@ class TestDataSourceForBootstrap {
val updateDf1 = hoodieROViewDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
updateDf1.write
.format("hudi")
- .options(commonOpts)
+ .options(writeOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
- .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
.mode(SaveMode.Append)
.save(basePath)
@@ -446,10 +464,9 @@ class TestDataSourceForBootstrap {
updateDF2.write
.format("hudi")
- .options(commonOpts)
+ .options(writeOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
- .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
.mode(SaveMode.Append)
.save(basePath)
@@ -466,31 +483,31 @@ class TestDataSourceForBootstrap {
assertEquals(0, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count())
}
- @Test def testFullBootstrapCOWPartitioned(): Unit = {
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK"))
+ def testFullBootstrapCOWPartitioned(recordType: HoodieRecordType): Unit = {
val timestamp = Instant.now.toEpochMilli
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
spark.sqlContext)
- // Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence
- // have partitioned columns stored in the data file
- partitionPaths.foreach(partitionPath => {
- sourceDF
- .filter(sourceDF("datestr").equalTo(lit(partitionPath)))
- .write
- .format("parquet")
- .mode(SaveMode.Overwrite)
- .save(srcPath + "/" + partitionPath)
- })
+ sourceDF.write.format("parquet")
+ .partitionBy("datestr")
+ .mode(SaveMode.Overwrite)
+ .save(srcPath)
+
+ val writeOpts = commonOpts ++ getRecordTypeOpts(recordType) ++ Map(
+ DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr"
+ )
// Perform bootstrap
val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write
.format("hudi")
- .options(commonOpts)
+ .options(writeOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
- .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
.option(HoodieBootstrapConfig.BASE_PATH.key, srcPath)
.option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName)
.option(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key, classOf[FullRecordBootstrapModeSelector].getName)
@@ -515,10 +532,9 @@ class TestDataSourceForBootstrap {
updateDF.write
.format("hudi")
- .options(commonOpts)
+ .options(writeOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
- .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
.mode(SaveMode.Append)
.save(basePath)
@@ -530,7 +546,7 @@ class TestDataSourceForBootstrap {
assertEquals(numRecords, hoodieROViewDF2.count())
assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())
- verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = false)
+ verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = true)
}
def runMetadataBootstrapAndVerifyCommit(tableType: String,
@@ -596,6 +612,12 @@ class TestDataSourceForBootstrap {
hoodieIncViewDF3.count())
}
}
+
+ def getRecordTypeOpts(recordType: HoodieRecordType): Map[String, String] =
+ recordType match {
+ case HoodieRecordType.SPARK => sparkRecordTypeOpts
+ case _ => Map.empty
+ }
}
object TestDataSourceForBootstrap {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index eb6ab80b5f9..f85e55dfd40 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -650,6 +650,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
cfg.configs.add(String.format("hoodie.bootstrap.base.path=%s", bootstrapSourcePath));
cfg.configs.add(String.format("%s=%s", DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "rider"));
cfg.configs.add(String.format("hoodie.bootstrap.keygen.class=%s", SimpleKeyGenerator.class.getName()));
+ cfg.configs.add("hoodie.datasource.write.hive_style_partitioning=true");
cfg.configs.add("hoodie.bootstrap.parallelism=5");
cfg.targetBasePath = newDatasetBasePath;
new HoodieDeltaStreamer(cfg, jsc).sync();
@@ -660,6 +661,9 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
TestHelpers.assertRecordCount(1950, newDatasetBasePath, sqlContext);
res.registerTempTable("bootstrapped");
assertEquals(1950, sqlContext.sql("select distinct _hoodie_record_key from bootstrapped").count());
+ // NOTE: To fetch record's count Spark will optimize the query fetching minimal possible amount
+ // of data, which might not provide adequate amount of test coverage
+ sqlContext.sql("select * from bootstrapped").show();
StructField[] fields = res.schema().fields();
List<String> fieldNames = Arrays.asList(res.schema().fieldNames());