You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@sedona.apache.org by Trang Nguyen <Tr...@inrix.com> on 2023/01/08 06:14:14 UTC

Propagating user defined attributes to spatial join

Hi,

I'm newbie to Sedona and am running into performance issues using the dataframe api to handle spatial joins with st_intersects.  From the execution plan, I see that this is largely due to  too few partitions getting used for the spatial join.

I've tried to increase the partition size by setting:
sparkSession.conf.set("spark.sql.shuffle.partitions", appConf.getInputPartitions.toString)
sparkSession.conf.set("geospark.join.numpartition", appConf.getInputPartitions.toString)
sparkSession.conf.set("spark.default.parallelism", appConf.getInputPartitions.toString)

The setting changse seem to make minimal difference.

I'm trying now to use convert the dataframes to be joined into spatialRDDs so that I can set the number of partitions for the spatial join.
However, I am running into a different issue when I try to convert back from the spatial joined result into a dataframe because the extra attributes from the original dataframes are not getting propagated through the join.

I am using the Adapter class for the conversion.


val tripRDD = Adapter.toSpatialRdd(trips, "geom", tripColumns)
val rddWithOtherAttributes = tripRDD.rawSpatialRDD.rdd.map[String](f=> {
  f.getUserData.toString
})
tripRDD.analyze()

val poiRDD = Adapter.toSpatialRdd(poiDS.toDF, "geom", poiColumns)
poiRDD.analyze()



tripRDD.spatialPartitioning(GridType.KDBTREE, appConf.getInputPartitions)





val joinRes = JoinQuery.SpatialJoinQueryFlat(tripRDD, poiRDD, usingIndex, spatialPredicate)

val df = Adapter.toDf(joinRes, tripColumns, poiColumns, sparkSession)

df.show







How can I get the original attributes to be propagated as part of the join? I searched the documentation but couldn't find any documentation on this.
By specifying the columns to be carried through in the Adapter.toSpatialRdd, I assume that the attributes would be carried through into the join as well.

Here is the error I am seeing:
va.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of geometry
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else newInstance(class org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS leftgeometry#2424
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, leftgeometry), StringType), true, false, true) AS leftgeometry#2425
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, trip_id), StringType), true, false, true) AS trip_id#2426
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else newInstance(class org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS rightgeometry#2427
               at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1052)
               at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:210)
               at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:193)

Thanks,
Trang

Re: Propagating user defined attributes to spatial join

Posted by Jia Yu <ji...@apache.org>.
On Sat, Jan 7, 2023 at 11:26 PM Jia Yu <ji...@apache.org> wrote:

> Hi Trang,
>
> The slow join performance issue is mostly caused by too few partitions.
>
> The most straightforward way to increase the number of partitions is:
> repartition both input DataFrame right after you load them from disk.
>
> e.g.,
>
> var tripDf = spark.read(XXX)
> tripDf = tripDf.repartition(tripDf.numPartitions * 5)
>
> var poiDf = spark.read(XXX)
> poiDf = poiDf.repartition(poiDf.numPartitions * 5)
>
> Then perform the SQL spatial join
> ====
>
> If you want to use RDD API,
>
> Please read [1] and [2]
>
> [1]
> https://sedona.apache.org/1.3.1-incubating/tutorial/sql/#dataframe-to-spatialrdd
> [2]
> https://sedona.apache.org/1.3.1-incubating/tutorial/sql/#spatialpairrdd-to-dataframe
>
> Thanks,
> Jia
>
>
>
> On Sat, Jan 7, 2023 at 11:14 PM Trang Nguyen <Tr...@inrix.com>
> wrote:
>
>> Hi,
>>
>> I'm newbie to Sedona and am running into performance issues using the
>> dataframe api to handle spatial joins with st_intersects.  From the
>> execution plan, I see that this is largely due to  too few partitions
>> getting used for the spatial join.
>>
>> I've tried to increase the partition size by setting:
>> sparkSession.conf.set("spark.sql.shuffle.partitions",
>> appConf.getInputPartitions.toString)
>> sparkSession.conf.set("geospark.join.numpartition",
>> appConf.getInputPartitions.toString)
>> sparkSession.conf.set("spark.default.parallelism",
>> appConf.getInputPartitions.toString)
>>
>> The setting changse seem to make minimal difference.
>>
>> I'm trying now to use convert the dataframes to be joined into
>> spatialRDDs so that I can set the number of partitions for the spatial join.
>> However, I am running into a different issue when I try to convert back
>> from the spatial joined result into a dataframe because the extra
>> attributes from the original dataframes are not getting propagated through
>> the join.
>>
>> I am using the Adapter class for the conversion.
>>
>>
>> val tripRDD = Adapter.toSpatialRdd(trips, "geom", tripColumns)
>> val rddWithOtherAttributes = tripRDD.rawSpatialRDD.rdd.map[String](f=> {
>>   f.getUserData.toString
>> })
>> tripRDD.analyze()
>>
>> val poiRDD = Adapter.toSpatialRdd(poiDS.toDF, "geom", poiColumns)
>> poiRDD.analyze()
>>
>>
>>
>> tripRDD.spatialPartitioning(GridType.KDBTREE, appConf.getInputPartitions)
>>
>>
>>
>>
>>
>> val joinRes = JoinQuery.SpatialJoinQueryFlat(tripRDD, poiRDD, usingIndex,
>> spatialPredicate)
>>
>> val df = Adapter.toDf(joinRes, tripColumns, poiColumns, sparkSession)
>>
>> df.show
>>
>>
>>
>>
>>
>>
>>
>> How can I get the original attributes to be propagated as part of the
>> join? I searched the documentation but couldn't find any documentation on
>> this.
>> By specifying the columns to be carried through in the
>> Adapter.toSpatialRdd, I assume that the attributes would be carried through
>> into the join as well.
>>
>> Here is the error I am seeing:
>> va.lang.RuntimeException: Error while encoding:
>> java.lang.RuntimeException: java.lang.String is not a valid external type
>> for schema of geometry
>> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt)
>> null else newInstance(class
>> org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS
>> leftgeometry#2424
>> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt)
>> null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
>> StringType, fromString,
>> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
>> org.apache.spark.sql.Row, true]), 1, leftgeometry), StringType), true,
>> false, true) AS leftgeometry#2425
>> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt)
>> null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
>> StringType, fromString,
>> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
>> org.apache.spark.sql.Row, true]), 2, trip_id), StringType), true, false,
>> true) AS trip_id#2426
>> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt)
>> null else newInstance(class
>> org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS
>> rightgeometry#2427
>>                at
>> org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1052)
>>                at
>> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:210)
>>                at
>> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:193)
>>
>> Thanks,
>> Trang
>>
>

RE: Propagating user defined attributes to spatial join

Posted by Trang Nguyen <Tr...@inrix.com>.
Hi Jia,

Thanks again for your advice. I am still running into performance issues and pursuing a slightly different approach of converting points into a MULTIPOINT geometry instead in order to reduce the number of spatial joins.
This is still running very slowly using the dataframe api, so I am trying with spatial RDDs.

