You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Dongwon Kim <ea...@gmail.com> on 2020/08/22 18:45:39 UTC

Support of per-key state after windowing

Hi all,

I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded pipeline
looks like below:

> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>   .apply(Window.into(FixedWindows.of(...)))        // (B)

  .apply(Combine.perKey(new MyCombinFn()))  // (C)

  .apply(ParDo.of(new MyDoFn()))                      // (D)


What I want to do is
(1) to group data by key (A) and window (B),
(2) to do some aggregation (C)
(3) to perform the final computation on each group (D)

I've noticed that a ValueState for a particular key is NULL whenever a new
window for the key is arriving, which gives me a feeling that Beam seems to
support only per-key+window state, not per-key state, after windowing.

I usually work with Flink DataStream API and Flink supports both per-key
state and per-key+window state [1].

Does Beam support per-key states, not per-key+window states, after
windowing (D)? If I miss something, please correct me.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction

Best,

Dongwon

Re: Support of per-key state after windowing

Posted by Dongwon Kim <ea...@gmail.com>.
Reuven and Kenneth,

Thanks for the tip!

Now I can get window information without having to modify the type of my
aggregator :-)

Best,

Dongwon

On Mon, Aug 24, 2020 at 3:16 AM Reuven Lax <re...@google.com> wrote:

> Kenn - shouldn't the Reify happen before the rewindow?
>
> On Sun, Aug 23, 2020 at 11:08 AM Kenneth Knowles <ke...@apache.org> wrote:
>
>>
>>
>> On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim <ea...@gmail.com>
>> wrote:
>>
>>> Hi Reuven,
>>>
>>> You and Kenneth are right; I thought GlobalWindows in unbounded streams
>>> need triggers.
>>>
>>> p.apply(WithKeys.of(...).withKeyType(...))           // (A)
>>>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>>   .apply(Window.into(new GlobalWindows()))      // (E)
>>>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>>>
>>>
>>> So just adding (E) blurs windows and makes the state defined in MyDoFn
>>> (D) a per-key state.
>>> Hope I understand you and Kenneth correctly this time.
>>>
>>
>> That is correct. However, I think you may want:
>>
>> p.apply(WithKeys.of(...).withKeyType(...))           // (A)
>>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>   .apply(Window.into(new GlobalWindows()))      // (E)
>>
>>
>>     .apply(Reify.windowsInValue()
>> <https://beam.apache.org/releases/javadoc/2.23.0/index.html?org/apache/beam/sdk/transforms/Reify.html>)
>>                          // (G)
>>
>>
>>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>>
>>
>> This will make the window information from (B) & (C) available to MyDoFn
>> in (D)
>>
>> Kenn
>>
>>
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> You could simply window into GlobalWindows and add a stateful DoFn
>>>> afterwards. No need for the triggering and GroupByKey.
>>>>
>>>> On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim <ea...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Kenneth,
>>>>>
>>>>> According to your suggestion, I modified my pipeline as follows:
>>>>>
>>>>> p.apply(WithKeys.of(...).withKeyType(...))
>>>>>>      // (A)
>>>>>>   .apply(Window.into(FixedWindows.of(...)))
>>>>>>    // (B)
>>>>>>   .apply(Combine.perKey(new MyCombinFn()))                        //
>>>>>> (C)
>>>>>>   .apply(
>>>>>>     Window
>>>>>>       .into(new GlobalWindows())
>>>>>>           // (E1)
>>>>>>       .triggering(
>>>>>>         Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>>>>>>       )
>>>>>>       .accumulatingFiredPanes()
>>>>>>            // (E3)
>>>>>>   )
>>>>>>   .apply(GroupByKey.create())
>>>>>>            // (F)
>>>>>>   .apply(ParDo.of(new MyDoFn()))
>>>>>>       // (D)
>>>>>
>>>>>
>>>>> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can
>>>>> iterate over a list of output records from (C) sharing the same key.
>>>>> This way I can achieve the same effect without having a per-key state
>>>>> at (D).
>>>>>
>>>>> Do I understand your intention correctly?
>>>>> If not, please advise me with some hints on it.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Dongwon
>>>>>
>>>>>
>>>>> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles <ke...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Dongwon,
>>>>>>
>>>>>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim <ea...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>>>>>>> pipeline looks like below:
>>>>>>>
>>>>>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>>>>>>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>>>>>>
>>>>>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>>>>>
>>>>>>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>>>>>>>
>>>>>>>
>>>>>>> What I want to do is
>>>>>>> (1) to group data by key (A) and window (B),
>>>>>>> (2) to do some aggregation (C)
>>>>>>> (3) to perform the final computation on each group (D)
>>>>>>>
>>>>>>> I've noticed that a ValueState for a particular key is NULL whenever
>>>>>>> a new window for the key is arriving, which gives me a feeling that Beam
>>>>>>> seems to support only per-key+window state, not per-key state, after
>>>>>>> windowing.
>>>>>>>
>>>>>>> I usually work with Flink DataStream API and Flink supports both
>>>>>>> per-key state and per-key+window state [1].
>>>>>>>
>>>>>>> Does Beam support per-key states, not per-key+window states, after
>>>>>>> windowing (D)? If I miss something, please correct me.
>>>>>>>
>>>>>>
>>>>>> You understand correctly - Beam does not include per-key state that
>>>>>> crosses window boundaries. If I understand your goal correctly, you can
>>>>>> achieve the same effect by copying the window metadata into the element and
>>>>>> then re-windowing into the global window before (D).
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Dongwon
>>>>>>>
>>>>>>>

Re: Support of per-key state after windowing

Posted by Kenneth Knowles <ke...@apache.org>.
Yes :-)

