You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by John Casey via user <us...@beam.apache.org> on 2023/08/09 17:15:48 UTC

Re: [question]Best practices for branching pipeline.

Depending on the specifics of your processing, it may be simpler to just do
both transforms within a single pardo.

i.e.

pipeline.apply(kafka.read())
.apply(ParDo.of(new UserTransform());

public static class UserTransform extends DoFn<KafkaRecord, Object>{

@ProcessElement
public void processElement(@Element KafkaRecord record,
OutputReciever<Object> receiver) {
      Type1 part1 = something(record);
      Type2 part2 = somethingElse(record;
      MergedType merged = merge(part1, part2);
     receiver.output(merged)
}

}



On Wed, Jul 26, 2023 at 11:43 PM Ruben Vargas <ru...@metova.com>
wrote:

>
> Hello?
>
> Any advice on how to do what I described? I can only found examples of
> bounded data. Not for streaming.
>
>
>
> Aldo can I get invited to slack?
>
> Thank you very much!
>
> El El vie, 21 de julio de 2023 a la(s) 9:34, Ruben Vargas <
> ruben.vargas@metova.com> escribió:
>
>> Hello,
>>
>> I'm starting using Beam and I would like to know if there is any
>> recommended pattern for doing the following:
>>
>> I have a message coming from Kafka and then I would like to apply two
>> different transformations and merge them in a single result at the end. I
>> attached an image that describes the pipeline.
>>
>> Each message has its own unique key,
>>
>> What I'm doing is using a Session Window with a trigger
>> elementCountAtLeast with the number equal to the number of process I
>> expected to generate results (in the case of the diagram will be 2)
>>
>> This is the code fragment I used for construct the window:
>>
>>         Window<KV<String, OUTPUT>> joinWindow = Window.<KV<String,
>> OUTPUT>>into(Sessions.withGapDuration(Duration.standardSeconds(60))).triggering(
>>
>> Repeatedly.forever(AfterPane.elementCountAtLeast(nProcessWait))
>>         ).discardingFiredPanes().withAllowedLateness(Duration.ZERO);
>>
>>
>> and then a CoGroupKey to join all of the results. Is this a
>> recommended approach? Or  is there a recommended way? What happens if at
>> some points I have a lot of windows "open"?
>>
>>
>> Thank you very much!
>>
>>

Re: [question]Best practices for branching pipeline.

Posted by John Casey via user <us...@beam.apache.org>.
Got it. In that case, cogbk is a good tool.

I'm not sure how best to configure the window for your use case. Because
you are only merging data you split within the window, I imagine small
windows would do, but I'm not sure how best to tune that.

On Thu, Aug 10, 2023 at 11:40 AM Ruben Vargas <ru...@metova.com>
wrote:

> Hello, Thank you very much for the reply
>
> I was thinking on branching because I have some heavy processes that I
> would like to distribute to other workers, and scale independently of the
> other less heavier processes
>
> Does that make sense?
>
> On Wed, Aug 9, 2023 at 12:16 PM John Casey via user <us...@beam.apache.org>
> wrote:
>
>> Depending on the specifics of your processing, it may be simpler to just
>> do both transforms within a single pardo.
>>
>> i.e.
>>
>> pipeline.apply(kafka.read())
>> .apply(ParDo.of(new UserTransform());
>>
>> public static class UserTransform extends DoFn<KafkaRecord, Object>{
>>
>> @ProcessElement
>> public void processElement(@Element KafkaRecord record,
>> OutputReciever<Object> receiver) {
>>       Type1 part1 = something(record);
>>       Type2 part2 = somethingElse(record;
>>       MergedType merged = merge(part1, part2);
>>      receiver.output(merged)
>> }
>>
>> }
>>
>>
>>
>> On Wed, Jul 26, 2023 at 11:43 PM Ruben Vargas <ru...@metova.com>
>> wrote:
>>
>>>
>>> Hello?
>>>
>>> Any advice on how to do what I described? I can only found examples of
>>> bounded data. Not for streaming.
>>>
>>>
>>>
>>> Aldo can I get invited to slack?
>>>
>>> Thank you very much!
>>>
>>> El El vie, 21 de julio de 2023 a la(s) 9:34, Ruben Vargas <
>>> ruben.vargas@metova.com> escribió:
>>>
>>>> Hello,
>>>>
>>>> I'm starting using Beam and I would like to know if there is any
>>>> recommended pattern for doing the following:
>>>>
>>>> I have a message coming from Kafka and then I would like to apply two
>>>> different transformations and merge them in a single result at the end. I
>>>> attached an image that describes the pipeline.
>>>>
>>>> Each message has its own unique key,
>>>>
>>>> What I'm doing is using a Session Window with a trigger
>>>> elementCountAtLeast with the number equal to the number of process I
>>>> expected to generate results (in the case of the diagram will be 2)
>>>>
>>>> This is the code fragment I used for construct the window:
>>>>
>>>>         Window<KV<String, OUTPUT>> joinWindow = Window.<KV<String,
>>>> OUTPUT>>into(Sessions.withGapDuration(Duration.standardSeconds(60))).triggering(
>>>>
>>>> Repeatedly.forever(AfterPane.elementCountAtLeast(nProcessWait))
>>>>         ).discardingFiredPanes().withAllowedLateness(Duration.ZERO);
>>>>
>>>>
>>>> and then a CoGroupKey to join all of the results. Is this a
>>>> recommended approach? Or  is there a recommended way? What happens if at
>>>> some points I have a lot of windows "open"?
>>>>
>>>>
>>>> Thank you very much!
>>>>
>>>>

Re: [question]Best practices for branching pipeline.

Posted by Ruben Vargas <ru...@metova.com>.
Hello, Thank you very much for the reply

I was thinking on branching because I have some heavy processes that I
would like to distribute to other workers, and scale independently of the
other less heavier processes

Does that make sense?

On Wed, Aug 9, 2023 at 12:16 PM John Casey via user <us...@beam.apache.org>
wrote:

> Depending on the specifics of your processing, it may be simpler to just
> do both transforms within a single pardo.
>
> i.e.
>
> pipeline.apply(kafka.read())
> .apply(ParDo.of(new UserTransform());
>
> public static class UserTransform extends DoFn<KafkaRecord, Object>{
>
> @ProcessElement
> public void processElement(@Element KafkaRecord record,
> OutputReciever<Object> receiver) {
>       Type1 part1 = something(record);
>       Type2 part2 = somethingElse(record;
>       MergedType merged = merge(part1, part2);
>      receiver.output(merged)
> }
>
> }
>
>
>
> On Wed, Jul 26, 2023 at 11:43 PM Ruben Vargas <ru...@metova.com>
> wrote:
>
>>
>> Hello?
>>
>> Any advice on how to do what I described? I can only found examples of
>> bounded data. Not for streaming.
>>
>>
>>
>> Aldo can I get invited to slack?
>>
>> Thank you very much!
>>
>> El El vie, 21 de julio de 2023 a la(s) 9:34, Ruben Vargas <
>> ruben.vargas@metova.com> escribió:
>>
>>> Hello,
>>>
>>> I'm starting using Beam and I would like to know if there is any
>>> recommended pattern for doing the following:
>>>
>>> I have a message coming from Kafka and then I would like to apply two
>>> different transformations and merge them in a single result at the end. I
>>> attached an image that describes the pipeline.
>>>
>>> Each message has its own unique key,
>>>
>>> What I'm doing is using a Session Window with a trigger
>>> elementCountAtLeast with the number equal to the number of process I
>>> expected to generate results (in the case of the diagram will be 2)
>>>
>>> This is the code fragment I used for construct the window:
>>>
>>>         Window<KV<String, OUTPUT>> joinWindow = Window.<KV<String,
>>> OUTPUT>>into(Sessions.withGapDuration(Duration.standardSeconds(60))).triggering(
>>>
>>> Repeatedly.forever(AfterPane.elementCountAtLeast(nProcessWait))
>>>         ).discardingFiredPanes().withAllowedLateness(Duration.ZERO);
>>>
>>>
>>> and then a CoGroupKey to join all of the results. Is this a
>>> recommended approach? Or  is there a recommended way? What happens if at
>>> some points I have a lot of windows "open"?
>>>
>>>
>>> Thank you very much!
>>>
>>>