You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mathieu DESPRIEE (JIRA)" <ji...@apache.org> on 2018/01/26 23:29:00 UTC

[jira] [Issue Comment Deleted] (SPARK-23220) broadcast hint not applied in a streaming left anti join

     [ https://issues.apache.org/jira/browse/SPARK-23220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Mathieu DESPRIEE updated SPARK-23220:
-------------------------------------
    Comment: was deleted

(was: [~viirya] working on it. It's actually harder than I thought. I can reproduce it again and again on our EMR cluster with all our archives, but I didn't manage to reproduce it locally with data samples so far.
I suspect something related to run conditions (data size, available memory, actual sinks ...).
Can the transformation of logical plans to physical plan change notably due to such conditions ?)

> broadcast hint not applied in a streaming left anti join
> --------------------------------------------------------
>
>                 Key: SPARK-23220
>                 URL: https://issues.apache.org/jira/browse/SPARK-23220
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.1
>            Reporter: Mathieu DESPRIEE
>            Priority: Major
>         Attachments: Screenshot from 2018-01-25 17-32-45.png
>
>
> We have a structured streaming app doing a left anti-join between a stream, and a static dataframe. This one is quite small (a few 100s of rows), but the query plan by default is a sort merge join.
>   
>  It happens sometimes we need to re-process some historical data, so we feed the same app with a FileSource pointing to our S3 storage with all archives. In that situation, the first mini-batch is quite heavy (several 100'000s of input files), and the time spent in sort-merge join is non-acceptable. Additionally it's highly skewed, so partition sizes are completely uneven, and executors tend to crash with OOMs.
> I tried to switch to a broadcast join, but Spark still applies a sort-merge.
> {noformat}
> ds.join(broadcast(hostnames), Seq("hostname"), "leftanti")
> {noformat}
> !Screenshot from 2018-01-25 17-32-45.png!
> The logical plan is :
> {noformat}
> Project [app_id#5203, <--- snip ---> ... 18 more fields]
> +- Project ...
> <-- snip -->
>          +- Join LeftAnti, (hostname#3584 = hostname#190)
>             :- Project [app_id, ...
> <-- snip -->
>                                        +- StreamingExecutionRelation FileStreamSource[s3://xxxx{/2018/{01,02}/*/*/,/2017/{08,09,10,11,12}/*/*/}], [app_id
>  <--snip--> ... 62 more fields]
>             +- ResolvedHint isBroadcastable=true
>                +- Relation[hostname#190,descr#191] RedshiftRelation("PUBLIC"."hostname_filter")
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org