You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/06/15 12:21:36 UTC

[hudi] branch master updated: [HUDI-4217] improve repeat init object in ExpressionPayload (#5825)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bf0a1906d [HUDI-4217] improve repeat init object in ExpressionPayload (#5825)
2bf0a1906d is described below

commit 2bf0a1906dc65bb676119729ad7e829c6bf6c2a4
Author: KnightChess <98...@qq.com>
AuthorDate: Wed Jun 15 20:21:28 2022 +0800

    [HUDI-4217] improve repeat init object in ExpressionPayload (#5825)
---
 .../sql/hudi/command/payload/SqlTypedRecord.scala  | 44 +++++++++++++++++++---
 .../hudi/command/payload/ExpressionPayload.scala   | 42 +++++++++++++--------
 2 files changed, 65 insertions(+), 21 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala
index 29025877b4..cb9b42607d 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala
@@ -17,27 +17,59 @@
 
 package org.apache.spark.sql.hudi.command.payload
 
+import com.google.common.cache.CacheBuilder
 import org.apache.avro.Schema
 import org.apache.avro.generic.IndexedRecord
-import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport}
+import org.apache.hudi.HoodieSparkUtils.sparkAdapter
+import org.apache.hudi.AvroConversionUtils
+import org.apache.spark.sql.avro.HoodieAvroDeserializer
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.hudi.command.payload.SqlTypedRecord.{getAvroDeserializer, getSqlType}
+import org.apache.spark.sql.types.StructType
+
+import java.util.concurrent.Callable
 
 /**
  * A sql typed record which will convert the avro field to sql typed value.
  */
-class SqlTypedRecord(val record: IndexedRecord) extends IndexedRecord with SparkAdapterSupport {
+class SqlTypedRecord(val record: IndexedRecord) extends IndexedRecord {
 
-  private lazy val sqlType = AvroConversionUtils.convertAvroSchemaToStructType(getSchema)
-  private lazy val avroDeserializer = sparkAdapter.createAvroDeserializer(record.getSchema, sqlType)
-  private lazy val sqlRow = avroDeserializer.deserialize(record).get.asInstanceOf[InternalRow]
+  private lazy val sqlRow = getAvroDeserializer(getSchema).deserialize(record).get.asInstanceOf[InternalRow]
 
   override def put(i: Int, v: Any): Unit = {
     record.put(i, v)
   }
 
   override def get(i: Int): AnyRef = {
-    sqlRow.get(i, sqlType(i).dataType)
+    sqlRow.get(i, getSqlType(getSchema)(i).dataType)
   }
 
   override def getSchema: Schema = record.getSchema
 }
+
+object SqlTypedRecord {
+
+  private val sqlTypeCache = CacheBuilder.newBuilder().build[Schema, StructType]()
+
+  private val avroDeserializerCache = CacheBuilder.newBuilder().build[Schema, HoodieAvroDeserializer]()
+
+  def getSqlType(schema: Schema): StructType = {
+    sqlTypeCache.get(schema, new Callable[StructType] {
+      override def call(): StructType = {
+        val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+        sqlTypeCache.put(schema, structType)
+        structType
+      }
+    })
+  }
+
+  def getAvroDeserializer(schema: Schema): HoodieAvroDeserializer= {
+    avroDeserializerCache.get(schema, new Callable[HoodieAvroDeserializer] {
+      override def call(): HoodieAvroDeserializer = {
+        val deserializer = sparkAdapter.createAvroDeserializer(schema, getSqlType(schema))
+        avroDeserializerCache.put(schema, deserializer)
+        deserializer
+      }
+    })
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
index e59a609321..d86cd75315 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
@@ -32,7 +32,7 @@ import org.apache.hudi.sql.IExpressionEvaluator
 import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters}
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.hudi.SerDeUtils
-import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.getEvaluator
+import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.{getEvaluator, getMergedSchema}
 import org.apache.spark.sql.types.{StructField, StructType}
 
 import java.util.concurrent.Callable
@@ -228,9 +228,7 @@ class ExpressionPayload(record: GenericRecord,
    */
   private def joinRecord(sourceRecord: IndexedRecord, targetRecord: IndexedRecord): IndexedRecord = {
     val leftSchema = sourceRecord.getSchema
-    // the targetRecord is load from the disk, it contains the meta fields, so we remove it here
-    val rightSchema = HoodieAvroUtils.removeMetadataFields(targetRecord.getSchema)
-    val joinSchema = mergeSchema(leftSchema, rightSchema)
+    val joinSchema = getMergedSchema(leftSchema, targetRecord.getSchema)
 
     val values = new ArrayBuffer[AnyRef]()
     for (i <- 0 until joinSchema.getFields.size()) {
@@ -244,17 +242,6 @@ class ExpressionPayload(record: GenericRecord,
     convertToRecord(values.toArray, joinSchema)
   }
 
-  private def mergeSchema(a: Schema, b: Schema): Schema = {
-    val mergedFields =
-      a.getFields.asScala.map(field =>
-        new Schema.Field("a_" + field.name,
-          field.schema, field.doc, field.defaultVal, field.order)) ++
-        b.getFields.asScala.map(field =>
-          new Schema.Field("b_" + field.name,
-            field.schema, field.doc, field.defaultVal, field.order))
-    Schema.createRecord(a.getName, a.getDoc, a.getNamespace, a.isError, mergedFields.asJava)
-  }
-
   private def evaluate(evaluator: IExpressionEvaluator, sqlTypedRecord: SqlTypedRecord): GenericRecord = {
     try evaluator.eval(sqlTypedRecord) catch {
       case e: Throwable =>
@@ -318,5 +305,30 @@ object ExpressionPayload {
         }
       })
   }
+
+  private val mergedSchemaCache = CacheBuilder.newBuilder().build[TupleSchema, Schema]()
+
+  def getMergedSchema(source: Schema, target: Schema): Schema = {
+
+    mergedSchemaCache.get(TupleSchema(source, target), new Callable[Schema] {
+      override def call(): Schema = {
+        val rightSchema = HoodieAvroUtils.removeMetadataFields(target)
+        mergeSchema(source, rightSchema)
+      }
+    })
+  }
+
+  def mergeSchema(a: Schema, b: Schema): Schema = {
+    val mergedFields =
+      a.getFields.asScala.map(field =>
+        new Schema.Field("a_" + field.name,
+          field.schema, field.doc, field.defaultVal, field.order)) ++
+        b.getFields.asScala.map(field =>
+          new Schema.Field("b_" + field.name,
+            field.schema, field.doc, field.defaultVal, field.order))
+    Schema.createRecord(a.getName, a.getDoc, a.getNamespace, a.isError, mergedFields.asJava)
+  }
+
+  case class TupleSchema(first: Schema, second: Schema)
 }