You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by me...@apache.org on 2023/03/28 07:23:49 UTC
[hudi] branch master updated: [HUDI-5977] Fix Date to String column schema evolution (#8280)
This is an automated email from the ASF dual-hosted git repository.
mengtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 04ec593413e [HUDI-5977] Fix Date to String column schema evolution (#8280)
04ec593413e is described below
commit 04ec593413e0be986dc021a7385b0d66d5659749
Author: voonhous <vo...@gmail.com>
AuthorDate: Tue Mar 28 15:23:37 2023 +0800
[HUDI-5977] Fix Date to String column schema evolution (#8280)
---
.../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 48 ++++++++++++++++++++++
.../parquet/Spark24HoodieParquetFileFormat.scala | 6 ++-
.../parquet/Spark31HoodieParquetFileFormat.scala | 6 ++-
.../Spark32PlusHoodieParquetFileFormat.scala | 6 ++-
4 files changed, 63 insertions(+), 3 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index de77b07fd83..7b1cf43b739 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -735,4 +735,52 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
}
}
+
+ test("Test DATE to STRING conversions when vectorized reading is not enabled") {
+ withTempDir { tmp =>
+ Seq("COPY_ON_WRITE", "MERGE_ON_READ").foreach { tableType =>
+ val tableName = generateTableName
+ val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}"
+ if (HoodieSparkUtils.gteqSpark3_1) {
+ // adding a struct column to force reads to use non-vectorized readers
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | struct_col struct<f0: int, f1: string>,
+ | ts long
+ |) using hudi
+ | location '$tablePath'
+ | options (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by (ts)
+ """.stripMargin)
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | values (1, 'a1', 10, struct(1, 'f_1'), 1000)
+ """.stripMargin)
+ spark.sql(s"select * from $tableName")
+
+ spark.sql("set hoodie.schema.on.read.enable=true")
+ spark.sql(s"alter table $tableName add columns(date_to_string_col date)")
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | values (2, 'a2', 20, struct(2, 'f_2'), date '2023-03-22', 1001)
+ """.stripMargin)
+ spark.sql(s"alter table $tableName alter column date_to_string_col type string")
+
+ // struct and string (converted from date) column must be read to ensure that non-vectorized reader is used
+ // not checking results as we just need to ensure that the table can be read without any errors thrown
+ spark.sql(s"select * from $tableName")
+ }
+ }
+ }
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
index 1a8585b38aa..c168911302e 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
@@ -107,6 +107,7 @@ class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+ val timeZoneId = Option(sqlConf.sessionLocalTimeZone)
(file: PartitionedFile) => {
assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
@@ -238,7 +239,10 @@ class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
}).toAttributes ++ partitionSchema.toAttributes
val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
if (implicitTypeChangeInfos.containsKey(i)) {
- Cast(attr, implicitTypeChangeInfos.get(i).getLeft)
+ val srcType = implicitTypeChangeInfos.get(i).getRight
+ val dstType = implicitTypeChangeInfos.get(i).getLeft
+ val needTimeZone = Cast.needsTimeZone(srcType, dstType)
+ Cast(attr, dstType, if (needTimeZone) timeZoneId else None)
} else attr
}
GenerateUnsafeProjection.generate(castSchema, newFullSchema)
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
index 9edd1321b12..a90d36a02de 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
@@ -130,6 +130,7 @@ class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+ val timeZoneId = Option(sqlConf.sessionLocalTimeZone)
(file: PartitionedFile) => {
assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
@@ -319,7 +320,10 @@ class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
}).toAttributes ++ partitionSchema.toAttributes
val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
if (typeChangeInfos.containsKey(i)) {
- Cast(attr, typeChangeInfos.get(i).getLeft)
+ val srcType = typeChangeInfos.get(i).getRight
+ val dstType = typeChangeInfos.get(i).getLeft
+ val needTimeZone = Cast.needsTimeZone(srcType, dstType)
+ Cast(attr, dstType, if (needTimeZone) timeZoneId else None)
} else attr
}
GenerateUnsafeProjection.generate(castSchema, newFullSchema)
diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala
index ae686d33a31..112f55cfc16 100644
--- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala
@@ -132,6 +132,7 @@ class Spark32PlusHoodieParquetFileFormat(private val shouldAppendPartitionValues
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
+ val timeZoneId = Option(sqlConf.sessionLocalTimeZone)
(file: PartitionedFile) => {
assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
@@ -374,7 +375,10 @@ class Spark32PlusHoodieParquetFileFormat(private val shouldAppendPartitionValues
}).toAttributes ++ partitionSchema.toAttributes
val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
if (typeChangeInfos.containsKey(i)) {
- Cast(attr, typeChangeInfos.get(i).getLeft)
+ val srcType = typeChangeInfos.get(i).getRight
+ val dstType = typeChangeInfos.get(i).getLeft
+ val needTimeZone = Cast.needsTimeZone(srcType, dstType)
+ Cast(attr, dstType, if (needTimeZone) timeZoneId else None)
} else attr
}
GenerateUnsafeProjection.generate(castSchema, newFullSchema)