You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/22 06:41:58 UTC

[hudi] branch release-0.11.0 updated (6fccca6c04 -> 25501c99e9)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a change to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git


    from 6fccca6c04 [HUDI-3935] Adding config to fallback to enabled Partition Values extraction from Partition path (#5377)
     new f8ce1c53ec [MINOR] Increase azure CI timeout to 120m (#5384)
     new 304c5bf20e [HUDI-3940] Fix retry count increment in lock manager (#5387)
     new 0fb2d5bae1 [HUDI-3921] Fixed schema evolution cannot work with HUDI-3855 (#5376)
     new 5bd7a677b2 [HUDI-3936] Fix projection for a nested field as pre-combined key (#5379)
     new 25501c99e9 [HUDI-3934] Fix `Spark32HoodieParquetFileFormat` not being compatible w/ Spark 3.2.0 (#5378)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 azure-pipelines.yml                                |  10 +-
 .../hudi/client/transaction/lock/LockManager.java  |   3 +-
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |  12 +-
 .../table/action/commit/HoodieMergeHelper.java     |  14 +-
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   |   8 +-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 102 ++++++++++---
 .../table/log/AbstractHoodieLogRecordReader.java   |   3 +-
 .../schema/action/InternalSchemaMerger.java        |  26 +++-
 .../internal/schema/utils/InternalSchemaUtils.java |  16 +++
 .../org/apache/hudi/avro/TestHoodieAvroUtils.java  |   7 +
 .../schema/utils/TestAvroSchemaEvolutionUtils.java |   4 +-
 .../org/apache/hudi/BaseFileOnlyRelation.scala     |   6 +-
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |  19 ++-
 .../hudi/MergeOnReadIncrementalRelation.scala      |   2 +-
 .../apache/hudi/MergeOnReadSnapshotRelation.scala  |   2 +-
 .../hudi/functional/TestMORDataSourceStorage.scala |  28 ++--
 .../functional/TestParquetColumnProjection.scala   |   4 +-
 .../org/apache/spark/sql/hudi/TestSpark3DDL.scala  |  31 ++--
 .../apache/spark/sql/adapter/Spark3_1Adapter.scala |   4 +-
 ....scala => Spark31HoodieParquetFileFormat.scala} |  31 ++--
 .../parquet/Spark32DataSourceUtils.scala           |  77 ++++++++++
 .../parquet/Spark32HoodieParquetFileFormat.scala   | 157 +++++++++++++++++----
 22 files changed, 439 insertions(+), 127 deletions(-)
 rename hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/{Spark312HoodieParquetFileFormat.scala => Spark31HoodieParquetFileFormat.scala} (95%)
 create mode 100644 hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32DataSourceUtils.scala


[hudi] 01/05: [MINOR] Increase azure CI timeout to 120m (#5384)

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit f8ce1c53eca05d319dccd0111173567e1ceaa921
Author: Raymond Xu <27...@users.noreply.github.com>
AuthorDate: Thu Apr 21 04:35:44 2022 -0700

    [MINOR] Increase azure CI timeout to 120m (#5384)
---
 azure-pipelines.yml | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/azure-pipelines.yml b/azure-pipelines.yml
index 8ca54c1ab3..6c01321004 100644
--- a/azure-pipelines.yml
+++ b/azure-pipelines.yml
@@ -33,7 +33,7 @@ stages:
     jobs:
       - job: UT_FT_1
         displayName: UT FT common & flink & UT client/spark-client
-        timeoutInMinutes: '90'
+        timeoutInMinutes: '120'
         steps:
           - task: Maven@3
             displayName: maven install
@@ -64,7 +64,7 @@ stages:
               mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
       - job: UT_FT_2
         displayName: FT client/spark-client
-        timeoutInMinutes: '90'
+        timeoutInMinutes: '120'
         steps:
           - task: Maven@3
             displayName: maven install
@@ -86,7 +86,7 @@ stages:
               mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
       - job: UT_FT_3
         displayName: UT FT clients & cli & utilities & sync/hive-sync
-        timeoutInMinutes: '90'
+        timeoutInMinutes: '120'
         steps:
           - task: Maven@3
             displayName: maven install
@@ -117,7 +117,7 @@ stages:
               mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
       - job: UT_FT_4
         displayName: UT FT other modules
-        timeoutInMinutes: '90'
+        timeoutInMinutes: '120'
         steps:
           - task: Maven@3
             displayName: maven install
@@ -148,7 +148,7 @@ stages:
               mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
       - job: IT
         displayName: IT modules
-        timeoutInMinutes: '90'
+        timeoutInMinutes: '120'
         steps:
           - task: AzureCLI@2
             displayName: Prepare for IT


[hudi] 02/05: [HUDI-3940] Fix retry count increment in lock manager (#5387)

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 304c5bf20ee6c8c45cb3d03213b52d6ea85530c4
Author: Sagar Sumit <sa...@gmail.com>
AuthorDate: Fri Apr 22 02:22:05 2022 +0530

    [HUDI-3940] Fix retry count increment in lock manager (#5387)
---
 .../main/java/org/apache/hudi/client/transaction/lock/LockManager.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
index 913736cad8..ca15c4fdc2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
@@ -70,11 +70,12 @@ public class LockManager implements Serializable, AutoCloseable {
           }
           LOG.info("Retrying to acquire lock...");
           Thread.sleep(maxWaitTimeInMs);
-          retryCount++;
         } catch (HoodieLockException | InterruptedException e) {
           if (retryCount >= maxRetries) {
             throw new HoodieLockException("Unable to acquire lock, lock object ", e);
           }
+        } finally {
+          retryCount++;
         }
       }
       if (!acquired) {


[hudi] 05/05: [HUDI-3934] Fix `Spark32HoodieParquetFileFormat` not being compatible w/ Spark 3.2.0 (#5378)

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 25501c99e9e353bb8cf3757404cc0cd1835e03b3
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Thu Apr 21 18:00:38 2022 -0700

    [HUDI-3934] Fix `Spark32HoodieParquetFileFormat` not being compatible w/ Spark 3.2.0 (#5378)
    
    - Due to the fact that Spark 3.2.1 is non-BWC w/ 3.2.0, we have to handle all these incompatibilities in Spark32HoodieParquetFileFormat. This PR is addressing that.
    
    Co-authored-by: Raymond Xu <27...@users.noreply.github.com>
---
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   |   8 +-
 .../apache/spark/sql/adapter/Spark3_1Adapter.scala |   4 +-
 ....scala => Spark31HoodieParquetFileFormat.scala} |  31 ++--
 .../parquet/Spark32DataSourceUtils.scala           |  77 ++++++++++
 .../parquet/Spark32HoodieParquetFileFormat.scala   | 157 +++++++++++++++++----
 5 files changed, 229 insertions(+), 48 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 54bc06bd76..7a8f8a1580 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -53,13 +53,15 @@ object HoodieSparkUtils extends SparkAdapterSupport {
 
   def isSpark3_1: Boolean = SPARK_VERSION.startsWith("3.1")
 
+  def gteqSpark3_1: Boolean = SPARK_VERSION > "3.1"
+
+  def gteqSpark3_1_3: Boolean = SPARK_VERSION >= "3.1.3"
+
   def isSpark3_2: Boolean = SPARK_VERSION.startsWith("3.2")
 
   def gteqSpark3_2: Boolean = SPARK_VERSION > "3.2"
 
-  def gteqSpark3_1: Boolean = SPARK_VERSION > "3.1"
-
-  def gteqSpark3_1_3: Boolean = SPARK_VERSION >= "3.1.3"
+  def gteqSpark3_2_1: Boolean = SPARK_VERSION >= "3.2.1"
 
   def getMetaSchema: StructType = {
     StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
index cd5cd9c82f..22431cb257 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
@@ -23,7 +23,7 @@ import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer, HoodieSpark3_1AvroSerializer}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark312HoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark31HoodieParquetFileFormat}
 import org.apache.spark.sql.hudi.SparkAdapter
 import org.apache.spark.sql.types.DataType
 import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark3_1CatalystExpressionUtils, SparkSession}
@@ -55,6 +55,6 @@ class Spark3_1Adapter extends BaseSpark3Adapter {
   }
 
   override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
-    Some(new Spark312HoodieParquetFileFormat(appendPartitionValues))
+    Some(new Spark31HoodieParquetFileFormat(appendPartitionValues))
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
similarity index 95%
rename from hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala
rename to hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
index 769373866f..e99850bef0 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
-import org.apache.hudi.common.util.{InternalSchemaCache, StringUtils}
+import org.apache.hudi.common.util.{InternalSchemaCache, ReflectionUtils, StringUtils}
 import org.apache.hudi.common.util.collection.Pair
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat.{createParquetFilters, pruneInternalSchema, rebuildFilterFromParquet}
+import org.apache.spark.sql.execution.datasources.parquet.Spark31HoodieParquetFileFormat.{createParquetFilters, pruneInternalSchema, rebuildFilterFromParquet}
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
@@ -61,7 +61,7 @@ import java.net.URI
  *   <li>Schema on-read</li>
  * </ol>
  */
-class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
+class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
 
   override def buildReaderWithPartitionValues(sparkSession: SparkSession,
                                               dataSchema: StructType,
@@ -154,8 +154,8 @@ class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: B
       val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent
 
       val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
-      val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
       val fileSchema = if (shouldUseInternalSchema) {
+        val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
         val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
         InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
       } else {
@@ -223,13 +223,17 @@ class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: B
 
       // Clone new conf
       val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value)
-      var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
-      if (shouldUseInternalSchema) {
+      var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = if (shouldUseInternalSchema) {
         val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
         val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
-        typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
+
         hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
+
+        SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
+      } else {
+        new java.util.HashMap()
       }
+
       val hadoopAttemptContext =
         new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
 
@@ -329,9 +333,7 @@ class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: B
   }
 }
 
-object Spark312HoodieParquetFileFormat {
-
-  val PARQUET_FILTERS_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetFilters"
+object Spark31HoodieParquetFileFormat {
 
   def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {
     val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
@@ -343,10 +345,11 @@ object Spark312HoodieParquetFileFormat {
     }
   }
 
-  private def createParquetFilters(arg: Any*): ParquetFilters = {
-    val clazz = Class.forName(PARQUET_FILTERS_CLASS_NAME, true, Thread.currentThread().getContextClassLoader)
-    val ctor = clazz.getConstructors.head
-    ctor.newInstance(arg.map(_.asInstanceOf[AnyRef]): _*).asInstanceOf[ParquetFilters]
+  private def createParquetFilters(args: Any*): ParquetFilters = {
+    // ParquetFilters bears a single ctor (in Spark 3.1)
+    val ctor = classOf[ParquetFilters].getConstructors.head
+    ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*)
+      .asInstanceOf[ParquetFilters]
   }
 
   private def rebuildFilterFromParquet(oldFilter: Filter, fileSchema: InternalSchema, querySchema: InternalSchema): Filter = {
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32DataSourceUtils.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32DataSourceUtils.scala
new file mode 100644
index 0000000000..6d1c76380f
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32DataSourceUtils.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
+import org.apache.spark.util.Utils
+
+object Spark32DataSourceUtils {
+
+  /**
+   * NOTE: This method was copied from Spark 3.2.0, and is required to maintain runtime
+   * compatibility against Spark 3.2.0
+   */
+  // scalastyle:off
+  def int96RebaseMode(lookupFileMeta: String => String,
+                      modeByConfig: String): LegacyBehaviorPolicy.Value = {
+    if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
+      return LegacyBehaviorPolicy.CORRECTED
+    }
+    // If there is no version, we return the mode specified by the config.
+    Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
+      // Files written by Spark 3.0 and earlier follow the legacy hybrid calendar and we need to
+      // rebase the INT96 timestamp values.
+      // Files written by Spark 3.1 and latter may also need the rebase if they were written with
+      // the "LEGACY" rebase mode.
+      if (version < "3.1.0" || lookupFileMeta("org.apache.spark.legacyINT96") != null) {
+        LegacyBehaviorPolicy.LEGACY
+      } else {
+        LegacyBehaviorPolicy.CORRECTED
+      }
+    }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
+  }
+  // scalastyle:on
+
+  /**
+   * NOTE: This method was copied from Spark 3.2.0, and is required to maintain runtime
+   * compatibility against Spark 3.2.0
+   */
+  // scalastyle:off
+  def datetimeRebaseMode(lookupFileMeta: String => String,
+                         modeByConfig: String): LegacyBehaviorPolicy.Value = {
+    if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
+      return LegacyBehaviorPolicy.CORRECTED
+    }
+    // If there is no version, we return the mode specified by the config.
+    Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
+      // Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to
+      // rebase the datetime values.
+      // Files written by Spark 3.0 and latter may also need the rebase if they were written with
+      // the "LEGACY" rebase mode.
+      if (version < "3.0.0" || lookupFileMeta("org.apache.spark.legacyDateTime") != null) {
+        LegacyBehaviorPolicy.LEGACY
+      } else {
+        LegacyBehaviorPolicy.CORRECTED
+      }
+    }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
+  }
+  // scalastyle:on
+
+}
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
index f2a0a21df8..7135f19e95 100644
--- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.FileSplit
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.util.InternalSchemaCache
@@ -37,10 +38,10 @@ import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat.{pruneInternalSchema, rebuildFilterFromParquet}
+import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat._
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
@@ -148,8 +149,8 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
       val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent
 
       val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
-      val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
       val fileSchema = if (shouldUseInternalSchema) {
+        val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
         val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
         InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
       } else {
@@ -158,21 +159,38 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
 
       lazy val footerFileMetaData =
         ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
-      val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
-        footerFileMetaData.getKeyValueMetaData.get,
-        datetimeRebaseModeInRead)
       // Try to push down filters when filter push-down is enabled.
       val pushed = if (enableParquetFilterPushDown) {
         val parquetSchema = footerFileMetaData.getSchema
-        val parquetFilters = new ParquetFilters(
-          parquetSchema,
-          pushDownDate,
-          pushDownTimestamp,
-          pushDownDecimal,
-          pushDownStringStartWith,
-          pushDownInFilterThreshold,
-          isCaseSensitive,
-          datetimeRebaseSpec)
+        val parquetFilters = if (HoodieSparkUtils.gteqSpark3_2_1) {
+          // NOTE: Below code could only be compiled against >= Spark 3.2.1,
+          //       and unfortunately won't compile against Spark 3.2.0
+          //       However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1
+          val datetimeRebaseSpec =
+            DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+          new ParquetFilters(
+            parquetSchema,
+            pushDownDate,
+            pushDownTimestamp,
+            pushDownDecimal,
+            pushDownStringStartWith,
+            pushDownInFilterThreshold,
+            isCaseSensitive,
+            datetimeRebaseSpec)
+        } else {
+          // Spark 3.2.0
+          val datetimeRebaseMode =
+            Spark32DataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+          createParquetFilters(
+            parquetSchema,
+            pushDownDate,
+            pushDownTimestamp,
+            pushDownDecimal,
+            pushDownStringStartWith,
+            pushDownInFilterThreshold,
+            isCaseSensitive,
+            datetimeRebaseMode)
+        }
         filters.map(rebuildFilterFromParquet(_, fileSchema, querySchemaOption.orElse(null)))
           // Collects all converted Parquet filter predicates. Notice that not all predicates can be
           // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
@@ -198,21 +216,21 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
           None
         }
 
-      val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
-        footerFileMetaData.getKeyValueMetaData.get,
-        int96RebaseModeInRead)
-
       val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
 
       // Clone new conf
       val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value)
-      var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
-      if (shouldUseInternalSchema) {
+      val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = if (shouldUseInternalSchema) {
         val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
         val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
-        typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
+
         hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
+
+        SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
+      } else {
+        new java.util.HashMap()
       }
+
       val hadoopAttemptContext =
         new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
 
@@ -225,6 +243,10 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
       if (enableVectorizedReader) {
         val vectorizedReader =
           if (shouldUseInternalSchema) {
+            val int96RebaseSpec =
+              DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+            val datetimeRebaseSpec =
+              DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
             new Spark32HoodieVectorizedParquetRecordReader(
               convertTz.orNull,
               datetimeRebaseSpec.mode.toString,
@@ -234,7 +256,14 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
               enableOffHeapColumnVector && taskContext.isDefined,
               capacity,
               typeChangeInfos)
-          } else {
+          } else if (HoodieSparkUtils.gteqSpark3_2_1) {
+            // NOTE: Below code could only be compiled against >= Spark 3.2.1,
+            //       and unfortunately won't compile against Spark 3.2.0
+            //       However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1
+            val int96RebaseSpec =
+              DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+            val datetimeRebaseSpec =
+              DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
             new VectorizedParquetRecordReader(
               convertTz.orNull,
               datetimeRebaseSpec.mode.toString,
@@ -243,7 +272,20 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
               int96RebaseSpec.timeZone,
               enableOffHeapColumnVector && taskContext.isDefined,
               capacity)
+          } else {
+            // Spark 3.2.0
+            val datetimeRebaseMode =
+              Spark32DataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+            val int96RebaseMode =
+              Spark32DataSourceUtils.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+            createVectorizedParquetRecordReader(
+              convertTz.orNull,
+              datetimeRebaseMode.toString,
+              int96RebaseMode.toString,
+              enableOffHeapColumnVector && taskContext.isDefined,
+              capacity)
           }
+
         // SPARK-37089: We cannot register a task completion listener to close this iterator here
         // because downstream exec nodes have already registered their listeners. Since listeners
         // are executed in reverse order of registration, a listener registered here would close the
@@ -279,12 +321,32 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
         }
       } else {
         logDebug(s"Falling back to parquet-mr")
-        // ParquetRecordReader returns InternalRow
-        val readSupport = new ParquetReadSupport(
-          convertTz,
-          enableVectorizedReader = false,
-          datetimeRebaseSpec,
-          int96RebaseSpec)
+        val readSupport = if (HoodieSparkUtils.gteqSpark3_2_1) {
+          // ParquetRecordReader returns InternalRow
+          // NOTE: Below code could only be compiled against >= Spark 3.2.1,
+          //       and unfortunately won't compile against Spark 3.2.0
+          //       However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1
+          val int96RebaseSpec =
+            DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+          val datetimeRebaseSpec =
+            DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+          new ParquetReadSupport(
+            convertTz,
+            enableVectorizedReader = false,
+            datetimeRebaseSpec,
+            int96RebaseSpec)
+        } else {
+          val datetimeRebaseMode =
+            Spark32DataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+          val int96RebaseMode =
+            Spark32DataSourceUtils.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+          createParquetReadSupport(
+            convertTz,
+            /* enableVectorizedReader = */ false,
+            datetimeRebaseMode,
+            int96RebaseMode)
+        }
+
         val reader = if (pushed.isDefined && enableRecordFilter) {
           val parquetFilter = FilterCompat.get(pushed.get, null)
           new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
@@ -332,10 +394,47 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
       }
     }
   }
+
 }
 
 object Spark32HoodieParquetFileFormat {
 
+  /**
+   * NOTE: This method is specific to Spark 3.2.0
+   */
+  private def createParquetFilters(args: Any*): ParquetFilters = {
+    // NOTE: ParquetFilters ctor args contain Scala enum, therefore we can't look it
+    //       up by arg types, and have to instead rely on the number of args based on individual class;
+    //       the ctor order is not guaranteed
+    val ctor = classOf[ParquetFilters].getConstructors.maxBy(_.getParameterCount)
+    ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*)
+      .asInstanceOf[ParquetFilters]
+  }
+
+  /**
+   * NOTE: This method is specific to Spark 3.2.0
+   */
+  private def createParquetReadSupport(args: Any*): ParquetReadSupport = {
+    // NOTE: ParquetReadSupport ctor args contain Scala enum, therefore we can't look it
+    //       up by arg types, and have to instead rely on the number of args based on individual class;
+    //       the ctor order is not guaranteed
+    val ctor = classOf[ParquetReadSupport].getConstructors.maxBy(_.getParameterCount)
+    ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*)
+      .asInstanceOf[ParquetReadSupport]
+  }
+
+  /**
+   * NOTE: This method is specific to Spark 3.2.0
+   */
+  private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = {
+    // NOTE: ParquetReadSupport ctor args contain Scala enum, therefore we can't look it
+    //       up by arg types, and have to instead rely on the number of args based on individual class;
+    //       the ctor order is not guaranteed
+    val ctor = classOf[VectorizedParquetRecordReader].getConstructors.maxBy(_.getParameterCount)
+    ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*)
+      .asInstanceOf[VectorizedParquetRecordReader]
+  }
+
   def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {
     val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
     if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {


[hudi] 04/05: [HUDI-3936] Fix projection for a nested field as pre-combined key (#5379)

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 5bd7a677b260f6252979a6f9d2245fc34e6e6b19
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Thu Apr 21 17:17:57 2022 -0700

    [HUDI-3936] Fix projection for a nested field as pre-combined key (#5379)
    
    This PR fixes the projection logic around a nested field which is used as the pre-combined key field. The fix is to only check and append the root level field for projection, i.e., "a", for a nested field "a.b.c" in the mandatory columns.
    
    - Changes the logic to check and append the root level field for a required nested field in the mandatory columns in HoodieBaseRelation.appendMandatoryColumns
---
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 32 +++++++++++++++-------
 .../org/apache/hudi/avro/TestHoodieAvroUtils.java  |  7 +++++
 .../org/apache/hudi/BaseFileOnlyRelation.scala     |  6 ++--
 .../scala/org/apache/hudi/HoodieBaseRelation.scala | 19 ++++++++-----
 .../hudi/MergeOnReadIncrementalRelation.scala      |  2 +-
 .../apache/hudi/MergeOnReadSnapshotRelation.scala  |  2 +-
 .../hudi/functional/TestMORDataSourceStorage.scala | 28 ++++++++++++-------
 .../functional/TestParquetColumnProjection.scala   |  4 +--
 8 files changed, 66 insertions(+), 34 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 163a1b3fa5..f69d5683d1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -18,6 +18,17 @@
 
 package org.apache.hudi.avro;
 
+import org.apache.hudi.common.config.SerializableSchema;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.SchemaCompatibilityException;
+
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Conversions;
 import org.apache.avro.Conversions.DecimalConversion;
@@ -42,16 +53,6 @@ import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.io.JsonDecoder;
 import org.apache.avro.io.JsonEncoder;
 import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.hudi.common.config.SerializableSchema;
-import org.apache.hudi.common.model.HoodieOperation;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.SchemaCompatibilityException;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -480,6 +481,17 @@ public class HoodieAvroUtils {
     return projectedSchema;
   }
 
+  /**
+   * Obtain the root-level field name of a full field name, possibly a nested field.
+   * For example, given "a.b.c", the output is "a"; given "a", the output is "a".
+   *
+   * @param fieldName The field name.
+   * @return Root-level field name
+   */
+  public static String getRootLevelFieldName(String fieldName) {
+    return fieldName.split("\\.")[0];
+  }
+
   /**
    * Obtain value of the provided field as string, denoted by dot notation. e.g: a.b.c
    */
diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index 246d74411d..bd0254da3d 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -257,6 +257,13 @@ public class TestHoodieAvroUtils {
     assertEquals(expectedSchema, rec1.getSchema());
   }
 
+  @Test
+  public void testGetRootLevelFieldName() {
+    assertEquals("a", HoodieAvroUtils.getRootLevelFieldName("a.b.c"));
+    assertEquals("a", HoodieAvroUtils.getRootLevelFieldName("a"));
+    assertEquals("", HoodieAvroUtils.getRootLevelFieldName(""));
+  }
+
   @Test
   public void testGetNestedFieldVal() {
     GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
index 3c667d2b42..c57f46a7b6 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
@@ -26,7 +26,7 @@ import org.apache.hudi.hadoop.HoodieROTablePathFilter
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, ParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
 import org.apache.spark.sql.hive.orc.OrcFileFormat
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.StructType
@@ -54,8 +54,8 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
 
   override type FileSplit = HoodieBaseFileSplit
 
-  override lazy val mandatoryColumns: Seq[String] =
-    // TODO reconcile, record's key shouldn't be mandatory for base-file only relation
+  override lazy val mandatoryFields: Seq[String] =
+  // TODO reconcile, record's key shouldn't be mandatory for base-file only relation
     Seq(recordKeyField)
 
   override def imbueConfigs(sqlContext: SQLContext): Unit = {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index f776d08ec9..4b7177f4d6 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath}
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
@@ -39,10 +40,8 @@ import org.apache.hudi.io.storage.HoodieHFileReader
 import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.avro.HoodieAvroSchemaConverters
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
-import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
 import org.apache.spark.sql.execution.FileRelation
 import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile, PartitioningUtils}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
@@ -199,7 +198,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
    *
    * @VisibleInTests
    */
-  val mandatoryColumns: Seq[String]
+  val mandatoryFields: Seq[String]
+
+  protected def mandatoryRootFields: Seq[String] =
+    mandatoryFields.map(col => HoodieAvroUtils.getRootLevelFieldName(col))
 
   protected def timeline: HoodieTimeline =
   // NOTE: We're including compaction here since it's not considering a "commit" operation
@@ -246,7 +248,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
     //
     // (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS THIS WILL BREAK THE UPSTREAM
     //       PROJECTION
-    val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
+    val fetchedColumns: Array[String] = appendMandatoryRootFields(requiredColumns)
 
     val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
       HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns, internalSchema)
@@ -362,8 +364,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
       !SubqueryExpression.hasSubquery(condition)
   }
 
-  protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = {
-    val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col))
+  protected final def appendMandatoryRootFields(requestedColumns: Array[String]): Array[String] = {
+    // For a nested field in mandatory columns, we should first get the root-level field, and then
+    // check for any missing column, as the requestedColumns should only contain root-level fields
+    // We should only append root-level field as well
+    val missing = mandatoryRootFields.filter(rootField => !requestedColumns.contains(rootField))
     requestedColumns ++ missing
   }
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 6aa7007851..806a5e371d 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -153,7 +153,7 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation {
     Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
   }
 
-  override lazy val mandatoryColumns: Seq[String] = {
+  override lazy val mandatoryFields: Seq[String] = {
     // NOTE: This columns are required for Incremental flow to be able to handle the rows properly, even in
     //       cases when no columns are requested to be fetched (for ex, when using {@code count()} API)
     Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index a88eb63036..75bc96624e 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -47,7 +47,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
 
   override type FileSplit = HoodieMergeOnReadFileSplit
 
-  override lazy val mandatoryColumns: Seq[String] =
+  override lazy val mandatoryFields: Seq[String] =
     Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
 
   protected val mergeType: String = optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
index 18b639f2f9..8cf6b4174c 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
@@ -32,7 +33,7 @@ import org.apache.spark.sql.functions.{col, lit}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.Tag
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.CsvSource
 
 import scala.collection.JavaConversions._
 
@@ -57,19 +58,28 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness {
   val updatedVerificationVal: String = "driver_update"
 
   @ParameterizedTest
-  @ValueSource(booleans = Array(true, false))
-  def testMergeOnReadStorage(isMetadataEnabled: Boolean) {
-    val dataGen = new HoodieTestDataGenerator()
+  @CsvSource(Array(
+    "true,",
+    "true,fare.currency",
+    "false,",
+    "false,fare.currency"
+  ))
+  def testMergeOnReadStorage(isMetadataEnabled: Boolean, preComineField: String) {
+    var options: Map[String, String] = commonOpts +
+      (HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled))
+    if (!StringUtils.isNullOrEmpty(preComineField)) {
+      options += (DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> preComineField)
+    }
+    val dataGen = new HoodieTestDataGenerator(0xDEEF)
     val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
     // Bulk Insert Operation
     val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
     val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
     inputDF1.write.format("org.apache.hudi")
-      .options(commonOpts)
+      .options(options)
       .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
-      .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
       .mode(SaveMode.Overwrite)
       .save(basePath)
 
@@ -90,8 +100,7 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness {
     val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList
     val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
     inputDF2.write.format("org.apache.hudi")
-      .options(commonOpts)
-      .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
+      .options(options)
       .mode(SaveMode.Append)
       .save(basePath)
 
@@ -110,8 +119,7 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness {
     val inputDF3 = hudiSnapshotDF2.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
 
     inputDF3.write.format("org.apache.hudi")
-      .options(commonOpts)
-      .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
+      .options(options)
       .mode(SaveMode.Append)
       .save(basePath)
 
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index f670450c3e..945d26be3f 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -19,7 +19,7 @@ package org.apache.hudi.functional
 
 import org.apache.avro.Schema
 import org.apache.hudi.common.config.HoodieMetadataConfig
-import org.apache.hudi.common.model.{HoodieRecord, OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload}
+import org.apache.hudi.common.model.{HoodieRecord, OverwriteNonDefaultsWithLatestAvroPayload}
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.testutils.{HadoopMapRedUtils, HoodieTestDataGenerator}
 import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
@@ -332,7 +332,7 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
         logWarning(s"Not matching bytes read ($bytesRead)")
       }
 
-      val readColumns = targetColumns ++ relation.mandatoryColumns
+      val readColumns = targetColumns ++ relation.mandatoryFields
       val (_, projectedStructType, _) = HoodieSparkUtils.getRequiredSchema(tableState.schema, readColumns)
 
       val row: InternalRow = rows.take(1).head


[hudi] 03/05: [HUDI-3921] Fixed schema evolution cannot work with HUDI-3855 (#5376)

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 0fb2d5bae12e6d6e10ba929e47c361997829b708
Author: xiarixiaoyao <me...@qq.com>
AuthorDate: Fri Apr 22 06:27:54 2022 +0800

    [HUDI-3921] Fixed schema evolution cannot work with HUDI-3855 (#5376)
    
    - when columns names are renamed (schema evolution enabled), while copying records from old data file with HoodieMergeHande, renamed columns wasn't handled well.
---
 .../java/org/apache/hudi/io/HoodieWriteHandle.java | 12 +++-
 .../table/action/commit/HoodieMergeHelper.java     | 14 ++++-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 70 +++++++++++++++++++---
 .../table/log/AbstractHoodieLogRecordReader.java   |  3 +-
 .../schema/action/InternalSchemaMerger.java        | 26 +++++++-
 .../internal/schema/utils/InternalSchemaUtils.java | 16 +++++
 .../schema/utils/TestAvroSchemaEvolutionUtils.java |  4 +-
 .../org/apache/spark/sql/hudi/TestSpark3DDL.scala  | 31 ++++------
 8 files changed, 137 insertions(+), 39 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 89babc7725..5d5760961a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -46,6 +46,9 @@ import org.apache.log4j.Logger;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.HashMap;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
 
 /**
  * Base class for all write operations logically performed at the file group level.
@@ -98,6 +101,8 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
   protected final String fileId;
   protected final String writeToken;
   protected final TaskContextSupplier taskContextSupplier;
+  // For full schema evolution
+  protected final boolean schemaOnReadEnabled;
 
   public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath,
                            String fileId, HoodieTable<T, I, K, O> hoodieTable, TaskContextSupplier taskContextSupplier) {
@@ -120,6 +125,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
         !hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction());
     this.taskContextSupplier = taskContextSupplier;
     this.writeToken = makeWriteToken();
+    schemaOnReadEnabled = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
   }
 
   /**
@@ -224,11 +230,13 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
    * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields.
    */
   protected GenericRecord rewriteRecord(GenericRecord record) {
-    return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
+    return schemaOnReadEnabled ? HoodieAvroUtils.rewriteRecordWithNewSchema(record, writeSchemaWithMetaFields, new HashMap<>())
+        : HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
   }
 
   protected GenericRecord rewriteRecordWithMetadata(GenericRecord record, String fileName) {
-    return HoodieAvroUtils.rewriteRecordWithMetadata(record, writeSchemaWithMetaFields, fileName);
+    return schemaOnReadEnabled ? HoodieAvroUtils.rewriteEvolutionRecordWithMetadata(record, writeSchemaWithMetaFields, fileName)
+        : HoodieAvroUtils.rewriteRecordWithMetadata(record, writeSchemaWithMetaFields, fileName);
   }
 
   public abstract List<WriteStatus> close();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index 578cdf0bc7..04dd29c63c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -36,6 +36,7 @@ import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
 import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
 import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -52,6 +53,8 @@ import org.apache.hadoop.conf.Configuration;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 public class HoodieMergeHelper<T extends HoodieRecordPayload> extends
@@ -93,6 +96,7 @@ public class HoodieMergeHelper<T extends HoodieRecordPayload> extends
 
     Option<InternalSchema> querySchemaOpt = SerDeHelper.fromJson(table.getConfig().getInternalSchema());
     boolean needToReWriteRecord = false;
+    Map<String, String> renameCols = new HashMap<>();
     // TODO support bootstrap
     if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) {
       // check implicitly add columns, and position reorder(spark sql may change cols order)
@@ -109,10 +113,14 @@ public class HoodieMergeHelper<T extends HoodieRecordPayload> extends
                       && writeInternalSchema.findIdByName(f) == querySchema.findIdByName(f)
                       && writeInternalSchema.findIdByName(f) != -1
                       && writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
-      readSchema = AvroInternalSchemaConverter.convert(new InternalSchemaMerger(writeInternalSchema, querySchema, true, false).mergeSchema(), readSchema.getName());
+      readSchema = AvroInternalSchemaConverter
+          .convert(new InternalSchemaMerger(writeInternalSchema, querySchema, true, false, false).mergeSchema(), readSchema.getName());
       Schema writeSchemaFromFile = AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
       needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
-              || SchemaCompatibility.checkReaderWriterCompatibility(writeSchemaFromFile, readSchema).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
+              || SchemaCompatibility.checkReaderWriterCompatibility(readSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
+      if (needToReWriteRecord) {
+        renameCols = InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
+      }
     }
 
     try {
@@ -121,7 +129,7 @@ public class HoodieMergeHelper<T extends HoodieRecordPayload> extends
         readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
       } else {
         if (needToReWriteRecord) {
-          readerIterator = HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), readSchema);
+          readerIterator = HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), readSchema, renameCols);
         } else {
           readerIterator = reader.getRecordIterator(readSchema);
         }
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index bf540a302e..163a1b3fa5 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -70,6 +70,8 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Deque;
+import java.util.LinkedList;
 import java.util.TimeZone;
 import java.util.stream.Collectors;
 
@@ -405,6 +407,14 @@ public class HoodieAvroUtils {
     return newRecord;
   }
 
+  // TODO Unify the logical of rewriteRecordWithMetadata and rewriteEvolutionRecordWithMetadata, and delete this function.
+  public static GenericRecord rewriteEvolutionRecordWithMetadata(GenericRecord genericRecord, Schema newSchema, String fileName) {
+    GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(genericRecord, newSchema, new HashMap<>());
+    // do not preserve FILENAME_METADATA_FIELD
+    newRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, fileName);
+    return newRecord;
+  }
+
   /**
    * Converts list of {@link GenericRecord} provided into the {@link GenericRecord} adhering to the
    * provided {@code newSchema}.
@@ -719,14 +729,28 @@ public class HoodieAvroUtils {
    *
    * @param oldRecord oldRecord to be rewritten
    * @param newSchema newSchema used to rewrite oldRecord
+   * @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema)
    * @return newRecord for new Schema
    */
-  public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, Schema newSchema) {
-    Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema);
+  public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, Schema newSchema, Map<String, String> renameCols) {
+    Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema, renameCols, new LinkedList<>());
     return (GenericData.Record) newRecord;
   }
 
-  private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema) {
+  /**
+   * Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the new schema.
+   * support deep rewrite for nested record and adjust rename operation.
+   * This particular method does the following things :
+   * a) Create a new empty GenericRecord with the new schema.
+   * b) For GenericRecord, copy over the data from the old schema to the new schema or set default values for all fields of this transformed schema
+   *
+   * @param oldRecord oldRecord to be rewritten
+   * @param newSchema newSchema used to rewrite oldRecord
+   * @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema)
+   * @param fieldNames track the full name of visited field when we travel new schema.
+   * @return newRecord for new Schema
+   */
+  private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) {
     if (oldRecord == null) {
       return null;
     }
@@ -741,10 +765,23 @@ public class HoodieAvroUtils {
 
         for (int i = 0; i < fields.size(); i++) {
           Schema.Field field = fields.get(i);
+          String fieldName = field.name();
+          fieldNames.push(fieldName);
           if (oldSchema.getField(field.name()) != null) {
             Schema.Field oldField = oldSchema.getField(field.name());
-            helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema()));
+            helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
+          } else {
+            String fieldFullName = createFullName(fieldNames);
+            String[] colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\.");
+            String lastColNameFromOldSchema = colNamePartsFromOldSchema[colNamePartsFromOldSchema.length - 1];
+            // deal with rename
+            if (oldSchema.getField(field.name()) == null && oldSchema.getField(lastColNameFromOldSchema) != null) {
+              // find rename
+              Schema.Field oldField = oldSchema.getField(lastColNameFromOldSchema);
+              helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
+            }
           }
+          fieldNames.pop();
         }
         GenericData.Record newRecord = new GenericData.Record(newSchema);
         for (int i = 0; i < fields.size(); i++) {
@@ -765,9 +802,11 @@ public class HoodieAvroUtils {
         }
         Collection array = (Collection)oldRecord;
         List<Object> newArray = new ArrayList();
+        fieldNames.push("element");
         for (Object element : array) {
-          newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType()));
+          newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames));
         }
+        fieldNames.pop();
         return newArray;
       case MAP:
         if (!(oldRecord instanceof Map)) {
@@ -775,17 +814,29 @@ public class HoodieAvroUtils {
         }
         Map<Object, Object> map = (Map<Object, Object>) oldRecord;
         Map<Object, Object> newMap = new HashMap<>();
+        fieldNames.push("value");
         for (Map.Entry<Object, Object> entry : map.entrySet()) {
-          newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType()));
+          newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames));
         }
+        fieldNames.pop();
         return newMap;
       case UNION:
-        return rewriteRecordWithNewSchema(oldRecord, getActualSchemaFromUnion(oldSchema, oldRecord), getActualSchemaFromUnion(newSchema, oldRecord));
+        return rewriteRecordWithNewSchema(oldRecord, getActualSchemaFromUnion(oldSchema, oldRecord), getActualSchemaFromUnion(newSchema, oldRecord), renameCols, fieldNames);
       default:
         return rewritePrimaryType(oldRecord, oldSchema, newSchema);
     }
   }
 
+  private static String createFullName(Deque<String> fieldNames) {
+    String result = "";
+    if (!fieldNames.isEmpty()) {
+      List<String> parentNames = new ArrayList<>();
+      fieldNames.descendingIterator().forEachRemaining(parentNames::add);
+      result = parentNames.stream().collect(Collectors.joining("."));
+    }
+    return result;
+  }
+
   private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Schema newSchema) {
     Schema realOldSchema = oldSchema;
     if (realOldSchema.getType() == UNION) {
@@ -958,9 +1009,10 @@ public class HoodieAvroUtils {
    *
    * @param oldRecords oldRecords to be rewrite
    * @param newSchema newSchema used to rewrite oldRecord
+   * @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema)
    * @return a iterator of rewrote GeneriRcords
    */
-  public static Iterator<GenericRecord> rewriteRecordWithNewSchema(Iterator<GenericRecord> oldRecords, Schema newSchema) {
+  public static Iterator<GenericRecord> rewriteRecordWithNewSchema(Iterator<GenericRecord> oldRecords, Schema newSchema, Map<String, String> renameCols) {
     if (oldRecords == null || newSchema == null) {
       return Collections.emptyIterator();
     }
@@ -972,7 +1024,7 @@ public class HoodieAvroUtils {
 
       @Override
       public GenericRecord next() {
-        return rewriteRecordWithNewSchema(oldRecords.next(), newSchema);
+        return rewriteRecordWithNewSchema(oldRecords.next(), newSchema, renameCols);
       }
     };
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 9e56083b26..9687136444 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -58,6 +58,7 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.Deque;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -379,7 +380,7 @@ public abstract class AbstractHoodieLogRecordReader {
       Option<Schema> schemaOption = getMergedSchema(dataBlock);
       while (recordIterator.hasNext()) {
         IndexedRecord currentRecord = recordIterator.next();
-        IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get()) : currentRecord;
+        IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get(), new HashMap<>()) : currentRecord;
         processNextRecord(createHoodieRecord(record, this.hoodieTableMetaClient.getTableConfig(), this.payloadClassFQN,
             this.preCombineField, this.withOperationField, this.simpleKeyGenFields, this.partitionName));
         totalLogRecords.incrementAndGet();
diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java
index 0d93ab170b..bcea9b957b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java
+++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java
@@ -48,6 +48,25 @@ public class InternalSchemaMerger {
   // we can pass decimalType to reWriteRecordWithNewSchema directly, everything is ok.
   private boolean useColumnTypeFromFileSchema = true;
 
+  // deal with rename
+  // Whether to use column name from file schema to read files when we find some column name has changed.
+  // spark parquetReader need the original column name to read data, otherwise the parquetReader will read nothing.
+  // eg: current column name is colOldName, now we rename it to colNewName,
+  // we should not pass colNewName to parquetReader, we must pass colOldName to it; when we read out the data.
+  // for log reader
+  // since our reWriteRecordWithNewSchema function support rewrite directly, so we no need this parameter
+  // eg: current column name is colOldName, now we rename it to colNewName,
+  // we can pass colNewName to reWriteRecordWithNewSchema directly, everything is ok.
+  private boolean useColNameFromFileSchema = true;
+
+  public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema querySchema, boolean ignoreRequiredAttribute, boolean useColumnTypeFromFileSchema, boolean useColNameFromFileSchema) {
+    this.fileSchema = fileSchema;
+    this.querySchema = querySchema;
+    this.ignoreRequiredAttribute = ignoreRequiredAttribute;
+    this.useColumnTypeFromFileSchema = useColumnTypeFromFileSchema;
+    this.useColNameFromFileSchema = useColNameFromFileSchema;
+  }
+
   public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema querySchema, boolean ignoreRequiredAttribute, boolean useColumnTypeFromFileSchema) {
     this.fileSchema = fileSchema;
     this.querySchema = querySchema;
@@ -131,12 +150,15 @@ public class InternalSchemaMerger {
   private Types.Field dealWithRename(int fieldId, Type newType, Types.Field oldField) {
     Types.Field fieldFromFileSchema = fileSchema.findField(fieldId);
     String nameFromFileSchema = fieldFromFileSchema.name();
+    String nameFromQuerySchema = querySchema.findField(fieldId).name();
     Type typeFromFileSchema = fieldFromFileSchema.type();
     // Current design mechanism guarantees nestedType change is not allowed, so no need to consider.
     if (newType.isNestedType()) {
-      return Types.Field.get(oldField.fieldId(), oldField.isOptional(), nameFromFileSchema, newType, oldField.doc());
+      return Types.Field.get(oldField.fieldId(), oldField.isOptional(),
+          useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, newType, oldField.doc());
     } else {
-      return Types.Field.get(oldField.fieldId(), oldField.isOptional(), nameFromFileSchema, useColumnTypeFromFileSchema ? typeFromFileSchema : newType, oldField.doc());
+      return Types.Field.get(oldField.fieldId(), oldField.isOptional(),
+          useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, useColumnTypeFromFileSchema ? typeFromFileSchema : newType, oldField.doc());
     }
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
index 3c0877f6f5..a784b409b8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
@@ -267,4 +267,20 @@ public class InternalSchemaUtils {
     }
     return result;
   }
+
+  /**
+   * Try to find all renamed cols between oldSchema and newSchema.
+   *
+   * @param oldSchema oldSchema
+   * @param newSchema newSchema which modified from oldSchema
+   * @return renameCols Map. (k, v) -> (colNameFromNewSchema, colNameFromOldSchema)
+   */
+  public static Map<String, String> collectRenameCols(InternalSchema oldSchema, InternalSchema newSchema) {
+    List<String> colNamesFromWriteSchema = oldSchema.getAllColsFullName();
+    return colNamesFromWriteSchema.stream().filter(f -> {
+      int filedIdFromWriteSchema = oldSchema.findIdByName(f);
+      // try to find the cols which has the same id, but have different colName;
+      return newSchema.getAllIds().contains(filedIdFromWriteSchema) && !newSchema.findfullName(filedIdFromWriteSchema).equalsIgnoreCase(f);
+    }).collect(Collectors.toMap(e -> newSchema.findfullName(oldSchema.findIdByName(e)), e -> e));
+  }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
index d116697b8d..3850ef07b9 100644
--- a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
@@ -284,7 +284,7 @@ public class TestAvroSchemaEvolutionUtils {
         .updateColumnType("col6", Types.StringType.get());
     InternalSchema newSchema = SchemaChangeUtils.applyTableChanges2Schema(internalSchema, updateChange);
     Schema newAvroSchema = AvroInternalSchemaConverter.convert(newSchema, avroSchema.getName());
-    GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema);
+    GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap<>());
 
     Assertions.assertEquals(GenericData.get().validate(newAvroSchema, newRecord), true);
   }
@@ -349,7 +349,7 @@ public class TestAvroSchemaEvolutionUtils {
     );
 
     Schema newAvroSchema = AvroInternalSchemaConverter.convert(newRecord, schema.getName());
-    GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema);
+    GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap<>());
     // test the correctly of rewrite
     Assertions.assertEquals(GenericData.get().validate(newAvroSchema, newAvroRecord), true);
   }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index ae828ed9f7..5416363598 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -445,28 +445,19 @@ class TestSpark3DDL extends TestHoodieSqlBase {
             Seq(null),
             Seq(Map("t1" -> 10.0d))
           )