I am running into a parsing issue on a nested array defined as:

  val joinSchema = StructType(Seq(
    StructField("leftgeometry", StringType, true),
    StructField("trip_id", StringType, true),
    StructField("probe_id", StringType, true),
    StructField("provider_id", StringType, true),
    StructField("mode", IntegerType, true),

    ...
    StructField("points", ArrayType(StructType(Seq(
      StructField("capture_time", StringType, true),
      StructField("lat", DoubleType, true),
      StructField("lon", DoubleType, true),
      StructField("raw_speed", DoubleType, true)
    )), true), true),
    StructField("rightgeometry",
...

|-- points: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- capture_time: string (nullable = true)
 |    |    |-- lat: double (nullable = true)
 |    |    |-- lon: double (nullable = true)
 |    |    |-- raw_speed: double (nullable = true)
 |-- rightgeometry: string (nullable = true)

WrappedArray([2022-03-03 23:19:32.000,40.971663,-124.10638,NaN], [2022-03-03 23:27:31.000,40.972371,-124.104135,107.568], [2022-03-03 23:28:40.000,41.002168,-124.143558,325.990692] 

Error:


023-01-10 22:39:33 ERROR Executor:94 - Exception in task 0.0 in stage 10.0 (TID 13)
scala.MatchError: ArrayType(StructType(StructField(capture_time,StringType,true), StructField(lat,DoubleType,true), StructField(lon,DoubleType,true), StructField(raw_speed,DoubleType,true)),true) (of class org.apache.spark.sql.types.ArrayType)
	at org.apache.sedona.sql.utils.Adapter$.parseString(Adapter.scala:301)
	at org.apache.sedona.sql.utils.Adapter$.$anonfun$castRowToSchema$1(Adapter.scala:271)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:273)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
	at scala.collection.immutable.List.map(List.scala:298)
	at org.apache.sedona.sql.utils.Adapter$.castRowToSchema(Adapter.scala:268)
	at org.apache.sedona.sql.utils.Adapter$.$anonfun$toDf$7(Adapter.scala:189)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	
Does Sedona support the parsing of nested arrays? Also is there planned support for maps?

Thanks,
Trang

-----Original Message-----
From: Jia Yu <ji...@apache.org> 
Sent: Sunday, January 8, 2023 10:21 PM
To: dev@sedona.apache.org
Subject: Re: Propagating user defined attributes to spatial join

Use Good Judgement: This email originated outside of INRIX Do not click on links or open attachments unless you recognize the sender and know the content is safe.

Hi Trang,

1. Since Sedona 1.3.1-incubating, you can add user defined struct type in the join result (pair RDD) to recover the original types. See:
https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsedona.apache.org%2F1.3.1-incubating%2Ftutorial%2Fsql%2F%23spatialpairrdd-to-dataframe&data=05%7C01%7CTrang.Nguyen%40inrix.com%7Cde1832975e5644e589ba08daf209b715%7C6ad2e4da8c924e588877ed06b8918379%7C0%7C0%7C638088420806028486%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=OH1JqGsxCD1U29T4i%2FJU15%2FWFBcTI8UxIaNiQmtRIrk%3D&reserved=0

Example 1:

val schema = StructType(Array( StructField("county", GeometryUDT, nullable = true), StructField("name", StringType, nullable = true), StructField("price"
, DoubleType, nullable = true), StructField("age", IntegerType, nullable =
true) )) val spatialDf = Adapter.toDf(spatialRDD, schema, sparkSession)


Example 2:

val schema = StructType(Array(
  StructField("leftGeometry", GeometryUDT, nullable = true),
  StructField("name", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("age", IntegerType, nullable = true),
  StructField("rightGeometry", GeometryUDT, nullable = true),
  StructField("category", StringType, nullable = true)
))
val joinResultDf = Adapter.toDf(joinResultPairRDD, schema, sparkSession)


2. For DataFrame SQL join,

(1) since you are using ST_Intersects, please try to swap two operands in ST_Intersects. E.g., ST_Intersects (Trips, Points) to ST_Intersects(Points, Trips). This does not affect the correctness of the join result.
(2) Your query has lots of filters on non-spatial attributes and this can actually reduce the join query overhead. Usually Sedona should be able to do filter pushdown then run the join. But if the query plan does not work that way (check it by printing joinDf.explain()), you can force Spark to it:

Run all the filters in SQL query 1, cache the result. Then run the spatial join in SQL query 2.

For example,

tripDf = spark.sql(SELECT XXX FROM XXX WHERE trip_distance_m < 1000.0 and provider_id in ('1000')).cache()
tripDf.createOrReplaceTempView("tripDf")
joinDf = spark.sql(SELECT XXX FROM tripDf, pointDf WHERE ST_Intersects(trip, points))

Thanks,
Jia


On Sun, Jan 8, 2023 at 1:03 PM Trang Nguyen <Tr...@inrix.com> wrote:

> Hi Jia,
>
>
>
> Thanks a lot for the hints. Reversing the left and right field names 
> did take care of the issue.
>
> However all the datatypes for the original fields were converted into 
> StringType. Is there a way to preserve the original type mapping as well?
>
>
>
> 0 = {StructField@21788} StructField(leftgeometry,GeometryUDT,true)
>
> 1 = {StructField@21789} StructField(poi_id,StringType,true)
>
> 2 = {StructField@21790} StructField(poi_parent_id,StringType,true)
>
> 3 = {StructField@21791} StructField(lat,StringType,true)
>
> 4 = {StructField@21792} StructField(lon,StringType,true)
>
> ....
>
> 22 = {StructField@21810} StructField(probe_id,StringType,true)
>
> 23 = {StructField@21811} StructField(trip_distance_m,StringType,true)
>
> 24 = {StructField@21812} StructField(trip_id,StringType,true)
>
> 25 = {StructField@21813} StructField(capture_time,StringType,true)
>
>
>
> --
>
>
>
> For the dataframe based implementation, you are right that 
> repartitioning the source dataframes did increase the spatial partition.
>
> However, the performance is incredibly slow. It is taking over 2 hours 
> to perform a spatial st_intersects join between two tables (poi, 9GB 
> and exploded points, 50GB).
>
> The query I am running is:
>
>           with explodedPoints
>
>           as
>
>           (select is_moving, end_time, start_time, provider_id, mode, 
> probe_id, trip_distance_m, trip_id, size(points) as point_count,
> explode(points) as point from t)
>
>           select distinct is_moving, end_time, start_time, 
> provider_id, mode, probe_id, trip_distance_m, trip_id, point.lat, 
> point.lon, point.capture_time , poi.poi_id, poi.qk, poi.poi_parent_id, 
> poi.lat as poi_lat, poi.lon as poi_lon, poi.owned, poi.fleetcentric,
> to_date(opened_on) as opened_on, to_date(opened_on) as closed_on  from 
> explodedPoints
>
>            join poi where trip_distance_m < 1000.0 and provider_id in
> ('1000') and st_intersects(geom, ST_Point(point.lon, point.lat)) and 
> point_count >= 2 and point_count <= 500
>
>
>
>
>
> DAG:
>
>
>
>
>
>
>
>
>
> Do you see any way to speed up the performance?
>
>
> Thanks,
> Trang
>
> -----Original Message-----
> From: Jia Yu <ji...@gmail.com>
> Sent: Saturday, January 7, 2023 10:38 PM
> To: dev@sedona.apache.org
> Subject: Re: Propagating user defined attributes to spatial join
>
>
>
> Use Good Judgement: This email originated outside of INRIX Do not 
> click on links or open attachments unless you recognize the sender and 
> know the content is safe.
>
>
>
> Hi Trang,
>
>
>
> 1. For Sedona SQL join, you usually don't need to set other parameters 
> via conf. The repartition will simply work. The new num partitions 
> might be preserved in the final result and it is something Sedona 
> tried to achieve but not guaranteed.
>
> 2. For RDD Join, as shown in the two links I gave you:
>
>
>
> var spatialRDD = Adapter.toSpatialRdd(spatialDf, "usacounty")
>
>
>
> This will bring field names to SpatialRDD (saved in 
> SpatialRDD.fieldNames attribute).
>
>
>
> When you finish your spatial join on two SpatialRDD and get joinResult 
> (which is a pairRdd), run.
>
>
>
> import scala.collection.JavaConversions._
>
> var joinResultDf = Adapter.toDf(joinResultPairRDD, leftRdd.fieldNames, 
> rightRdd.fieldNames, sparkSession)
>
>
>
> This will give the dataframe with the original column names. If it 
> doesn't work, try to swap the fieldNames: 
> Adapter.toDf(joinResultPairRDD, rightRdd.fieldNames, 
> leftRdd.fieldNames, sparkSession)
>
>
>
>
>
> On Sat, Jan 7, 2023 at 11:30 PM Trang Nguyen <Tr...@inrix.com>
> wrote:
>
>
>
> > Hi Jia,
>
> >
>
> > Thanks for the quick response. I also tried to repartition trips as
>
> > below to different values but never see the count I specified during
>
> > the spatial join itself.
>
> > Not exactly sure why but I am trying now against the spatial RDDs
> directly.
>
> > Is there a way to propagate the custom fields across?
>
> >
>
> > Thanks
>
> > Trang
>
> >
>
> > -----Original Message-----
>
> > From: Jia Yu <ji...@apache.org>
>
> > Sent: Saturday, January 7, 2023 10:26 PM
>
> > To: dev@sedona.apache.org
>
> > Subject: Re: Propagating user defined attributes to spatial join
>
> >
>
> > Use Good Judgement: This email originated outside of INRIX Do not
>
> > click on links or open attachments unless you recognize the sender 
> > and
>
> > know the content is safe.
>
> >
>
> > Hi Trang,
>
> >
>
> > The slow join performance issue is mostly caused by too few partitions.
>
> >
>
> > The most straightforward way to increase the number of partitions is:
>
> > repartition both input DataFrame right after you load them from disk.
>
> >
>
> > e.g.,
>
> >
>
> > var tripDf = spark.read(XXX)
>
> > tripDf = tripDf.repartition(tripDf.numPartitions * 5)
>
> >
>
> > var poiDf = spark.read(XXX)
>
> > poiDf = poiDf.repartition(poiDf.numPartitions * 5)
>
> >
>
> > Then perform the SQL spatial join
>
> > ====
>
> >
>
> > If you want to use RDD API,
>
> >
>
> > Please read [1] and [2]
>
> >
>
> > [1]
>
> >
>
> > https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fse
> > do%2F&data=05%7C01%7CTrang.Nguyen%40inrix.com%7Cde1832975e5644e589ba
> > 08daf209b715%7C6ad2e4da8c924e588877ed06b8918379%7C0%7C0%7C6380884208
> > 06028486%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzI
> > iLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=aZgP2BVZZ6DVRO
> > FEnGangkJGSJ1LH6VOf5H1hYxuEv4%3D&reserved=0
>
> > na.apache.org%2F1.3.1-incubating%2Ftutorial%2Fsql%2F%23dataframe-to-
> > sp
>
> > atialrdd&data=05%7C01%7CTrang.Nguyen%40inrix.com%7C56aca1c2bca3495be
> > 34
>
> > a08daf142ffc1%7C6ad2e4da8c924e588877ed06b8918379%7C0%7C0%7C638087567
> > 31
>
> > 1005763%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIi
> > LC
>
> > JBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=zhJ3muIkE40wFmOTH
> > wd
>
> > mWNlEorBQq5neN9taEFEvw3I%3D&reserved=0
>
> > [2]
>
> >
>
> > https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fse
> > do%2F&data=05%7C01%7CTrang.Nguyen%40inrix.com%7Cde1832975e5644e589ba
> > 08daf209b715%7C6ad2e4da8c924e588877ed06b8918379%7C0%7C0%7C6380884208
> > 06028486%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzI
> > iLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=aZgP2BVZZ6DVRO
> > FEnGangkJGSJ1LH6VOf5H1hYxuEv4%3D&reserved=0
>
> > na.apache.org%2F1.3.1-incubating%2Ftutorial%2Fsql%2F%23spatialpairrd
> > d-
>
> > to-dataframe&data=05%7C01%7CTrang.Nguyen%40inrix.com%7C56aca1c2bca34
> > 95
>
> > be34a08daf142ffc1%7C6ad2e4da8c924e588877ed06b8918379%7C0%7C0%7C63808
> > 75
>
> > 67311005763%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2lu
> > Mz
>
> > IiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=SlL8QXYxXU1yd
> > C5
>
> > KFggp2HB7E%2BF0bY0Kw5dmBvwr644%3D&reserved=0
>
> >
>
> > Thanks,
>
> > Jia
>
> >
>
> >
>
> >
>
> > On Sat, Jan 7, 2023 at 11:14 PM Trang Nguyen 
> > <Tr...@inrix.com>
>
> > wrote:
>
> >
>
> > > Hi,
>
> > >
>
> > > I'm newbie to Sedona and am running into performance issues using
>
> > > the dataframe api to handle spatial joins with st_intersects.  
> > > From
>
> > > the execution plan, I see that this is largely due to  too few
>
> > > partitions getting used for the spatial join.
>
> > >
>
> > > I've tried to increase the partition size by setting:
>
> > > sparkSession.conf.set("spark.sql.shuffle.partitions",
>
> > > appConf.getInputPartitions.toString)
>
> > > sparkSession.conf.set("geospark.join.numpartition",
>
> > > appConf.getInputPartitions.toString)
>
> > > sparkSession.conf.set("spark.default.parallelism",
>
> > > appConf.getInputPartitions.toString)
>
> > >
>
> > > The setting changse seem to make minimal difference.
>
> > >
>
> > > I'm trying now to use convert the dataframes to be joined into
>
> > > spatialRDDs so that I can set the number of partitions for the
>
> > > spatial
>
> > join.
>
> > > However, I am running into a different issue when I try to convert
>
> > > back from the spatial joined result into a dataframe because the
>
> > > extra attributes from the original dataframes are not getting
>
> > > propagated through the join.
>
> > >
>
> > > I am using the Adapter class for the conversion.
>
> > >
>
> > >
>
> > > val tripRDD = Adapter.toSpatialRdd(trips, "geom", tripColumns) val
>
> > > rddWithOtherAttributes = tripRDD.rawSpatialRDD.rdd.map[String](f=> 
> > > {
>
> > >   f.getUserData.toString
>
> > > })
>
> > > tripRDD.analyze()
>
> > >
>
> > > val poiRDD = Adapter.toSpatialRdd(poiDS.toDF, "geom", poiColumns)
>
> > > poiRDD.analyze()
>
> > >
>
> > >
>
> > >
>
> > > tripRDD.spatialPartitioning(GridType.KDBTREE,
>
> > > appConf.getInputPartitions)
>
> > >
>
> > >
>
> > >
>
> > >
>
> > >
>
> > > val joinRes = JoinQuery.SpatialJoinQueryFlat(tripRDD, poiRDD,
>
> > > usingIndex,
>
> > > spatialPredicate)
>
> > >
>
> > > val df = Adapter.toDf(joinRes, tripColumns, poiColumns,
>
> > > sparkSession)
>
> > >
>
> > > df.show
>
> > >
>
> > >
>
> > >
>
> > >
>
> > >
>
> > >
>
> > >
>
> > > How can I get the original attributes to be propagated as part of
>
> > > the join? I searched the documentation but couldn't find any
>
> > > documentation on this.
>
> > > By specifying the columns to be carried through in the
>
> > > Adapter.toSpatialRdd, I assume that the attributes would be 
> > > carried
>
> > > through into the join as well.
>
> > >
>
> > > Here is the error I am seeing:
>
> > > va.lang.RuntimeException: Error while encoding:
>
> > > java.lang.RuntimeException: java.lang.String is not a valid 
> > > external
>
> > > type for schema of geometry if (assertnotnull(input[0,
>
> > > org.apache.spark.sql.Row, true]).isNullAt) null else
>
> > > newInstance(class
>
> > > org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS
>
> > > leftgeometry#2424
>
> > > if (assertnotnull(input[0, org.apache.spark.sql.Row,
>
> > > true]).isNullAt) null else staticinvoke(class
>
> > > org.apache.spark.unsafe.types.UTF8String,
>
> > > StringType, fromString,
>
> > > validateexternaltype(getexternalrowfield(assertnotnull(input[0,
>
> > > org.apache.spark.sql.Row, true]), 1, leftgeometry), StringType),
>
> > > true, false, true) AS leftgeometry#2425 if (assertnotnull(input[0,
>
> > > org.apache.spark.sql.Row, true]).isNullAt) null else
>
> > > staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
>
> > > StringType, fromString,
>
> > > validateexternaltype(getexternalrowfield(assertnotnull(input[0,
>
> > > org.apache.spark.sql.Row, true]), 2, trip_id), StringType), true,
>
> > > false,
>
> > > true) AS trip_id#2426
>
> > > if (assertnotnull(input[0, org.apache.spark.sql.Row,
>
> > > true]).isNullAt) null else newInstance(class
>
> > > org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS
>
> > > rightgeometry#2427
>
> > >                at
>
> > >
>
> > org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncoding
> > Er
>
> > ror(QueryExecutionErrors.scala:1052)
>
> > >                at
>
> > >
>
> > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.
> > ap
>
> > ply(ExpressionEncoder.scala:210)
>
> > >                at
>
> > > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.
>
> > > ap
>
> > > ply(ExpressionEncoder.scala:193)
>
> > >
>
> > > Thanks,
>
> > > Trang
>
> > >
>
> >
>

Re: Propagating user defined attributes to spatial join

Posted by Jia Yu <ji...@apache.org>.
Hi Trang,

1. Since Sedona 1.3.1-incubating, you can add user defined struct type in
the join result (pair RDD) to recover the original types. See:
https://sedona.apache.org/1.3.1-incubating/tutorial/sql/#spatialpairrdd-to-dataframe

Example 1:

val schema = StructType(Array( StructField("county", GeometryUDT, nullable =
true), StructField("name", StringType, nullable = true), StructField("price"
, DoubleType, nullable = true), StructField("age", IntegerType, nullable =
true) )) val spatialDf = Adapter.toDf(spatialRDD, schema, sparkSession)


Example 2:

val schema = StructType(Array(
  StructField("leftGeometry", GeometryUDT, nullable = true),
  StructField("name", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("age", IntegerType, nullable = true),
  StructField("rightGeometry", GeometryUDT, nullable = true),
  StructField("category", StringType, nullable = true)
))
val joinResultDf = Adapter.toDf(joinResultPairRDD, schema, sparkSession)


2. For DataFrame SQL join,

(1) since you are using ST_Intersects, please try to swap two operands in
ST_Intersects. E.g., ST_Intersects (Trips, Points) to ST_Intersects(Points,
Trips). This does not affect the correctness of the join result.
(2) Your query has lots of filters on non-spatial attributes and this can
actually reduce the join query overhead. Usually Sedona should be able to
do filter pushdown then run the join. But if the query plan does not work
that way (check it by printing joinDf.explain()), you can force Spark to it:

Run all the filters in SQL query 1, cache the result. Then run the spatial
join in SQL query 2.

For example,

tripDf = spark.sql(SELECT XXX FROM XXX WHERE trip_distance_m < 1000.0 and
provider_id in ('1000')).cache()
tripDf.createOrReplaceTempView("tripDf")
joinDf = spark.sql(SELECT XXX FROM tripDf, pointDf WHERE
ST_Intersects(trip, points))

Thanks,
Jia


On Sun, Jan 8, 2023 at 1:03 PM Trang Nguyen <Tr...@inrix.com> wrote:

> Hi Jia,
>
>
>
> Thanks a lot for the hints. Reversing the left and right field names did
> take care of the issue.
>
> However all the datatypes for the original fields were converted into
> StringType. Is there a way to preserve the original type mapping as well?
>
>
>
> 0 = {StructField@21788} StructField(leftgeometry,GeometryUDT,true)
>
> 1 = {StructField@21789} StructField(poi_id,StringType,true)
>
> 2 = {StructField@21790} StructField(poi_parent_id,StringType,true)
>
> 3 = {StructField@21791} StructField(lat,StringType,true)
>
> 4 = {StructField@21792} StructField(lon,StringType,true)
>
> ....
>
> 22 = {StructField@21810} StructField(probe_id,StringType,true)
>
> 23 = {StructField@21811} StructField(trip_distance_m,StringType,true)
>
> 24 = {StructField@21812} StructField(trip_id,StringType,true)
>
> 25 = {StructField@21813} StructField(capture_time,StringType,true)
>
>
>
> --
>
>
>
> For the dataframe based implementation, you are right that repartitioning
> the source dataframes did increase the spatial partition.
>
> However, the performance is incredibly slow. It is taking over 2 hours to
> perform a spatial st_intersects join between two tables (poi, 9GB and
> exploded points, 50GB).
>
> The query I am running is:
>
>           with explodedPoints
>
>           as
>
>           (select is_moving, end_time, start_time, provider_id, mode,
> probe_id, trip_distance_m, trip_id, size(points) as point_count,
> explode(points) as point from t)
>
>           select distinct is_moving, end_time, start_time, provider_id,
> mode, probe_id, trip_distance_m, trip_id, point.lat, point.lon,
> point.capture_time , poi.poi_id, poi.qk, poi.poi_parent_id, poi.lat as
> poi_lat, poi.lon as poi_lon, poi.owned, poi.fleetcentric,
> to_date(opened_on) as opened_on, to_date(opened_on) as closed_on  from
> explodedPoints
>
>            join poi where trip_distance_m < 1000.0 and provider_id in
> ('1000') and st_intersects(geom, ST_Point(point.lon, point.lat)) and
> point_count >= 2 and point_count <= 500
>
>
>
>
>
> DAG:
>
>
>
>
>
>
>
>
>
> Do you see any way to speed up the performance?
>
>
> Thanks,
> Trang
>
> -----Original Message-----
> From: Jia Yu <ji...@gmail.com>
> Sent: Saturday, January 7, 2023 10:38 PM
> To: dev@sedona.apache.org
> Subject: Re: Propagating user defined attributes to spatial join
>
>
>
> Use Good Judgement: This email originated outside of INRIX Do not click on
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> Hi Trang,
>
>
>
> 1. For Sedona SQL join, you usually don't need to set other parameters via
> conf. The repartition will simply work. The new num partitions might be
> preserved in the final result and it is something Sedona tried to achieve
> but not guaranteed.
>
> 2. For RDD Join, as shown in the two links I gave you:
>
>
>
> var spatialRDD = Adapter.toSpatialRdd(spatialDf, "usacounty")
>
>
>
> This will bring field names to SpatialRDD (saved in SpatialRDD.fieldNames
> attribute).
>
>
>
> When you finish your spatial join on two SpatialRDD and get joinResult
> (which is a pairRdd), run.
>
>
>
> import scala.collection.JavaConversions._
>
> var joinResultDf = Adapter.toDf(joinResultPairRDD, leftRdd.fieldNames,
> rightRdd.fieldNames, sparkSession)
>
>
>
> This will give the dataframe with the original column names. If it doesn't
> work, try to swap the fieldNames: Adapter.toDf(joinResultPairRDD,
> rightRdd.fieldNames, leftRdd.fieldNames, sparkSession)
>
>
>
>
>
> On Sat, Jan 7, 2023 at 11:30 PM Trang Nguyen <Tr...@inrix.com>
> wrote:
>
>
>
> > Hi Jia,
>
> >
>
> > Thanks for the quick response. I also tried to repartition trips as
>
> > below to different values but never see the count I specified during
>
> > the spatial join itself.
>
> > Not exactly sure why but I am trying now against the spatial RDDs
> directly.
>
> > Is there a way to propagate the custom fields across?
>
> >
>
> > Thanks
>
> > Trang
>
> >
>
> > -----Original Message-----
>
> > From: Jia Yu <ji...@apache.org>
>
> > Sent: Saturday, January 7, 2023 10:26 PM
>
> > To: dev@sedona.apache.org
>
> > Subject: Re: Propagating user defined attributes to spatial join
>
> >
>
> > Use Good Judgement: This email originated outside of INRIX Do not
>
> > click on links or open attachments unless you recognize the sender and
>
> > know the content is safe.
>
> >
>
> > Hi Trang,
>
> >
>
> > The slow join performance issue is mostly caused by too few partitions.
>
> >
>
> > The most straightforward way to increase the number of partitions is:
>
> > repartition both input DataFrame right after you load them from disk.
>
> >
>
> > e.g.,
>
> >
>
> > var tripDf = spark.read(XXX)
>
> > tripDf = tripDf.repartition(tripDf.numPartitions * 5)
>
> >
>
> > var poiDf = spark.read(XXX)
>
> > poiDf = poiDf.repartition(poiDf.numPartitions * 5)
>
> >
>
> > Then perform the SQL spatial join
>
> > ====
>
> >
>
> > If you want to use RDD API,
>
> >
>
> > Please read [1] and [2]
>
> >
>
> > [1]
>
> >
>
> > https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsedo
>
> > na.apache.org%2F1.3.1-incubating%2Ftutorial%2Fsql%2F%23dataframe-to-sp
>
> > atialrdd&data=05%7C01%7CTrang.Nguyen%40inrix.com%7C56aca1c2bca3495be34
>
> > a08daf142ffc1%7C6ad2e4da8c924e588877ed06b8918379%7C0%7C0%7C63808756731
>
> > 1005763%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLC
>
> > JBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=zhJ3muIkE40wFmOTHwd
>
> > mWNlEorBQq5neN9taEFEvw3I%3D&reserved=0
>
> > [2]
>
> >
>
> > https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsedo
>
> > na.apache.org%2F1.3.1-incubating%2Ftutorial%2Fsql%2F%23spatialpairrdd-
>
> > to-dataframe&data=05%7C01%7CTrang.Nguyen%40inrix.com%7C56aca1c2bca3495
>
> > be34a08daf142ffc1%7C6ad2e4da8c924e588877ed06b8918379%7C0%7C0%7C6380875
>
> > 67311005763%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMz
>
> > IiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=SlL8QXYxXU1ydC5
>
> > KFggp2HB7E%2BF0bY0Kw5dmBvwr644%3D&reserved=0
>
> >
>
> > Thanks,
>
> > Jia
>
> >
>
> >
>
> >
>
> > On Sat, Jan 7, 2023 at 11:14 PM Trang Nguyen <Tr...@inrix.com>
>
> > wrote:
>
> >
>
> > > Hi,
>
> > >
>
> > > I'm newbie to Sedona and am running into performance issues using
>
> > > the dataframe api to handle spatial joins with st_intersects.  From
>
> > > the execution plan, I see that this is largely due to  too few
>
> > > partitions getting used for the spatial join.
>
> > >
>
> > > I've tried to increase the partition size by setting:
>
> > > sparkSession.conf.set("spark.sql.shuffle.partitions",
>
> > > appConf.getInputPartitions.toString)
>
> > > sparkSession.conf.set("geospark.join.numpartition",
>
> > > appConf.getInputPartitions.toString)
>
> > > sparkSession.conf.set("spark.default.parallelism",
>
> > > appConf.getInputPartitions.toString)
>
> > >
>
> > > The setting changse seem to make minimal difference.
>
> > >
>
> > > I'm trying now to use convert the dataframes to be joined into
>
> > > spatialRDDs so that I can set the number of partitions for the
>
> > > spatial
>
> > join.
>
> > > However, I am running into a different issue when I try to convert
>
> > > back from the spatial joined result into a dataframe because the
>
> > > extra attributes from the original dataframes are not getting
>
> > > propagated through the join.
>
> > >
>
> > > I am using the Adapter class for the conversion.
>
> > >
>
> > >
>
> > > val tripRDD = Adapter.toSpatialRdd(trips, "geom", tripColumns) val
>
> > > rddWithOtherAttributes = tripRDD.rawSpatialRDD.rdd.map[String](f=> {
>
> > >   f.getUserData.toString
>
> > > })
>
> > > tripRDD.analyze()
>
> > >
>
> > > val poiRDD = Adapter.toSpatialRdd(poiDS.toDF, "geom", poiColumns)
>
> > > poiRDD.analyze()
>
> > >
>
> > >
>
> > >
>
> > > tripRDD.spatialPartitioning(GridType.KDBTREE,
>
> > > appConf.getInputPartitions)
>
> > >
>
> > >
>
> > >
>
> > >
>
> > >
>
> > > val joinRes = JoinQuery.SpatialJoinQueryFlat(tripRDD, poiRDD,
>
> > > usingIndex,
>
> > > spatialPredicate)
>
> > >
>
> > > val df = Adapter.toDf(joinRes, tripColumns, poiColumns,
>
> > > sparkSession)
>
> > >
>
> > > df.show
>
> > >
>
> > >
>
> > >
>
> > >
>
> > >
>
> > >
>
> > >
>
> > > How can I get the original attributes to be propagated as part of
>
> > > the join? I searched the documentation but couldn't find any
>
> > > documentation on this.
>
> > > By specifying the columns to be carried through in the
>
> > > Adapter.toSpatialRdd, I assume that the attributes would be carried
>
> > > through into the join as well.
>
> > >
>
> > > Here is the error I am seeing:
>
> > > va.lang.RuntimeException: Error while encoding:
>
> > > java.lang.RuntimeException: java.lang.String is not a valid external
>
> > > type for schema of geometry if (assertnotnull(input[0,
>
> > > org.apache.spark.sql.Row, true]).isNullAt) null else
>
> > > newInstance(class
>
> > > org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS
>
> > > leftgeometry#2424
>
> > > if (assertnotnull(input[0, org.apache.spark.sql.Row,
>
> > > true]).isNullAt) null else staticinvoke(class
>
> > > org.apache.spark.unsafe.types.UTF8String,
>
> > > StringType, fromString,
>
> > > validateexternaltype(getexternalrowfield(assertnotnull(input[0,
>
> > > org.apache.spark.sql.Row, true]), 1, leftgeometry), StringType),
>
> > > true, false, true) AS leftgeometry#2425 if (assertnotnull(input[0,
>
> > > org.apache.spark.sql.Row, true]).isNullAt) null else
>
> > > staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
>
> > > StringType, fromString,
>
> > > validateexternaltype(getexternalrowfield(assertnotnull(input[0,
>
> > > org.apache.spark.sql.Row, true]), 2, trip_id), StringType), true,
>
> > > false,
>
> > > true) AS trip_id#2426
>
> > > if (assertnotnull(input[0, org.apache.spark.sql.Row,
>
> > > true]).isNullAt) null else newInstance(class
>
> > > org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS
>
> > > rightgeometry#2427
>
> > >                at
>
> > >
>
> > org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingEr
>
> > ror(QueryExecutionErrors.scala:1052)
>
> > >                at
>
> > >
>
> > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.ap
>
> > ply(ExpressionEncoder.scala:210)
>
> > >                at
>
> > > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.
>
> > > ap
>
> > > ply(ExpressionEncoder.scala:193)
>
> > >
>
> > > Thanks,
>
> > > Trang
>
> > >
>
> >
>

RE: Propagating user defined attributes to spatial join

Posted by Trang Nguyen <Tr...@inrix.com>.
Hi Jia,



Thanks a lot for the hints. Reversing the left and right field names did take care of the issue.

However all the datatypes for the original fields were converted into StringType. Is there a way to preserve the original type mapping as well?



0 = {StructField@21788} StructField(leftgeometry,GeometryUDT,true)

1 = {StructField@21789} StructField(poi_id,StringType,true)

2 = {StructField@21790} StructField(poi_parent_id,StringType,true)

3 = {StructField@21791} StructField(lat,StringType,true)

4 = {StructField@21792} StructField(lon,StringType,true)

....

22 = {StructField@21810} StructField(probe_id,StringType,true)

23 = {StructField@21811} StructField(trip_distance_m,StringType,true)

24 = {StructField@21812} StructField(trip_id,StringType,true)

25 = {StructField@21813} StructField(capture_time,StringType,true)



--



For the dataframe based implementation, you are right that repartitioning the source dataframes did increase the spatial partition.

However, the performance is incredibly slow. It is taking over 2 hours to perform a spatial st_intersects join between two tables (poi, 9GB and exploded points, 50GB).

The query I am running is:
          with explodedPoints
          as
          (select is_moving, end_time, start_time, provider_id, mode, probe_id, trip_distance_m, trip_id, size(points) as point_count, explode(points) as point from t)
          select distinct is_moving, end_time, start_time, provider_id, mode, probe_id, trip_distance_m, trip_id, point.lat, point.lon, point.capture_time , poi.poi_id, poi.qk, poi.poi_parent_id, poi.lat as poi_lat, poi.lon as poi_lon, poi.owned, poi.fleetcentric, to_date(opened_on) as opened_on, to_date(opened_on) as closed_on  from explodedPoints
           join poi where trip_distance_m < 1000.0 and provider_id in ('1000') and st_intersects(geom, ST_Point(point.lon, point.lat)) and point_count >= 2 and point_count <= 500





DAG:





[cid:image001.png@01D92361.6139FB50]





Do you see any way to speed up the performance?

Thanks,
Trang

-----Original Message-----
From: Jia Yu <ji...@gmail.com>
Sent: Saturday, January 7, 2023 10:38 PM
To: dev@sedona.apache.org
Subject: Re: Propagating user defined attributes to spatial join



Use Good Judgement: This email originated outside of INRIX Do not click on links or open attachments unless you recognize the sender and know the content is safe.



Hi Trang,



1. For Sedona SQL join, you usually don't need to set other parameters via conf. The repartition will simply work. The new num partitions might be preserved in the final result and it is something Sedona tried to achieve but not guaranteed.

2. For RDD Join, as shown in the two links I gave you:



var spatialRDD = Adapter.toSpatialRdd(spatialDf, "usacounty")



This will bring field names to SpatialRDD (saved in SpatialRDD.fieldNames attribute).



When you finish your spatial join on two SpatialRDD and get joinResult (which is a pairRdd), run.



import scala.collection.JavaConversions._

var joinResultDf = Adapter.toDf(joinResultPairRDD, leftRdd.fieldNames, rightRdd.fieldNames, sparkSession)



This will give the dataframe with the original column names. If it doesn't work, try to swap the fieldNames: Adapter.toDf(joinResultPairRDD, rightRdd.fieldNames, leftRdd.fieldNames, sparkSession)





On Sat, Jan 7, 2023 at 11:30 PM Trang Nguyen <Tr...@inrix.com>> wrote:



> Hi Jia,

>

> Thanks for the quick response. I also tried to repartition trips as

> below to different values but never see the count I specified during

> the spatial join itself.

> Not exactly sure why but I am trying now against the spatial RDDs directly.

> Is there a way to propagate the custom fields across?

>

> Thanks

> Trang

>

> -----Original Message-----

> From: Jia Yu <ji...@apache.org>>

> Sent: Saturday, January 7, 2023 10:26 PM

> To: dev@sedona.apache.org<ma...@sedona.apache.org>

> Subject: Re: Propagating user defined attributes to spatial join

>

> Use Good Judgement: This email originated outside of INRIX Do not

> click on links or open attachments unless you recognize the sender and

> know the content is safe.

>

> Hi Trang,

>

> The slow join performance issue is mostly caused by too few partitions.

>

> The most straightforward way to increase the number of partitions is:

> repartition both input DataFrame right after you load them from disk.

>

> e.g.,

>

> var tripDf = spark.read(XXX)

> tripDf = tripDf.repartition(tripDf.numPartitions * 5)

>

> var poiDf = spark.read(XXX)

> poiDf = poiDf.repartition(poiDf.numPartitions * 5)

>

> Then perform the SQL spatial join

> ====

>

> If you want to use RDD API,

>

> Please read [1] and [2]

>

> [1]

>

> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsedo

> na.apache.org%2F1.3.1-incubating%2Ftutorial%2Fsql%2F%23dataframe-to-sp

> atialrdd&data=05%7C01%7CTrang.Nguyen%40inrix.com%7C56aca1c2bca3495be34

> a08daf142ffc1%7C6ad2e4da8c924e588877ed06b8918379%7C0%7C0%7C63808756731

> 1005763%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLC

> JBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=zhJ3muIkE40wFmOTHwd

> mWNlEorBQq5neN9taEFEvw3I%3D&reserved=0

> [2]

>

> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsedo

> na.apache.org%2F1.3.1-incubating%2Ftutorial%2Fsql%2F%23spatialpairrdd-

> to-dataframe&data=05%7C01%7CTrang.Nguyen%40inrix.com%7C56aca1c2bca3495

> be34a08daf142ffc1%7C6ad2e4da8c924e588877ed06b8918379%7C0%7C0%7C6380875

> 67311005763%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMz

> IiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=SlL8QXYxXU1ydC5

> KFggp2HB7E%2BF0bY0Kw5dmBvwr644%3D&reserved=0

>

> Thanks,

> Jia

>

>

>

> On Sat, Jan 7, 2023 at 11:14 PM Trang Nguyen <Tr...@inrix.com>>

> wrote:

>

> > Hi,

> >

> > I'm newbie to Sedona and am running into performance issues using

> > the dataframe api to handle spatial joins with st_intersects.  From

> > the execution plan, I see that this is largely due to  too few

> > partitions getting used for the spatial join.

> >

> > I've tried to increase the partition size by setting:

> > sparkSession.conf.set("spark.sql.shuffle.partitions",

> > appConf.getInputPartitions.toString)

> > sparkSession.conf.set("geospark.join.numpartition",

> > appConf.getInputPartitions.toString)

> > sparkSession.conf.set("spark.default.parallelism",

> > appConf.getInputPartitions.toString)

> >

> > The setting changse seem to make minimal difference.

> >

> > I'm trying now to use convert the dataframes to be joined into

> > spatialRDDs so that I can set the number of partitions for the

> > spatial

> join.

> > However, I am running into a different issue when I try to convert

> > back from the spatial joined result into a dataframe because the

> > extra attributes from the original dataframes are not getting

> > propagated through the join.

> >

> > I am using the Adapter class for the conversion.

> >

> >

> > val tripRDD = Adapter.toSpatialRdd(trips, "geom", tripColumns) val

> > rddWithOtherAttributes = tripRDD.rawSpatialRDD.rdd.map[String](f=> {

> >   f.getUserData.toString

> > })

> > tripRDD.analyze()

> >

> > val poiRDD = Adapter.toSpatialRdd(poiDS.toDF, "geom", poiColumns)

> > poiRDD.analyze()

> >

> >

> >

> > tripRDD.spatialPartitioning(GridType.KDBTREE,

> > appConf.getInputPartitions)

> >

> >

> >

> >

> >

> > val joinRes = JoinQuery.SpatialJoinQueryFlat(tripRDD, poiRDD,

> > usingIndex,

> > spatialPredicate)

> >

> > val df = Adapter.toDf(joinRes, tripColumns, poiColumns,

> > sparkSession)

> >

> > df.show

> >

> >

> >

> >

> >

> >

> >

> > How can I get the original attributes to be propagated as part of

> > the join? I searched the documentation but couldn't find any

> > documentation on this.

> > By specifying the columns to be carried through in the

> > Adapter.toSpatialRdd, I assume that the attributes would be carried

> > through into the join as well.

> >

> > Here is the error I am seeing:

> > va.lang.RuntimeException: Error while encoding:

> > java.lang.RuntimeException: java.lang.String is not a valid external

> > type for schema of geometry if (assertnotnull(input[0,

> > org.apache.spark.sql.Row, true]).isNullAt) null else

> > newInstance(class

> > org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS

> > leftgeometry#2424

> > if (assertnotnull(input[0, org.apache.spark.sql.Row,

> > true]).isNullAt) null else staticinvoke(class

> > org.apache.spark.unsafe.types.UTF8String,

> > StringType, fromString,

> > validateexternaltype(getexternalrowfield(assertnotnull(input[0,

> > org.apache.spark.sql.Row, true]), 1, leftgeometry), StringType),

> > true, false, true) AS leftgeometry#2425 if (assertnotnull(input[0,

> > org.apache.spark.sql.Row, true]).isNullAt) null else

