You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Anton Kedin (JIRA)" <ji...@apache.org> on 2018/01/10 01:46:02 UTC

[jira] [Commented] (BEAM-3190) [SQL] Join Windowing Semantics

    [ https://issues.apache.org/jira/browse/BEAM-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16319536#comment-16319536 ] 

Anton Kedin commented on BEAM-3190:
-----------------------------------

At the moment SQL joins for unbounded inputs happen per-pane. Which means that for discarding mode we're joining deltas, and for accumulating mode we're joining full panes each time trigger fires, resulting in duplicates. This means it becomes hard to reason about correctness of joins results.

There are few ways to extend joins implementation to mitigate this:
# accumulation. We accumulate inputs in a persistent state and join them at each trigger firing:
#* this enables us to join whole streams. At each trigger firing we iterate over buffered inputs and emit newly matching records:
#** we need to control state/buffer expiration, so that we don't accumulate PCollections forever:
#*** need to configure timeouts when to clear the state. Probably separately per input PCollection;
#*** if we know there's a 1-1 or 1-many relation, then we can purge the state for one input after the first match;
#*** in case of multiple joins it is unclear how to configure this per-join. Currently multiple joins are converted into a tree of nested binary joins;
#** correct behavior for outer joins is unclear:
#*** we need to be able to control when we emit result if there is no matching record in one of the inputs;
#*** and we need to be able to control what happens if eventually such matching record does appear;
#* potentially Beam state cells can be used for this, but there are complications:
#** they are partitioned per key per window;
#** meaning it is unclear how to match records across windows;
#** if we have a single global window, then this becomes feasible:
#*** example is [Nexmark Query3|https://github.com/apache/beam/blob/64ff21f35ee2946d38645fb0a51678628e49e62a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java], which does this using GlobalWindows with multiple firings. Join implemented as a custom stateful ParDo;
#*** although can be implemented as a custom ParDo for a specific use case, it is unclear how to correctly generalize this approach;
#* potentially other mechanism can be use for buffering, not Beam state cells. Not investigated at the moment;
# retractions:
#* each stage of the pipeline will issue a retraction if its previous outputs are not valid anymore and need to be revoked:
#** for example GBKs and CoGBKs will retract previous results if new input arrives;
#** this will work automatically and always produce correct results under the hood, if implemented correctly and correct accumulation mode is configured for the pipeline;
#** needs more design work;
# support only specific join modes for which we know the behavior:
#* for example, if we can ensure that joins are executed only once per window:
#** we can guarantee that complete windows contents will be joined once;
#** there are known configurations with such properties which we can explicitly whitelist and reject everything else:
#*** DefaultTrigger with allowedLateness=0 in any accumulation mode;
#*** AfterWatermark.pastEndOfWindow() with allowedLateness=0 in any accumulation mode;
#** unclear how to enforce this for arbitrary windows and triggers;

Next step is to [implement the whitelisted configurations approach|BEAM-3345] until we have retractions.

> [SQL] Join Windowing Semantics
> ------------------------------
>
>                 Key: BEAM-3190
>                 URL: https://issues.apache.org/jira/browse/BEAM-3190
>             Project: Beam
>          Issue Type: Task
>          Components: dsl-sql
>            Reporter: Anton Kedin
>
> Should join implementation reject incorrect windowing strategies?
> Concerns: discarding mode + joins + multiple trigger firings might lead to incorrect results, like missing join/data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)