+          spark.sql(s"alter table ${tableName} rename column members to mem")
+          spark.sql(s"alter table ${tableName} rename column mem.value.n to nn")
+          spark.sql(s"alter table ${tableName} rename column userx to us")
+          spark.sql(s"alter table ${tableName} rename column us.age to age1")
+
+          spark.sql(s"insert into ${tableName} values(2 , map('k1', struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 10))")
+          spark.sql(s"select mem.value.nn, us.age1 from $tableName order by id").show()
+          checkAnswer(spark.sql(s"select mem.value.nn, us.age1 from $tableName order by id").collect())(
+            Seq(null, 29),
+            Seq(null, 291)
+          )
         }
       }
     }
   }
-
-  private def performClustering(writeDf: DataFrame, basePath: String, tableName: String, tableType: String): Unit = {
-    writeDf.write.format("org.apache.hudi")
-      .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType)
-      .option("hoodie.upsert.shuffle.parallelism", "1")
-      .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "id")
-      .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "comb")
-      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "par")
-      .option(HoodieWriteConfig.TBL_NAME.key, tableName)
-      .option("hoodie.schema.on.read.enable", "true")
-      // option for clustering
-      .option("hoodie.clustering.inline", "true")
-      .option("hoodie.clustering.inline.max.commits", "1")
-      .option("hoodie.clustering.plan.strategy.small.file.limit", String.valueOf(2*1024*1024L))
-      .option("hoodie.clustering.plan.strategy.max.bytes.per.group", String.valueOf(10*1024*1024L))
-      .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(4 * 1024* 1024L))
-      .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "col1, col2")
-      .mode(SaveMode.Append)
-      .save(basePath)
-  }
 }