You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/04/22 00:37:01 UTC

[hudi] 14/15: Hardening the code

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.11-0-apr21-5378-patched
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 12b9b783b9c74519044200592d897bf90efcf3ad
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Thu Apr 21 13:00:23 2022 -0700

    Hardening the code
---
 .../parquet/Spark32HoodieParquetFileFormat.scala    | 21 +++++++++++++++++----
 1 file changed, 17 insertions(+), 4 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
index 85f8918137..7135f19e95 100644
--- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
@@ -25,9 +25,9 @@ import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
 import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util.InternalSchemaCache
 import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.common.util.collection.Pair
-import org.apache.hudi.common.util.{InternalSchemaCache, ReflectionUtils}
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
 import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
@@ -220,13 +220,17 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
 
       // Clone new conf
       val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value)
-      var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
-      if (shouldUseInternalSchema) {
+      val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = if (shouldUseInternalSchema) {
         val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
         val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
-        typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
+
         hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
+
+        SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
+      } else {
+        new java.util.HashMap()
       }
+
       val hadoopAttemptContext =
         new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
 
@@ -395,6 +399,9 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
 
 object Spark32HoodieParquetFileFormat {
 
+  /**
+   * NOTE: This method is specific to Spark 3.2.0
+   */
   private def createParquetFilters(args: Any*): ParquetFilters = {
     // NOTE: ParquetFilters ctor args contain Scala enum, therefore we can't look it
     //       up by arg types, and have to instead rely on the number of args based on individual class;
@@ -404,6 +411,9 @@ object Spark32HoodieParquetFileFormat {
       .asInstanceOf[ParquetFilters]
   }
 
+  /**
+   * NOTE: This method is specific to Spark 3.2.0
+   */
   private def createParquetReadSupport(args: Any*): ParquetReadSupport = {
     // NOTE: ParquetReadSupport ctor args contain Scala enum, therefore we can't look it
     //       up by arg types, and have to instead rely on the number of args based on individual class;
@@ -413,6 +423,9 @@ object Spark32HoodieParquetFileFormat {
       .asInstanceOf[ParquetReadSupport]
   }
 
+  /**
+   * NOTE: This method is specific to Spark 3.2.0
+   */
   private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = {
     // NOTE: ParquetReadSupport ctor args contain Scala enum, therefore we can't look it
     //       up by arg types, and have to instead rely on the number of args based on individual class;