You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/06/10 03:19:26 UTC

[GitHub] [hudi] KnightChess commented on a diff in pull request #5825: [HUDI-4217] improve repeat init object in ExpressionPayload

KnightChess commented on code in PR #5825:
URL: https://github.com/apache/hudi/pull/5825#discussion_r894120312


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala:
##########
@@ -17,27 +17,59 @@
 
 package org.apache.spark.sql.hudi.command.payload
 
+import com.google.common.cache.CacheBuilder
 import org.apache.avro.Schema
 import org.apache.avro.generic.IndexedRecord
-import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport}
+import org.apache.hudi.HoodieSparkUtils.sparkAdapter
+import org.apache.hudi.AvroConversionUtils
+import org.apache.spark.sql.avro.HoodieAvroDeserializer
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.hudi.command.payload.SqlTypedRecord.{getAvroDeserializer, getSqlType}
+import org.apache.spark.sql.types.StructType
+
+import java.util.concurrent.Callable
 
 /**
  * A sql typed record which will convert the avro field to sql typed value.
  */
-class SqlTypedRecord(val record: IndexedRecord) extends IndexedRecord with SparkAdapterSupport {
+class SqlTypedRecord(val record: IndexedRecord) extends IndexedRecord {
 
-  private lazy val sqlType = AvroConversionUtils.convertAvroSchemaToStructType(getSchema)
-  private lazy val avroDeserializer = sparkAdapter.createAvroDeserializer(record.getSchema, sqlType)
-  private lazy val sqlRow = avroDeserializer.deserialize(record).get.asInstanceOf[InternalRow]
+  private lazy val sqlRow = getAvroDeserializer(getSchema).deserialize(record).get.asInstanceOf[InternalRow]
 
   override def put(i: Int, v: Any): Unit = {
     record.put(i, v)
   }
 
   override def get(i: Int): AnyRef = {
-    sqlRow.get(i, sqlType(i).dataType)
+    sqlRow.get(i, getSqlType(getSchema)(i).dataType)
   }
 
   override def getSchema: Schema = record.getSchema
 }
+
+object SqlTypedRecord {
+
+  private val sqlTypeCache = CacheBuilder.newBuilder().maximumSize(16).build[Schema, StructType]()

Review Comment:
   16 just the guava cache default size. In batch model, I think only one source and target schema will be cache, but considering streaming or spark thrift server( or livy kyuubi), the target or source schema will diff. For long process task, may be someone can give a appropriate suggestion for it
   
   >I also met this issue recently, but I'm not sure it's a good idea if we don't have a way to invalidate the schema if we don't use it anymore.
   
   that's why I use LRU cache to cache it, just for long process task. And we also can set some expiration strategy if its necessary, but I think LRU cache will enough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org