On Sun, Aug 23, 2020 at 2:16 PM Reuven Lax <re...@google.com> wrote:

> Kenn - shouldn't the Reify happen before the rewindow?
>
> On Sun, Aug 23, 2020 at 11:08 AM Kenneth Knowles <ke...@apache.org> wrote:
>
>>
>>
>> On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim <ea...@gmail.com>
>> wrote:
>>
>>> Hi Reuven,
>>>
>>> You and Kenneth are right; I thought GlobalWindows in unbounded streams
>>> need triggers.
>>>
>>> p.apply(WithKeys.of(...).withKeyType(...))           // (A)
>>>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>>   .apply(Window.into(new GlobalWindows()))      // (E)
>>>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>>>
>>>
>>> So just adding (E) blurs windows and makes the state defined in MyDoFn
>>> (D) a per-key state.
>>> Hope I understand you and Kenneth correctly this time.
>>>
>>
>> That is correct. However, I think you may want:
>>
>> p.apply(WithKeys.of(...).withKeyType(...))           // (A)
>>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>   .apply(Window.into(new GlobalWindows()))      // (E)
>>
>>
>>     .apply(Reify.windowsInValue()
>> <https://beam.apache.org/releases/javadoc/2.23.0/index.html?org/apache/beam/sdk/transforms/Reify.html>)
>>                          // (G)
>>
>>
>>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>>
>>
>> This will make the window information from (B) & (C) available to MyDoFn
>> in (D)
>>
>> Kenn
>>
>>
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> You could simply window into GlobalWindows and add a stateful DoFn
>>>> afterwards. No need for the triggering and GroupByKey.
>>>>
>>>> On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim <ea...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Kenneth,
>>>>>
>>>>> According to your suggestion, I modified my pipeline as follows:
>>>>>
>>>>> p.apply(WithKeys.of(...).withKeyType(...))
>>>>>>      // (A)
>>>>>>   .apply(Window.into(FixedWindows.of(...)))
>>>>>>    // (B)
>>>>>>   .apply(Combine.perKey(new MyCombinFn()))                        //
>>>>>> (C)
>>>>>>   .apply(
>>>>>>     Window
>>>>>>       .into(new GlobalWindows())
>>>>>>           // (E1)
>>>>>>       .triggering(
>>>>>>         Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>>>>>>       )
>>>>>>       .accumulatingFiredPanes()
>>>>>>            // (E3)
>>>>>>   )
>>>>>>   .apply(GroupByKey.create())
>>>>>>            // (F)
>>>>>>   .apply(ParDo.of(new MyDoFn()))
>>>>>>       // (D)
>>>>>
>>>>>
>>>>> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can
>>>>> iterate over a list of output records from (C) sharing the same key.
>>>>> This way I can achieve the same effect without having a per-key state
>>>>> at (D).
>>>>>
>>>>> Do I understand your intention correctly?
>>>>> If not, please advise me with some hints on it.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Dongwon
>>>>>
>>>>>
>>>>> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles <ke...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Dongwon,
>>>>>>
>>>>>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim <ea...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>>>>>>> pipeline looks like below:
>>>>>>>
>>>>>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>>>>>>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>>>>>>
>>>>>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>>>>>
>>>>>>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>>>>>>>
>>>>>>>
>>>>>>> What I want to do is
>>>>>>> (1) to group data by key (A) and window (B),
>>>>>>> (2) to do some aggregation (C)
>>>>>>> (3) to perform the final computation on each group (D)
>>>>>>>
>>>>>>> I've noticed that a ValueState for a particular key is NULL whenever
>>>>>>> a new window for the key is arriving, which gives me a feeling that Beam
>>>>>>> seems to support only per-key+window state, not per-key state, after
>>>>>>> windowing.
>>>>>>>
>>>>>>> I usually work with Flink DataStream API and Flink supports both
>>>>>>> per-key state and per-key+window state [1].
>>>>>>>
>>>>>>> Does Beam support per-key states, not per-key+window states, after
>>>>>>> windowing (D)? If I miss something, please correct me.
>>>>>>>
>>>>>>
>>>>>> You understand correctly - Beam does not include per-key state that
>>>>>> crosses window boundaries. If I understand your goal correctly, you can
>>>>>> achieve the same effect by copying the window metadata into the element and
>>>>>> then re-windowing into the global window before (D).
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Dongwon
>>>>>>>
>>>>>>>

Re: Support of per-key state after windowing

Posted by Reuven Lax <re...@google.com>.
Kenn - shouldn't the Reify happen before the rewindow?

On Sun, Aug 23, 2020 at 11:08 AM Kenneth Knowles <ke...@apache.org> wrote:

>
>
> On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim <ea...@gmail.com> wrote:
>
>> Hi Reuven,
>>
>> You and Kenneth are right; I thought GlobalWindows in unbounded streams
>> need triggers.
>>
>> p.apply(WithKeys.of(...).withKeyType(...))           // (A)
>>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>   .apply(Window.into(new GlobalWindows()))      // (E)
>>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>>
>>
>> So just adding (E) blurs windows and makes the state defined in MyDoFn
>> (D) a per-key state.
>> Hope I understand you and Kenneth correctly this time.
>>
>
> That is correct. However, I think you may want:
>
> p.apply(WithKeys.of(...).withKeyType(...))           // (A)
>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>   .apply(Window.into(new GlobalWindows()))      // (E)
>
>
>     .apply(Reify.windowsInValue()
> <https://beam.apache.org/releases/javadoc/2.23.0/index.html?org/apache/beam/sdk/transforms/Reify.html>)
>                          // (G)
>
>
>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>
>
> This will make the window information from (B) & (C) available to MyDoFn
> in (D)
>
> Kenn
>
>
>>
>> Best,
>>
>> Dongwon
>>
>> On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax <re...@google.com> wrote:
>>
>>> You could simply window into GlobalWindows and add a stateful DoFn
>>> afterwards. No need for the triggering and GroupByKey.
>>>
>>> On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim <ea...@gmail.com>
>>> wrote:
>>>
>>>> Hi Kenneth,
>>>>
>>>> According to your suggestion, I modified my pipeline as follows:
>>>>
>>>> p.apply(WithKeys.of(...).withKeyType(...))
>>>>>    // (A)
>>>>>   .apply(Window.into(FixedWindows.of(...)))
>>>>>    // (B)
>>>>>   .apply(Combine.perKey(new MyCombinFn()))                        //
>>>>> (C)
>>>>>   .apply(
>>>>>     Window
>>>>>       .into(new GlobalWindows())
>>>>>           // (E1)
>>>>>       .triggering(
>>>>>         Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>>>>>       )
>>>>>       .accumulatingFiredPanes()
>>>>>          // (E3)
>>>>>   )
>>>>>   .apply(GroupByKey.create())
>>>>>          // (F)
>>>>>   .apply(ParDo.of(new MyDoFn()))
>>>>>       // (D)
>>>>
>>>>
>>>> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can
>>>> iterate over a list of output records from (C) sharing the same key.
>>>> This way I can achieve the same effect without having a per-key state
>>>> at (D).
>>>>
>>>> Do I understand your intention correctly?
>>>> If not, please advise me with some hints on it.
>>>>
>>>> Thanks,
>>>>
>>>> Dongwon
>>>>
>>>>
>>>> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles <ke...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Dongwon,
>>>>>
>>>>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim <ea...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>>>>>> pipeline looks like below:
>>>>>>
>>>>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>>>>>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>>>>>
>>>>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>>>>
>>>>>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>>>>>>
>>>>>>
>>>>>> What I want to do is
>>>>>> (1) to group data by key (A) and window (B),
>>>>>> (2) to do some aggregation (C)
>>>>>> (3) to perform the final computation on each group (D)
>>>>>>
>>>>>> I've noticed that a ValueState for a particular key is NULL whenever
>>>>>> a new window for the key is arriving, which gives me a feeling that Beam
>>>>>> seems to support only per-key+window state, not per-key state, after
>>>>>> windowing.
>>>>>>
>>>>>> I usually work with Flink DataStream API and Flink supports both
>>>>>> per-key state and per-key+window state [1].
>>>>>>
>>>>>> Does Beam support per-key states, not per-key+window states, after
>>>>>> windowing (D)? If I miss something, please correct me.
>>>>>>
>>>>>
>>>>> You understand correctly - Beam does not include per-key state that
>>>>> crosses window boundaries. If I understand your goal correctly, you can
>>>>> achieve the same effect by copying the window metadata into the element and
>>>>> then re-windowing into the global window before (D).
>>>>>
>>>>> Kenn
>>>>>
>>>>>
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Dongwon
>>>>>>
>>>>>>

Re: Support of per-key state after windowing

Posted by Kenneth Knowles <ke...@apache.org>.
On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim <ea...@gmail.com> wrote:

> Hi Reuven,
>
> You and Kenneth are right; I thought GlobalWindows in unbounded streams
> need triggers.
>
> p.apply(WithKeys.of(...).withKeyType(...))           // (A)
>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>   .apply(Window.into(new GlobalWindows()))      // (E)
>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>
>
> So just adding (E) blurs windows and makes the state defined in MyDoFn (D)
> a per-key state.
> Hope I understand you and Kenneth correctly this time.
>

That is correct. However, I think you may want:

p.apply(WithKeys.of(...).withKeyType(...))           // (A)
>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>   .apply(Window.into(new GlobalWindows()))      // (E)


    .apply(Reify.windowsInValue()
<https://beam.apache.org/releases/javadoc/2.23.0/index.html?org/apache/beam/sdk/transforms/Reify.html>)
                         // (G)


>   .apply(ParDo.of(new MyDoFn()))                      // (D)


This will make the window information from (B) & (C) available to MyDoFn in
(D)

Kenn


>
> Best,
>
> Dongwon
>
> On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax <re...@google.com> wrote:
>
>> You could simply window into GlobalWindows and add a stateful DoFn
>> afterwards. No need for the triggering and GroupByKey.
>>
>> On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim <ea...@gmail.com>
>> wrote:
>>
>>> Hi Kenneth,
>>>
>>> According to your suggestion, I modified my pipeline as follows:
>>>
>>> p.apply(WithKeys.of(...).withKeyType(...))
>>>>    // (A)
>>>>   .apply(Window.into(FixedWindows.of(...)))
>>>>  // (B)
>>>>   .apply(Combine.perKey(new MyCombinFn()))                        // (C)
>>>>   .apply(
>>>>     Window
>>>>       .into(new GlobalWindows())
>>>>         // (E1)
>>>>       .triggering(
>>>>         Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>>>>       )
>>>>       .accumulatingFiredPanes()
>>>>          // (E3)
>>>>   )
>>>>   .apply(GroupByKey.create())
>>>>          // (F)
>>>>   .apply(ParDo.of(new MyDoFn()))
>>>>     // (D)
>>>
>>>
>>> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can
>>> iterate over a list of output records from (C) sharing the same key.
>>> This way I can achieve the same effect without having a per-key state at
>>> (D).
>>>
>>> Do I understand your intention correctly?
>>> If not, please advise me with some hints on it.
>>>
>>> Thanks,
>>>
>>> Dongwon
>>>
>>>
>>> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>>> Hi Dongwon,
>>>>
>>>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim <ea...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>>>>> pipeline looks like below:
>>>>>
>>>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>>>>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>>>>
>>>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>>>
>>>>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>>>>>
>>>>>
>>>>> What I want to do is
>>>>> (1) to group data by key (A) and window (B),
>>>>> (2) to do some aggregation (C)
>>>>> (3) to perform the final computation on each group (D)
>>>>>
>>>>> I've noticed that a ValueState for a particular key is NULL whenever a
>>>>> new window for the key is arriving, which gives me a feeling that Beam
>>>>> seems to support only per-key+window state, not per-key state, after
>>>>> windowing.
>>>>>
>>>>> I usually work with Flink DataStream API and Flink supports both
>>>>> per-key state and per-key+window state [1].
>>>>>
>>>>> Does Beam support per-key states, not per-key+window states, after
>>>>> windowing (D)? If I miss something, please correct me.
>>>>>
>>>>
>>>> You understand correctly - Beam does not include per-key state that
>>>> crosses window boundaries. If I understand your goal correctly, you can
>>>> achieve the same effect by copying the window metadata into the element and
>>>> then re-windowing into the global window before (D).
>>>>
>>>> Kenn
>>>>
>>>>
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>>>>
>>>>> Best,
>>>>>
>>>>> Dongwon
>>>>>
>>>>>

Re: Support of per-key state after windowing

Posted by Dongwon Kim <ea...@gmail.com>.
Hi Reuven,

You and Kenneth are right; I thought GlobalWindows in unbounded streams
need triggers.

p.apply(WithKeys.of(...).withKeyType(...))           // (A)
>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>   .apply(Window.into(new GlobalWindows()))      // (E)
>   .apply(ParDo.of(new MyDoFn()))                      // (D)


So just adding (E) blurs windows and makes the state defined in MyDoFn (D)
a per-key state.
Hope I understand you and Kenneth correctly this time.

Best,

Dongwon

On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax <re...@google.com> wrote:

> You could simply window into GlobalWindows and add a stateful DoFn
> afterwards. No need for the triggering and GroupByKey.
>
> On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim <ea...@gmail.com> wrote:
>
>> Hi Kenneth,
>>
>> According to your suggestion, I modified my pipeline as follows:
>>
>> p.apply(WithKeys.of(...).withKeyType(...))
>>>  // (A)
>>>   .apply(Window.into(FixedWindows.of(...)))
>>>  // (B)
>>>   .apply(Combine.perKey(new MyCombinFn()))                        // (C)
>>>   .apply(
>>>     Window
>>>       .into(new GlobalWindows())
>>>         // (E1)
>>>       .triggering(
>>>         Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>>>       )
>>>       .accumulatingFiredPanes()
>>>        // (E3)
>>>   )
>>>   .apply(GroupByKey.create())
>>>        // (F)
>>>   .apply(ParDo.of(new MyDoFn()))
>>>     // (D)
>>
>>
>> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can iterate
>> over a list of output records from (C) sharing the same key.
>> This way I can achieve the same effect without having a per-key state at
>> (D).
>>
>> Do I understand your intention correctly?
>> If not, please advise me with some hints on it.
>>
>> Thanks,
>>
>> Dongwon
>>
>>
>> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> Hi Dongwon,
>>>
>>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim <ea...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>>>> pipeline looks like below:
>>>>
>>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>>>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>>>
>>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>>
>>>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>>>>
>>>>
>>>> What I want to do is
>>>> (1) to group data by key (A) and window (B),
>>>> (2) to do some aggregation (C)
>>>> (3) to perform the final computation on each group (D)
>>>>
>>>> I've noticed that a ValueState for a particular key is NULL whenever a
>>>> new window for the key is arriving, which gives me a feeling that Beam
>>>> seems to support only per-key+window state, not per-key state, after
>>>> windowing.
>>>>
>>>> I usually work with Flink DataStream API and Flink supports both
>>>> per-key state and per-key+window state [1].
>>>>
>>>> Does Beam support per-key states, not per-key+window states, after
>>>> windowing (D)? If I miss something, please correct me.
>>>>
>>>
>>> You understand correctly - Beam does not include per-key state that
>>> crosses window boundaries. If I understand your goal correctly, you can
>>> achieve the same effect by copying the window metadata into the element and
>>> then re-windowing into the global window before (D).
>>>
>>> Kenn
>>>
>>>
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>>>
>>>> Best,
>>>>
>>>> Dongwon
>>>>
>>>>

Re: Support of per-key state after windowing

Posted by Reuven Lax <re...@google.com>.
You could simply window into GlobalWindows and add a stateful DoFn
afterwards. No need for the triggering and GroupByKey.

On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim <ea...@gmail.com> wrote:

> Hi Kenneth,
>
> According to your suggestion, I modified my pipeline as follows:
>
> p.apply(WithKeys.of(...).withKeyType(...))
>>  // (A)
>>   .apply(Window.into(FixedWindows.of(...)))
>>  // (B)
>>   .apply(Combine.perKey(new MyCombinFn()))                        // (C)
>>   .apply(
>>     Window
>>       .into(new GlobalWindows())
>>       // (E1)
>>       .triggering(
>>         Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>>       )
>>       .accumulatingFiredPanes()
>>        // (E3)
>>   )
>>   .apply(GroupByKey.create())
>>        // (F)
>>   .apply(ParDo.of(new MyDoFn()))
>>   // (D)
>
>
> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can iterate
> over a list of output records from (C) sharing the same key.
> This way I can achieve the same effect without having a per-key state at
> (D).
>
> Do I understand your intention correctly?
> If not, please advise me with some hints on it.
>
> Thanks,
>
> Dongwon
>
>
> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles <ke...@apache.org> wrote:
>
>> Hi Dongwon,
>>
>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim <ea...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>>> pipeline looks like below:
>>>
>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>>
>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>
>>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>>>
>>>
>>> What I want to do is
>>> (1) to group data by key (A) and window (B),
>>> (2) to do some aggregation (C)
>>> (3) to perform the final computation on each group (D)
>>>
>>> I've noticed that a ValueState for a particular key is NULL whenever a
>>> new window for the key is arriving, which gives me a feeling that Beam
>>> seems to support only per-key+window state, not per-key state, after
>>> windowing.
>>>
>>> I usually work with Flink DataStream API and Flink supports both per-key
>>> state and per-key+window state [1].
>>>
>>> Does Beam support per-key states, not per-key+window states, after
>>> windowing (D)? If I miss something, please correct me.
>>>
>>
>> You understand correctly - Beam does not include per-key state that
>> crosses window boundaries. If I understand your goal correctly, you can
>> achieve the same effect by copying the window metadata into the element and
>> then re-windowing into the global window before (D).
>>
>> Kenn
>>
>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>>

Re: Support of per-key state after windowing

Posted by Dongwon Kim <ea...@gmail.com>.
Hi Kenneth,

According to your suggestion, I modified my pipeline as follows:

p.apply(WithKeys.of(...).withKeyType(...))
>  // (A)
>   .apply(Window.into(FixedWindows.of(...)))
>  // (B)
>   .apply(Combine.perKey(new MyCombinFn()))                        // (C)
>   .apply(
>     Window
>       .into(new GlobalWindows())
>       // (E1)
>       .triggering(
>         Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>       )
>       .accumulatingFiredPanes()
>      // (E3)
>   )
>   .apply(GroupByKey.create())
>      // (F)
>   .apply(ParDo.of(new MyDoFn()))
>   // (D)


I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can iterate
over a list of output records from (C) sharing the same key.
This way I can achieve the same effect without having a per-key state at
(D).

Do I understand your intention correctly?
If not, please advise me with some hints on it.

Thanks,

Dongwon


On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles <ke...@apache.org> wrote:

> Hi Dongwon,
>
> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim <ea...@gmail.com> wrote:
>
>> Hi all,
>>
>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>> pipeline looks like below:
>>
>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>
>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>
>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>>
>>
>> What I want to do is
>> (1) to group data by key (A) and window (B),
>> (2) to do some aggregation (C)
>> (3) to perform the final computation on each group (D)
>>
>> I've noticed that a ValueState for a particular key is NULL whenever a
>> new window for the key is arriving, which gives me a feeling that Beam
>> seems to support only per-key+window state, not per-key state, after
>> windowing.
>>
>> I usually work with Flink DataStream API and Flink supports both per-key
>> state and per-key+window state [1].
>>
>> Does Beam support per-key states, not per-key+window states, after
>> windowing (D)? If I miss something, please correct me.
>>
>
> You understand correctly - Beam does not include per-key state that
> crosses window boundaries. If I understand your goal correctly, you can
> achieve the same effect by copying the window metadata into the element and
> then re-windowing into the global window before (D).
>
> Kenn
>
>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>
>> Best,
>>
>> Dongwon
>>
>>

Re: Support of per-key state after windowing

Posted by Kenneth Knowles <ke...@apache.org>.
Hi Dongwon,

On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim <ea...@gmail.com> wrote:

> Hi all,
>
> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded pipeline
> looks like below:
>
>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>
>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>
>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>
>
> What I want to do is
> (1) to group data by key (A) and window (B),
> (2) to do some aggregation (C)
> (3) to perform the final computation on each group (D)
>
> I've noticed that a ValueState for a particular key is NULL whenever a new
> window for the key is arriving, which gives me a feeling that Beam seems to
> support only per-key+window state, not per-key state, after windowing.
>
> I usually work with Flink DataStream API and Flink supports both per-key
> state and per-key+window state [1].
>
> Does Beam support per-key states, not per-key+window states, after
> windowing (D)? If I miss something, please correct me.
>

You understand correctly - Beam does not include per-key state that crosses
window boundaries. If I understand your goal correctly, you can achieve the
same effect by copying the window metadata into the element and then
re-windowing into the global window before (D).

Kenn


>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>
> Best,
>
> Dongwon
>
>