> > staticinvoke(class org.apache.spark.unsafe.types.UTF8String,

> > StringType, fromString,

> > validateexternaltype(getexternalrowfield(assertnotnull(input[0,

> > org.apache.spark.sql.Row, true]), 2, trip_id), StringType), true,

> > false,

> > true) AS trip_id#2426

> > if (assertnotnull(input[0, org.apache.spark.sql.Row,

> > true]).isNullAt) null else newInstance(class

> > org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS

> > rightgeometry#2427

> >                at

> >

> org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingEr

> ror(QueryExecutionErrors.scala:1052)

> >                at

> >

> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.ap

> ply(ExpressionEncoder.scala:210)

> >                at

> > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.

> > ap

> > ply(ExpressionEncoder.scala:193)

> >

> > Thanks,

> > Trang

> >

>

Re: Propagating user defined attributes to spatial join

Posted by Jia Yu <ji...@gmail.com>.
Hi Trang,

1. For Sedona SQL join, you usually don't need to set other parameters via
conf. The repartition will simply work. The new num partitions might be
preserved in the final result and it is something Sedona tried to achieve
but not guaranteed.
2. For RDD Join, as shown in the two links I gave you:

var spatialRDD = Adapter.toSpatialRdd(spatialDf, "usacounty")

This will bring field names to SpatialRDD (saved in SpatialRDD.fieldNames
attribute).

When you finish your spatial join on two SpatialRDD and get joinResult
(which is a pairRdd), run.

import scala.collection.JavaConversions._
var joinResultDf = Adapter.toDf(joinResultPairRDD, leftRdd.fieldNames,
rightRdd.fieldNames, sparkSession)

This will give the dataframe with the original column names. If it doesn't
work, try to swap the fieldNames: Adapter.toDf(joinResultPairRDD,
rightRdd.fieldNames, leftRdd.fieldNames, sparkSession)


On Sat, Jan 7, 2023 at 11:30 PM Trang Nguyen <Tr...@inrix.com> wrote:

> Hi Jia,
>
> Thanks for the quick response. I also tried to repartition trips as below
> to different values but never see the count I specified during the spatial
> join itself.
> Not exactly sure why but I am trying now against the spatial RDDs directly.
> Is there a way to propagate the custom fields across?
>
> Thanks
> Trang
>
> -----Original Message-----
> From: Jia Yu <ji...@apache.org>
> Sent: Saturday, January 7, 2023 10:26 PM
> To: dev@sedona.apache.org
> Subject: Re: Propagating user defined attributes to spatial join
>
> Use Good Judgement: This email originated outside of INRIX Do not click on
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
> Hi Trang,
>
> The slow join performance issue is mostly caused by too few partitions.
>
> The most straightforward way to increase the number of partitions is:
> repartition both input DataFrame right after you load them from disk.
>
> e.g.,
>
> var tripDf = spark.read(XXX)
> tripDf = tripDf.repartition(tripDf.numPartitions * 5)
>
> var poiDf = spark.read(XXX)
> poiDf = poiDf.repartition(poiDf.numPartitions * 5)
>
> Then perform the SQL spatial join
> ====
>
> If you want to use RDD API,
>
> Please read [1] and [2]
>
> [1]
>
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsedona.apache.org%2F1.3.1-incubating%2Ftutorial%2Fsql%2F%23dataframe-to-spatialrdd&data=05%7C01%7CTrang.Nguyen%40inrix.com%7C5af8964fe2b14c55229608daf14142e7%7C6ad2e4da8c924e588877ed06b8918379%7C0%7C0%7C638087559848338745%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=dJmn0XS64LzkJTp%2Fq%2FEsQc%2Fy5hke%2B02HecrZyphTVp4%3D&reserved=0
> [2]
>
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsedona.apache.org%2F1.3.1-incubating%2Ftutorial%2Fsql%2F%23spatialpairrdd-to-dataframe&data=05%7C01%7CTrang.Nguyen%40inrix.com%7C5af8964fe2b14c55229608daf14142e7%7C6ad2e4da8c924e588877ed06b8918379%7C0%7C0%7C638087559848338745%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=8v35stCSvME%2FCB%2FlgzjvAyT7%2BoNcgFoZtu58q5ImO7A%3D&reserved=0
>
> Thanks,
> Jia
>
>
>
> On Sat, Jan 7, 2023 at 11:14 PM Trang Nguyen <Tr...@inrix.com>
> wrote:
>
> > Hi,
> >
> > I'm newbie to Sedona and am running into performance issues using the
> > dataframe api to handle spatial joins with st_intersects.  From the
> > execution plan, I see that this is largely due to  too few partitions
> > getting used for the spatial join.
> >
> > I've tried to increase the partition size by setting:
> > sparkSession.conf.set("spark.sql.shuffle.partitions",
> > appConf.getInputPartitions.toString)
> > sparkSession.conf.set("geospark.join.numpartition",
> > appConf.getInputPartitions.toString)
> > sparkSession.conf.set("spark.default.parallelism",
> > appConf.getInputPartitions.toString)
> >
> > The setting changse seem to make minimal difference.
> >
> > I'm trying now to use convert the dataframes to be joined into
> > spatialRDDs so that I can set the number of partitions for the spatial
> join.
> > However, I am running into a different issue when I try to convert
> > back from the spatial joined result into a dataframe because the extra
> > attributes from the original dataframes are not getting propagated
> > through the join.
> >
> > I am using the Adapter class for the conversion.
> >
> >
> > val tripRDD = Adapter.toSpatialRdd(trips, "geom", tripColumns) val
> > rddWithOtherAttributes = tripRDD.rawSpatialRDD.rdd.map[String](f=> {
> >   f.getUserData.toString
> > })
> > tripRDD.analyze()
> >
> > val poiRDD = Adapter.toSpatialRdd(poiDS.toDF, "geom", poiColumns)
> > poiRDD.analyze()
> >
> >
> >
> > tripRDD.spatialPartitioning(GridType.KDBTREE,
> > appConf.getInputPartitions)
> >
> >
> >
> >
> >
> > val joinRes = JoinQuery.SpatialJoinQueryFlat(tripRDD, poiRDD,
> > usingIndex,
> > spatialPredicate)
> >
> > val df = Adapter.toDf(joinRes, tripColumns, poiColumns, sparkSession)
> >
> > df.show
> >
> >
> >
> >
> >
> >
> >
> > How can I get the original attributes to be propagated as part of the
> > join? I searched the documentation but couldn't find any documentation
> > on this.
> > By specifying the columns to be carried through in the
> > Adapter.toSpatialRdd, I assume that the attributes would be carried
> > through into the join as well.
> >
> > Here is the error I am seeing:
> > va.lang.RuntimeException: Error while encoding:
> > java.lang.RuntimeException: java.lang.String is not a valid external
> > type for schema of geometry if (assertnotnull(input[0,
> > org.apache.spark.sql.Row, true]).isNullAt) null else newInstance(class
> > org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS
> > leftgeometry#2424
> > if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt)
> > null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
> > StringType, fromString,
> > validateexternaltype(getexternalrowfield(assertnotnull(input[0,
> > org.apache.spark.sql.Row, true]), 1, leftgeometry), StringType), true,
> > false, true) AS leftgeometry#2425 if (assertnotnull(input[0,
> > org.apache.spark.sql.Row, true]).isNullAt) null else
> > staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
> > StringType, fromString,
> > validateexternaltype(getexternalrowfield(assertnotnull(input[0,
> > org.apache.spark.sql.Row, true]), 2, trip_id), StringType), true,
> > false,
> > true) AS trip_id#2426
> > if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt)
> > null else newInstance(class
> > org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS
> > rightgeometry#2427
> >                at
> >
> org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1052)
> >                at
> >
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:210)
> >                at
> > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.ap
> > ply(ExpressionEncoder.scala:193)
> >
> > Thanks,
> > Trang
> >
>

