You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/21 10:30:58 UTC
[hudi] 07/13: Extracted `int96RebaseMode`, `datetimeRebaseMode` into `Spark32DataSourceUtils`
This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch rc3-patched-for-test
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit a4b2ff77f10222cac0d71bca778140c4dda1eb11
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Wed Apr 20 14:44:46 2022 -0700
Extracted `int96RebaseMode`, `datetimeRebaseMode` into `Spark32DataSourceUtils`
---
.../parquet/Spark32DataSourceUtils.scala | 77 ++++++++++++++++++++++
.../parquet/Spark32HoodieParquetFileFormat.scala | 56 ++--------------
2 files changed, 82 insertions(+), 51 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32DataSourceUtils.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32DataSourceUtils.scala
new file mode 100644
index 0000000000..6d1c76380f
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32DataSourceUtils.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
+import org.apache.spark.util.Utils
+
+object Spark32DataSourceUtils {
+
+ /**
+ * NOTE: This method was copied from Spark 3.2.0, and is required to maintain runtime
+ * compatibility against Spark 3.2.0
+ */
+ // scalastyle:off
+ def int96RebaseMode(lookupFileMeta: String => String,
+ modeByConfig: String): LegacyBehaviorPolicy.Value = {
+ if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
+ return LegacyBehaviorPolicy.CORRECTED
+ }
+ // If there is no version, we return the mode specified by the config.
+ Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
+ // Files written by Spark 3.0 and earlier follow the legacy hybrid calendar and we need to
+ // rebase the INT96 timestamp values.
+ // Files written by Spark 3.1 and latter may also need the rebase if they were written with
+ // the "LEGACY" rebase mode.
+ if (version < "3.1.0" || lookupFileMeta("org.apache.spark.legacyINT96") != null) {
+ LegacyBehaviorPolicy.LEGACY
+ } else {
+ LegacyBehaviorPolicy.CORRECTED
+ }
+ }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
+ }
+ // scalastyle:on
+
+ /**
+ * NOTE: This method was copied from Spark 3.2.0, and is required to maintain runtime
+ * compatibility against Spark 3.2.0
+ */
+ // scalastyle:off
+ def datetimeRebaseMode(lookupFileMeta: String => String,
+ modeByConfig: String): LegacyBehaviorPolicy.Value = {
+ if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
+ return LegacyBehaviorPolicy.CORRECTED
+ }
+ // If there is no version, we return the mode specified by the config.
+ Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
+ // Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to
+ // rebase the datetime values.
+ // Files written by Spark 3.0 and latter may also need the rebase if they were written with
+ // the "LEGACY" rebase mode.
+ if (version < "3.0.0" || lookupFileMeta("org.apache.spark.legacyDateTime") != null) {
+ LegacyBehaviorPolicy.LEGACY
+ } else {
+ LegacyBehaviorPolicy.CORRECTED
+ }
+ }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
+ }
+ // scalastyle:on
+
+}
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
index dfeedd7ae4..99cb83cf51 100644
--- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
@@ -181,7 +181,7 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
} else {
// Spark 3.2.0
val datetimeRebaseMode =
- Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+ Spark32DataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
createParquetFilters(
parquetSchema,
pushDownDate,
@@ -272,9 +272,9 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
} else {
// Spark 3.2.0
val datetimeRebaseMode =
- Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+ Spark32DataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
val int96RebaseMode =
- Spark32HoodieParquetFileFormat.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+ Spark32DataSourceUtils.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
createVectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseMode.toString,
@@ -334,9 +334,9 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
int96RebaseSpec)
} else {
val datetimeRebaseMode =
- Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+ Spark32DataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
val int96RebaseMode =
- Spark32HoodieParquetFileFormat.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+ Spark32DataSourceUtils.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
createParquetReadSupport(
convertTz,
/* enableVectorizedReader = */ false,
@@ -417,52 +417,6 @@ object Spark32HoodieParquetFileFormat {
parquetReadSupport.asInstanceOf[ParquetReadSupport]
}
- // TODO scala-doc
- // Spark 3.2.0
- // scalastyle:off
- def int96RebaseMode(lookupFileMeta: String => String,
- modeByConfig: String): LegacyBehaviorPolicy.Value = {
- if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
- return LegacyBehaviorPolicy.CORRECTED
- }
- // If there is no version, we return the mode specified by the config.
- Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
- // Files written by Spark 3.0 and earlier follow the legacy hybrid calendar and we need to
- // rebase the INT96 timestamp values.
- // Files written by Spark 3.1 and latter may also need the rebase if they were written with
- // the "LEGACY" rebase mode.
- if (version < "3.1.0" || lookupFileMeta("org.apache.spark.legacyINT96") != null) {
- LegacyBehaviorPolicy.LEGACY
- } else {
- LegacyBehaviorPolicy.CORRECTED
- }
- }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
- }
- // scalastyle:on
-
- // TODO scala-doc
- // Spark 3.2.0
- // scalastyle:off
- def datetimeRebaseMode(lookupFileMeta: String => String,
- modeByConfig: String): LegacyBehaviorPolicy.Value = {
- if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
- return LegacyBehaviorPolicy.CORRECTED
- }
- // If there is no version, we return the mode specified by the config.
- Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
- // Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to
- // rebase the datetime values.
- // Files written by Spark 3.0 and latter may also need the rebase if they were written with
- // the "LEGACY" rebase mode.
- if (version < "3.0.0" || lookupFileMeta("org.apache.spark.legacyDateTime") != null) {
- LegacyBehaviorPolicy.LEGACY
- } else {
- LegacyBehaviorPolicy.CORRECTED
- }
- }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
- }
- // scalastyle:on
-
def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {
val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {