You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/10/06 02:41:02 UTC

[hudi] branch master updated: Revert "[HUDI-4915] improve avro serializer/deserializer (#6788)" (#6809)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 067cc24d88 Revert "[HUDI-4915] improve avro serializer/deserializer (#6788)" (#6809)
067cc24d88 is described below

commit 067cc24d88fd299f1dfc8b96a1995621799613d4
Author: Yann Byron <bi...@gmail.com>
AuthorDate: Thu Oct 6 10:40:55 2022 +0800

    Revert "[HUDI-4915] improve avro serializer/deserializer (#6788)" (#6809)
    
    This reverts commit 79b3e2b899cc303490c22610fda0e5ac2013cf02.
---
 .../org/apache/spark/sql/avro/AvroDeserializer.scala | 20 +++++++++++++-------
 .../org/apache/spark/sql/avro/AvroSerializer.scala   | 17 +++++++++++------
 .../org/apache/spark/sql/avro/AvroDeserializer.scala | 20 +++++++++++++-------
 .../org/apache/spark/sql/avro/AvroSerializer.scala   | 19 ++++++++++++-------
 .../org/apache/spark/sql/avro/AvroDeserializer.scala | 20 +++++++++++++-------
 .../org/apache/spark/sql/avro/AvroSerializer.scala   | 19 ++++++++++++-------
 .../org/apache/spark/sql/avro/AvroDeserializer.scala | 20 +++++++++++++-------
 .../org/apache/spark/sql/avro/AvroSerializer.scala   | 19 ++++++++++++-------
 8 files changed, 99 insertions(+), 55 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 921e6deb58..9725fb63f5 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -49,27 +49,33 @@ import scala.collection.mutable.ArrayBuffer
 class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
   private lazy val decimalConversions = new DecimalConversion()
 
-  def deserialize(data: Any): Any = rootCatalystType match {
+  private val converter: Any => Any = rootCatalystType match {
     // A shortcut for empty schema.
     case st: StructType if st.isEmpty =>
-      InternalRow.empty
+      (data: Any) => InternalRow.empty
 
     case st: StructType =>
       val resultRow = new SpecificInternalRow(st.map(_.dataType))
       val fieldUpdater = new RowUpdater(resultRow)
       val writer = getRecordWriter(rootAvroType, st, Nil)
-      val record = data.asInstanceOf[GenericRecord]
-      writer(fieldUpdater, record)
-      resultRow
+      (data: Any) => {
+        val record = data.asInstanceOf[GenericRecord]
+        writer(fieldUpdater, record)
+        resultRow
+      }
 
     case _ =>
       val tmpRow = new SpecificInternalRow(Seq(rootCatalystType))
       val fieldUpdater = new RowUpdater(tmpRow)
       val writer = newWriter(rootAvroType, rootCatalystType, Nil)
-      writer(fieldUpdater, 0, data)
-      tmpRow.get(0, rootCatalystType)
+      (data: Any) => {
+        writer(fieldUpdater, 0, data)
+        tmpRow.get(0, rootCatalystType)
+      }
   }
 
+  def deserialize(data: Any): Any = converter(data)
+
   /**
    * Creates a writer to write avro values to Catalyst values at the given ordinal with the given
    * updater.
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index e0c7344138..2b88be8165 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -47,6 +47,10 @@ import org.apache.spark.sql.types._
 class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) {
 
   def serialize(catalystData: Any): Any = {
+    converter.apply(catalystData)
+  }
+
+  private val converter: Any => Any = {
     val actualAvroType = resolveNullableType(rootAvroType, nullable)
     val baseConverter = rootCatalystType match {
       case st: StructType =>
@@ -59,13 +63,14 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:
           converter.apply(tmpRow, 0)
     }
     if (nullable) {
-      if (catalystData == null) {
-        null
-      } else {
-        baseConverter.apply(catalystData)
-      }
+      (data: Any) =>
+        if (data == null) {
+          null
+        } else {
+          baseConverter.apply(data)
+        }
     } else {
-      baseConverter.apply(catalystData)
+      baseConverter
     }
   }
 
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 61482ab96f..5fb6d907bd 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -69,28 +69,34 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
   private val timestampRebaseFunc = createTimestampRebaseFuncInRead(
     datetimeRebaseMode, "Avro")
 
-  def deserialize(data: Any): Option[Any] = rootCatalystType match {
+  private val converter: Any => Option[Any] = rootCatalystType match {
     // A shortcut for empty schema.
     case st: StructType if st.isEmpty =>
-      Some(InternalRow.empty)
+      (data: Any) => Some(InternalRow.empty)
 
     case st: StructType =>
       val resultRow = new SpecificInternalRow(st.map(_.dataType))
       val fieldUpdater = new RowUpdater(resultRow)
       val applyFilters = filters.skipRow(resultRow, _)
       val writer = getRecordWriter(rootAvroType, st, Nil, applyFilters)
-      val record = data.asInstanceOf[GenericRecord]
-      val skipRow = writer(fieldUpdater, record)
-      if (skipRow) None else Some(resultRow)
+      (data: Any) => {
+        val record = data.asInstanceOf[GenericRecord]
+        val skipRow = writer(fieldUpdater, record)
+        if (skipRow) None else Some(resultRow)
+      }
 
     case _ =>
       val tmpRow = new SpecificInternalRow(Seq(rootCatalystType))
       val fieldUpdater = new RowUpdater(tmpRow)
       val writer = newWriter(rootAvroType, rootCatalystType, Nil)
-      writer(fieldUpdater, 0, data)
-      Some(tmpRow.get(0, rootCatalystType))
+      (data: Any) => {
+        writer(fieldUpdater, 0, data)
+        Some(tmpRow.get(0, rootCatalystType))
+      }
   }
 
+  def deserialize(data: Any): Option[Any] = converter(data)
+
   /**
    * Creates a writer to write avro values to Catalyst values at the given ordinal with the given
    * updater.
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 2397186a17..36d86c1e01 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -57,13 +57,17 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
         SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE)))
   }
 
+  def serialize(catalystData: Any): Any = {
+    converter.apply(catalystData)
+  }
+
   private val dateRebaseFunc = createDateRebaseFuncInWrite(
     datetimeRebaseMode, "Avro")
 
   private val timestampRebaseFunc = createTimestampRebaseFuncInWrite(
     datetimeRebaseMode, "Avro")
 
-  def serialize(catalystData: Any): Any = {
+  private val converter: Any => Any = {
     val actualAvroType = resolveNullableType(rootAvroType, nullable)
     val baseConverter = rootCatalystType match {
       case st: StructType =>
@@ -76,13 +80,14 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
           converter.apply(tmpRow, 0)
     }
     if (nullable) {
-      if (catalystData == null) {
-        null
-      } else {
-        baseConverter.apply(catalystData)
-      }
+      (data: Any) =>
+        if (data == null) {
+          null
+        } else {
+          baseConverter.apply(data)
+        }
     } else {
-      baseConverter.apply(catalystData)
+      baseConverter
     }
   }
 
diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 9f3b60b8c3..0b60933075 100644
--- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -72,33 +72,39 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
 
   private val timestampRebaseFunc = createTimestampRebaseFuncInRead(datetimeRebaseSpec, "Avro")
 
-  def deserialize(data: Any): Option[Any] = try {
+  private val converter: Any => Option[Any] = try {
     rootCatalystType match {
       // A shortcut for empty schema.
       case st: StructType if st.isEmpty =>
-        Some(InternalRow.empty)
+        (_: Any) => Some(InternalRow.empty)
 
       case st: StructType =>
         val resultRow = new SpecificInternalRow(st.map(_.dataType))
         val fieldUpdater = new RowUpdater(resultRow)
         val applyFilters = filters.skipRow(resultRow, _)
         val writer = getRecordWriter(rootAvroType, st, Nil, Nil, applyFilters)
-        val record = data.asInstanceOf[GenericRecord]
-        val skipRow = writer(fieldUpdater, record)
-        if (skipRow) None else Some(resultRow)
+        (data: Any) => {
+          val record = data.asInstanceOf[GenericRecord]
+          val skipRow = writer(fieldUpdater, record)
+          if (skipRow) None else Some(resultRow)
+        }
 
       case _ =>
         val tmpRow = new SpecificInternalRow(Seq(rootCatalystType))
         val fieldUpdater = new RowUpdater(tmpRow)
         val writer = newWriter(rootAvroType, rootCatalystType, Nil, Nil)
-        writer(fieldUpdater, 0, data)
-        Some(tmpRow.get(0, rootCatalystType))
+        (data: Any) => {
+          writer(fieldUpdater, 0, data)
+          Some(tmpRow.get(0, rootCatalystType))
+        }
     }
   } catch {
     case ise: IncompatibleSchemaException => throw new IncompatibleSchemaException(
       s"Cannot convert Avro type $rootAvroType to SQL type ${rootCatalystType.sql}.", ise)
   }
 
+  def deserialize(data: Any): Option[Any] = converter(data)
+
   /**
    * Creates a writer to write avro values to Catalyst values at the given ordinal with the given
    * updater.
diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 0f9b60c701..ba9812b026 100644
--- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -66,13 +66,17 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
       LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE)))
   }
 
+  def serialize(catalystData: Any): Any = {
+    converter.apply(catalystData)
+  }
+
   private val dateRebaseFunc = createDateRebaseFuncInWrite(
     datetimeRebaseMode, "Avro")
 
   private val timestampRebaseFunc = createTimestampRebaseFuncInWrite(
     datetimeRebaseMode, "Avro")
 
-  def serialize(catalystData: Any): Any = {
+  private val converter: Any => Any = {
     val actualAvroType = resolveNullableType(rootAvroType, nullable)
     val baseConverter = try {
       rootCatalystType match {
@@ -90,13 +94,14 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
         s"Cannot convert SQL type ${rootCatalystType.sql} to Avro type $rootAvroType.", ise)
     }
     if (nullable) {
-      if (catalystData == null) {
-        null
-      } else {
-        baseConverter.apply(catalystData)
-      }
+      (data: Any) =>
+        if (data == null) {
+          null
+        } else {
+          baseConverter.apply(data)
+        }
     } else {
-      baseConverter.apply(catalystData)
+      baseConverter
     }
   }
 
diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 0b00b6d1ab..5e7bab3e51 100644
--- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -71,33 +71,39 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
 
   private val timestampRebaseFunc = createTimestampRebaseFuncInRead(datetimeRebaseSpec, "Avro")
 
-  def deserialize(data: Any): Option[Any] = try {
+  private val converter: Any => Option[Any] = try {
     rootCatalystType match {
       // A shortcut for empty schema.
       case st: StructType if st.isEmpty =>
-        Some(InternalRow.empty)
+        (_: Any) => Some(InternalRow.empty)
 
       case st: StructType =>
         val resultRow = new SpecificInternalRow(st.map(_.dataType))
         val fieldUpdater = new RowUpdater(resultRow)
         val applyFilters = filters.skipRow(resultRow, _)
         val writer = getRecordWriter(rootAvroType, st, Nil, Nil, applyFilters)
-        val record = data.asInstanceOf[GenericRecord]
-        val skipRow = writer(fieldUpdater, record)
-        if (skipRow) None else Some(resultRow)
+        (data: Any) => {
+          val record = data.asInstanceOf[GenericRecord]
+          val skipRow = writer(fieldUpdater, record)
+          if (skipRow) None else Some(resultRow)
+        }
 
       case _ =>
         val tmpRow = new SpecificInternalRow(Seq(rootCatalystType))
         val fieldUpdater = new RowUpdater(tmpRow)
         val writer = newWriter(rootAvroType, rootCatalystType, Nil, Nil)
-        writer(fieldUpdater, 0, data)
-        Some(tmpRow.get(0, rootCatalystType))
+        (data: Any) => {
+          writer(fieldUpdater, 0, data)
+          Some(tmpRow.get(0, rootCatalystType))
+        }
     }
   } catch {
     case ise: IncompatibleSchemaException => throw new IncompatibleSchemaException(
       s"Cannot convert Avro type $rootAvroType to SQL type ${rootCatalystType.sql}.", ise)
   }
 
+  def deserialize(data: Any): Option[Any] = converter(data)
+
   /**
    * Creates a writer to write avro values to Catalyst values at the given ordinal with the given
    * updater.
diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index dfa970f573..450d9d7346 100644
--- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -65,13 +65,17 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
       LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE)))
   }
 
+  def serialize(catalystData: Any): Any = {
+    converter.apply(catalystData)
+  }
+
   private val dateRebaseFunc = createDateRebaseFuncInWrite(
     datetimeRebaseMode, "Avro")
 
   private val timestampRebaseFunc = createTimestampRebaseFuncInWrite(
     datetimeRebaseMode, "Avro")
 
-  def serialize(catalystData: Any): Any = {
+  private val converter: Any => Any = {
     val actualAvroType = resolveNullableType(rootAvroType, nullable)
     val baseConverter = try {
       rootCatalystType match {
@@ -89,13 +93,14 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
         s"Cannot convert SQL type ${rootCatalystType.sql} to Avro type $rootAvroType.", ise)
     }
     if (nullable) {
-      if (catalystData == null) {
-        null
-      } else {
-        baseConverter.apply(catalystData)
-      }
+      (data: Any) =>
+        if (data == null) {
+          null
+        } else {
+          baseConverter.apply(data)
+        }
     } else {
-      baseConverter.apply(catalystData)
+      baseConverter
     }
   }