You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Tathagata Das (JIRA)" <ji...@apache.org> on 2017/09/21 22:43:00 UTC

[jira] [Resolved] (SPARK-22053) Implement stream-stream inner join in Append mode

     [ https://issues.apache.org/jira/browse/SPARK-22053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Tathagata Das resolved SPARK-22053.
-----------------------------------
       Resolution: Fixed
    Fix Version/s: 3.0.0

Issue resolved by pull request 19271
[https://github.com/apache/spark/pull/19271]

> Implement stream-stream inner join in Append mode
> -------------------------------------------------
>
>                 Key: SPARK-22053
>                 URL: https://issues.apache.org/jira/browse/SPARK-22053
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Tathagata Das
>            Assignee: Tathagata Das
>             Fix For: 3.0.0
>
>
> Stream-stream inner join is traditionally implemented using a two-way symmetric hash join. At a high level, we want to do the following.
> 1. For each stream, we maintain the past rows as state in State Store. 
>     - For each joining key, there can be multiple rows that have been received. 
>     - So, we have to effectively maintain a key-to-list-of-values multimap as state for each stream.
> 2. In each batch, for each input row in each stream
>     - Look up the other streams state to see if there are matching rows, and output them if they satisfy the joining condition
>     - Add the input row to corresponding stream’s state.
>     - If the data has a timestamp/window column with watermark, then we will use that to calculate the threshold for keys that are required to buffered for future matches and drop the rest from the state.
> Cleaning up old unnecessary state rows depends completely on whether watermark has been defined and what are join conditions. We definitely want to support state clean up two types of queries that are likely to be common. 
> - Queries to time range conditions - E.g. {{SELECT * FROM leftTable, rightTable ON leftKey = rightKey AND leftTime > rightTime - INTERVAL 8 MINUTES AND leftTime < rightTime + INTERVAL 1 HOUR}}
> - Queries with windows as the matching key - E.g. {{SELECT * FROM leftTable, rightTable ON leftKey = rightKey AND window(leftTime, "1 hour") = window(rightTime, "1 hour")}} (pseudo-SQL)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org