You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/04/22 00:36:49 UTC
[hudi] 02/15: Handle incompatibilities b/w Spark 3.2.0 and 3.2.1 in `Spark32HoodieParquetFileFormat`
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch release-0.11-0-apr21-5378-patched
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit bdcbbe710fdc26cbd65613353b60a91f75716c90
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Wed Apr 20 14:27:42 2022 -0700
Handle incompatibilities b/w Spark 3.2.0 and 3.2.1 in `Spark32HoodieParquetFileFormat`
---
.../scala/org/apache/hudi/HoodieSparkUtils.scala | 8 +-
.../parquet/Spark32HoodieParquetFileFormat.scala | 183 +++++++++++++++++----
2 files changed, 160 insertions(+), 31 deletions(-)
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 54bc06bd76..7a8f8a1580 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -53,13 +53,15 @@ object HoodieSparkUtils extends SparkAdapterSupport {
def isSpark3_1: Boolean = SPARK_VERSION.startsWith("3.1")
+ def gteqSpark3_1: Boolean = SPARK_VERSION > "3.1"
+
+ def gteqSpark3_1_3: Boolean = SPARK_VERSION >= "3.1.3"
+
def isSpark3_2: Boolean = SPARK_VERSION.startsWith("3.2")
def gteqSpark3_2: Boolean = SPARK_VERSION > "3.2"
- def gteqSpark3_1: Boolean = SPARK_VERSION > "3.1"
-
- def gteqSpark3_1_3: Boolean = SPARK_VERSION >= "3.1.3"
+ def gteqSpark3_2_1: Boolean = SPARK_VERSION >= "3.2.1"
def getMetaSchema: StructType = {
StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
index f2a0a21df8..dfeedd7ae4 100644
--- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
@@ -22,11 +22,12 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.util.InternalSchemaCache
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.common.util.{InternalSchemaCache, ReflectionUtils}
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.action.InternalSchemaMerger
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
@@ -35,17 +36,18 @@ import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
import org.apache.spark.TaskContext
-import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat.{pruneInternalSchema, rebuildFilterFromParquet}
+import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat._
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession}
+import org.apache.spark.util.{SerializableConfiguration, Utils}
import java.net.URI
@@ -158,21 +160,38 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
lazy val footerFileMetaData =
ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
- val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
- footerFileMetaData.getKeyValueMetaData.get,
- datetimeRebaseModeInRead)
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
- val parquetFilters = new ParquetFilters(
- parquetSchema,
- pushDownDate,
- pushDownTimestamp,
- pushDownDecimal,
- pushDownStringStartWith,
- pushDownInFilterThreshold,
- isCaseSensitive,
- datetimeRebaseSpec)
+ val parquetFilters = if (HoodieSparkUtils.gteqSpark3_2_1) {
+ // NOTE: Below code could only be compiled against >= Spark 3.2.1,
+ // and unfortunately won't compile against Spark 3.2.0
+ // However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1
+ val datetimeRebaseSpec =
+ DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+ new ParquetFilters(
+ parquetSchema,
+ pushDownDate,
+ pushDownTimestamp,
+ pushDownDecimal,
+ pushDownStringStartWith,
+ pushDownInFilterThreshold,
+ isCaseSensitive,
+ datetimeRebaseSpec)
+ } else {
+ // Spark 3.2.0
+ val datetimeRebaseMode =
+ Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+ createParquetFilters(
+ parquetSchema,
+ pushDownDate,
+ pushDownTimestamp,
+ pushDownDecimal,
+ pushDownStringStartWith,
+ pushDownInFilterThreshold,
+ isCaseSensitive,
+ datetimeRebaseMode)
+ }
filters.map(rebuildFilterFromParquet(_, fileSchema, querySchemaOption.orElse(null)))
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
@@ -198,10 +217,6 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
None
}
- val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
- footerFileMetaData.getKeyValueMetaData.get,
- int96RebaseModeInRead)
-
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
// Clone new conf
@@ -225,6 +240,10 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
if (enableVectorizedReader) {
val vectorizedReader =
if (shouldUseInternalSchema) {
+ val int96RebaseSpec =
+ DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+ val datetimeRebaseSpec =
+ DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
new Spark32HoodieVectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseSpec.mode.toString,
@@ -234,7 +253,14 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
enableOffHeapColumnVector && taskContext.isDefined,
capacity,
typeChangeInfos)
- } else {
+ } else if (HoodieSparkUtils.gteqSpark3_2_1) {
+ // NOTE: Below code could only be compiled against >= Spark 3.2.1,
+ // and unfortunately won't compile against Spark 3.2.0
+ // However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1
+ val int96RebaseSpec =
+ DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+ val datetimeRebaseSpec =
+ DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
new VectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseSpec.mode.toString,
@@ -243,7 +269,20 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
int96RebaseSpec.timeZone,
enableOffHeapColumnVector && taskContext.isDefined,
capacity)
+ } else {
+ // Spark 3.2.0
+ val datetimeRebaseMode =
+ Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+ val int96RebaseMode =
+ Spark32HoodieParquetFileFormat.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+ createVectorizedParquetRecordReader(
+ convertTz.orNull,
+ datetimeRebaseMode.toString,
+ int96RebaseMode.toString,
+ enableOffHeapColumnVector && taskContext.isDefined,
+ capacity)
}
+
// SPARK-37089: We cannot register a task completion listener to close this iterator here
// because downstream exec nodes have already registered their listeners. Since listeners
// are executed in reverse order of registration, a listener registered here would close the
@@ -279,12 +318,32 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
}
} else {
logDebug(s"Falling back to parquet-mr")
- // ParquetRecordReader returns InternalRow
- val readSupport = new ParquetReadSupport(
- convertTz,
- enableVectorizedReader = false,
- datetimeRebaseSpec,
- int96RebaseSpec)
+ val readSupport = if (HoodieSparkUtils.gteqSpark3_2_1) {
+ // ParquetRecordReader returns InternalRow
+ // NOTE: Below code could only be compiled against >= Spark 3.2.1,
+ // and unfortunately won't compile against Spark 3.2.0
+ // However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1
+ val int96RebaseSpec =
+ DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+ val datetimeRebaseSpec =
+ DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+ new ParquetReadSupport(
+ convertTz,
+ enableVectorizedReader = false,
+ datetimeRebaseSpec,
+ int96RebaseSpec)
+ } else {
+ val datetimeRebaseMode =
+ Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+ val int96RebaseMode =
+ Spark32HoodieParquetFileFormat.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+ createParquetReadSupport(
+ convertTz,
+ /* enableVectorizedReader = */ false,
+ datetimeRebaseMode,
+ int96RebaseMode)
+ }
+
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
@@ -332,10 +391,78 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
}
}
}
+
}
object Spark32HoodieParquetFileFormat {
+ private val PARQUET_FILTERS_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetFilters"
+ private val PARQUET_VECTORIZED_READER_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader"
+ private val PARQUET_READ_SUPPORT_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport"
+
+ private def createParquetFilters(args: Any*): ParquetFilters = {
+ val parquetFiltersInstance = ReflectionUtils.loadClass(PARQUET_FILTERS_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*)
+ parquetFiltersInstance.asInstanceOf[ParquetFilters]
+ }
+
+ private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = {
+ val vectorizedRecordReader =
+ ReflectionUtils.loadClass(PARQUET_VECTORIZED_READER_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*)
+ vectorizedRecordReader.asInstanceOf[VectorizedParquetRecordReader]
+ }
+
+ private def createParquetReadSupport(args: Any*): ParquetReadSupport = {
+ val parquetReadSupport =
+ ReflectionUtils.loadClass(PARQUET_READ_SUPPORT_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*)
+ parquetReadSupport.asInstanceOf[ParquetReadSupport]
+ }
+
+ // TODO scala-doc
+ // Spark 3.2.0
+ // scalastyle:off
+ def int96RebaseMode(lookupFileMeta: String => String,
+ modeByConfig: String): LegacyBehaviorPolicy.Value = {
+ if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
+ return LegacyBehaviorPolicy.CORRECTED
+ }
+ // If there is no version, we return the mode specified by the config.
+ Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
+ // Files written by Spark 3.0 and earlier follow the legacy hybrid calendar and we need to
+ // rebase the INT96 timestamp values.
+ // Files written by Spark 3.1 and latter may also need the rebase if they were written with
+ // the "LEGACY" rebase mode.
+ if (version < "3.1.0" || lookupFileMeta("org.apache.spark.legacyINT96") != null) {
+ LegacyBehaviorPolicy.LEGACY
+ } else {
+ LegacyBehaviorPolicy.CORRECTED
+ }
+ }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
+ }
+ // scalastyle:on
+
+ // TODO scala-doc
+ // Spark 3.2.0
+ // scalastyle:off
+ def datetimeRebaseMode(lookupFileMeta: String => String,
+ modeByConfig: String): LegacyBehaviorPolicy.Value = {
+ if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
+ return LegacyBehaviorPolicy.CORRECTED
+ }
+ // If there is no version, we return the mode specified by the config.
+ Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
+ // Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to
+ // rebase the datetime values.
+ // Files written by Spark 3.0 and latter may also need the rebase if they were written with
+ // the "LEGACY" rebase mode.
+ if (version < "3.0.0" || lookupFileMeta("org.apache.spark.legacyDateTime") != null) {
+ LegacyBehaviorPolicy.LEGACY
+ } else {
+ LegacyBehaviorPolicy.CORRECTED
+ }
+ }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
+ }
+ // scalastyle:on
+
def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {
val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {