You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/04/19 18:55:27 UTC

[GitHub] [hudi] alexeykudinkin opened a new pull request, #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

alexeykudinkin opened a new pull request, #5364:
URL: https://github.com/apache/hudi/pull/5364

   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contribute/how-to-contribute before opening a pull request.*
   
   ## What is the purpose of the pull request
   
   After we did a fallback to `HadoopFsRelation` in #5352, old issues of HUDI-3204 re-surfaced due to the fact that Spark forcibly appends partition values parsed from the actual partition path to the fetched dataset.
   
   Unfortunately, this behavior is not configurable, and therefore to address this problem we have to override default `ParquetFileFormat` w/ our own modified instance that overrides this behavior: our own extension for `ParquetFileFormat` make such behavior configurable -- caller can configure whether it would prefer 
   
    - To append partition values parsed from the actually partition path 
    - To avoid appending such partition values and instead rely on reading the source columns being used as partition values (Hudi-specific behavior)
   
   ## Brief change log
   
    - Scaffolded `Spark24HoodieParquetFileFormat` extending `ParquetFileFormat` and overriding the behavior of adding partition columns to every row
    - Amended `SparkAdapter`s `createHoodieParquetFileFormat` API to be able to configure whether to append partition values or not
    - Fallback to append partition values in cases when the source columns are not persisted in data-file
    - Fixing HoodieBaseRelation incorrectly handling mandatory columns
   
   ## Verify this pull request
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1103256312

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8145",
       "triggerID" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8146",
       "triggerID" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8150",
       "triggerID" : "f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96 UNKNOWN
   * 27d58e93df6310a8f35f93e73bff5ae3f2378bdd UNKNOWN
   * 3dc0bd1b03d19253dd4034e72f365d6e63e0fce0 UNKNOWN
   * f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8150) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
alexeykudinkin commented on code in PR #5364:
URL: https://github.com/apache/hudi/pull/5364#discussion_r853472942


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkHoodieParquetFileFormat.scala:
##########
@@ -28,20 +28,19 @@ import org.apache.spark.sql.types.StructType
 
 
 class SparkHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport {
-  override def shortName(): String = "HoodieParquet"
+  override def shortName(): String = "hoodie-parquet"

Review Comment:
   Correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1103217469

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8145",
       "triggerID" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8146",
       "triggerID" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8150",
       "triggerID" : "f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96 UNKNOWN
   * c040d5c57e551bc7e0fbbad03236b70ab48b2cf8 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8146) 
   * 27d58e93df6310a8f35f93e73bff5ae3f2378bdd UNKNOWN
   * 3dc0bd1b03d19253dd4034e72f365d6e63e0fce0 UNKNOWN
   * f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8150) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xiarixiaoyao commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
xiarixiaoyao commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1103787179

   @alexeykudinkin @nsivabalan @yihua  
   LGTM
   
   i will put another patch to deal with schema evolution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1103454476

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8145",
       "triggerID" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8146",
       "triggerID" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8150",
       "triggerID" : "f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e56dcad8f272d0450e2803790e2ca9dab1a8c4e7",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8157",
       "triggerID" : "e56dcad8f272d0450e2803790e2ca9dab1a8c4e7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96 UNKNOWN
   * 27d58e93df6310a8f35f93e73bff5ae3f2378bdd UNKNOWN
   * 3dc0bd1b03d19253dd4034e72f365d6e63e0fce0 UNKNOWN
   * f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8150) 
   * e56dcad8f272d0450e2803790e2ca9dab1a8c4e7 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8157) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
alexeykudinkin commented on code in PR #5364:
URL: https://github.com/apache/hudi/pull/5364#discussion_r856510393


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -223,56 +228,62 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
 
     val fileSplits = collectFileSplits(partitionFilters, dataFilters)
 
-    val partitionSchema = if (dropPartitionColumnsWhenWrite) {
-      // when hoodie.datasource.write.drop.partition.columns is true, partition columns can't be persisted in
-      // data files.
-      StructType(partitionColumns.map(StructField(_, StringType)))
-    } else {
-      StructType(Nil)
-    }
 
-    val tableSchema = HoodieTableSchema(tableStructSchema, if (internalSchema.isEmptySchema) tableAvroSchema.toString else AvroInternalSchemaConverter.convert(internalSchema, tableAvroSchema.getName).toString, internalSchema)
-    val dataSchema = if (dropPartitionColumnsWhenWrite) {
-      val dataStructType = StructType(tableStructSchema.filterNot(f => partitionColumns.contains(f.name)))
-      HoodieTableSchema(
-        dataStructType,
-        sparkAdapter.getAvroSchemaConverters.toAvroType(dataStructType, nullable = false, "record").toString()
-      )
-    } else {
-      tableSchema
-    }
-    val requiredSchema = if (dropPartitionColumnsWhenWrite) {
-      val requiredStructType = StructType(requiredStructSchema.filterNot(f => partitionColumns.contains(f.name)))
-      HoodieTableSchema(
-        requiredStructType,
-        sparkAdapter.getAvroSchemaConverters.toAvroType(requiredStructType, nullable = false, "record").toString()
-      )
+    val tableAvroSchemaStr =
+      if (internalSchema.isEmptySchema) tableAvroSchema.toString
+      else AvroInternalSchemaConverter.convert(internalSchema, tableAvroSchema.getName).toString
+
+    val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchema)
+    val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, requiredInternalSchema)
+
+    // Since schema requested by the caller might contain partition columns, we might need to
+    // prune it, removing all partition columns from it in case these columns are not persisted
+    // in the data files
+    //
+    // NOTE: This partition schema is only relevant to file reader to be able to embed
+    //       values of partition columns (hereafter referred to as partition values) encoded into
+    //       the partition path, and omitted from the data file, back into fetched rows;
+    //       Note that, by default, partition columns are not omitted therefore specifying
+    //       partition schema for reader is not required
+    val (partitionSchema, dataSchema, prunedRequiredSchema) =
+      tryPrunePartitionColumns(tableSchema, requiredSchema)
+
+    if (fileSplits.isEmpty) {
+      sparkSession.sparkContext.emptyRDD
     } else {
-      HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, requiredInternalSchema)
+      val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, prunedRequiredSchema, filters)
+
+      // NOTE: In case when partition columns have been pruned from the required schema, we have to project
+      //       the rows from the pruned schema back into the one expected by the caller
+      val projectedRDD = if (prunedRequiredSchema.structTypeSchema != requiredSchema.structTypeSchema) {
+        rdd.mapPartitions { it =>
+          val fullPrunedSchema = StructType(prunedRequiredSchema.structTypeSchema.fields ++ partitionSchema.fields)
+          val unsafeProjection = generateUnsafeProjection(fullPrunedSchema, requiredSchema.structTypeSchema)

Review Comment:
   @YannByron this is the problem you're hitting with mandatory columns -- when you're filtering out partition columns from the schema,  you actually re-ordered the columns relative to what caller (Spark) was expecting and it was simply projecting schema assuming that BaseRelation will return rows adhering to the schema, while it was returning it w/ columns reordered (where partition columns were appended at the end).
   
   Proper fix for that was to do projection here back into the schema that caller expects



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##########
@@ -114,16 +114,37 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
    *       rule; you can find more details in HUDI-3896)
    */
   def toHadoopFsRelation: HadoopFsRelation = {
+    // We're delegating to Spark to append partition values to every row only in cases
+    // when these corresponding partition-values are not persisted w/in the data file itself
+    val shouldAppendPartitionColumns = omitPartitionColumnsInFile
+
     val (tableFileFormat, formatClassName) = metaClient.getTableConfig.getBaseFileFormat match {
-      case HoodieFileFormat.PARQUET => (new ParquetFileFormat, "parquet")
+      case HoodieFileFormat.PARQUET => (sparkAdapter.createHoodieParquetFileFormat(shouldAppendPartitionColumns).get, "hoodie-parquet")
       case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
     }
 
     if (globPaths.isEmpty) {
+      // NOTE: There are currently 2 ways partition values could be fetched:
+      //          - Source columns (producing the values used for physical partitioning) will be read
+      //          from the data file
+      //          - Values parsed from the actual partition pat would be appended to the final dataset

Review Comment:
   Addressed in a follow-up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1103207245

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8145",
       "triggerID" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8146",
       "triggerID" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96 UNKNOWN
   * c040d5c57e551bc7e0fbbad03236b70ab48b2cf8 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8146) 
   * 27d58e93df6310a8f35f93e73bff5ae3f2378bdd UNKNOWN
   * 3dc0bd1b03d19253dd4034e72f365d6e63e0fce0 UNKNOWN
   * f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1103190387

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8145",
       "triggerID" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8146",
       "triggerID" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96 UNKNOWN
   * c040d5c57e551bc7e0fbbad03236b70ab48b2cf8 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8146) 
   * 27d58e93df6310a8f35f93e73bff5ae3f2378bdd UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1103481704

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8145",
       "triggerID" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8146",
       "triggerID" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8150",
       "triggerID" : "f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e56dcad8f272d0450e2803790e2ca9dab1a8c4e7",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8157",
       "triggerID" : "e56dcad8f272d0450e2803790e2ca9dab1a8c4e7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9e166e9146345498e0aaee8eff844b9444f3a8d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f9e166e9146345498e0aaee8eff844b9444f3a8d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "64dd7b1a59d215289d97578dcd0943b4d9764dcd",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8161",
       "triggerID" : "64dd7b1a59d215289d97578dcd0943b4d9764dcd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96 UNKNOWN
   * 27d58e93df6310a8f35f93e73bff5ae3f2378bdd UNKNOWN
   * 3dc0bd1b03d19253dd4034e72f365d6e63e0fce0 UNKNOWN
   * e56dcad8f272d0450e2803790e2ca9dab1a8c4e7 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8157) 
   * f9e166e9146345498e0aaee8eff844b9444f3a8d UNKNOWN
   * 64dd7b1a59d215289d97578dcd0943b4d9764dcd Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8161) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] yihua commented on a diff in pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
yihua commented on code in PR #5364:
URL: https://github.com/apache/hudi/pull/5364#discussion_r853619711


##########
hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala:
##########
@@ -55,14 +54,7 @@ class Spark3_1Adapter extends BaseSpark3Adapter {
     }
   }
 
-  override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = {
-    if (SPARK_VERSION.startsWith("3.1")) {
-      val loadClassName = "org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat"
-      val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader)
-      val ctor = clazz.getConstructors.head
-      Some(ctor.newInstance().asInstanceOf[ParquetFileFormat])
-    } else {
-      None
-    }
+  override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
+    Some(new Spark312HoodieParquetFileFormat(appendPartitionValues))

Review Comment:
   Got it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xiarixiaoyao commented on a diff in pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
xiarixiaoyao commented on code in PR #5364:
URL: https://github.com/apache/hudi/pull/5364#discussion_r853677692


##########
hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala:
##########
@@ -17,279 +17,301 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import java.net.URI
-import java.util
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.HoodieSparkUtils
-import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.{InternalSchemaCache, StringUtils}
 import org.apache.hudi.common.util.collection.Pair
 import org.apache.hudi.internal.schema.InternalSchema
-import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
 import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
