You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Kenneth Knowles (JIRA)" <ji...@apache.org> on 2017/01/20 05:35:27 UTC

[jira] [Created] (BEAM-1287) Give new DoFn the ability to output to a particular window

Kenneth Knowles created BEAM-1287:
-------------------------------------

             Summary: Give new DoFn the ability to output to a particular window
                 Key: BEAM-1287
                 URL: https://issues.apache.org/jira/browse/BEAM-1287
             Project: Beam
          Issue Type: New Feature
          Components: beam-model, sdk-java-core
            Reporter: Kenneth Knowles
            Assignee: Kenneth Knowles


The new {{DoFn}} design allows us to have specialized output receivers, such as a key-preserving output (the default is non-key-preserving) or non-window-preserving (the default is window-preserving) output. This JIRA is for the latter, with an emphasis on making the two as analogous as we can.

{code}
new DoFn<A, B>() {
  @ProcessElement
  public void processElement(ProcessContext c, OutputToWindow receiver) {
    receiver.outputWithTimestamp(value, timestamp, window);
  }
}
{code}

After this change, window assignment need not be a primitive.

Why is this OK? The primary motivation for keeping windows strongly separated is because they yield parallelism if we don't impose any requirement that multiple windows for a single key be co-located or linearized. We should be able to process a single key with millions of non-merging windows in parallel without having to reify the windows (though this isn't _that_ bad). That is a major change/improvement over the vague assumption that keys are the atom of parallelism.

This change will not remove this property, as it pertains to input and state. The analogy with keys:

 - Stateful DoFn requires the ability to access key-and-window state. For some runners, perhaps this does not require colocation. For runners that want to do this efficiently/locally, it means some key-and-window colocation operation followed by only key-and-window preserving transforms. So outputting to a new window breaks the invariant, just as a non-key-preserving transform would. Until we had the new {{DoFn}} we couldn't know if non-window-preserving output was used.

 - Non-key-preserving output also breaks any idea that combined aggregates are actually one per key, etc. So windows can work the same way.

 - Timestamps are interesting. By analogy with keys, timestamps would be just part of the value and able to change freely. This doesn't work so well because of lateness. To avoid digging deeper into changing anything, this proposal just suggests that a timestamp is provided, and whether it is allowed to be late is governed by the same rules as {{outputWithTimestamp}}.

 - Not clear if this has uses for merging windows.

This change is entirely backwards compatible, but given that it removes a primitive and is rather little effort, it might bear earlier consideration. No work will begin until it is brought to the dev list.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)