You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/02/09 17:51:22 UTC

[GitHub] [spark] viirya opened a new pull request #16478: [SPARK-7768][SQL] Revise user defined types (UDT)

viirya opened a new pull request #16478: [SPARK-7768][SQL] Revise user defined types (UDT)
URL: https://github.com/apache/spark/pull/16478
 
 
   ## What changes were proposed in this pull request?
   
   This patch is going to revise the current API for User Defined Types (UDTs).
   
   Currently the API of `UserDefinedType` asks developers to implement the method that fills user data into SparkSQL's internal format such as `GenericInternalRow`, `UnsafeArrayData`, etc.
   
   One target of this patch is to simplify the way to write UDTs. Developers can just use Scala types, instead of internal types such as ArrayData, when writing UDTs. `UserDefinedType` will use SparkSQL's encoder to convert user data to internal format.
   
   For example, the following is the serialization method of `MatrixUDT`.
   
   Before this patch:
   
       override def serialize(obj: Matrix): InternalRow = {
         val row = new GenericInternalRow(7)
         obj match {
           case sm: SparseMatrix =>
             row.setByte(0, 0)
             row.setInt(1, sm.numRows)
             row.setInt(2, sm.numCols)
             row.update(3, UnsafeArrayData.fromPrimitiveArray(sm.colPtrs))
             row.update(4, UnsafeArrayData.fromPrimitiveArray(sm.rowIndices))
             row.update(5, UnsafeArrayData.fromPrimitiveArray(sm.values))
             row.setBoolean(6, sm.isTransposed)
   
           case dm: DenseMatrix =>
             row.setByte(0, 1)
             row.setInt(1, dm.numRows)
             row.setInt(2, dm.numCols)
             row.setNullAt(3)
             row.setNullAt(4)
             row.update(5, UnsafeArrayData.fromPrimitiveArray(dm.values))
             row.setBoolean(6, dm.isTransposed)
         }
         row
       }
   
   
   After this patch:
   
       override def writeRow(obj: Matrix): Row = {
         obj match {
           case sm: SparseMatrix =>
             Row(0.toByte, sm.numRows, sm.numCols, sm.colPtrs, sm.rowIndices, sm.values, sm.isTransposed)
   
           case dm: DenseMatrix =>
             Row(1.toByte, dm.numRows, dm.numCols, null, null, dm.values, dm.isTransposed)
         }
       }
   
   Developers now manipulate external row `Row` and Scala types. Encoder will take care of converting the data to SparkSQL's internal format.
   
   
   ### Main API change
   
   In the past, two main methods are needed to be implemented in developers' UDTs which extend `UserDefinedType`.
   
       /** Convert the user type to a SQL datum */
       def serialize(obj: UserType): Any
   
       /** Convert a SQL datum to the user type */
       def deserialize(datum: Any): UserType
   
   Developers put/get the data of user class into/from an internal type of Spark SQL.
   
   Now developers after this patch, are changed to implement two methods which put/get user data into/from an external row of SparkSQL.
   
       /** Convert the object of user type to an external row. Must be implemented in subclasses. */
       def writeRow(obj: UserType): Row
   
       /** Convert the external row to an object of user type. Must be implemented in subclasses. */
       def readRow(row: Row): UserType
   
   ### Benchmark
   
   Ran a benchmark against previous `UserDefinedType`.
   
   Code:
   
       private val random = new Random(100)
       private lazy val pointsRDD = (0 to 1000).map { i =>
         val features = (0 to 100).map { _ =>
           random.nextDouble()
         }
         MyLabeledPoint(i % 10, new UDT.MyDenseVector(features.toArray))
       }.toDF()
   
       test("serialize MyLabeledPoint") {
         val N = 10L << 5
         runBenchmark("serialize MyLabeledPoint", N) {
           pointsRDD.groupBy('label).agg(max('features)).collect()
         }
       }
   
   
   Before this patch:
   
       Java HotSpot(TM) 64-Bit Server VM 1.8.0_102-b14 on Linux 4.4.39-moby
       Intel(R) Core(TM) i7-5557U CPU @ 3.10GHz
       serialize MyLabeledPoint:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
       ------------------------------------------------------------------------------------------------
       serialize MyLabeledPoint wholestage off        271 /  328          0.0      845767.7       1.0X
       serialize MyLabeledPoint wholestage on         131 /  261          0.0      408173.6       2.1X
   
   After this patch:
   
       Java HotSpot(TM) 64-Bit Server VM 1.8.0_102-b14 on Linux 4.4.39-moby
       Intel(R) Core(TM) i7-5557U CPU @ 3.10GHz
       serialize MyLabeledPoint:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
       ------------------------------------------------------------------------------------------------
       serialize MyLabeledPoint wholestage off        221 /  331          0.0      692100.4       1.0X
       serialize MyLabeledPoint wholestage on         125 /  145          0.0      391120.1       1.8X
   
   Basically, after this patch the serialization/deserialization of `UserDefinedType` can compete or be slightly better than previous one.
   
   ## How was this patch tested?
   
   Existing Jenkins tests.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org