-
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.AvroDeserializer
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat.{createParquetFilters, pruneInternalSchema, rebuildFilterFromParquet}
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
-import org.apache.spark.sql.execution.datasources.parquet._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
-class Spark312HoodieParquetFileFormat extends ParquetFileFormat {
-
-  // reference ParquetFileFormat from spark project
-  override def buildReaderWithPartitionValues(
-                                               sparkSession: SparkSession,
-                                               dataSchema: StructType,
-                                               partitionSchema: StructType,
-                                               requiredSchema: StructType,
-                                               filters: Seq[Filter],
-                                               options: Map[String, String],
-                                               hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
-    if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, "").isEmpty) {
-      // fallback to origin parquet File read
-      super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
-    } else {
-      hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
-      hadoopConf.set(
-        ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
-        requiredSchema.json)
-      hadoopConf.set(
-        ParquetWriteSupport.SPARK_ROW_SCHEMA,
-        requiredSchema.json)
-      hadoopConf.set(
-        SQLConf.SESSION_LOCAL_TIMEZONE.key,
-        sparkSession.sessionState.conf.sessionLocalTimeZone)
-      hadoopConf.setBoolean(
-        SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
-        sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
-      hadoopConf.setBoolean(
-        SQLConf.CASE_SENSITIVE.key,
-        sparkSession.sessionState.conf.caseSensitiveAnalysis)
-
-      ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
-
-      // Sets flags for `ParquetToSparkSchemaConverter`
-      hadoopConf.setBoolean(
-        SQLConf.PARQUET_BINARY_AS_STRING.key,
-        sparkSession.sessionState.conf.isParquetBinaryAsString)
-      hadoopConf.setBoolean(
-        SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
-        sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
-      // for dataSource v1, we have no method to do project for spark physical plan.
-      // it's safe to do cols project here.
-      val internalSchemaString = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
-      val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
-      if (querySchemaOption.isPresent && !requiredSchema.isEmpty) {
-        val prunedSchema = SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema, querySchemaOption.get())
-        hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(prunedSchema))
+import java.net.URI
+
+
+/**
+ * This class is an extension of [[ParquetFileFormat]] overriding Spark-specific behavior
+ * that's not possible to customize in any other way
+ *
+ * NOTE: This is a version of [[AvroDeserializer]] impl from Spark 3.1.2 w/ w/ the following changes applied to it:
+ * <ol>
+ *   <li>Avoiding appending partition values to the rows read from the data file</li>
+ *   <li>Schema on-read</li>
+ * </ol>
+ */
+class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+                                              dataSchema: StructType,
+                                              partitionSchema: StructType,
+                                              requiredSchema: StructType,
+                                              filters: Seq[Filter],
+                                              options: Map[String, String],
+                                              hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
+    hadoopConf.set(
+      ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      ParquetWriteSupport.SPARK_ROW_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      SQLConf.SESSION_LOCAL_TIMEZONE.key,
+      sparkSession.sessionState.conf.sessionLocalTimeZone)
+    hadoopConf.setBoolean(
+      SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
+      sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
+    hadoopConf.setBoolean(
+      SQLConf.CASE_SENSITIVE.key,
+      sparkSession.sessionState.conf.caseSensitiveAnalysis)
+
+    ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
+
+    // Sets flags for `ParquetToSparkSchemaConverter`
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_BINARY_AS_STRING.key,
+      sparkSession.sessionState.conf.isParquetBinaryAsString)
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+      sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+
+    val internalSchemaStr = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+    // For Spark DataSource v1, there's no Physical Plan projection/schema pruning w/in Spark itself,
+    // therefore it's safe to do schema projection here
+    if (!isNullOrEmpty(internalSchemaStr)) {
+      val prunedInternalSchemaStr =
+        pruneInternalSchema(internalSchemaStr, requiredSchema)
+      hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, prunedInternalSchemaStr)
+    }
+
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+
+    // TODO: if you move this into the closure it reverts to the default values.
+    // If true, enable using the custom RecordReader for parquet. This only works for
+    // a subset of the types (no complex types).
+    val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
+    val sqlConf = sparkSession.sessionState.conf
+    val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
+    val enableVectorizedReader: Boolean =
+      sqlConf.parquetVectorizedReaderEnabled &&
+        resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+    val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
+    val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
+    val capacity = sqlConf.parquetVectorizedReaderBatchSize
+    val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
+    // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+    val returningBatch = supportBatch(sparkSession, resultSchema)
+    val pushDownDate = sqlConf.parquetFilterPushDownDate
+    val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
+    val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
+    val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
+    val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
+    val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+
+    (file: PartitionedFile) => {
+      assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
+
+      val filePath = new Path(new URI(file.filePath))
+      val split =
+        new org.apache.parquet.hadoop.ParquetInputSplit(
+          filePath,
+          file.start,
+          file.start + file.length,
+          file.length,
+          Array.empty,
+          null)
+
+      val sharedConf = broadcastedHadoopConf.value.value
+
+      // Fetch internal schema
+      val internalSchemaStr = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+      // Internal schema has to be pruned at this point
+      val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
+
+      val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent
+
+      val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
+      val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
+      val fileSchema = if (shouldUseInternalSchema) {
+        val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
+        InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+      } else {
+        null
       }
-      val broadcastedHadoopConf =
-        sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
-
-      // TODO: if you move this into the closure it reverts to the default values.
-      // If true, enable using the custom RecordReader for parquet. This only works for
-      // a subset of the types (no complex types).
-      val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
-      val sqlConf = sparkSession.sessionState.conf
-      val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
-      val enableVectorizedReader: Boolean =
-        sqlConf.parquetVectorizedReaderEnabled &&
-          resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
-      val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
-      val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
-      val capacity = sqlConf.parquetVectorizedReaderBatchSize
-      val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
-      // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
-      val returningBatch = supportBatch(sparkSession, resultSchema)
-      val pushDownDate = sqlConf.parquetFilterPushDownDate
-      val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
-      val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
-      val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
-      val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
-      val isCaseSensitive = sqlConf.caseSensitiveAnalysis
-
-      (file: PartitionedFile) => {
-        assert(file.partitionValues.numFields == partitionSchema.size)
-        val filePath = new Path(new URI(file.filePath))
-        val split =
-          new org.apache.parquet.hadoop.ParquetInputSplit(
-            filePath,
-            file.start,
-            file.start + file.length,
-            file.length,
-            Array.empty,
-            null)
-        val sharedConf = broadcastedHadoopConf.value.value
-        // do deal with internalSchema
-        val internalSchemaString = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
-        // querySchema must be a pruned schema.
-        val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
-        val internalSchemaChangeEnabled = if (internalSchemaString.isEmpty || !querySchemaOption.isPresent) false else true
-        val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
-        val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
-        val fileSchema = if (internalSchemaChangeEnabled) {
-          val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
-          InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+
+      lazy val footerFileMetaData =
+        ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
+      val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+        footerFileMetaData.getKeyValueMetaData.get,
+        SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
+      // Try to push down filters when filter push-down is enabled.
+      val pushed = if (enableParquetFilterPushDown) {
+        val parquetSchema = footerFileMetaData.getSchema
+        val parquetFilters = if (HoodieSparkUtils.gteqSpark3_1_3) {
+          createParquetFilters(
+            parquetSchema,
+            pushDownDate,
+            pushDownTimestamp,
+            pushDownDecimal,
+            pushDownStringStartWith,
+            pushDownInFilterThreshold,
+            isCaseSensitive,
+            datetimeRebaseMode)
         } else {
-          // this should not happened, searchSchemaAndCache will deal with correctly.
-          null
+          createParquetFilters(
+            parquetSchema,
+            pushDownDate,
+            pushDownTimestamp,
+            pushDownDecimal,
+            pushDownStringStartWith,
+            pushDownInFilterThreshold,
+            isCaseSensitive)
         }
+        filters.map(rebuildFilterFromParquet(_, fileSchema, querySchemaOption.orElse(null)))
+          // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+          // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+          // is used here.
+          .flatMap(parquetFilters.createFilter)
+          .reduceOption(FilterApi.and)
+      } else {
+        None
+      }
 
-        lazy val footerFileMetaData =
-          ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
-        val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
-          footerFileMetaData.getKeyValueMetaData.get,
-          SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
-        // Try to push down filters when filter push-down is enabled.
-        val pushed = if (enableParquetFilterPushDown) {
-          val parquetSchema = footerFileMetaData.getSchema
-          val parquetFilters = if (HoodieSparkUtils.gteqSpark3_1_3) {
-            Spark312HoodieParquetFileFormat.createParquetFilters(
-              parquetSchema,
-              pushDownDate,
-              pushDownTimestamp,
-              pushDownDecimal,
-              pushDownStringStartWith,
-              pushDownInFilterThreshold,
-              isCaseSensitive,
-              datetimeRebaseMode)
-          } else {
-            Spark312HoodieParquetFileFormat.createParquetFilters(
-              parquetSchema,
-              pushDownDate,
-              pushDownTimestamp,
-              pushDownDecimal,
-              pushDownStringStartWith,
-              pushDownInFilterThreshold,
-              isCaseSensitive)
-          }
-          filters.map(Spark312HoodieParquetFileFormat.rebuildFilterFromParquet(_, fileSchema, querySchemaOption.get()))
-            // Collects all converted Parquet filter predicates. Notice that not all predicates can be
-            // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
-            // is used here.
-            .flatMap(parquetFilters.createFilter(_))
-            .reduceOption(FilterApi.and)
+      // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
+      // *only* if the file was created by something other than "parquet-mr", so check the actual
+      // writer here for this file.  We have to do this per-file, as each file in the table may
+      // have different writers.
+      // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
+      def isCreatedByParquetMr: Boolean =
+        footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+      val convertTz =
+        if (timestampConversion && !isCreatedByParquetMr) {
+          Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
         } else {
           None
         }
 
-        // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
-        // *only* if the file was created by something other than "parquet-mr", so check the actual
-        // writer here for this file.  We have to do this per-file, as each file in the table may
-        // have different writers.
-        // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
-        def isCreatedByParquetMr: Boolean =
-          footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
-
-        val convertTz =
-          if (timestampConversion && !isCreatedByParquetMr) {
-            Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
-          } else {
-            None
-          }
-        val int96RebaseMode = DataSourceUtils.int96RebaseMode(
-          footerFileMetaData.getKeyValueMetaData.get,
-          SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))
-
-        val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
-        // use new conf
-        val hadoopAttempConf = new Configuration(broadcastedHadoopConf.value.value)
-        //
-        // reset request schema
-        var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
-        if (internalSchemaChangeEnabled) {
-          val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
-          val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
-          typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
-          hadoopAttempConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
-        }
-        val hadoopAttemptContext =
-          new TaskAttemptContextImpl(hadoopAttempConf, attemptId)
+      val int96RebaseMode = DataSourceUtils.int96RebaseMode(
+        footerFileMetaData.getKeyValueMetaData.get,
+        SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))
 
-        // Try to push down filters when filter push-down is enabled.
-        // Notice: This push-down is RowGroups level, not individual records.
-        if (pushed.isDefined) {
-          ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
-        }
-        val taskContext = Option(TaskContext.get())
-        if (enableVectorizedReader) {
-          val vectorizedReader = new Spark312HoodieVectorizedParquetRecordReader(
-            convertTz.orNull,
-            datetimeRebaseMode.toString,
-            int96RebaseMode.toString,
-            enableOffHeapColumnVector && taskContext.isDefined,
-            capacity, typeChangeInfos)
-          val iter = new RecordReaderIterator(vectorizedReader)
-          // SPARK-23457 Register a task completion listener before `initialization`.
-          taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
-          vectorizedReader.initialize(split, hadoopAttemptContext)
+      val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+
+      // Clone new conf
+      val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value)
+      var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
+      if (shouldUseInternalSchema) {
+        val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
+        val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
+        typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
+        hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
+      }
+      val hadoopAttemptContext =
+        new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
+
+      // Try to push down filters when filter push-down is enabled.
+      // Notice: This push-down is RowGroups level, not individual records.
+      if (pushed.isDefined) {
+        ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
+      }
+      val taskContext = Option(TaskContext.get())
+      if (enableVectorizedReader) {
+        val vectorizedReader = new Spark312HoodieVectorizedParquetRecordReader(
+          convertTz.orNull,

Review Comment:
   Pls use shouldUseInternalSchema to fallback origin VectorizedParquetRecordReader.  Spark312HoodieVectorizedParquetRecordReader is used for schema evolution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1102986099

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96 UNKNOWN
   * ceb5addbf5d6d1100443ecf598f06b4323e1d6fe UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] yihua commented on a diff in pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
yihua commented on code in PR #5364:
URL: https://github.com/apache/hudi/pull/5364#discussion_r853541246


##########
hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala:
##########
@@ -55,14 +54,7 @@ class Spark3_1Adapter extends BaseSpark3Adapter {
     }
   }
 
-  override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = {
-    if (SPARK_VERSION.startsWith("3.1")) {
-      val loadClassName = "org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat"
-      val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader)
-      val ctor = clazz.getConstructors.head
-      Some(ctor.newInstance().asInstanceOf[ParquetFileFormat])
-    } else {
-      None
-    }
+  override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
+    Some(new Spark312HoodieParquetFileFormat(appendPartitionValues))

Review Comment:
   Is there any reason why the class loader is used before, instead of directly creating a new instance with the class? @xushiyan  do you have any context here, to make sure there is no historical get-around and we're not breaking any logic? 



##########
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala:
##########
@@ -27,233 +25,262 @@ import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.common.util.collection.Pair
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
 import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
-import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
+import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat.{pruneInternalSchema, rebuildFilterFromParquet}
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
-class Spark32HoodieParquetFileFormat extends ParquetFileFormat {
-
-  // reference ParquetFileFormat from spark project
-  override def buildReaderWithPartitionValues(
-                                               sparkSession: SparkSession,
-                                               dataSchema: StructType,
-                                               partitionSchema: StructType,
-                                               requiredSchema: StructType,
-                                               filters: Seq[Filter],
-                                               options: Map[String, String],
-                                               hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
-    if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, "").isEmpty) {
-      // fallback to origin parquet File read
-      super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)

Review Comment:
   If `shouldAppendPartitionValues` is true and the existing if condition is true, can we still fall back to the original parquet file read?



##########
hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala:
##########
@@ -17,279 +17,301 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import java.net.URI
-import java.util
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.HoodieSparkUtils
-import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.{InternalSchemaCache, StringUtils}
 import org.apache.hudi.common.util.collection.Pair
 import org.apache.hudi.internal.schema.InternalSchema
-import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
 import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
-
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.AvroDeserializer
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat.{createParquetFilters, pruneInternalSchema, rebuildFilterFromParquet}
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
-import org.apache.spark.sql.execution.datasources.parquet._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
-class Spark312HoodieParquetFileFormat extends ParquetFileFormat {
-
-  // reference ParquetFileFormat from spark project
-  override def buildReaderWithPartitionValues(
-                                               sparkSession: SparkSession,
-                                               dataSchema: StructType,
-                                               partitionSchema: StructType,
-                                               requiredSchema: StructType,
-                                               filters: Seq[Filter],
-                                               options: Map[String, String],
-                                               hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
-    if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, "").isEmpty) {

Review Comment:
   similar here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1103448431

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8145",
       "triggerID" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8146",
       "triggerID" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8150",
       "triggerID" : "f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e56dcad8f272d0450e2803790e2ca9dab1a8c4e7",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e56dcad8f272d0450e2803790e2ca9dab1a8c4e7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96 UNKNOWN
   * 27d58e93df6310a8f35f93e73bff5ae3f2378bdd UNKNOWN
   * 3dc0bd1b03d19253dd4034e72f365d6e63e0fce0 UNKNOWN
   * f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8150) 
   * e56dcad8f272d0450e2803790e2ca9dab1a8c4e7 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xiarixiaoyao commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
xiarixiaoyao commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1103525747

   @alexeykudinkin  thanks for your address.
   @alexeykudinkin  @nsivabalan @yihua  now schema evolution is not worked with this pr,  let me find the reason,  pls wait a moment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
alexeykudinkin commented on code in PR #5364:
URL: https://github.com/apache/hudi/pull/5364#discussion_r853568411


##########
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala:
##########
@@ -27,233 +25,262 @@ import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.common.util.collection.Pair
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
 import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
-import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
+import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat.{pruneInternalSchema, rebuildFilterFromParquet}
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
-class Spark32HoodieParquetFileFormat extends ParquetFileFormat {
-
-  // reference ParquetFileFormat from spark project
-  override def buildReaderWithPartitionValues(
-                                               sparkSession: SparkSession,
-                                               dataSchema: StructType,
-                                               partitionSchema: StructType,
-                                               requiredSchema: StructType,
-                                               filters: Seq[Filter],
-                                               options: Map[String, String],
-                                               hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
-    if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, "").isEmpty) {
-      // fallback to origin parquet File read
-      super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)

Review Comment:
   `shouldAppendPartitionValues` is almost never true now (only in cases when we drop the source columns)



##########
hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala:
##########
@@ -17,279 +17,301 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import java.net.URI
-import java.util
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.HoodieSparkUtils
-import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.{InternalSchemaCache, StringUtils}
 import org.apache.hudi.common.util.collection.Pair
 import org.apache.hudi.internal.schema.InternalSchema
-import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
 import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
-
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.AvroDeserializer
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat.{createParquetFilters, pruneInternalSchema, rebuildFilterFromParquet}
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
-import org.apache.spark.sql.execution.datasources.parquet._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
-class Spark312HoodieParquetFileFormat extends ParquetFileFormat {
-
-  // reference ParquetFileFormat from spark project
-  override def buildReaderWithPartitionValues(
-                                               sparkSession: SparkSession,
-                                               dataSchema: StructType,
-                                               partitionSchema: StructType,
-                                               requiredSchema: StructType,
-                                               filters: Seq[Filter],
-                                               options: Map[String, String],
-                                               hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
-    if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, "").isEmpty) {

Review Comment:
   Responded above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
alexeykudinkin commented on code in PR #5364:
URL: https://github.com/apache/hudi/pull/5364#discussion_r853567337


##########
hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala:
##########
@@ -55,14 +54,7 @@ class Spark3_1Adapter extends BaseSpark3Adapter {
     }
   }
 
-  override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = {
-    if (SPARK_VERSION.startsWith("3.1")) {
-      val loadClassName = "org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat"
-      val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader)
-      val ctor = clazz.getConstructors.head
-      Some(ctor.newInstance().asInstanceOf[ParquetFileFormat])
-    } else {
-      None
-    }
+  override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
+    Some(new Spark312HoodieParquetFileFormat(appendPartitionValues))

Review Comment:
   My hunch is that @xiarixiaoyao was using the reflection to load this component to handle the case of Spark 3.0. But given that we're dropping support for it in 0.11, i just dropped the reflection and instantiate it directly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
alexeykudinkin commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1103241149

   @xiarixiaoyao please take a look as well


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] yihua commented on a diff in pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
yihua commented on code in PR #5364:
URL: https://github.com/apache/hudi/pull/5364#discussion_r853620708


##########
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala:
##########
@@ -27,233 +25,262 @@ import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.common.util.collection.Pair
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
 import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
-import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
+import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat.{pruneInternalSchema, rebuildFilterFromParquet}
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
-class Spark32HoodieParquetFileFormat extends ParquetFileFormat {
-
-  // reference ParquetFileFormat from spark project
-  override def buildReaderWithPartitionValues(
-                                               sparkSession: SparkSession,
-                                               dataSchema: StructType,
-                                               partitionSchema: StructType,
-                                               requiredSchema: StructType,
-                                               filters: Seq[Filter],
-                                               options: Map[String, String],
-                                               hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
-    if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, "").isEmpty) {
-      // fallback to origin parquet File read
-      super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)

Review Comment:
   Sg



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1103465752

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8145",
       "triggerID" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8146",
       "triggerID" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8150",
       "triggerID" : "f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e56dcad8f272d0450e2803790e2ca9dab1a8c4e7",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8157",
       "triggerID" : "e56dcad8f272d0450e2803790e2ca9dab1a8c4e7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9e166e9146345498e0aaee8eff844b9444f3a8d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f9e166e9146345498e0aaee8eff844b9444f3a8d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96 UNKNOWN
   * 27d58e93df6310a8f35f93e73bff5ae3f2378bdd UNKNOWN
   * 3dc0bd1b03d19253dd4034e72f365d6e63e0fce0 UNKNOWN
   * e56dcad8f272d0450e2803790e2ca9dab1a8c4e7 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8157) 
   * f9e166e9146345498e0aaee8eff844b9444f3a8d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
alexeykudinkin commented on code in PR #5364:
URL: https://github.com/apache/hudi/pull/5364#discussion_r853630693


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##########
@@ -114,16 +114,38 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
    *       rule; you can find more details in HUDI-3896)
    */
   def toHadoopFsRelation: HadoopFsRelation = {
+    // We're delegating to Spark to append partition values to every row only in cases
+    // when these corresponding partition-values are not persisted w/in the data file itself
+    val shouldAppendPartitionColumns = omitPartitionColumnsInFile

Review Comment:
   Good call!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1103555789

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8145",
       "triggerID" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8146",
       "triggerID" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8150",
       "triggerID" : "f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e56dcad8f272d0450e2803790e2ca9dab1a8c4e7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8157",
       "triggerID" : "e56dcad8f272d0450e2803790e2ca9dab1a8c4e7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9e166e9146345498e0aaee8eff844b9444f3a8d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f9e166e9146345498e0aaee8eff844b9444f3a8d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "64dd7b1a59d215289d97578dcd0943b4d9764dcd",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8161",
       "triggerID" : "64dd7b1a59d215289d97578dcd0943b4d9764dcd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96 UNKNOWN
   * 27d58e93df6310a8f35f93e73bff5ae3f2378bdd UNKNOWN
   * 3dc0bd1b03d19253dd4034e72f365d6e63e0fce0 UNKNOWN
   * f9e166e9146345498e0aaee8eff844b9444f3a8d UNKNOWN
   * 64dd7b1a59d215289d97578dcd0943b4d9764dcd Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8161) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