RE: Propagating user defined attributes to spatial join

Posted by Trang Nguyen <Tr...@inrix.com>.
Hi Jia,

Thanks for the quick response. I also tried to repartition trips as below to different values but never see the count I specified during the spatial join itself. 
Not exactly sure why but I am trying now against the spatial RDDs directly.
Is there a way to propagate the custom fields across?

Thanks
Trang

-----Original Message-----
From: Jia Yu <ji...@apache.org> 
Sent: Saturday, January 7, 2023 10:26 PM
To: dev@sedona.apache.org
Subject: Re: Propagating user defined attributes to spatial join

Use Good Judgement: This email originated outside of INRIX Do not click on links or open attachments unless you recognize the sender and know the content is safe.

Hi Trang,

The slow join performance issue is mostly caused by too few partitions.

The most straightforward way to increase the number of partitions is:
repartition both input DataFrame right after you load them from disk.

e.g.,

var tripDf = spark.read(XXX)
tripDf = tripDf.repartition(tripDf.numPartitions * 5)

var poiDf = spark.read(XXX)
poiDf = poiDf.repartition(poiDf.numPartitions * 5)

Then perform the SQL spatial join
====

If you want to use RDD API,

Please read [1] and [2]

[1]
https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsedona.apache.org%2F1.3.1-incubating%2Ftutorial%2Fsql%2F%23dataframe-to-spatialrdd&data=05%7C01%7CTrang.Nguyen%40inrix.com%7C5af8964fe2b14c55229608daf14142e7%7C6ad2e4da8c924e588877ed06b8918379%7C0%7C0%7C638087559848338745%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=dJmn0XS64LzkJTp%2Fq%2FEsQc%2Fy5hke%2B02HecrZyphTVp4%3D&reserved=0
[2]
https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsedona.apache.org%2F1.3.1-incubating%2Ftutorial%2Fsql%2F%23spatialpairrdd-to-dataframe&data=05%7C01%7CTrang.Nguyen%40inrix.com%7C5af8964fe2b14c55229608daf14142e7%7C6ad2e4da8c924e588877ed06b8918379%7C0%7C0%7C638087559848338745%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=8v35stCSvME%2FCB%2FlgzjvAyT7%2BoNcgFoZtu58q5ImO7A%3D&reserved=0

