You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Evan Galpin <eg...@apache.org> on 2023/05/25 21:14:51 UTC

[Dataflow][Stateful] Bypass Dataflow Overrides?

Hi all,

I'm running into a scenario where I feel that Dataflow Overrides
(specifically BatchStatefulParDoOverrides.GbkBeforeStatefulParDo ) are
unnecessarily causing a batch pipeline to "pause" throughput since a GBK
needs to have processed all the data in a window before it can output.

Is it strictly required that GbkBeforeStatefulParDo must run before any
stateful DoFn? If not, what failure modes is GbkBeforeStatefulParDo trying
to protect against, and how can it be bypassed/disabled while still using
DataflowRunner?

Thanks,
Evan

Re: [Dataflow][Stateful] Bypass Dataflow Overrides?

Posted by Robert Bradshaw via user <us...@beam.apache.org>.
The key in GroupIntoBatches is actually not semantically meaningful, and
for a batch pipeline the use of state/timers is not needed either. If all
you need to do is batch elements into groups of (at most) N, you can write
a DoFn that collects things in its process method and emits them when the
batch is full (and also in the finish bundle method, though some care needs
to be taken to handle windowing correctly). On the other hand, if you're
trying to limit the parallelism across all workers you'd likely need to
limit the number of concurrently-processed keys (which would require a
grouping of some sort onto a finite number of keys, unless you want to cap
your entire pipeline at a certain number of workers).

On Thu, May 25, 2023 at 2:34 PM Evan Galpin <eg...@apache.org> wrote:

> Understood, thanks for the clarification, I'll need to look more in-depth
> at my pipeline code then.  I'm definitely observing that all steps
> downstream from the Stateful step in my pipeline do not start until steps
> upstream of the Stateful step are fully completed.  The Stateful step is a
> RateLimit[1] transfer which borrows heavily from GroupIntoBatches.
>
> [1] https://gist.github.com/egalpin/162a04b896dc7be1d0899acf17e676b3
>
> On Thu, May 25, 2023 at 2:25 PM Robert Bradshaw via user <
> user@beam.apache.org> wrote:
>
>> The GbkBeforeStatefulParDo is an implementation detail used to send all
>> elements with the same key to the same worker (so that they can share
>> state, which is itself partitioned by worker). This does cause a global
>> barrier in batch pipelines.
>>
>> On Thu, May 25, 2023 at 2:15 PM Evan Galpin <eg...@apache.org> wrote:
>>
>>> Hi all,
>>>
>>> I'm running into a scenario where I feel that Dataflow Overrides
>>> (specifically BatchStatefulParDoOverrides.GbkBeforeStatefulParDo ) are
>>> unnecessarily causing a batch pipeline to "pause" throughput since a GBK
>>> needs to have processed all the data in a window before it can output.
>>>
>>> Is it strictly required that GbkBeforeStatefulParDo must run before any
>>> stateful DoFn? If not, what failure modes is GbkBeforeStatefulParDo trying
>>> to protect against, and how can it be bypassed/disabled while still using
>>> DataflowRunner?
>>>
>>> Thanks,
>>> Evan
>>>
>>

Re: [Dataflow][Stateful] Bypass Dataflow Overrides?

Posted by Evan Galpin <eg...@apache.org>.
Understood, thanks for the clarification, I'll need to look more in-depth
at my pipeline code then.  I'm definitely observing that all steps
downstream from the Stateful step in my pipeline do not start until steps
upstream of the Stateful step are fully completed.  The Stateful step is a
RateLimit[1] transfer which borrows heavily from GroupIntoBatches.

[1] https://gist.github.com/egalpin/162a04b896dc7be1d0899acf17e676b3

On Thu, May 25, 2023 at 2:25 PM Robert Bradshaw via user <
user@beam.apache.org> wrote:

> The GbkBeforeStatefulParDo is an implementation detail used to send all
> elements with the same key to the same worker (so that they can share
> state, which is itself partitioned by worker). This does cause a global
> barrier in batch pipelines.
>
> On Thu, May 25, 2023 at 2:15 PM Evan Galpin <eg...@apache.org> wrote:
>
>> Hi all,
>>
>> I'm running into a scenario where I feel that Dataflow Overrides
>> (specifically BatchStatefulParDoOverrides.GbkBeforeStatefulParDo ) are
>> unnecessarily causing a batch pipeline to "pause" throughput since a GBK
>> needs to have processed all the data in a window before it can output.
>>
>> Is it strictly required that GbkBeforeStatefulParDo must run before any
>> stateful DoFn? If not, what failure modes is GbkBeforeStatefulParDo trying
>> to protect against, and how can it be bypassed/disabled while still using
>> DataflowRunner?
>>
>> Thanks,
>> Evan
>>
>

Re: [Dataflow][Stateful] Bypass Dataflow Overrides?

Posted by Robert Bradshaw via user <us...@beam.apache.org>.
The GbkBeforeStatefulParDo is an implementation detail used to send all
elements with the same key to the same worker (so that they can share
state, which is itself partitioned by worker). This does cause a global
barrier in batch pipelines.

On Thu, May 25, 2023 at 2:15 PM Evan Galpin <eg...@apache.org> wrote:

> Hi all,
>
> I'm running into a scenario where I feel that Dataflow Overrides
> (specifically BatchStatefulParDoOverrides.GbkBeforeStatefulParDo ) are
> unnecessarily causing a batch pipeline to "pause" throughput since a GBK
> needs to have processed all the data in a window before it can output.
>
> Is it strictly required that GbkBeforeStatefulParDo must run before any
> stateful DoFn? If not, what failure modes is GbkBeforeStatefulParDo trying
> to protect against, and how can it be bypassed/disabled while still using
> DataflowRunner?
>
> Thanks,
> Evan
>