You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@sedona.apache.org by "Adam Binford (Jira)" <ji...@apache.org> on 2023/01/07 03:23:00 UTC

[jira] [Commented] (SEDONA-231) Redundant Serde Removal

    [ https://issues.apache.org/jira/browse/SEDONA-231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17655634#comment-17655634 ] 

Adam Binford commented on SEDONA-231:
-------------------------------------

This is an interesting idea! Only comment I might add is instead of using a method that sets a flag, maybe just create an "evalDeserialized" method on the SerdeAware trait that gets called instead of "eval" if the expression matches the trait

> Redundant Serde Removal
> -----------------------
>
>                 Key: SEDONA-231
>                 URL: https://issues.apache.org/jira/browse/SEDONA-231
>             Project: Apache Sedona
>          Issue Type: Improvement
>            Reporter: Doug Dennis
>            Priority: Major
>
> Currently, Geometry objects are deserialized and reserialized during every evaluation of a function on a row in Spark. This amounts to a great deal of redundant serde during query execution. At times, objects are serialized just to be immediately deserialized.
> To demonstrate this in action, I placed print statements in the GeometrySerializer serialize and deserialize methods, the GeometryUDT serialize and deserialize methods, and in the eval methods of several functions. When the following is executed:
>  
> {noformat}
> val columns = Seq("input", "blade")
> val data = Seq(
>   ("GEOMETRYCOLLECTION ( LINESTRING (0 0, 1.5 1.5, 2 2), LINESTRING (3 3, 4.5 4.5, 5 5))", "MULTIPOINT (0.5 0.5, 1 1, 3.5 3.5, 4 4)")
> )
> var df = spark.createDataFrame(data).toDF(columns:_*)     
> println(
>   df.selectExpr("ST_Normalize(ST_Split(ST_GeomFromWKT(input), ST_GeomFromWKT(blade))) AS result").collect()(0).get(0).asInstanceOf[Geometry].toText()
> ){noformat}
> I get the following output:
> {noformat}
>  **org.apache.spark.sql.sedona_sql.expressions.ST_Normalize**
>  **org.apache.spark.sql.sedona_sql.expressions.ST_Split**
>  **org.apache.spark.sql.sedona_sql.expressions.ST_GeomFromWKT**
> Inside GeometrySerializer.serialize
> Inside GeometrySerializer.serialize
> Inside GeometrySerializer.serialize
> Inside GeometrySerializer.deserialize
>  **org.apache.spark.sql.sedona_sql.expressions.ST_GeomFromWKT**
> Inside GeometrySerializer.serialize
> Inside GeometrySerializer.deserialize
> Inside GeometrySerializer.serialize
> Inside GeometrySerializer.deserialize
> Inside GeometrySerializer.serialize
> Inside UDT deserialize.
> Inside GeometrySerializer.deserialize
> MULTILINESTRING ((0 0, 0.5 0.5), (0.5 0.5, 1 1), (1 1, 1.5 1.5, 2 2), (3 3, 3.5 3.5), (3.5 3.5, 4 4), (4 4, 4.5 4.5, 5 5)){noformat}
> To explain what is happening:
>  # ST_Normalize.eval is called.
>  # ST_Normalize.eval calls ST_Split.eval.
>  # ST_Split.eval first calls the ST_GeomFromWKT that had the GEOMETRYCOLLECTION wkt.
>  # ST_GeomFromWKT processes the wkt string and generates a Geometry object.
>  # The Geometry object is passed to GeometrySerializer.serialize. This is the first call to serialize.
>  # This object is a GeometryCollection and the GeometrySerializer uses recursion to handle them so you see two more additional calls to serialize.
>  # The GeometryCollection is then immediately deserialized and returned to ST_Split.
>  # The second ST_GeomFromWKT is called (this one has a MULTIPOINT wkt).
>  # ST_GeomFromWKT processes the WKT and then serializes the geometry.
>  # That geometry is immediately deserialized and returned to ST_Split.
>  # ST_Split performs its operation and then serializes the geometry.
>  # That geometry is then immediately deserialized and returned to ST_Normalize.
>  # ST_Normalize normalizes the geometry object and then serializes it for good.
>  # Then the GeometryUDT.deserialize is called to handle the collect call which of course calls GeometrySerializer.deserialize.
> There are multiple instances here where geometry objects are serialized and then immediately deserialized to be further operated on. That is obviously pretty wasteful.
>  
> I propose eliminating this redundancy through the following steps.
>  * Create a trait called SerdeAware which has a single method called doNotSerializeOutput.
>  * This trait is then added to the InferredUnaryExpression and InferredBinaryExpression abstract classes.
>  * When the doNotSerializeOutput is called on one of the expression classes, a serializeOutput flag is set to false.
>  * That flag is read in the class's eval method.
>  * If the flag is false then the output will not be serialized and if the flag is true then the output does get serialized.
>  * Finally, the buildExtractor method of the InferredTypes object is modified to detect if the input expression is SerdeAware and if it is then the doNotSerializeOutput method is called before calling the input expression's eval method.
> In the test implementation I created I was able to get the following output:
> {noformat}
>  **org.apache.spark.sql.sedona_sql.expressions.ST_Normalize**
>  **org.apache.spark.sql.sedona_sql.expressions.ST_Split**
>  **org.apache.spark.sql.sedona_sql.expressions.ST_GeomFromWKT**
>  **org.apache.spark.sql.sedona_sql.expressions.ST_GeomFromWKT**
> Inside GeometrySerializer.serialize
> Inside UDT deserialize.
> Inside GeometrySerializer.deserialize
> MULTILINESTRING ((0 0, 0.5 0.5), (0.5 0.5, 1 1), (1 1, 1.5 1.5, 2 2), (3 3, 3.5 3.5), (3.5 3.5, 4 4), (4 4, 4.5 4.5, 5 5)){noformat}
> You can see that only a single serialization was called and only at the very end of the computation.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)