You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Andrew Wylie <ad...@yahoo.co.uk> on 2020/03/25 20:54:11 UTC

Flattening multiple inputs, multiple outputs

Hi,

So I have 5 input Pub Sub topics and each one of those has an output Pub Sub topic. When I Flatten the 5 inputs, is there a way I can ensure that I can route the messages to their respective output topics? Is there any kind of built in attribute in the messages I can read to determine their input topic and therefore which output topic to write to?

Pub Sub Input topic 1 messages need to be output from the pipeline to Pub Sub Output topic 1 for example.



Thanks

Re: Flattening multiple inputs, multiple outputs

Posted by Reza Rokni <re...@google.com>.
Do you happen to have more than one constructor in the class you're
creating?

BTW, if you only have one concrete class that you want to delay then you
can probably do away with the generics and just set the coder for the cache
directly.







On Mon, Mar 30, 2020 at 8:52 PM Andrew Wylie <ad...@yahoo.co.uk> wrote:

> Thanks very much Rez, I'll give that a try and let you know how it goes.
>
> Just one question on the cache variable, it has to be a final, however it
> tries to be set in the constructor, which causes a compilation error. Is
> there a nice way to set this to the correct Coder?
>
>
> On Sunday, 29 March 2020, 04:32:49 BST, Reza Rokni <re...@google.com>
> wrote:
>
>
> Andrew, with regards to the delay use case, assuming I have understood the
> use case, you can explore using State and Timers.
>
> Something like this pattern below, from a code sample that I hope to add
> to https://beam.apache.org/documentation/patterns/overview/ once I get
> time... If it works for your use case it would be great to get feedback so
> we can make the pattern generalized for Beam folks :-)
>
> Please also note I am using BagState below as not all runners support
> MapState yet. Also for the Key you should balance the key space, maybe into
> a range of 1000-10000. The output won't be exactly 5 mins, but hopefully
> close enough to the SLO that you are looking for.
>
>     public static class FutureCache<K, V> extends DoFn<KV<K,
> TimestampedValue<V>>, V> {
>
>         Coder<V> objectCoder;
>
>         @TimerId("Process")
>         private final TimerSpec processTimer =
> TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
>
>         @StateId("cache")
>         private final StateSpec<BagState<TimestampedValue<V>>> cache;
>
>         public FutureCache(Coder<V> objectCoder) {
>             this.objectCoder = objectCoder;
>             cache = StateSpecs.bag(TimestampedValueCoder.of(objectCoder));
>         }
>
>         @ProcessElement
>         public void process(
>                 @TimerId("Process") Timer timer,
>                 @StateId("cache") BagState<TimestampedValue<V>> cache,
>                 @Element KV<K, TimestampedValue<V>> input) {
>             cache.add(input.getValue());
>             // Will just keep updating for each element.
>             timer.offset(Duration.standardSeconds(1)).setRelative();
>         }
>
>         @OnTimer("Process")
>         public void OnTimer(
>                 @TimerId("Process") Timer timer,
>                 OnTimerContext otc,
>                 @StateId("cache") BagState<TimestampedValue<V>> cache) {
>
> ... Output based on Timestamp being less than (now() - yourDelta..)
>
>
>
> On Sat, Mar 28, 2020 at 5:47 AM Luke Cwik <lc...@google.com> wrote:
>
> Yes, since you don't care about the grouping, you could use any random key
> that you want.
>
> On Fri, Mar 27, 2020 at 2:45 PM Andrew Wylie <ad...@yahoo.co.uk> wrote:
>
> To be able to invoke GroupByKey, does that mean I need to create a KV
> object for my Pub Sub messages that are received from the topic?
>
> On 27 Mar 2020, at 18:30, Luke Cwik <lc...@google.com> wrote:
>
> 
> The trigger only applies to when the output of a GroupByKey is produced
> and won't put in the delay without one so as long as inputEvents is
> followed by a GroupByKey you'll see the delay.
>
> Why do you want the delay as there might be different solutions for the
> problem you're trying to solve?
>
> On Fri, Mar 27, 2020 at 11:16 AM Andrew Wylie <ad...@yahoo.co.uk> wrote:
>
> Thanks Luke. I wasn’t sure if that was advised or even possible, but it
> seems to be working well.
>
> I would like to introduce a 5 minute delay between reading each message
> and publishing it to the output topic. Is the correct way to do this in
> Apache Beam, using triggers?
>
> I am trying the approach below in my pipeline, but the messages just
> output without the delay.
>
> PCollection windowed_inputEvents = inputEvents.apply(
> Window.into(FixedWindows.of(Duration.standardMinutes(1)))
>
> .triggering(AfterProcessingTime.pastFirstElementInPane().
>
> plusDelayOf(Duration.standardMinutes(5))).
>
> withAllowedLateness(Duration.standardMinutes(1)).
>
> discardingFiredPanes());
>
> Thanks
>
> On 25 Mar 2020, at 23:11, Luke Cwik <lc...@google.com> wrote:
>
> Is there a reason you don't keep them separate and apply the same
> transforms to each output with the only difference being the final writing
> transform being configured for the correct topic?
>
> On Wed, Mar 25, 2020 at 1:54 PM Andrew Wylie <ad...@yahoo.co.uk> wrote:
>
> Hi,
>
> So I have 5 input Pub Sub topics and each one of those has an output Pub
> Sub topic. When I Flatten the 5 inputs, is there a way I can ensure that I
> can route the messages to their respective output topics? Is there any kind
> of built in attribute in the messages I can read to determine their input
> topic and therefore which output topic to write to?
>
> Pub Sub Input topic 1 messages need to be output from the pipeline to Pub
> Sub Output topic 1 for example.
>
>
> Thanks
>
>
>

Re: Flattening multiple inputs, multiple outputs

Posted by Andrew Wylie <ad...@yahoo.co.uk>.
 Thanks very much Rez, I'll give that a try and let you know how it goes.
Just one question on the cache variable, it has to be a final, however it tries to be set in the constructor, which causes a compilation error. Is there a nice way to set this to the correct Coder?

    On Sunday, 29 March 2020, 04:32:49 BST, Reza Rokni <re...@google.com> wrote:  
 
 Andrew, with regards to the delay use case, assuming I have understood the use case, you can explore using State and Timers.
Something like this pattern below, from a code sample that I hope to add to https://beam.apache.org/documentation/patterns/overview/ once I get time... If it works for your use case it would be great to get feedback so we can make the pattern generalized for Beam folks :-) 
Please also note I am using BagState below as not all runners support MapState yet. Also for the Key you should balance the key space, maybe into a range of 1000-10000. The output won't be exactly 5 mins, but hopefully close enough to the SLO that you are looking for. 
    public static class FutureCache<K, V> extends DoFn<KV<K, TimestampedValue<V>>, V> {

        Coder<V> objectCoder;

        @TimerId("Process")
        private final TimerSpec processTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

        @StateId("cache")
        private final StateSpec<BagState<TimestampedValue<V>>> cache;

        public FutureCache(Coder<V> objectCoder) {
            this.objectCoder = objectCoder;
            cache = StateSpecs.bag(TimestampedValueCoder.of(objectCoder));
        }

        @ProcessElement
        public void process(
                @TimerId("Process") Timer timer,
                @StateId("cache") BagState<TimestampedValue<V>> cache,
                @Element KV<K, TimestampedValue<V>> input) {
            cache.add(input.getValue());
            // Will just keep updating for each element.
            timer.offset(Duration.standardSeconds(1)).setRelative();
        }

        @OnTimer("Process")
        public void OnTimer(
                @TimerId("Process") Timer timer,
                OnTimerContext otc,
                @StateId("cache") BagState<TimestampedValue<V>> cache) {

... Output based on Timestamp being less than (now() - yourDelta..)


On Sat, Mar 28, 2020 at 5:47 AM Luke Cwik <lc...@google.com> wrote:

Yes, since you don't care about the grouping, you could use any random key that you want.
On Fri, Mar 27, 2020 at 2:45 PM Andrew Wylie <ad...@yahoo.co.uk> wrote:

To be able to invoke GroupByKey, does that mean I need to create a KV object for my Pub Sub messages that are received from the topic?

On 27 Mar 2020, at 18:30, Luke Cwik <lc...@google.com> wrote:



The trigger only applies to when the output of a GroupByKey is produced and won't put in the delay without one so as long as inputEvents is followed by a GroupByKey you'll see the delay.
Why do you want the delay as there might be different solutions for the problem you're trying to solve?
On Fri, Mar 27, 2020 at 11:16 AM Andrew Wylie <ad...@yahoo.co.uk> wrote:

Thanks Luke. I wasn’t sure if that was advised or even possible, but it seems to be working well.
I would like to introduce a 5 minute delay between reading each message and publishing it to the output topic. Is the correct way to do this in Apache Beam, using triggers?
I am trying the approach below in my pipeline, but the messages just output without the delay.
PCollection windowed_inputEvents = inputEvents.apply( Window.into(FixedWindows.of(Duration.standardMinutes(1)))                                                             .triggering(AfterProcessingTime.pastFirstElementInPane().                                                            plusDelayOf(Duration.standardMinutes(5))).                                                            withAllowedLateness(Duration.standardMinutes(1)).                                                            discardingFiredPanes());
Thanks

On 25 Mar 2020, at 23:11, Luke Cwik <lc...@google.com> wrote:
Is there a reason you don't keep them separate and apply the same transforms to each output with the only difference being the final writing transform being configured for the correct topic?
On Wed, Mar 25, 2020 at 1:54 PM Andrew Wylie <ad...@yahoo.co.uk> wrote:


Hi,

So I have 5 input Pub Sub topics and each one of those has an output Pub Sub topic. When I Flatten the 5 inputs, is there a way I can ensure that I can route the messages to their respective output topics? Is there any kind of built in attribute in the messages I can read to determine their input topic and therefore which output topic to write to?

Pub Sub Input topic 1 messages need to be output from the pipeline to Pub Sub Output topic 1 for example.




Thanks







  

Re: Flattening multiple inputs, multiple outputs

Posted by Reza Rokni <re...@google.com>.
Andrew, with regards to the delay use case, assuming I have understood the
use case, you can explore using State and Timers.

Something like this pattern below, from a code sample that I hope to add to
https://beam.apache.org/documentation/patterns/overview/ once I get
time... If it works for your use case it would be great to get feedback so
we can make the pattern generalized for Beam folks :-)

Please also note I am using BagState below as not all runners support
MapState yet. Also for the Key you should balance the key space, maybe into
a range of 1000-10000. The output won't be exactly 5 mins, but hopefully
close enough to the SLO that you are looking for.

    public static class FutureCache<K, V> extends DoFn<KV<K,
TimestampedValue<V>>, V> {

        Coder<V> objectCoder;

        @TimerId("Process")
        private final TimerSpec processTimer =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

        @StateId("cache")
        private final StateSpec<BagState<TimestampedValue<V>>> cache;

        public FutureCache(Coder<V> objectCoder) {
            this.objectCoder = objectCoder;
            cache = StateSpecs.bag(TimestampedValueCoder.of(objectCoder));
        }

        @ProcessElement
        public void process(
                @TimerId("Process") Timer timer,
                @StateId("cache") BagState<TimestampedValue<V>> cache,
                @Element KV<K, TimestampedValue<V>> input) {
            cache.add(input.getValue());
            // Will just keep updating for each element.
            timer.offset(Duration.standardSeconds(1)).setRelative();
        }

        @OnTimer("Process")
        public void OnTimer(
                @TimerId("Process") Timer timer,
                OnTimerContext otc,
                @StateId("cache") BagState<TimestampedValue<V>> cache) {

... Output based on Timestamp being less than (now() - yourDelta..)



On Sat, Mar 28, 2020 at 5:47 AM Luke Cwik <lc...@google.com> wrote:

> Yes, since you don't care about the grouping, you could use any random key
> that you want.
>
> On Fri, Mar 27, 2020 at 2:45 PM Andrew Wylie <ad...@yahoo.co.uk> wrote:
>
>> To be able to invoke GroupByKey, does that mean I need to create a KV
>> object for my Pub Sub messages that are received from the topic?
>>
>> On 27 Mar 2020, at 18:30, Luke Cwik <lc...@google.com> wrote:
>>
>> 
>> The trigger only applies to when the output of a GroupByKey is produced
>> and won't put in the delay without one so as long as inputEvents is
>> followed by a GroupByKey you'll see the delay.
>>
>> Why do you want the delay as there might be different solutions for the
>> problem you're trying to solve?
>>
>> On Fri, Mar 27, 2020 at 11:16 AM Andrew Wylie <ad...@yahoo.co.uk> wrote:
>>
>>> Thanks Luke. I wasn’t sure if that was advised or even possible, but it
>>> seems to be working well.
>>>
>>> I would like to introduce a 5 minute delay between reading each message
>>> and publishing it to the output topic. Is the correct way to do this in
>>> Apache Beam, using triggers?
>>>
>>> I am trying the approach below in my pipeline, but the messages just
>>> output without the delay.
>>>
>>> PCollection windowed_inputEvents = inputEvents.apply(
>>> Window.into(FixedWindows.of(Duration.standardMinutes(1)))
>>>
>>> .triggering(AfterProcessingTime.pastFirstElementInPane().
>>>
>>> plusDelayOf(Duration.standardMinutes(5))).
>>>
>>> withAllowedLateness(Duration.standardMinutes(1)).
>>>
>>> discardingFiredPanes());
>>>
>>> Thanks
>>>
>>> On 25 Mar 2020, at 23:11, Luke Cwik <lc...@google.com> wrote:
>>>
>>> Is there a reason you don't keep them separate and apply the same
>>> transforms to each output with the only difference being the final writing
>>> transform being configured for the correct topic?
>>>
>>> On Wed, Mar 25, 2020 at 1:54 PM Andrew Wylie <ad...@yahoo.co.uk> wrote:
>>>
>>>> Hi,
>>>>
>>>> So I have 5 input Pub Sub topics and each one of those has an output
>>>> Pub Sub topic. When I Flatten the 5 inputs, is there a way I can ensure
>>>> that I can route the messages to their respective output topics? Is there
>>>> any kind of built in attribute in the messages I can read to determine
>>>> their input topic and therefore which output topic to write to?
>>>>
>>>> Pub Sub Input topic 1 messages need to be output from the pipeline to
>>>> Pub Sub Output topic 1 for example.
>>>>
>>>>
>>>> Thanks
>>>>
>>>
>>>

Re: Flattening multiple inputs, multiple outputs

Posted by Luke Cwik <lc...@google.com>.
Yes, since you don't care about the grouping, you could use any random key
that you want.

On Fri, Mar 27, 2020 at 2:45 PM Andrew Wylie <ad...@yahoo.co.uk> wrote:

> To be able to invoke GroupByKey, does that mean I need to create a KV
> object for my Pub Sub messages that are received from the topic?
>
> On 27 Mar 2020, at 18:30, Luke Cwik <lc...@google.com> wrote:
>
> 
> The trigger only applies to when the output of a GroupByKey is produced
> and won't put in the delay without one so as long as inputEvents is
> followed by a GroupByKey you'll see the delay.
>
> Why do you want the delay as there might be different solutions for the
> problem you're trying to solve?
>
> On Fri, Mar 27, 2020 at 11:16 AM Andrew Wylie <ad...@yahoo.co.uk> wrote:
>
>> Thanks Luke. I wasn’t sure if that was advised or even possible, but it
>> seems to be working well.
>>
>> I would like to introduce a 5 minute delay between reading each message
>> and publishing it to the output topic. Is the correct way to do this in
>> Apache Beam, using triggers?
>>
>> I am trying the approach below in my pipeline, but the messages just
>> output without the delay.
>>
>> PCollection windowed_inputEvents = inputEvents.apply(
>> Window.into(FixedWindows.of(Duration.standardMinutes(1)))
>>
>> .triggering(AfterProcessingTime.pastFirstElementInPane().
>>
>> plusDelayOf(Duration.standardMinutes(5))).
>>
>> withAllowedLateness(Duration.standardMinutes(1)).
>>
>> discardingFiredPanes());
>>
>> Thanks
>>
>> On 25 Mar 2020, at 23:11, Luke Cwik <lc...@google.com> wrote:
>>
>> Is there a reason you don't keep them separate and apply the same
>> transforms to each output with the only difference being the final writing
>> transform being configured for the correct topic?
>>
>> On Wed, Mar 25, 2020 at 1:54 PM Andrew Wylie <ad...@yahoo.co.uk> wrote:
>>
>>> Hi,
>>>
>>> So I have 5 input Pub Sub topics and each one of those has an output Pub
>>> Sub topic. When I Flatten the 5 inputs, is there a way I can ensure that I
>>> can route the messages to their respective output topics? Is there any kind
>>> of built in attribute in the messages I can read to determine their input
>>> topic and therefore which output topic to write to?
>>>
>>> Pub Sub Input topic 1 messages need to be output from the pipeline to
>>> Pub Sub Output topic 1 for example.
>>>
>>>
>>> Thanks
>>>
>>
>>

Re: Flattening multiple inputs, multiple outputs

Posted by Andrew Wylie <ad...@yahoo.co.uk>.
To be able to invoke GroupByKey, does that mean I need to create a KV object for my Pub Sub messages that are received from the topic?

> On 27 Mar 2020, at 18:30, Luke Cwik <lc...@google.com> wrote:
> 
> 
> The trigger only applies to when the output of a GroupByKey is produced and won't put in the delay without one so as long as inputEvents is followed by a GroupByKey you'll see the delay.
> 
> Why do you want the delay as there might be different solutions for the problem you're trying to solve?
> 
>> On Fri, Mar 27, 2020 at 11:16 AM Andrew Wylie <ad...@yahoo.co.uk> wrote:
>> Thanks Luke. I wasn’t sure if that was advised or even possible, but it seems to be working well.
>> 
>> I would like to introduce a 5 minute delay between reading each message and publishing it to the output topic. Is the correct way to do this in Apache Beam, using triggers?
>> 
>> I am trying the approach below in my pipeline, but the messages just output without the delay.
>> 
>> PCollection windowed_inputEvents = inputEvents.apply( Window.into(FixedWindows.of(Duration.standardMinutes(1))) 
>>                                                             .triggering(AfterProcessingTime.pastFirstElementInPane().
>>                                                             plusDelayOf(Duration.standardMinutes(5))).
>>                                                             withAllowedLateness(Duration.standardMinutes(1)).
>>                                                             discardingFiredPanes());
>> 
>> Thanks
>> 
>>> On 25 Mar 2020, at 23:11, Luke Cwik <lc...@google.com> wrote:
>>> 
>>> Is there a reason you don't keep them separate and apply the same transforms to each output with the only difference being the final writing transform being configured for the correct topic?
>>> 
>>> On Wed, Mar 25, 2020 at 1:54 PM Andrew Wylie <ad...@yahoo.co.uk> wrote:
>>>> Hi,
>>>> 
>>>> So I have 5 input Pub Sub topics and each one of those has an output Pub Sub topic. When I Flatten the 5 inputs, is there a way I can ensure that I can route the messages to their respective output topics? Is there any kind of built in attribute in the messages I can read to determine their input topic and therefore which output topic to write to?
>>>> 
>>>> Pub Sub Input topic 1 messages need to be output from the pipeline to Pub Sub Output topic 1 for example.
>>>> 
>>>> 
>>>> 
>>>> Thanks
>>>> 
>> 

Re: Flattening multiple inputs, multiple outputs

Posted by Andrew Wylie <ad...@yahoo.co.uk>.
So my 'input topics' are a real time feed and I also want to provide a delayed feed of the same information, so my 'output topics' are for my delayed feed. I was hoping to use Beam to do this in Dataflow.

> On 27 Mar 2020, at 18:30, Luke Cwik <lc...@google.com> wrote:
> 
> GroupByKey


Re: Flattening multiple inputs, multiple outputs

Posted by Luke Cwik <lc...@google.com>.
The trigger only applies to when the output of a GroupByKey is produced and
won't put in the delay without one so as long as inputEvents is followed by
a GroupByKey you'll see the delay.

Why do you want the delay as there might be different solutions for the
problem you're trying to solve?

On Fri, Mar 27, 2020 at 11:16 AM Andrew Wylie <ad...@yahoo.co.uk> wrote:

> Thanks Luke. I wasn’t sure if that was advised or even possible, but it
> seems to be working well.
>
> I would like to introduce a 5 minute delay between reading each message
> and publishing it to the output topic. Is the correct way to do this in
> Apache Beam, using triggers?
>
> I am trying the approach below in my pipeline, but the messages just
> output without the delay.
>
> PCollection windowed_inputEvents = inputEvents.apply(
> Window.into(FixedWindows.of(Duration.standardMinutes(1)))
>
> .triggering(AfterProcessingTime.pastFirstElementInPane().
>
> plusDelayOf(Duration.standardMinutes(5))).
>
> withAllowedLateness(Duration.standardMinutes(1)).
>
> discardingFiredPanes());
>
> Thanks
>
> On 25 Mar 2020, at 23:11, Luke Cwik <lc...@google.com> wrote:
>
> Is there a reason you don't keep them separate and apply the same
> transforms to each output with the only difference being the final writing
> transform being configured for the correct topic?
>
> On Wed, Mar 25, 2020 at 1:54 PM Andrew Wylie <ad...@yahoo.co.uk> wrote:
>
>> Hi,
>>
>> So I have 5 input Pub Sub topics and each one of those has an output Pub
>> Sub topic. When I Flatten the 5 inputs, is there a way I can ensure that I
>> can route the messages to their respective output topics? Is there any kind
>> of built in attribute in the messages I can read to determine their input
>> topic and therefore which output topic to write to?
>>
>> Pub Sub Input topic 1 messages need to be output from the pipeline to Pub
>> Sub Output topic 1 for example.
>>
>>
>> Thanks
>>
>
>

Re: Flattening multiple inputs, multiple outputs

Posted by Andrew Wylie <ad...@yahoo.co.uk>.
Thanks Luke. I wasn’t sure if that was advised or even possible, but it seems to be working well.

I would like to introduce a 5 minute delay between reading each message and publishing it to the output topic. Is the correct way to do this in Apache Beam, using triggers?

I am trying the approach below in my pipeline, but the messages just output without the delay.

PCollection windowed_inputEvents = inputEvents.apply( Window.into(FixedWindows.of(Duration.standardMinutes(1))) 
                                                            .triggering(AfterProcessingTime.pastFirstElementInPane().
                                                            plusDelayOf(Duration.standardMinutes(5))).
                                                            withAllowedLateness(Duration.standardMinutes(1)).
                                                            discardingFiredPanes());

Thanks

> On 25 Mar 2020, at 23:11, Luke Cwik <lc...@google.com> wrote:
> 
> Is there a reason you don't keep them separate and apply the same transforms to each output with the only difference being the final writing transform being configured for the correct topic?
> 
> On Wed, Mar 25, 2020 at 1:54 PM Andrew Wylie <adw555@yahoo.co.uk <ma...@yahoo.co.uk>> wrote:
> Hi,
> 
> So I have 5 input Pub Sub topics and each one of those has an output Pub Sub topic. When I Flatten the 5 inputs, is there a way I can ensure that I can route the messages to their respective output topics? Is there any kind of built in attribute in the messages I can read to determine their input topic and therefore which output topic to write to?
> 
> Pub Sub Input topic 1 messages need to be output from the pipeline to Pub Sub Output topic 1 for example.
> 
> 
> 
> Thanks
> 


Re: Flattening multiple inputs, multiple outputs

Posted by Luke Cwik <lc...@google.com>.
Is there a reason you don't keep them separate and apply the same
transforms to each output with the only difference being the final writing
transform being configured for the correct topic?

On Wed, Mar 25, 2020 at 1:54 PM Andrew Wylie <ad...@yahoo.co.uk> wrote:

> Hi,
>
> So I have 5 input Pub Sub topics and each one of those has an output Pub
> Sub topic. When I Flatten the 5 inputs, is there a way I can ensure that I
> can route the messages to their respective output topics? Is there any kind
> of built in attribute in the messages I can read to determine their input
> topic and therefore which output topic to write to?
>
> Pub Sub Input topic 1 messages need to be output from the pipeline to Pub
> Sub Output topic 1 for example.
>
>
> Thanks
>