You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Kane Knight <ka...@hoxtonanalytics.com> on 2022/05/16 11:04:58 UTC

[Question] - Side inputs in golang sdk

Hi there,

I’m trying out a streaming pipeline which reads events from a pubsub topic and I want to pass a side input to the stage that processes this pubsub event, updating the side input value every 5 minutes using an impulse that kicks off a time.Ticker which emits every 5 minutes.

The streaming side using the pubsubio data source didn’t seem to be working in this configuration as pubsub messages were not being acked. However, this normally works without the side input configuration.

So my question is, are side inputs available to use in the go sdk running on dataflow? Is there a bug or known issue?

Thank you,
Kane Knight

Re: [Question] - Side inputs in golang sdk

Posted by Robert Burke <ro...@frantil.com>.
Ah. And in re-reading your email, that also sounds like a bug we can fix.
Unbounded PCollections are quite new to the Go SDK, PubSub on Dataflow
being the only way to produce them for a long time.  There's bound to be a
few of those .

A glance at the handling code doesn't indicate there should be anything
specific in the pipeline graph for an unbounded side input. We'll
investigate this experience and try to make sure it's working for 2.40.

On Tue, May 17, 2022, 9:33 AM Robert Burke <ro...@frantil.com> wrote:

> Hello Kane!
>
> In chatting with Jack, this won't be possible to implement in the Go SDK
> until 2.40. DoFn self checkpointing only made it into the Go SDK a week or
> two ago. That is required for unbounded PCollections.
> A Go native Periodic Sequence isn't yet implemented at head, but there
> shouldn't be any reason it can't be.
>
> If you're feeling spicy, you could try authoring it yourself by syncing to
> @master, and basing off the java or python variants. [1][2]
>
> An example of a self checkpointing exists in the integration tests. [3]
>
>
> All that said, the reason the code doesn't work is that as far as runners
> are concerned, the gen DoFn doesn't terminate. Therefore the runner can't
> do it's own checkpoint and materialize the PCollections for use in side
> inputs.  It thinks if it can wait long enough, it will see all the data,
> and then can optimize execution on that, because it's a bounded PCollection.
>
> I hope this is useful!
>
> Robert Burke
> Beam Go Busybody
>
>
> [1]:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java
> [2]:
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/periodicsequence.py
> [3]:
> https://github.com/apache/beam/blob/master/sdks/go/test/integration/primitives/checkpointing.go
>
> On Tue, May 17, 2022, 2:32 AM Kane Knight <ka...@hoxtonanalytics.com>
> wrote:
>
>> Hi Jack,
>>
>> Thanks for the quick reply. Having tried to add windowing that does not
>> seem to help. Here is an example pipeline [1].
>>
>> I'm trying to re-create the slowly updating side-input outlined in [2], a
>> common beam pattern.
>>
>> The side-input is created as a bounded node in the graph, and it is not
>> clear how that would affect things. It does not yet [3] seem possible to
>> create unbounded sources, but even if generating ticks externally and
>> reading them from PubSub, thus creating an unbounded side-input, it doesn't
>> seem to work. This implies to me that the bounding is not the issue here,
>> but if you can shine some light on how the bounding affects this situation
>> that would be great.
>>
>> Could it have something to do with support for ViewFn missing [4]?
>>
>> [1] https://gist.github.com/KaneKnight/e549b0eb16c94c315cbcbd7dbfee5ae7
>> [2]
>> https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-side-input-using-windowing
>> [3]
>> https://github.com/apache/beam/blob/master/CHANGES.md#2400---unreleased
>> [4] https://issues.apache.org/jira/browse/BEAM-3305
>>
>> Thanks,
>>
>> Kane Knight
>>
>> On 16 May 2022, at 16:33, Jack McCluskey <jr...@google.com> wrote:
>>
>> Hey Kane,
>>
>> The Go SDK should support side inputs on Dataflow. Are you windowing the
>> side input, or are they in the global window? If they aren't windowed the
>> side input will block until the end of the global window, which would cause
>> issues.
>>
>> Thanks,
>>
>> Jack McCluskey
>>
>> On Mon, May 16, 2022 at 7:05 AM Kane Knight <ka...@hoxtonanalytics.com>
>> wrote:
>>
>>> Hi there,
>>>
>>> I’m trying out a streaming pipeline which reads events from a pubsub
>>> topic and I want to pass a side input to the stage that processes this
>>> pubsub event, updating the side input value every 5 minutes using an
>>> impulse that kicks off a time.Ticker which emits every 5 minutes.
>>>
>>> The streaming side using the pubsubio data source didn’t seem to be
>>> working in this configuration as pubsub messages were not being acked.
>>> However, this normally works without the side input configuration.
>>>
>>> So my question is, are side inputs available to use in the go sdk
>>> running on dataflow? Is there a bug or known issue?
>>>
>>> Thank you,
>>> Kane Knight
>>
>>
>>

Re: [Question] - Side inputs in golang sdk

Posted by Robert Burke <ro...@frantil.com>.
Hello Kane!

In chatting with Jack, this won't be possible to implement in the Go SDK
until 2.40. DoFn self checkpointing only made it into the Go SDK a week or
two ago. That is required for unbounded PCollections.
A Go native Periodic Sequence isn't yet implemented at head, but there
shouldn't be any reason it can't be.

If you're feeling spicy, you could try authoring it yourself by syncing to
@master, and basing off the java or python variants. [1][2]

An example of a self checkpointing exists in the integration tests. [3]


All that said, the reason the code doesn't work is that as far as runners
are concerned, the gen DoFn doesn't terminate. Therefore the runner can't
do it's own checkpoint and materialize the PCollections for use in side
inputs.  It thinks if it can wait long enough, it will see all the data,
and then can optimize execution on that, because it's a bounded PCollection.

I hope this is useful!

Robert Burke
Beam Go Busybody


[1]:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java
[2]:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/periodicsequence.py
[3]:
https://github.com/apache/beam/blob/master/sdks/go/test/integration/primitives/checkpointing.go

On Tue, May 17, 2022, 2:32 AM Kane Knight <ka...@hoxtonanalytics.com> wrote:

> Hi Jack,
>
> Thanks for the quick reply. Having tried to add windowing that does not
> seem to help. Here is an example pipeline [1].
>
> I'm trying to re-create the slowly updating side-input outlined in [2], a
> common beam pattern.
>
> The side-input is created as a bounded node in the graph, and it is not
> clear how that would affect things. It does not yet [3] seem possible to
> create unbounded sources, but even if generating ticks externally and
> reading them from PubSub, thus creating an unbounded side-input, it doesn't
> seem to work. This implies to me that the bounding is not the issue here,
> but if you can shine some light on how the bounding affects this situation
> that would be great.
>
> Could it have something to do with support for ViewFn missing [4]?
>
> [1] https://gist.github.com/KaneKnight/e549b0eb16c94c315cbcbd7dbfee5ae7
> [2]
> https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-side-input-using-windowing
> [3]
> https://github.com/apache/beam/blob/master/CHANGES.md#2400---unreleased
> [4] https://issues.apache.org/jira/browse/BEAM-3305
>
> Thanks,
>
> Kane Knight
>
> On 16 May 2022, at 16:33, Jack McCluskey <jr...@google.com> wrote:
>
> Hey Kane,
>
> The Go SDK should support side inputs on Dataflow. Are you windowing the
> side input, or are they in the global window? If they aren't windowed the
> side input will block until the end of the global window, which would cause
> issues.
>
> Thanks,
>
> Jack McCluskey
>
> On Mon, May 16, 2022 at 7:05 AM Kane Knight <ka...@hoxtonanalytics.com>
> wrote:
>
>> Hi there,
>>
>> I’m trying out a streaming pipeline which reads events from a pubsub
>> topic and I want to pass a side input to the stage that processes this
>> pubsub event, updating the side input value every 5 minutes using an
>> impulse that kicks off a time.Ticker which emits every 5 minutes.
>>
>> The streaming side using the pubsubio data source didn’t seem to be
>> working in this configuration as pubsub messages were not being acked.
>> However, this normally works without the side input configuration.
>>
>> So my question is, are side inputs available to use in the go sdk running
>> on dataflow? Is there a bug or known issue?
>>
>> Thank you,
>> Kane Knight
>
>
>

Re: [Question] - Side inputs in golang sdk

Posted by Kane Knight <ka...@hoxtonanalytics.com>.
Hi Jack,

Thanks for the quick reply. Having tried to add windowing that does not seem to help. Here is an example pipeline [1].

I'm trying to re-create the slowly updating side-input outlined in [2], a common beam pattern.

The side-input is created as a bounded node in the graph, and it is not clear how that would affect things. It does not yet [3] seem possible to create unbounded sources, but even if generating ticks externally and reading them from PubSub, thus creating an unbounded side-input, it doesn't seem to work. This implies to me that the bounding is not the issue here, but if you can shine some light on how the bounding affects this situation that would be great.

Could it have something to do with support for ViewFn missing [4]?

[1] https://gist.github.com/KaneKnight/e549b0eb16c94c315cbcbd7dbfee5ae7
[2] https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-side-input-using-windowing
[3] https://github.com/apache/beam/blob/master/CHANGES.md#2400---unreleased
[4] https://issues.apache.org/jira/browse/BEAM-3305

Thanks,

Kane Knight

On 16 May 2022, at 16:33, Jack McCluskey <jr...@google.com>> wrote:

Hey Kane,

The Go SDK should support side inputs on Dataflow. Are you windowing the side input, or are they in the global window? If they aren't windowed the side input will block until the end of the global window, which would cause issues.

Thanks,

Jack McCluskey

On Mon, May 16, 2022 at 7:05 AM Kane Knight <ka...@hoxtonanalytics.com>> wrote:
Hi there,

I’m trying out a streaming pipeline which reads events from a pubsub topic and I want to pass a side input to the stage that processes this pubsub event, updating the side input value every 5 minutes using an impulse that kicks off a time.Ticker which emits every 5 minutes.

The streaming side using the pubsubio data source didn’t seem to be working in this configuration as pubsub messages were not being acked. However, this normally works without the side input configuration.

So my question is, are side inputs available to use in the go sdk running on dataflow? Is there a bug or known issue?

Thank you,
Kane Knight


Re: [Question] - Side inputs in golang sdk

Posted by Jack McCluskey <jr...@google.com>.
Hey Kane,

The Go SDK should support side inputs on Dataflow. Are you windowing the
side input, or are they in the global window? If they aren't windowed the
side input will block until the end of the global window, which would cause
issues.

Thanks,

Jack McCluskey

On Mon, May 16, 2022 at 7:05 AM Kane Knight <ka...@hoxtonanalytics.com>
wrote:

> Hi there,
>
> I’m trying out a streaming pipeline which reads events from a pubsub topic
> and I want to pass a side input to the stage that processes this pubsub
> event, updating the side input value every 5 minutes using an impulse that
> kicks off a time.Ticker which emits every 5 minutes.
>
> The streaming side using the pubsubio data source didn’t seem to be
> working in this configuration as pubsub messages were not being acked.
> However, this normally works without the side input configuration.
>
> So my question is, are side inputs available to use in the go sdk running
> on dataflow? Is there a bug or known issue?
>
> Thank you,
> Kane Knight