You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by me...@apache.org on 2023/03/28 07:23:49 UTC

[hudi] branch master updated: [HUDI-5977] Fix Date to String column schema evolution (#8280)

This is an automated email from the ASF dual-hosted git repository.

mengtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 04ec593413e [HUDI-5977] Fix Date to String column schema evolution (#8280)
04ec593413e is described below

commit 04ec593413e0be986dc021a7385b0d66d5659749
Author: voonhous <vo...@gmail.com>
AuthorDate: Tue Mar 28 15:23:37 2023 +0800

    [HUDI-5977] Fix Date to String column schema evolution (#8280)
---
 .../org/apache/spark/sql/hudi/TestSpark3DDL.scala  | 48 ++++++++++++++++++++++
 .../parquet/Spark24HoodieParquetFileFormat.scala   |  6 ++-
 .../parquet/Spark31HoodieParquetFileFormat.scala   |  6 ++-
 .../Spark32PlusHoodieParquetFileFormat.scala       |  6 ++-
 4 files changed, 63 insertions(+), 3 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index de77b07fd83..7b1cf43b739 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -735,4 +735,52 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
       }
     }
   }
+
+  test("Test DATE to STRING conversions when vectorized reading is not enabled") {
+    withTempDir { tmp =>
+      Seq("COPY_ON_WRITE", "MERGE_ON_READ").foreach { tableType =>
+        val tableName = generateTableName
+        val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}"
+        if (HoodieSparkUtils.gteqSpark3_1) {
+          // adding a struct column to force reads to use non-vectorized readers
+          spark.sql(
+            s"""
+               | create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  struct_col struct<f0: int, f1: string>,
+               |  ts long
+               |) using hudi
+               | location '$tablePath'
+               | options (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts'
+               | )
+               | partitioned by (ts)
+             """.stripMargin)
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | values (1, 'a1', 10, struct(1, 'f_1'), 1000)
+              """.stripMargin)
+          spark.sql(s"select * from $tableName")
+
+          spark.sql("set hoodie.schema.on.read.enable=true")
+          spark.sql(s"alter table $tableName add columns(date_to_string_col date)")
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | values (2, 'a2', 20, struct(2, 'f_2'), date '2023-03-22', 1001)
+              """.stripMargin)
+          spark.sql(s"alter table $tableName alter column date_to_string_col type string")
+
+          // struct and string (converted from date) column must be read to ensure that non-vectorized reader is used
+          // not checking results as we just need to ensure that the table can be read without any errors thrown
+          spark.sql(s"select * from $tableName")
+        }
+      }
+    }
+  }
 }
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
index 1a8585b38aa..c168911302e 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
@@ -107,6 +107,7 @@ class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
     val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
     val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
     val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+    val timeZoneId = Option(sqlConf.sessionLocalTimeZone)
 
     (file: PartitionedFile) => {
       assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
@@ -238,7 +239,10 @@ class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
           }).toAttributes ++ partitionSchema.toAttributes
           val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
             if (implicitTypeChangeInfos.containsKey(i)) {
-              Cast(attr, implicitTypeChangeInfos.get(i).getLeft)
+              val srcType = implicitTypeChangeInfos.get(i).getRight
+              val dstType = implicitTypeChangeInfos.get(i).getLeft
+              val needTimeZone = Cast.needsTimeZone(srcType, dstType)
+              Cast(attr, dstType, if (needTimeZone) timeZoneId else None)
             } else attr
           }
           GenerateUnsafeProjection.generate(castSchema, newFullSchema)
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
index 9edd1321b12..a90d36a02de 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
@@ -130,6 +130,7 @@ class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
     val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
     val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
     val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+    val timeZoneId = Option(sqlConf.sessionLocalTimeZone)
 
     (file: PartitionedFile) => {
       assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
@@ -319,7 +320,10 @@ class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
           }).toAttributes ++ partitionSchema.toAttributes
           val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
             if (typeChangeInfos.containsKey(i)) {
-              Cast(attr, typeChangeInfos.get(i).getLeft)
+              val srcType = typeChangeInfos.get(i).getRight
+              val dstType = typeChangeInfos.get(i).getLeft
+              val needTimeZone = Cast.needsTimeZone(srcType, dstType)
+              Cast(attr, dstType, if (needTimeZone) timeZoneId else None)
             } else attr
           }
           GenerateUnsafeProjection.generate(castSchema, newFullSchema)
diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala
index ae686d33a31..112f55cfc16 100644
--- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala
@@ -132,6 +132,7 @@ class Spark32PlusHoodieParquetFileFormat(private val shouldAppendPartitionValues
     val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
     val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
     val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
+    val timeZoneId = Option(sqlConf.sessionLocalTimeZone)
 
     (file: PartitionedFile) => {
       assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
@@ -374,7 +375,10 @@ class Spark32PlusHoodieParquetFileFormat(private val shouldAppendPartitionValues
             }).toAttributes ++ partitionSchema.toAttributes
             val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
               if (typeChangeInfos.containsKey(i)) {
-                Cast(attr, typeChangeInfos.get(i).getLeft)
+                val srcType = typeChangeInfos.get(i).getRight
+                val dstType = typeChangeInfos.get(i).getLeft
+                val needTimeZone = Cast.needsTimeZone(srcType, dstType)
+                Cast(attr, dstType, if (needTimeZone) timeZoneId else None)
               } else attr
             }
             GenerateUnsafeProjection.generate(castSchema, newFullSchema)