You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/09/25 15:42:51 UTC
[hudi] branch master updated: [HUDI-4915] improve avro serializer/deserializer (#6788)
This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 79b3e2b899 [HUDI-4915] improve avro serializer/deserializer (#6788)
79b3e2b899 is described below
commit 79b3e2b899cc303490c22610fda0e5ac2013cf02
Author: Yann Byron <bi...@gmail.com>
AuthorDate: Sun Sep 25 23:42:44 2022 +0800
[HUDI-4915] improve avro serializer/deserializer (#6788)
---
.../org/apache/spark/sql/avro/AvroDeserializer.scala | 20 +++++++-------------
.../org/apache/spark/sql/avro/AvroSerializer.scala | 17 ++++++-----------
.../org/apache/spark/sql/avro/AvroDeserializer.scala | 20 +++++++-------------
.../org/apache/spark/sql/avro/AvroSerializer.scala | 19 +++++++------------
.../org/apache/spark/sql/avro/AvroDeserializer.scala | 20 +++++++-------------
.../org/apache/spark/sql/avro/AvroSerializer.scala | 19 +++++++------------
.../org/apache/spark/sql/avro/AvroDeserializer.scala | 20 +++++++-------------
.../org/apache/spark/sql/avro/AvroSerializer.scala | 19 +++++++------------
8 files changed, 55 insertions(+), 99 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 9725fb63f5..921e6deb58 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -49,33 +49,27 @@ import scala.collection.mutable.ArrayBuffer
class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
private lazy val decimalConversions = new DecimalConversion()
- private val converter: Any => Any = rootCatalystType match {
+ def deserialize(data: Any): Any = rootCatalystType match {
// A shortcut for empty schema.
case st: StructType if st.isEmpty =>
- (data: Any) => InternalRow.empty
+ InternalRow.empty
case st: StructType =>
val resultRow = new SpecificInternalRow(st.map(_.dataType))
val fieldUpdater = new RowUpdater(resultRow)
val writer = getRecordWriter(rootAvroType, st, Nil)
- (data: Any) => {
- val record = data.asInstanceOf[GenericRecord]
- writer(fieldUpdater, record)
- resultRow
- }
+ val record = data.asInstanceOf[GenericRecord]
+ writer(fieldUpdater, record)
+ resultRow
case _ =>
val tmpRow = new SpecificInternalRow(Seq(rootCatalystType))
val fieldUpdater = new RowUpdater(tmpRow)
val writer = newWriter(rootAvroType, rootCatalystType, Nil)
- (data: Any) => {
- writer(fieldUpdater, 0, data)
- tmpRow.get(0, rootCatalystType)
- }
+ writer(fieldUpdater, 0, data)
+ tmpRow.get(0, rootCatalystType)
}
- def deserialize(data: Any): Any = converter(data)
-
/**
* Creates a writer to write avro values to Catalyst values at the given ordinal with the given
* updater.
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 2b88be8165..e0c7344138 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -47,10 +47,6 @@ import org.apache.spark.sql.types._
class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) {
def serialize(catalystData: Any): Any = {
- converter.apply(catalystData)
- }
-
- private val converter: Any => Any = {
val actualAvroType = resolveNullableType(rootAvroType, nullable)
val baseConverter = rootCatalystType match {
case st: StructType =>
@@ -63,14 +59,13 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:
converter.apply(tmpRow, 0)
}
if (nullable) {
- (data: Any) =>
- if (data == null) {
- null
- } else {
- baseConverter.apply(data)
- }
+ if (catalystData == null) {
+ null
+ } else {
+ baseConverter.apply(catalystData)
+ }
} else {
- baseConverter
+ baseConverter.apply(catalystData)
}
}
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 5fb6d907bd..61482ab96f 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -69,34 +69,28 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
private val timestampRebaseFunc = createTimestampRebaseFuncInRead(
datetimeRebaseMode, "Avro")
- private val converter: Any => Option[Any] = rootCatalystType match {
+ def deserialize(data: Any): Option[Any] = rootCatalystType match {
// A shortcut for empty schema.
case st: StructType if st.isEmpty =>
- (data: Any) => Some(InternalRow.empty)
+ Some(InternalRow.empty)
case st: StructType =>
val resultRow = new SpecificInternalRow(st.map(_.dataType))
val fieldUpdater = new RowUpdater(resultRow)
val applyFilters = filters.skipRow(resultRow, _)
val writer = getRecordWriter(rootAvroType, st, Nil, applyFilters)
- (data: Any) => {
- val record = data.asInstanceOf[GenericRecord]
- val skipRow = writer(fieldUpdater, record)
- if (skipRow) None else Some(resultRow)
- }
+ val record = data.asInstanceOf[GenericRecord]
+ val skipRow = writer(fieldUpdater, record)
+ if (skipRow) None else Some(resultRow)
case _ =>
val tmpRow = new SpecificInternalRow(Seq(rootCatalystType))
val fieldUpdater = new RowUpdater(tmpRow)
val writer = newWriter(rootAvroType, rootCatalystType, Nil)
- (data: Any) => {
- writer(fieldUpdater, 0, data)
- Some(tmpRow.get(0, rootCatalystType))
- }
+ writer(fieldUpdater, 0, data)
+ Some(tmpRow.get(0, rootCatalystType))
}
- def deserialize(data: Any): Option[Any] = converter(data)
-
/**
* Creates a writer to write avro values to Catalyst values at the given ordinal with the given
* updater.
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 36d86c1e01..2397186a17 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -57,17 +57,13 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE)))
}
- def serialize(catalystData: Any): Any = {
- converter.apply(catalystData)
- }
-
private val dateRebaseFunc = createDateRebaseFuncInWrite(
datetimeRebaseMode, "Avro")
private val timestampRebaseFunc = createTimestampRebaseFuncInWrite(
datetimeRebaseMode, "Avro")
- private val converter: Any => Any = {
+ def serialize(catalystData: Any): Any = {
val actualAvroType = resolveNullableType(rootAvroType, nullable)
val baseConverter = rootCatalystType match {
case st: StructType =>
@@ -80,14 +76,13 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
converter.apply(tmpRow, 0)
}
if (nullable) {
- (data: Any) =>
- if (data == null) {
- null
- } else {
- baseConverter.apply(data)
- }
+ if (catalystData == null) {
+ null
+ } else {
+ baseConverter.apply(catalystData)
+ }
} else {
- baseConverter
+ baseConverter.apply(catalystData)
}
}
diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 0b60933075..9f3b60b8c3 100644
--- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -72,39 +72,33 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
private val timestampRebaseFunc = createTimestampRebaseFuncInRead(datetimeRebaseSpec, "Avro")
- private val converter: Any => Option[Any] = try {
+ def deserialize(data: Any): Option[Any] = try {
rootCatalystType match {
// A shortcut for empty schema.
case st: StructType if st.isEmpty =>
- (_: Any) => Some(InternalRow.empty)
+ Some(InternalRow.empty)
case st: StructType =>
val resultRow = new SpecificInternalRow(st.map(_.dataType))
val fieldUpdater = new RowUpdater(resultRow)
val applyFilters = filters.skipRow(resultRow, _)
val writer = getRecordWriter(rootAvroType, st, Nil, Nil, applyFilters)
- (data: Any) => {
- val record = data.asInstanceOf[GenericRecord]
- val skipRow = writer(fieldUpdater, record)
- if (skipRow) None else Some(resultRow)
- }
+ val record = data.asInstanceOf[GenericRecord]
+ val skipRow = writer(fieldUpdater, record)
+ if (skipRow) None else Some(resultRow)
case _ =>
val tmpRow = new SpecificInternalRow(Seq(rootCatalystType))
val fieldUpdater = new RowUpdater(tmpRow)
val writer = newWriter(rootAvroType, rootCatalystType, Nil, Nil)
- (data: Any) => {
- writer(fieldUpdater, 0, data)
- Some(tmpRow.get(0, rootCatalystType))
- }
+ writer(fieldUpdater, 0, data)
+ Some(tmpRow.get(0, rootCatalystType))
}
} catch {
case ise: IncompatibleSchemaException => throw new IncompatibleSchemaException(
s"Cannot convert Avro type $rootAvroType to SQL type ${rootCatalystType.sql}.", ise)
}
- def deserialize(data: Any): Option[Any] = converter(data)
-
/**
* Creates a writer to write avro values to Catalyst values at the given ordinal with the given
* updater.
diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index ba9812b026..0f9b60c701 100644
--- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -66,17 +66,13 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE)))
}
- def serialize(catalystData: Any): Any = {
- converter.apply(catalystData)
- }
-
private val dateRebaseFunc = createDateRebaseFuncInWrite(
datetimeRebaseMode, "Avro")
private val timestampRebaseFunc = createTimestampRebaseFuncInWrite(
datetimeRebaseMode, "Avro")
- private val converter: Any => Any = {
+ def serialize(catalystData: Any): Any = {
val actualAvroType = resolveNullableType(rootAvroType, nullable)
val baseConverter = try {
rootCatalystType match {
@@ -94,14 +90,13 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
s"Cannot convert SQL type ${rootCatalystType.sql} to Avro type $rootAvroType.", ise)
}
if (nullable) {
- (data: Any) =>
- if (data == null) {
- null
- } else {
- baseConverter.apply(data)
- }
+ if (catalystData == null) {
+ null
+ } else {
+ baseConverter.apply(catalystData)
+ }
} else {
- baseConverter
+ baseConverter.apply(catalystData)
}
}
diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 5e7bab3e51..0b00b6d1ab 100644
--- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -71,39 +71,33 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
private val timestampRebaseFunc = createTimestampRebaseFuncInRead(datetimeRebaseSpec, "Avro")
- private val converter: Any => Option[Any] = try {
+ def deserialize(data: Any): Option[Any] = try {
rootCatalystType match {
// A shortcut for empty schema.
case st: StructType if st.isEmpty =>
- (_: Any) => Some(InternalRow.empty)
+ Some(InternalRow.empty)
case st: StructType =>
val resultRow = new SpecificInternalRow(st.map(_.dataType))
val fieldUpdater = new RowUpdater(resultRow)
val applyFilters = filters.skipRow(resultRow, _)
val writer = getRecordWriter(rootAvroType, st, Nil, Nil, applyFilters)
- (data: Any) => {
- val record = data.asInstanceOf[GenericRecord]
- val skipRow = writer(fieldUpdater, record)
- if (skipRow) None else Some(resultRow)
- }
+ val record = data.asInstanceOf[GenericRecord]
+ val skipRow = writer(fieldUpdater, record)
+ if (skipRow) None else Some(resultRow)
case _ =>
val tmpRow = new SpecificInternalRow(Seq(rootCatalystType))
val fieldUpdater = new RowUpdater(tmpRow)
val writer = newWriter(rootAvroType, rootCatalystType, Nil, Nil)
- (data: Any) => {
- writer(fieldUpdater, 0, data)
- Some(tmpRow.get(0, rootCatalystType))
- }
+ writer(fieldUpdater, 0, data)
+ Some(tmpRow.get(0, rootCatalystType))
}
} catch {
case ise: IncompatibleSchemaException => throw new IncompatibleSchemaException(
s"Cannot convert Avro type $rootAvroType to SQL type ${rootCatalystType.sql}.", ise)
}
- def deserialize(data: Any): Option[Any] = converter(data)
-
/**
* Creates a writer to write avro values to Catalyst values at the given ordinal with the given
* updater.
diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 450d9d7346..dfa970f573 100644
--- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -65,17 +65,13 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE)))
}
- def serialize(catalystData: Any): Any = {
- converter.apply(catalystData)
- }
-
private val dateRebaseFunc = createDateRebaseFuncInWrite(
datetimeRebaseMode, "Avro")
private val timestampRebaseFunc = createTimestampRebaseFuncInWrite(
datetimeRebaseMode, "Avro")
- private val converter: Any => Any = {
+ def serialize(catalystData: Any): Any = {
val actualAvroType = resolveNullableType(rootAvroType, nullable)
val baseConverter = try {
rootCatalystType match {
@@ -93,14 +89,13 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
s"Cannot convert SQL type ${rootCatalystType.sql} to Avro type $rootAvroType.", ise)
}
if (nullable) {
- (data: Any) =>
- if (data == null) {
- null
- } else {
- baseConverter.apply(data)
- }
+ if (catalystData == null) {
+ null
+ } else {
+ baseConverter.apply(catalystData)
+ }
} else {
- baseConverter
+ baseConverter.apply(catalystData)
}
}