You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jungtaek Lim (Jira)" <ji...@apache.org> on 2022/04/01 10:10:00 UTC

[jira] [Resolved] (SPARK-38684) Stream-stream outer join has a possible correctness issue due to weakly read consistent on outer iterators

     [ https://issues.apache.org/jira/browse/SPARK-38684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jungtaek Lim resolved SPARK-38684.
----------------------------------
    Fix Version/s: 3.3.0
                   3.2.2
       Resolution: Fixed

Issue resolved by pull request 36002
[https://github.com/apache/spark/pull/36002]

> Stream-stream outer join has a possible correctness issue due to weakly read consistent on outer iterators
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-38684
>                 URL: https://issues.apache.org/jira/browse/SPARK-38684
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.2.1, 3.3.0
>            Reporter: Jungtaek Lim
>            Assignee: Jungtaek Lim
>            Priority: Blocker
>              Labels: correctness
>             Fix For: 3.3.0, 3.2.2
>
>
> We figured out stream-stream join has the same issue with SPARK-38320 on the appended iterators. Since the root cause is same as SPARK-38320, this is only reproducible with RocksDB state store provider, but even with HDFS backed state store provider, it is not guaranteed by interface contract hence may depend on the JVM vendor, version, etc.
> I can easily construct the scenario of “data loss” in state store.
> Condition follows:
>  * Use stream-stream time interval outer join
>  ** left outer join has an issue on left side, right outer join has an issue on right side, full outer join has an issue on both sides
>  * At batch N, produce row(s) on the problematic side which are non-late
>  * At the same batch (batch N), some row(s) on the problematic side should be evicted by watermark condition
> When the condition is fulfilled, out of sync happens with keyToNumValues between state and the iterator in evict phase. If eviction of the row happens for the grouping key (updating keyToNumValues), the eviction phase “overwrites” keyToNumValues in the state as the value it calculates.
> Given that the eviction phase “do not know” about the new rows (keyToNumValues is out of sync), effectively discarding all rows from the state being added in the batch N.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org