alexeykudinkin commented on code in PR #5364:
URL: https://github.com/apache/hudi/pull/5364#discussion_r853393411


##########
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.AvroDeserializer
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.{PartitionedFile, RecordReaderIterator}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.{AtomicType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+import java.net.URI
+
+/**
+ * This class is an extension of [[ParquetFileFormat]] overriding Spark-specific behavior
+ * that's not possible to customize in any other way
+ *
+ * NOTE: This is a version of [[AvroDeserializer]] impl from Spark 2.4.4 w/ w/ the following changes applied to it:
+ * <ol>
+ *   <li>Avoiding appending partition values to the rows read from the data file</li>
+ * </ol>
+ */
+class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {

Review Comment:
   Only aspects that diverge from the source are the ones using `shouldAppendPartitionValues`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1102982994

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan merged pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
xushiyan merged PR #5364:
URL: https://github.com/apache/hudi/pull/5364


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
alexeykudinkin commented on code in PR #5364:
URL: https://github.com/apache/hudi/pull/5364#discussion_r854290453


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##########
@@ -114,16 +114,38 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
    *       rule; you can find more details in HUDI-3896)
    */
   def toHadoopFsRelation: HadoopFsRelation = {
+    // We're delegating to Spark to append partition values to every row only in cases
+    // when these corresponding partition-values are not persisted w/in the data file itself
+    val shouldAppendPartitionColumns = omitPartitionColumnsInFile

Review Comment:
   @nsivabalan on a second thought -- this flag is actually directing whether we should be omitting partition columns when we persist in data files, so kept it as `omitPartitionColumns` to be aligned with the config value



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1103069053

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8145",
       "triggerID" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8146",
       "triggerID" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96 UNKNOWN
   * ceb5addbf5d6d1100443ecf598f06b4323e1d6fe Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8145) 
   * c040d5c57e551bc7e0fbbad03236b70ab48b2cf8 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8146) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1103199446

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8145",
       "triggerID" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8146",
       "triggerID" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96 UNKNOWN
   * c040d5c57e551bc7e0fbbad03236b70ab48b2cf8 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8146) 
   * 27d58e93df6310a8f35f93e73bff5ae3f2378bdd UNKNOWN
   * 3dc0bd1b03d19253dd4034e72f365d6e63e0fce0 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1103185550

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8145",
       "triggerID" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8146",
       "triggerID" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96 UNKNOWN
   * c040d5c57e551bc7e0fbbad03236b70ab48b2cf8 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8146) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xiarixiaoyao commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
