You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Kenneth Knowles (JIRA)" <ji...@apache.org> on 2016/11/08 05:11:58 UTC

[jira] [Comment Edited] (BEAM-912) Range join in Beam

    [ https://issues.apache.org/jira/browse/BEAM-912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15646524#comment-15646524 ] 

Kenneth Knowles edited comment on BEAM-912 at 11/8/16 5:11 AM:
---------------------------------------------------------------

Very good point. My idea does not work for you. Actually the difficult part of your problem isn't really windowing. Windows are how we decide to trigger and evict data when allowed lateness is past - that is why {{BoundedWindow}} only has a maximum timestamp. But your problem is challenging if we just talk about doing a range join in Beam, where only equi-joins on shared keys are in the SDK/model. A naive way to do it is a full cross product and then filter the data that matches. Some prominent big data products still do this, even though it is very inefficient.

In Beam one way we could make this more efficient is with a side input (see https://s.apache.org/beam-side-inputs-1-pager) that supports range queries. Then collection B would be the main input, and collection A would be the side inputs. The windowing of each of them would be just like the {{assignWindows}} above, but with no merging. Then, because side inputs are only eventually consistent, you would want to trigger collection B after some allowed lateness, and then downstream join it with A and whatever elements from A had arrived would be the ones that are joined. In cases where the side input is small and bounded, you may be able to do this in memory or on local disk.

Triggers cannot solve this problem, and they are not designed to do your main computation like this - they only control when results get output (sometimes we say "materialized") from a {{GroupByKey}} or {{Combine}} - see https://s.apache.org/beam-triggers.

The solution you describe is how you might use a stateful {{ParDo}} to implement a per-key range join, though I think the need for a data structure that supports range queries will be the same to support out of order data. Your {{onElement}} becomes {{@ProcessElement}} and {{BagState}} is the same. The state will be partitioned by key and window, so your example would be globally windowed. Using state in your {{DoFn}} is something I am working on right now, so you should follow BEAM-25 for updates on when it is ready for you to use, and check out https://s.apache.org/beam-state.


was (Author: kenn):
Very good point. My idea does not work for you. Actually the difficult part of your problem isn't really windowing. Windows are how we decide to trigger and evict data when allowed lateness is past - that is why {{BoundedWindow}} only has a maximum timestamp. But your problem is challenging if we just talk about doing a range join in Beam, where only equi-joins on shared keys are in the SDK/model. A naive way to do it is a full cross product and then filter the data that matches. Some prominent big data products still do this, even though it is very inefficient.

In Beam one way we could make this more efficient is with a side input (see https://s.apache.org/beam-side-inputs-1-pager) that supports range queries. Then collection B would be the main input, and collection A would be the side inputs. The windowing of each of them would be just like the {{assignWindows}} above, but with no merging. Then, because side inputs are only eventually consistent, you would want to trigger collection B after some allowed lateness, and then downstream join it with A and whatever elements from A had arrived would be the ones that are joined. In cases where the side input is small or bounded, you may be able to do this in memory or on local disk.

Triggers cannot solve this problem, and they are not designed to do your main computation like this - they only control when results get output (sometimes we say "materialized") from a {{GroupByKey}} or {{Combine}} - see https://s.apache.org/beam-triggers.

The solution you describe is how you might use a stateful {{ParDo}} to implement a per-key range join, though I think the need for a data structure that supports range queries will be the same to support out of order data. Your {{onElement}} becomes {{@ProcessElement}} and {{BagState}} is the same. The state will be partitioned by key and window, so your example would be globally windowed. Using state in your {{DoFn}} is something I am working on right now, so you should follow BEAM-25 for updates on when it is ready for you to use, and check out https://s.apache.org/beam-state.

> Range join in Beam
> ------------------
>
>                 Key: BEAM-912
>                 URL: https://issues.apache.org/jira/browse/BEAM-912
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model
>            Reporter: Jingsong Lee
>            Assignee: Kenneth Knowles
>         Attachments: betweenJoin.png
>
>
> 1.We can support some data-driven trigger, so we need expose data in OnElementContext of onElement method. 
> 2.We can support more flexible join, so we need expose buffer tag in TriggerContext, now this buffer tag is in SystemReduceFn.
> for example: SELECT STREAM * FROM Orders AS o JOIN Shipments AS s
> ON o.orderId = s.orderId AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL '1' HOUR;
> link: https://issues.apache.org/jira/browse/BEAM-101



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)