Thanks,
Jia



On Sat, Jan 7, 2023 at 11:14 PM Trang Nguyen <Tr...@inrix.com> wrote:

> Hi,
>
> I'm newbie to Sedona and am running into performance issues using the 
> dataframe api to handle spatial joins with st_intersects.  From the 
> execution plan, I see that this is largely due to  too few partitions 
> getting used for the spatial join.
>
> I've tried to increase the partition size by setting:
> sparkSession.conf.set("spark.sql.shuffle.partitions",
> appConf.getInputPartitions.toString)
> sparkSession.conf.set("geospark.join.numpartition",
> appConf.getInputPartitions.toString)
> sparkSession.conf.set("spark.default.parallelism",
> appConf.getInputPartitions.toString)
>
> The setting changse seem to make minimal difference.
>
> I'm trying now to use convert the dataframes to be joined into 
> spatialRDDs so that I can set the number of partitions for the spatial join.
> However, I am running into a different issue when I try to convert 
> back from the spatial joined result into a dataframe because the extra 
> attributes from the original dataframes are not getting propagated 
> through the join.
>
> I am using the Adapter class for the conversion.
>
>
> val tripRDD = Adapter.toSpatialRdd(trips, "geom", tripColumns) val 
> rddWithOtherAttributes = tripRDD.rawSpatialRDD.rdd.map[String](f=> {
>   f.getUserData.toString
> })
> tripRDD.analyze()
>
> val poiRDD = Adapter.toSpatialRdd(poiDS.toDF, "geom", poiColumns)
> poiRDD.analyze()
>
>
>
> tripRDD.spatialPartitioning(GridType.KDBTREE, 
> appConf.getInputPartitions)
>
>
>
>
>
> val joinRes = JoinQuery.SpatialJoinQueryFlat(tripRDD, poiRDD, 
> usingIndex,
> spatialPredicate)
>
> val df = Adapter.toDf(joinRes, tripColumns, poiColumns, sparkSession)
>
> df.show
>
>
>
>
>
>
>
> How can I get the original attributes to be propagated as part of the 
> join? I searched the documentation but couldn't find any documentation 
> on this.
> By specifying the columns to be carried through in the 
> Adapter.toSpatialRdd, I assume that the attributes would be carried 
> through into the join as well.
>
> Here is the error I am seeing:
> va.lang.RuntimeException: Error while encoding:
> java.lang.RuntimeException: java.lang.String is not a valid external 
> type for schema of geometry if (assertnotnull(input[0, 
> org.apache.spark.sql.Row, true]).isNullAt) null else newInstance(class 
> org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS
> leftgeometry#2424
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) 
> null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
> StringType, fromString,
> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
> org.apache.spark.sql.Row, true]), 1, leftgeometry), StringType), true, 
> false, true) AS leftgeometry#2425 if (assertnotnull(input[0, 
> org.apache.spark.sql.Row, true]).isNullAt) null else 
> staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
> StringType, fromString,
> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
> org.apache.spark.sql.Row, true]), 2, trip_id), StringType), true, 
> false,
> true) AS trip_id#2426
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) 
> null else newInstance(class 
> org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS
> rightgeometry#2427
>                at
> org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1052)
>                at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:210)
>                at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.ap
> ply(ExpressionEncoder.scala:193)
>
> Thanks,
> Trang
>

Re: Propagating user defined attributes to spatial join

Posted by Jia Yu <ji...@apache.org>.
Hi Trang,

The slow join performance issue is mostly caused by too few partitions.

The most straightforward way to increase the number of partitions is:
repartition both input DataFrame right after you load them from disk.

e.g.,

var tripDf = spark.read(XXX)
tripDf = tripDf.repartition(tripDf.numPartitions * 5)

var poiDf = spark.read(XXX)
poiDf = poiDf.repartition(poiDf.numPartitions * 5)

Then perform the SQL spatial join
====

If you want to use RDD API,

Please read [1] and [2]

[1]
https://sedona.apache.org/1.3.1-incubating/tutorial/sql/#dataframe-to-spatialrdd
[2]
https://sedona.apache.org/1.3.1-incubating/tutorial/sql/#spatialpairrdd-to-dataframe

Thanks,
Jia



On Sat, Jan 7, 2023 at 11:14 PM Trang Nguyen <Tr...@inrix.com> wrote:

> Hi,
>
> I'm newbie to Sedona and am running into performance issues using the
> dataframe api to handle spatial joins with st_intersects.  From the
> execution plan, I see that this is largely due to  too few partitions
> getting used for the spatial join.
>
> I've tried to increase the partition size by setting:
> sparkSession.conf.set("spark.sql.shuffle.partitions",
> appConf.getInputPartitions.toString)
> sparkSession.conf.set("geospark.join.numpartition",
> appConf.getInputPartitions.toString)
> sparkSession.conf.set("spark.default.parallelism",
> appConf.getInputPartitions.toString)
>
> The setting changse seem to make minimal difference.
>
> I'm trying now to use convert the dataframes to be joined into spatialRDDs
> so that I can set the number of partitions for the spatial join.
> However, I am running into a different issue when I try to convert back
> from the spatial joined result into a dataframe because the extra
> attributes from the original dataframes are not getting propagated through
> the join.
>
> I am using the Adapter class for the conversion.
>
>
> val tripRDD = Adapter.toSpatialRdd(trips, "geom", tripColumns)
> val rddWithOtherAttributes = tripRDD.rawSpatialRDD.rdd.map[String](f=> {
>   f.getUserData.toString
> })
> tripRDD.analyze()
>
> val poiRDD = Adapter.toSpatialRdd(poiDS.toDF, "geom", poiColumns)
> poiRDD.analyze()
>
>
>
> tripRDD.spatialPartitioning(GridType.KDBTREE, appConf.getInputPartitions)
>
>
>
>
>
> val joinRes = JoinQuery.SpatialJoinQueryFlat(tripRDD, poiRDD, usingIndex,
> spatialPredicate)
>
> val df = Adapter.toDf(joinRes, tripColumns, poiColumns, sparkSession)
>
> df.show
>
>
>
>
>
>
>
> How can I get the original attributes to be propagated as part of the
> join? I searched the documentation but couldn't find any documentation on
> this.
> By specifying the columns to be carried through in the
> Adapter.toSpatialRdd, I assume that the attributes would be carried through
> into the join as well.
>
> Here is the error I am seeing:
> va.lang.RuntimeException: Error while encoding:
> java.lang.RuntimeException: java.lang.String is not a valid external type
> for schema of geometry
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null
> else newInstance(class
> org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS
> leftgeometry#2424
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null
> else staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
> StringType, fromString,
> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
> org.apache.spark.sql.Row, true]), 1, leftgeometry), StringType), true,
> false, true) AS leftgeometry#2425
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null
> else staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
> StringType, fromString,
> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
> org.apache.spark.sql.Row, true]), 2, trip_id), StringType), true, false,
> true) AS trip_id#2426
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null
> else newInstance(class
> org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS
> rightgeometry#2427
>                at
> org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1052)
>                at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:210)
>                at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:193)
>
> Thanks,
> Trang
>