xiarixiaoyao commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1103415549

   @alexeykudinkin  now we use hoodieparquetFile, 
   we may need to modfy this function to reduce the impact of schema evolution
   HoodieDataSourceHelper.getConfigurationWithInternalSchema 
   ```
     def getConfigurationWithInternalSchema(conf: Configuration, internalSchema: InternalSchema, tablePath: String, validCommits: String): Configuration = {
       val querySchemaString = SerDeHelper.toJson(internalSchema)
       if (!querySchemaString.isEmpty) {
         conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, querySchemaString)
         conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, tablePath)
         conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits)
       }
       conf
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1103016126

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8145",
       "triggerID" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96 UNKNOWN
   * ceb5addbf5d6d1100443ecf598f06b4323e1d6fe Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8145) 
   * c040d5c57e551bc7e0fbbad03236b70ab48b2cf8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on code in PR #5364:
URL: https://github.com/apache/hudi/pull/5364#discussion_r853598977


##########
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala:
##########
@@ -289,6 +324,16 @@ class Spark32HoodieParquetFileFormat extends ParquetFileFormat {
 
 object Spark32HoodieParquetFileFormat {
 
+  def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {

Review Comment:
   feel free to fix this in a follow up PR if need be. may be we can move this to a util class and used in across adaptors? I see same exact method in Spark312HoodieParquetFileFormat class as well.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##########
@@ -114,16 +114,38 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
    *       rule; you can find more details in HUDI-3896)
    */
   def toHadoopFsRelation: HadoopFsRelation = {
+    // We're delegating to Spark to append partition values to every row only in cases
+    // when these corresponding partition-values are not persisted w/in the data file itself
+    val shouldAppendPartitionColumns = omitPartitionColumnsInFile

Review Comment:
   minor. instead of "omitPartitionColumnsInFile" (present tense), may be we can name the variable as "isPartitionColumnPersistedInDataFile" (past tense).  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] yihua commented on a diff in pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
yihua commented on code in PR #5364:
URL: https://github.com/apache/hudi/pull/5364#discussion_r853429120


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##########
@@ -114,16 +114,37 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
    *       rule; you can find more details in HUDI-3896)
    */
   def toHadoopFsRelation: HadoopFsRelation = {
+    // We're delegating to Spark to append partition values to every row only in cases
+    // when these corresponding partition-values are not persisted w/in the data file itself
+    val shouldAppendPartitionColumns = omitPartitionColumnsInFile
+
     val (tableFileFormat, formatClassName) = metaClient.getTableConfig.getBaseFileFormat match {
-      case HoodieFileFormat.PARQUET => (new ParquetFileFormat, "parquet")
+      case HoodieFileFormat.PARQUET => (sparkAdapter.createHoodieParquetFileFormat(shouldAppendPartitionColumns).get, "hoodie-parquet")

Review Comment:
   nit: create a constant for "hoodie-parquet" so it can be referenced everywhere.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##########
@@ -114,16 +114,37 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
    *       rule; you can find more details in HUDI-3896)
    */
   def toHadoopFsRelation: HadoopFsRelation = {
+    // We're delegating to Spark to append partition values to every row only in cases
+    // when these corresponding partition-values are not persisted w/in the data file itself
+    val shouldAppendPartitionColumns = omitPartitionColumnsInFile
+
     val (tableFileFormat, formatClassName) = metaClient.getTableConfig.getBaseFileFormat match {
-      case HoodieFileFormat.PARQUET => (new ParquetFileFormat, "parquet")
+      case HoodieFileFormat.PARQUET => (sparkAdapter.createHoodieParquetFileFormat(shouldAppendPartitionColumns).get, "hoodie-parquet")
       case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
     }
 
     if (globPaths.isEmpty) {
+      // NOTE: There are currently 2 ways partition values could be fetched:
+      //          - Source columns (producing the values used for physical partitioning) will be read
+      //          from the data file
+      //          - Values parsed from the actual partition pat would be appended to the final dataset

Review Comment:
   typo: "pat"



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkHoodieParquetFileFormat.scala:
##########
@@ -28,20 +28,19 @@ import org.apache.spark.sql.types.StructType
 
 
 class SparkHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport {
-  override def shortName(): String = "HoodieParquet"
+  override def shortName(): String = "hoodie-parquet"

Review Comment:
   I assume this is used by Spark to identify the format?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala:
##########
@@ -183,7 +183,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
       sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, metaClient.getBasePath)
       sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits)
       val formatClassName = metaClient.getTableConfig.getBaseFileFormat match {
-        case HoodieFileFormat.PARQUET => if (!internalSchema.isEmptySchema) "HoodieParquet" else "parquet"
+        case HoodieFileFormat.PARQUET => "hoodie-parquet"

Review Comment:
   same here for constant



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1103020293

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8145",
       "triggerID" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96 UNKNOWN
   * ceb5addbf5d6d1100443ecf598f06b4323e1d6fe Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8145) 
   * c040d5c57e551bc7e0fbbad03236b70ab48b2cf8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
alexeykudinkin commented on code in PR #5364:
URL: https://github.com/apache/hudi/pull/5364#discussion_r853515259


##########
hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala:
##########
@@ -17,279 +17,301 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import java.net.URI
-import java.util
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.HoodieSparkUtils
-import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.{InternalSchemaCache, StringUtils}
 import org.apache.hudi.common.util.collection.Pair
 import org.apache.hudi.internal.schema.InternalSchema
-import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
 import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
-
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.AvroDeserializer
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat.{createParquetFilters, pruneInternalSchema, rebuildFilterFromParquet}
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
-import org.apache.spark.sql.execution.datasources.parquet._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
-class Spark312HoodieParquetFileFormat extends ParquetFileFormat {
-
-  // reference ParquetFileFormat from spark project
-  override def buildReaderWithPartitionValues(
-                                               sparkSession: SparkSession,
-                                               dataSchema: StructType,
-                                               partitionSchema: StructType,
-                                               requiredSchema: StructType,
-                                               filters: Seq[Filter],
-                                               options: Map[String, String],
-                                               hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
-    if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, "").isEmpty) {
-      // fallback to origin parquet File read
-      super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
-    } else {
-      hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
-      hadoopConf.set(
-        ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
-        requiredSchema.json)
-      hadoopConf.set(
-        ParquetWriteSupport.SPARK_ROW_SCHEMA,
-        requiredSchema.json)
-      hadoopConf.set(
-        SQLConf.SESSION_LOCAL_TIMEZONE.key,
-        sparkSession.sessionState.conf.sessionLocalTimeZone)
-      hadoopConf.setBoolean(
-        SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
-        sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
-      hadoopConf.setBoolean(
-        SQLConf.CASE_SENSITIVE.key,
-        sparkSession.sessionState.conf.caseSensitiveAnalysis)
-
-      ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
-
-      // Sets flags for `ParquetToSparkSchemaConverter`
-      hadoopConf.setBoolean(
-        SQLConf.PARQUET_BINARY_AS_STRING.key,
-        sparkSession.sessionState.conf.isParquetBinaryAsString)
-      hadoopConf.setBoolean(
-        SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
-        sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
-      // for dataSource v1, we have no method to do project for spark physical plan.
-      // it's safe to do cols project here.
-      val internalSchemaString = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
-      val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
-      if (querySchemaOption.isPresent && !requiredSchema.isEmpty) {
-        val prunedSchema = SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema, querySchemaOption.get())
-        hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(prunedSchema))
+import java.net.URI
+
+
+/**
+ * This class is an extension of [[ParquetFileFormat]] overriding Spark-specific behavior
+ * that's not possible to customize in any other way
+ *
+ * NOTE: This is a version of [[AvroDeserializer]] impl from Spark 3.1.2 w/ w/ the following changes applied to it:
+ * <ol>
+ *   <li>Avoiding appending partition values to the rows read from the data file</li>
+ *   <li>Schema on-read</li>
+ * </ol>
+ */
+class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,

Review Comment:
   Github UI has hard time reflecting the changes properly:
   
   1. Had to remove top-level conditional (since this FileFormat is now used to control whether partition values will be appended)
   2. Did minor cleanup for things related to handling of `InternalSchema` to make sure those are not failing w/ NPEs
   3. Adding changes to handle `shouldAppendPartitionValues`
   
   NOTE: Copy both of those into IDEA scratchpad to be able to compare them side by side in a more meaningful way



##########
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala:
##########
@@ -27,233 +25,262 @@ import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.common.util.collection.Pair
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
 import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
-import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
+import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat.{pruneInternalSchema, rebuildFilterFromParquet}
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
-class Spark32HoodieParquetFileFormat extends ParquetFileFormat {
-
-  // reference ParquetFileFormat from spark project
-  override def buildReaderWithPartitionValues(
-                                               sparkSession: SparkSession,
-                                               dataSchema: StructType,
-                                               partitionSchema: StructType,
-                                               requiredSchema: StructType,
-                                               filters: Seq[Filter],
-                                               options: Map[String, String],
-                                               hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
-    if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, "").isEmpty) {
-      // fallback to origin parquet File read
-      super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
-    } else {
-      hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
-      hadoopConf.set(
-        ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
-        requiredSchema.json)
-      hadoopConf.set(
-        ParquetWriteSupport.SPARK_ROW_SCHEMA,
-        requiredSchema.json)
-      hadoopConf.set(
-        SQLConf.SESSION_LOCAL_TIMEZONE.key,
-        sparkSession.sessionState.conf.sessionLocalTimeZone)
-      hadoopConf.setBoolean(
-        SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
-        sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
-      hadoopConf.setBoolean(
-        SQLConf.CASE_SENSITIVE.key,
-        sparkSession.sessionState.conf.caseSensitiveAnalysis)
-
-      ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
-
-      // Sets flags for `ParquetToSparkSchemaConverter`
-      hadoopConf.setBoolean(
-        SQLConf.PARQUET_BINARY_AS_STRING.key,
-        sparkSession.sessionState.conf.isParquetBinaryAsString)
-      hadoopConf.setBoolean(
-        SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
-        sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
-      // for dataSource v1, we have no method to do project for spark physical plan.
-      // it's safe to do cols project here.
-      val internalSchemaString = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
-      val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
-      if (querySchemaOption.isPresent && !requiredSchema.isEmpty) {
-        val prunedSchema = SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema, querySchemaOption.get())
-        hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(prunedSchema))
+import java.net.URI
+
+/**
+ * This class is an extension of [[ParquetFileFormat]] overriding Spark-specific behavior
+ * that's not possible to customize in any other way
+ *
+ * NOTE: This is a version of [[AvroDeserializer]] impl from Spark 3.2.1 w/ w/ the following changes applied to it:
+ * <ol>
+ *   <li>Avoiding appending partition values to the rows read from the data file</li>
+ *   <li>Schema on-read</li>
+ * </ol>
+ */
+class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {

Review Comment:
   Same comments as for Spark 3.1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1103467255

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8145",
       "triggerID" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8146",
       "triggerID" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8150",
       "triggerID" : "f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e56dcad8f272d0450e2803790e2ca9dab1a8c4e7",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8157",
       "triggerID" : "e56dcad8f272d0450e2803790e2ca9dab1a8c4e7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9e166e9146345498e0aaee8eff844b9444f3a8d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f9e166e9146345498e0aaee8eff844b9444f3a8d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "64dd7b1a59d215289d97578dcd0943b4d9764dcd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "64dd7b1a59d215289d97578dcd0943b4d9764dcd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96 UNKNOWN
   * 27d58e93df6310a8f35f93e73bff5ae3f2378bdd UNKNOWN
   * 3dc0bd1b03d19253dd4034e72f365d6e63e0fce0 UNKNOWN
   * e56dcad8f272d0450e2803790e2ca9dab1a8c4e7 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8157) 
   * f9e166e9146345498e0aaee8eff844b9444f3a8d UNKNOWN
   * 64dd7b1a59d215289d97578dcd0943b4d9764dcd UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1103464381

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8145",
       "triggerID" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8146",
       "triggerID" : "c040d5c57e551bc7e0fbbad03236b70ab48b2cf8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "27d58e93df6310a8f35f93e73bff5ae3f2378bdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3dc0bd1b03d19253dd4034e72f365d6e63e0fce0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8150",
       "triggerID" : "f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e56dcad8f272d0450e2803790e2ca9dab1a8c4e7",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8157",
       "triggerID" : "e56dcad8f272d0450e2803790e2ca9dab1a8c4e7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9e166e9146345498e0aaee8eff844b9444f3a8d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f9e166e9146345498e0aaee8eff844b9444f3a8d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96 UNKNOWN
   * 27d58e93df6310a8f35f93e73bff5ae3f2378bdd UNKNOWN
   * 3dc0bd1b03d19253dd4034e72f365d6e63e0fce0 UNKNOWN
   * f4eaa8e08eb3759160f93e2bdb19ba7ac473a7df Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8150) 
   * e56dcad8f272d0450e2803790e2ca9dab1a8c4e7 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8157) 
   * f9e166e9146345498e0aaee8eff844b9444f3a8d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
alexeykudinkin commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1103465853

   @xiarixiaoyao addressed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
alexeykudinkin commented on code in PR #5364:
URL: https://github.com/apache/hudi/pull/5364#discussion_r853634821


##########
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala:
##########
@@ -289,6 +324,16 @@ class Spark32HoodieParquetFileFormat extends ParquetFileFormat {
 
 object Spark32HoodieParquetFileFormat {
 
+  def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {

Review Comment:
   Yeah, there's quite a bit duplication we can eliminate. We can take it up as a a follow-up for the sake of moving f/w w/ RC3 ASAP



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5364: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5364:
URL: https://github.com/apache/hudi/pull/5364#issuecomment-1102989147

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8145",
       "triggerID" : "ceb5addbf5d6d1100443ecf598f06b4323e1d6fe",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6fa8f1506ac4cfeb00e2296b4a7f9fe5338fde96 UNKNOWN
   * ceb5addbf5d6d1100443ecf598f06b4323e1d6fe Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8145) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org