You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Xiao Li (JIRA)" <ji...@apache.org> on 2018/06/21 22:41:00 UTC

[jira] [Resolved] (SPARK-24588) StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from children

     [ https://issues.apache.org/jira/browse/SPARK-24588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Xiao Li resolved SPARK-24588.
-----------------------------
       Resolution: Fixed
    Fix Version/s: 2.4.0
                   2.3.2

> StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from children
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-24588
>                 URL: https://issues.apache.org/jira/browse/SPARK-24588
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.0, 2.3.1
>            Reporter: Wenchen Fan
>            Assignee: Wenchen Fan
>            Priority: Blocker
>              Labels: correctness
>             Fix For: 2.3.2, 2.4.0
>
>
> In [https://github.com/apache/spark/pull/19080], we simplified the distribution/partitioning framework, and make all the join-like operators require HashClusteredPartitioning from children. Unfortunately streaming join operator was missed.
> This can cause wrong result. Think about
> {code:java}
> val input1 = MemoryStream[Int]
> val input2 = MemoryStream[Int]
> val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b)
> val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b)
> val joined = df1.join(df2, Seq("a", "b")).select('a)
> {code}
> The physical plan is
> {code:java}
> *(3) Project [a#5, b#6, c#7, c#14]
> +- StreamingSymmetricHashJoin [a#5, b#6], [a#12, b#13], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = <unknown>, runId = 5a1ab77a-ed5c-4f0b-8bcb-fc5637152b97, opId = 0, ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ]
>    :- Exchange hashpartitioning(a#5, b#6, 5)
>    :  +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6, (value#1 * 3) AS c#7]
>    :     +- StreamingRelation MemoryStream[value#1], [value#1]
>    +- Exchange hashpartitioning(b#13, 5)
>       +- *(2) Project [value#3 AS a#12, (value#3 * 3) AS b#13, (value#3 * 4) AS c#14]
>          +- StreamingRelation MemoryStream[value#3], [value#3]
> {code}
> The left table is hash partitioned by a, b, while the right table is hash partitioned by b. This means, we may have a matching record that is in different partitions, which should be in the output but not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org