You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by GitBox <> on 2022/04/20 02:37:13 UTC

[GitHub] [hudi] xiarixiaoyao commented on a diff in pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

xiarixiaoyao commented on code in PR #5364:

@@ -17,279 +17,301 @@
 package org.apache.spark.sql.execution.datasources.parquet
-import java.util
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.HoodieSparkUtils
-import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.{InternalSchemaCache, StringUtils}
 import org.apache.hudi.common.util.collection.Pair
 import org.apache.hudi.internal.schema.InternalSchema
-import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
 import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.AvroDeserializer
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat.{createParquetFilters, pruneInternalSchema, rebuildFilterFromParquet}
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
-import org.apache.spark.sql.execution.datasources.parquet._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
 import org.apache.spark.util.SerializableConfiguration
-class Spark312HoodieParquetFileFormat extends ParquetFileFormat {
-  // reference ParquetFileFormat from spark project
-  override def buildReaderWithPartitionValues(
-                                               sparkSession: SparkSession,
-                                               dataSchema: StructType,
-                                               partitionSchema: StructType,
-                                               requiredSchema: StructType,
-                                               filters: Seq[Filter],
-                                               options: Map[String, String],
-                                               hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
-    if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, "").isEmpty) {
-      // fallback to origin parquet File read
-      super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
-    } else {
-      hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
-      hadoopConf.set(
-        ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
-        requiredSchema.json)
-      hadoopConf.set(
-        ParquetWriteSupport.SPARK_ROW_SCHEMA,
-        requiredSchema.json)
-      hadoopConf.set(
-        sparkSession.sessionState.conf.sessionLocalTimeZone)
-      hadoopConf.setBoolean(
-        sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
-      hadoopConf.setBoolean(
-        SQLConf.CASE_SENSITIVE.key,
-        sparkSession.sessionState.conf.caseSensitiveAnalysis)
-      ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
-      // Sets flags for `ParquetToSparkSchemaConverter`
-      hadoopConf.setBoolean(
-        sparkSession.sessionState.conf.isParquetBinaryAsString)
-      hadoopConf.setBoolean(
-        sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
-      // for dataSource v1, we have no method to do project for spark physical plan.
-      // it's safe to do cols project here.
-      val internalSchemaString = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
-      val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
-      if (querySchemaOption.isPresent && !requiredSchema.isEmpty) {
-        val prunedSchema = SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema, querySchemaOption.get())
-        hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(prunedSchema))
+ * This class is an extension of [[ParquetFileFormat]] overriding Spark-specific behavior
+ * that's not possible to customize in any other way
+ *
+ * NOTE: This is a version of [[AvroDeserializer]] impl from Spark 3.1.2 w/ w/ the following changes applied to it:
+ * <ol>
+ *   <li>Avoiding appending partition values to the rows read from the data file</li>
+ *   <li>Schema on-read</li>
+ * </ol>
+ */
+class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+                                              dataSchema: StructType,
+                                              partitionSchema: StructType,
+                                              requiredSchema: StructType,
+                                              filters: Seq[Filter],
+                                              options: Map[String, String],
+                                              hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
+    hadoopConf.set(
+      ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      ParquetWriteSupport.SPARK_ROW_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      sparkSession.sessionState.conf.sessionLocalTimeZone)
+    hadoopConf.setBoolean(
+      sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
+    hadoopConf.setBoolean(
+      SQLConf.CASE_SENSITIVE.key,
+      sparkSession.sessionState.conf.caseSensitiveAnalysis)
+    ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
+    // Sets flags for `ParquetToSparkSchemaConverter`
+    hadoopConf.setBoolean(
+      sparkSession.sessionState.conf.isParquetBinaryAsString)
+    hadoopConf.setBoolean(
+      sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+    val internalSchemaStr = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+    // For Spark DataSource v1, there's no Physical Plan projection/schema pruning w/in Spark itself,
+    // therefore it's safe to do schema projection here
+    if (!isNullOrEmpty(internalSchemaStr)) {
+      val prunedInternalSchemaStr =
+        pruneInternalSchema(internalSchemaStr, requiredSchema)
+      hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, prunedInternalSchemaStr)
+    }
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+    // TODO: if you move this into the closure it reverts to the default values.
+    // If true, enable using the custom RecordReader for parquet. This only works for
+    // a subset of the types (no complex types).
+    val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
+    val sqlConf = sparkSession.sessionState.conf
+    val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
+    val enableVectorizedReader: Boolean =
+      sqlConf.parquetVectorizedReaderEnabled &&
+        resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+    val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
+    val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
+    val capacity = sqlConf.parquetVectorizedReaderBatchSize
+    val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
+    // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+    val returningBatch = supportBatch(sparkSession, resultSchema)
+    val pushDownDate = sqlConf.parquetFilterPushDownDate
+    val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
+    val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
+    val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
+    val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
+    val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+    (file: PartitionedFile) => {
+      assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
+      val filePath = new Path(new URI(file.filePath))
+      val split =
+        new org.apache.parquet.hadoop.ParquetInputSplit(
+          filePath,
+          file.start,
+          file.start + file.length,
+          file.length,
+          Array.empty,
+          null)
+      val sharedConf = broadcastedHadoopConf.value.value
+      // Fetch internal schema
+      val internalSchemaStr = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+      // Internal schema has to be pruned at this point
+      val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
+      val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent
+      val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
+      val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
+      val fileSchema = if (shouldUseInternalSchema) {
+        val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
+        InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+      } else {
+        null
-      val broadcastedHadoopConf =
-        sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
-      // TODO: if you move this into the closure it reverts to the default values.
-      // If true, enable using the custom RecordReader for parquet. This only works for
-      // a subset of the types (no complex types).
-      val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
-      val sqlConf = sparkSession.sessionState.conf
-      val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
-      val enableVectorizedReader: Boolean =
-        sqlConf.parquetVectorizedReaderEnabled &&
-          resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
-      val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
-      val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
-      val capacity = sqlConf.parquetVectorizedReaderBatchSize
-      val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
-      // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
-      val returningBatch = supportBatch(sparkSession, resultSchema)
-      val pushDownDate = sqlConf.parquetFilterPushDownDate
-      val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
-      val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
-      val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
-      val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
-      val isCaseSensitive = sqlConf.caseSensitiveAnalysis
-      (file: PartitionedFile) => {
-        assert(file.partitionValues.numFields == partitionSchema.size)
-        val filePath = new Path(new URI(file.filePath))
-        val split =
-          new org.apache.parquet.hadoop.ParquetInputSplit(
-            filePath,
-            file.start,
-            file.start + file.length,
-            file.length,
-            Array.empty,
-            null)
-        val sharedConf = broadcastedHadoopConf.value.value
-        // do deal with internalSchema
-        val internalSchemaString = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
-        // querySchema must be a pruned schema.
-        val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
-        val internalSchemaChangeEnabled = if (internalSchemaString.isEmpty || !querySchemaOption.isPresent) false else true
-        val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
-        val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
-        val fileSchema = if (internalSchemaChangeEnabled) {
-          val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
-          InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+      lazy val footerFileMetaData =
+        ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
+      val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+        footerFileMetaData.getKeyValueMetaData.get,
+      // Try to push down filters when filter push-down is enabled.
+      val pushed = if (enableParquetFilterPushDown) {
+        val parquetSchema = footerFileMetaData.getSchema
+        val parquetFilters = if (HoodieSparkUtils.gteqSpark3_1_3) {
+          createParquetFilters(
+            parquetSchema,
+            pushDownDate,
+            pushDownTimestamp,
+            pushDownDecimal,
+            pushDownStringStartWith,
+            pushDownInFilterThreshold,
+            isCaseSensitive,
+            datetimeRebaseMode)
         } else {
-          // this should not happened, searchSchemaAndCache will deal with correctly.
-          null
+          createParquetFilters(
+            parquetSchema,
+            pushDownDate,
+            pushDownTimestamp,
+            pushDownDecimal,
+            pushDownStringStartWith,
+            pushDownInFilterThreshold,
+            isCaseSensitive)
+, fileSchema, querySchemaOption.orElse(null)))
+          // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+          // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+          // is used here.
+          .flatMap(parquetFilters.createFilter)
+          .reduceOption(FilterApi.and)
+      } else {
+        None
+      }
-        lazy val footerFileMetaData =
-          ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
-        val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
-          footerFileMetaData.getKeyValueMetaData.get,
-        // Try to push down filters when filter push-down is enabled.
-        val pushed = if (enableParquetFilterPushDown) {
-          val parquetSchema = footerFileMetaData.getSchema
-          val parquetFilters = if (HoodieSparkUtils.gteqSpark3_1_3) {
-            Spark312HoodieParquetFileFormat.createParquetFilters(
-              parquetSchema,
-              pushDownDate,
-              pushDownTimestamp,
-              pushDownDecimal,
-              pushDownStringStartWith,
-              pushDownInFilterThreshold,
-              isCaseSensitive,
-              datetimeRebaseMode)
-          } else {
-            Spark312HoodieParquetFileFormat.createParquetFilters(
-              parquetSchema,
-              pushDownDate,
-              pushDownTimestamp,
-              pushDownDecimal,
-              pushDownStringStartWith,
-              pushDownInFilterThreshold,
-              isCaseSensitive)
-          }
-, fileSchema, querySchemaOption.get()))
-            // Collects all converted Parquet filter predicates. Notice that not all predicates can be
-            // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
-            // is used here.
-            .flatMap(parquetFilters.createFilter(_))
-            .reduceOption(FilterApi.and)
+      // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
+      // *only* if the file was created by something other than "parquet-mr", so check the actual
+      // writer here for this file.  We have to do this per-file, as each file in the table may
+      // have different writers.
+      // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
+      def isCreatedByParquetMr: Boolean =
+        footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+      val convertTz =
+        if (timestampConversion && !isCreatedByParquetMr) {
+          Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
         } else {
-        // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
-        // *only* if the file was created by something other than "parquet-mr", so check the actual
-        // writer here for this file.  We have to do this per-file, as each file in the table may
-        // have different writers.
-        // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
-        def isCreatedByParquetMr: Boolean =
-          footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
-        val convertTz =
-          if (timestampConversion && !isCreatedByParquetMr) {
-            Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
-          } else {
-            None
-          }
-        val int96RebaseMode = DataSourceUtils.int96RebaseMode(
-          footerFileMetaData.getKeyValueMetaData.get,
-        val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
-        // use new conf
-        val hadoopAttempConf = new Configuration(broadcastedHadoopConf.value.value)
-        //
-        // reset request schema
-        var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
-        if (internalSchemaChangeEnabled) {
-          val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
-          val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
-          typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
-          hadoopAttempConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
-        }
-        val hadoopAttemptContext =
-          new TaskAttemptContextImpl(hadoopAttempConf, attemptId)
+      val int96RebaseMode = DataSourceUtils.int96RebaseMode(
+        footerFileMetaData.getKeyValueMetaData.get,
-        // Try to push down filters when filter push-down is enabled.
-        // Notice: This push-down is RowGroups level, not individual records.
-        if (pushed.isDefined) {
-          ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
-        }
-        val taskContext = Option(TaskContext.get())
-        if (enableVectorizedReader) {
-          val vectorizedReader = new Spark312HoodieVectorizedParquetRecordReader(
-            convertTz.orNull,
-            datetimeRebaseMode.toString,
-            int96RebaseMode.toString,
-            enableOffHeapColumnVector && taskContext.isDefined,
-            capacity, typeChangeInfos)
-          val iter = new RecordReaderIterator(vectorizedReader)
-          // SPARK-23457 Register a task completion listener before `initialization`.
-          taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
-          vectorizedReader.initialize(split, hadoopAttemptContext)
+      val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+      // Clone new conf
+      val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value)
+      var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
+      if (shouldUseInternalSchema) {
+        val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
+        val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
+        typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
+        hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
+      }
+      val hadoopAttemptContext =
+        new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
+      // Try to push down filters when filter push-down is enabled.
+      // Notice: This push-down is RowGroups level, not individual records.
+      if (pushed.isDefined) {
+        ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
+      }
+      val taskContext = Option(TaskContext.get())
+      if (enableVectorizedReader) {
+        val vectorizedReader = new Spark312HoodieVectorizedParquetRecordReader(
+          convertTz.orNull,

Review Comment:
   Pls use shouldUseInternalSchema to fallback origin VectorizedParquetRecordReader.  Spark312HoodieVectorizedParquetRecordReader is used for schema evolution.

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail:

For queries about this service, please contact Infrastructure at: