You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@sedona.apache.org by "Jia Yu (Jira)" <ji...@apache.org> on 2023/01/16 23:14:00 UTC

[jira] [Commented] (SEDONA-233) Incorrect results for several joins in a single stage

    [ https://issues.apache.org/jira/browse/SEDONA-233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17677538#comment-17677538 ] 

Jia Yu commented on SEDONA-233:
-------------------------------

[~umartin] Is there a way to fix this? Maybe a naive solution could be:

use cache to force Spark finish the first join, then re-use cache?

 

E.g., 
joined_df = geo_df.alias("a").join(geo_df.alias("b"), f.expr("st_intersects(a.geom, b.geom)")).cache()

joined_df.union(joined_df).count()

> Incorrect results for several joins in a single stage
> -----------------------------------------------------
>
>                 Key: SEDONA-233
>                 URL: https://issues.apache.org/jira/browse/SEDONA-233
>             Project: Apache Sedona
>          Issue Type: Bug
>            Reporter: Martin Andersson
>            Priority: Major
>         Attachments: image-2023-01-16-17-38-00-132.png
>
>
> Queries with several joins in a single stage leads to warning logs and possibly incorrect results. One way to trigger the error is to use the union operator.
> {code:java}
> joined_df = geo_df.alias("a").join(geo_df.alias("b"), f.expr("st_intersects(a.geom, b.geom)"))
> joined_df.union(joined_df).count()
> {code}
> Logs:
> {code:java}
> 23/01/16 17:22:58 WARN JudgementBase: Didn't find partition extent for this partition: 8
> 23/01/16 17:22:58 WARN JudgementBase: Didn't find partition extent for this partition: 11
> 23/01/16 17:22:58 WARN JudgementBase: Didn't find partition extent for this partition: 12
> ...
> {code}
> Partitioned joins in Sedona assumes that TaskContext.partitionId is the same as the grid id used for partitioning (JudgementBase::initPartition). That isn't true if Spark runs several joins in a single stage.
> In the example above, if 10 partitions are used in each join, Spark will run the two joins in a single stage with 20 tasks. The second join will have partition id 10-19 instead of the expected 0-9. The second join could produce incorrect results. If the partition extent isn't found there is no deduplication. If it maps to the wrong extent it could eliminate rows that shouldn't be eliminated.
> From spark-ui. Two joins in a single stage:
> !image-2023-01-16-17-38-00-132.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)