You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/21 17:48:35 UTC

[hudi] 04/16: Extracted `int96RebaseMode`, `datetimeRebaseMode` into `Spark32DataSourceUtils`

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch rc3-patched-for-test
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b93c5a2aaf92fa36f2a3272929625c76ea25a7f4
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Wed Apr 20 14:44:46 2022 -0700

    Extracted `int96RebaseMode`, `datetimeRebaseMode` into `Spark32DataSourceUtils`
---
 .../parquet/Spark32DataSourceUtils.scala           | 77 ++++++++++++++++++++++
 .../parquet/Spark32HoodieParquetFileFormat.scala   | 56 ++--------------
 2 files changed, 82 insertions(+), 51 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32DataSourceUtils.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32DataSourceUtils.scala
new file mode 100644
index 0000000000..6d1c76380f
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32DataSourceUtils.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
+import org.apache.spark.util.Utils
+
+object Spark32DataSourceUtils {
+
+  /**
+   * NOTE: This method was copied from Spark 3.2.0, and is required to maintain runtime
+   * compatibility against Spark 3.2.0
+   */
+  // scalastyle:off
+  def int96RebaseMode(lookupFileMeta: String => String,
+                      modeByConfig: String): LegacyBehaviorPolicy.Value = {
+    if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
+      return LegacyBehaviorPolicy.CORRECTED
+    }
+    // If there is no version, we return the mode specified by the config.
+    Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
+      // Files written by Spark 3.0 and earlier follow the legacy hybrid calendar and we need to
+      // rebase the INT96 timestamp values.
+      // Files written by Spark 3.1 and latter may also need the rebase if they were written with
+      // the "LEGACY" rebase mode.
+      if (version < "3.1.0" || lookupFileMeta("org.apache.spark.legacyINT96") != null) {
+        LegacyBehaviorPolicy.LEGACY
+      } else {
+        LegacyBehaviorPolicy.CORRECTED
+      }
+    }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
+  }
+  // scalastyle:on
+
+  /**
+   * NOTE: This method was copied from Spark 3.2.0, and is required to maintain runtime
+   * compatibility against Spark 3.2.0
+   */
+  // scalastyle:off
+  def datetimeRebaseMode(lookupFileMeta: String => String,
+                         modeByConfig: String): LegacyBehaviorPolicy.Value = {
+    if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
+      return LegacyBehaviorPolicy.CORRECTED
+    }
+    // If there is no version, we return the mode specified by the config.
+    Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
+      // Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to
+      // rebase the datetime values.
+      // Files written by Spark 3.0 and latter may also need the rebase if they were written with
+      // the "LEGACY" rebase mode.
+      if (version < "3.0.0" || lookupFileMeta("org.apache.spark.legacyDateTime") != null) {
+        LegacyBehaviorPolicy.LEGACY
+      } else {
+        LegacyBehaviorPolicy.CORRECTED
+      }
+    }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
+  }
+  // scalastyle:on
+
+}
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
index dfeedd7ae4..99cb83cf51 100644
--- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
@@ -181,7 +181,7 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
         } else {
           // Spark 3.2.0
           val datetimeRebaseMode =
-            Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+            Spark32DataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
           createParquetFilters(
             parquetSchema,
             pushDownDate,
@@ -272,9 +272,9 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
           } else {
             // Spark 3.2.0
             val datetimeRebaseMode =
-              Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+              Spark32DataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
             val int96RebaseMode =
-              Spark32HoodieParquetFileFormat.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+              Spark32DataSourceUtils.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
             createVectorizedParquetRecordReader(
               convertTz.orNull,
               datetimeRebaseMode.toString,
@@ -334,9 +334,9 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
             int96RebaseSpec)
         } else {
           val datetimeRebaseMode =
-            Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+            Spark32DataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
           val int96RebaseMode =
-            Spark32HoodieParquetFileFormat.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+            Spark32DataSourceUtils.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
           createParquetReadSupport(
             convertTz,
             /* enableVectorizedReader = */ false,
@@ -417,52 +417,6 @@ object Spark32HoodieParquetFileFormat {
     parquetReadSupport.asInstanceOf[ParquetReadSupport]
   }
 
-  // TODO scala-doc
-  // Spark 3.2.0
-  // scalastyle:off
-  def int96RebaseMode(lookupFileMeta: String => String,
-                      modeByConfig: String): LegacyBehaviorPolicy.Value = {
-    if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
-      return LegacyBehaviorPolicy.CORRECTED
-    }
-    // If there is no version, we return the mode specified by the config.
-    Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
-      // Files written by Spark 3.0 and earlier follow the legacy hybrid calendar and we need to
-      // rebase the INT96 timestamp values.
-      // Files written by Spark 3.1 and latter may also need the rebase if they were written with
-      // the "LEGACY" rebase mode.
-      if (version < "3.1.0" || lookupFileMeta("org.apache.spark.legacyINT96") != null) {
-        LegacyBehaviorPolicy.LEGACY
-      } else {
-        LegacyBehaviorPolicy.CORRECTED
-      }
-    }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
-  }
-  // scalastyle:on
-
-  // TODO scala-doc
-  // Spark 3.2.0
-  // scalastyle:off
-  def datetimeRebaseMode(lookupFileMeta: String => String,
-                         modeByConfig: String): LegacyBehaviorPolicy.Value = {
-    if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
-      return LegacyBehaviorPolicy.CORRECTED
-    }
-    // If there is no version, we return the mode specified by the config.
-    Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
-      // Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to
-      // rebase the datetime values.
-      // Files written by Spark 3.0 and latter may also need the rebase if they were written with
-      // the "LEGACY" rebase mode.
-      if (version < "3.0.0" || lookupFileMeta("org.apache.spark.legacyDateTime") != null) {
-        LegacyBehaviorPolicy.LEGACY
-      } else {
-        LegacyBehaviorPolicy.CORRECTED
-      }
-    }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
-  }
-  // scalastyle:on
-
   def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {
     val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
     if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {