You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Tathagata Das (JIRA)" <ji...@apache.org> on 2017/09/18 22:22:00 UTC

[jira] [Updated] (SPARK-22053) Implement stream-stream inner join

     [ https://issues.apache.org/jira/browse/SPARK-22053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Tathagata Das updated SPARK-22053:
----------------------------------
    Description: 
Stream-stream inner join is traditionally implemented using a two-way symmetric hash join. At a high level, we want to do the following.
- For each stream, we maintain the past rows as state in State Store. 
    - For each joining key, there can be multiple rows that have been received. 
    - So, we have to effectively maintain a key-to-list-of-values multimap as state for each stream.
- In each batch, for each input row in each stream
    - Look up the other streams state to see if there are are matching rows, and output them if they satisfy the joining condition
    - Add the input row to corresponding stream’s state.
    - If the data has a timestamp/window column with watermark, then we will use that to calculate the threshold for keys that are required to buffered for future matches, and drop the rest from the state.





  was:
Stream-stream inner join is traditionally implemented using a two-way symmetric hash join. At a high level, we want to do the following.
- For each stream, we maintain the past rows as state in State Store. 
  - For each joining key, there can be multiple rows that have been received. 
  - So, we have to effectively maintain a key-to-list-of-values multimap as state for each stream.
- In each batch, for each input row in each stream
  - Look up the other streams state to see if there are are matching rows, and output them if they satisfy the joining condition
  - Add the input row to corresponding stream’s state.
  - If the data has a timestamp/window column with watermark, then we will use that to calculate the threshold for keys that are required to buffered for future matches, and drop the rest from the state.






> Implement stream-stream inner join
> ----------------------------------
>
>                 Key: SPARK-22053
>                 URL: https://issues.apache.org/jira/browse/SPARK-22053
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Tathagata Das
>            Assignee: Tathagata Das
>
> Stream-stream inner join is traditionally implemented using a two-way symmetric hash join. At a high level, we want to do the following.
> - For each stream, we maintain the past rows as state in State Store. 
>     - For each joining key, there can be multiple rows that have been received. 
>     - So, we have to effectively maintain a key-to-list-of-values multimap as state for each stream.
> - In each batch, for each input row in each stream
>     - Look up the other streams state to see if there are are matching rows, and output them if they satisfy the joining condition
>     - Add the input row to corresponding stream’s state.
>     - If the data has a timestamp/window column with watermark, then we will use that to calculate the threshold for keys that are required to buffered for future matches, and drop the rest from the state.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org