You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Talat Uyarer <tu...@paloaltonetworks.com> on 2020/08/19 22:36:54 UTC

Resource Consumption increase With TupleTag

Hi,

I have a very simple DAG on my dataflow job. (KafkaIO->Filter->WriteGCS).
When I submit this Dataflow job per topic it has 4kps per instance
processing speed. However I want to consume two different topics in one DF
job. I used TupleTag. I created TupleTags per message type. Each topic has
different message types and also needs different filters. So my pipeline
turned to below DAG. Message Extractor is a very simple step checking
header of kafka messages and writing the correct TupleTag. However after
starting to use this new DAG, dataflow canprocess 2kps per instance.

                                                 |--->Filter1-->WriteGCS
KafkaIO->MessageExtractor-> |
                                                 |--->Filter2-->WriteGCS

Do you have any idea why my data process speed decreased ?

Thanks

Re: Resource Consumption increase With TupleTag

Posted by Luke Cwik <lc...@google.com>.
On Thu, Aug 20, 2020 at 12:54 PM Talat Uyarer <tu...@paloaltonetworks.com>
wrote:

> Hi Lucas,
>
>> Not really. It is more about pipeline complexity, logging, debugging,
>> monitoring which become more complex.
>
> Should I use a different consumer group or should I use the same consumer
> group ?
>
I don't know what you're asking.


> And also How Autoscaling will decide worker count ?
>
> There is some analysis that is done based upon certain metrics to optimize
CPU utilization and throughput.



> What do you mean by it's not working properly?
>
> Actually i should correct my statement. Both jobs are using tuple tags but
> when I add one more branch after MessageExtractor things are changing.
>
How are they changing?


> What does the timing information for the transforms tell you on the
>> Dataflow Job UI?
>
> Based on Wall Time on DAG. KafkaIO is the slowest step on my pipeline. Its
> Walltime shows 28 days. I put all wall time for each step.
>
>
>   |--->Filter1 (1 day) --> WriteGCS(1day)
> KafkaIO(28 days)->MessageExtractor(7 hrs) -> |
>
>   |--->Filter2 (13 days) --> WriteGCS(2days)
>
> How do these wall times compare when they are run as two separate
pipelines.


> Thanks
>
> On Thu, Aug 20, 2020 at 10:58 AM Luke Cwik <lc...@google.com> wrote:
>
>> Do you mean I can put my simple pipeline multiple times for all topics in
>> one dataflow job ?
>> Yes
>>
>> Is there any side effect having multiple independent DAG on one DF job ?
>> Not really. It is more about pipeline complexity, logging, debugging,
>> monitoring which become more complex.
>>
>> And also why the TupleTag model is not working properly?
>> What do you mean by it's not working properly?
>>
>> Why is it using more resources than what it should be?
>> What does the timing information for the transforms tell you on the
>> Dataflow Job UI? (Even if MessageExtractor seems simple it isn't free, You
>> have to now write to two GCS locations instead of one for each work item
>> that you process so your doing more network calls)
>>
>>
>> On Wed, Aug 19, 2020 at 8:36 PM Talat Uyarer <
>> tuyarer@paloaltonetworks.com> wrote:
>>
>>> Filter step is an independent step. We can think it is an etl step or
>>> something else. MessageExtractor step writes messages on TupleTags based on
>>> the kafka header. Yes, MessageExtractor is literally a multi-output DoFn
>>> already. MessageExtractor is processing 48kps but branches are processing
>>> their logs. Each Filter only consumes its log type. There is no any  So
>>> That's why I assume it should consume the same amount of workers. But it
>>> consumes more workers.
>>>
>>>
>>>
>>>  |--->Filter1(20kps)-->WriteGCS
>>> KafkaIO->MessageExtractor(48kps)-> |
>>>
>>>  |--->Filter2(28kps)-->WriteGCS
>>>
>>> Do you mean I can put my simple pipeline multiple times for all topics
>>> in one dataflow job ? Is there any side effect having multiple
>>> independent DAG on one DF job ? And also why the TupleTag model is not
>>> working properly? Why is it using more resources than what it should be?
>>>
>>> Thanks
>>>
>>>
>>>
>>> On Wed, Aug 19, 2020 at 5:16 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> Just to clarify, previously you had.
>>>>
>>>> KafkaIO(topic1) --20kps--> Filter1 -> WriteGCS
>>>> KafkaIO(topic2) --28kps--> Filter2 -> WriteGCS
>>>>
>>>> And now you have
>>>>
>>>>
>>>>                                                   ---48kps--> Filter1
>>>> -> WriteGCS
>>>>                                               /
>>>> KafkaIO(topic1, topic2) + MessageExtractor
>>>>                                                \
>>>>                                                  ---48kps--> Filter2 ->
>>>> WriteGCS
>>>>
>>>> Each filter is now actually consuming (and throwing away) more data
>>>> than before.
>>>>
>>>> Or is MessageExtractor literally a multi-output DoFn already (which is
>>>> why you're talking about TupleTags). This could possibly be more
>>>> expensive if reading Kafak with headers is more expensive than reading
>>>> it without.
>>>>
>>>> If topic1 and topic2 are truly independent, I would keep their reads
>>>> separate. This will simplify your pipeline (and sounds like it'll
>>>> improve performance). Note that you don't have to have a separate
>>>> Dataflow job for each read, you can have a single Pipeline and do as
>>>> many reads as you want and the'll all get executed in the same job.
>>>>
>>>> On Wed, Aug 19, 2020 at 4:14 PM Talat Uyarer
>>>> <tu...@paloaltonetworks.com> wrote:
>>>> >
>>>> > Hi Robert,
>>>> >
>>>> > I calculated process speed based on worker count. When I have
>>>> separate jobs. topic1 job used 5 workers, topic2 job used 7 workers. Based
>>>> on KafkaIO message count. they had 4kps processing speed per worker. After
>>>> I combine them in one df job. That DF job started using ~18 workers, not 12
>>>> workers.
>>>> >
>>>> > How can I understand if they are poorly fused or not ? I can not
>>>> write Filter because it is a beamsql. I just want to simplified my DAG
>>>> that's why i did not mentioned
>>>> >
>>>> > Thanks
>>>> >
>>>> > On Wed, Aug 19, 2020 at 3:54 PM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>> >>
>>>> >> Is this 2kps coming out of Filter1 + 2kps coming out of Filter2
>>>> (which
>>>> >> would be 4kps total), or only 2kps coming out of KafkaIO and
>>>> >> MessageExtractor?
>>>> >>
>>>> >> Though it /shouldn't/ matter, due to sibling fusion, there's a chance
>>>> >> things are getting fused poorly and you could write Filter1 and
>>>> >> Filter2 instead as a DoFn with multiple outputs (see
>>>> >>
>>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_programming-2Dguide_-23additional-2Doutputs&d=DwIFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=Erfg03JLKLNG3lT2ejqq7_fbvfL95-wSxZ5hFKqzyKU&s=JsWPJxBXopYYenfBAp6nkwfB0Q1Dhs1d4Yi41fBY3a8&e=
>>>> ).
>>>> >>
>>>> >> - Robert
>>>> >>
>>>> >> On Wed, Aug 19, 2020 at 3:37 PM Talat Uyarer
>>>> >> <tu...@paloaltonetworks.com> wrote:
>>>> >> >
>>>> >> > Hi,
>>>> >> >
>>>> >> > I have a very simple DAG on my dataflow job.
>>>> (KafkaIO->Filter->WriteGCS). When I submit this Dataflow job per topic it
>>>> has 4kps per instance processing speed. However I want to consume two
>>>> different topics in one DF job. I used TupleTag. I created TupleTags per
>>>> message type. Each topic has different message types and also needs
>>>> different filters. So my pipeline turned to below DAG. Message Extractor is
>>>> a very simple step checking header of kafka messages and writing the
>>>> correct TupleTag. However after starting to use this new DAG, dataflow
>>>> canprocess 2kps per instance.
>>>> >> >
>>>> >> >
>>>> |--->Filter1-->WriteGCS
>>>> >> > KafkaIO->MessageExtractor-> |
>>>> >> >
>>>> |--->Filter2-->WriteGCS
>>>> >> >
>>>> >> > Do you have any idea why my data process speed decreased ?
>>>> >> >
>>>> >> > Thanks
>>>>
>>>

Re: Resource Consumption increase With TupleTag

Posted by Talat Uyarer <tu...@paloaltonetworks.com>.
Hi Lucas,

> Not really. It is more about pipeline complexity, logging, debugging,
> monitoring which become more complex.

Should I use a different consumer group or should I use the same consumer
group ?  And also How Autoscaling will decide worker count ?

What do you mean by it's not working properly?

Actually i should correct my statement. Both jobs are using tuple tags but
when I add one more branch after MessageExtractor things are changing.

What does the timing information for the transforms tell you on the
> Dataflow Job UI?

Based on Wall Time on DAG. KafkaIO is the slowest step on my pipeline. Its
Walltime shows 28 days. I put all wall time for each step.


|--->Filter1 (1 day) --> WriteGCS(1day)
KafkaIO(28 days)->MessageExtractor(7 hrs) -> |

|--->Filter2 (13 days) --> WriteGCS(2days)

Thanks

On Thu, Aug 20, 2020 at 10:58 AM Luke Cwik <lc...@google.com> wrote:

> Do you mean I can put my simple pipeline multiple times for all topics in
> one dataflow job ?
> Yes
>
> Is there any side effect having multiple independent DAG on one DF job ?
> Not really. It is more about pipeline complexity, logging, debugging,
> monitoring which become more complex.
>
> And also why the TupleTag model is not working properly?
> What do you mean by it's not working properly?
>
> Why is it using more resources than what it should be?
> What does the timing information for the transforms tell you on the
> Dataflow Job UI? (Even if MessageExtractor seems simple it isn't free, You
> have to now write to two GCS locations instead of one for each work item
> that you process so your doing more network calls)
>
>
> On Wed, Aug 19, 2020 at 8:36 PM Talat Uyarer <tu...@paloaltonetworks.com>
> wrote:
>
>> Filter step is an independent step. We can think it is an etl step or
>> something else. MessageExtractor step writes messages on TupleTags based on
>> the kafka header. Yes, MessageExtractor is literally a multi-output DoFn
>> already. MessageExtractor is processing 48kps but branches are processing
>> their logs. Each Filter only consumes its log type. There is no any  So
>> That's why I assume it should consume the same amount of workers. But it
>> consumes more workers.
>>
>>
>>
>>  |--->Filter1(20kps)-->WriteGCS
>> KafkaIO->MessageExtractor(48kps)-> |
>>
>>  |--->Filter2(28kps)-->WriteGCS
>>
>> Do you mean I can put my simple pipeline multiple times for all topics in
>> one dataflow job ? Is there any side effect having multiple independent DAG
>> on one DF job ? And also why the TupleTag model is not working properly?
>> Why is it using more resources than what it should be?
>>
>> Thanks
>>
>>
>>
>> On Wed, Aug 19, 2020 at 5:16 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> Just to clarify, previously you had.
>>>
>>> KafkaIO(topic1) --20kps--> Filter1 -> WriteGCS
>>> KafkaIO(topic2) --28kps--> Filter2 -> WriteGCS
>>>
>>> And now you have
>>>
>>>
>>>                                                   ---48kps--> Filter1
>>> -> WriteGCS
>>>                                               /
>>> KafkaIO(topic1, topic2) + MessageExtractor
>>>                                                \
>>>                                                  ---48kps--> Filter2 ->
>>> WriteGCS
>>>
>>> Each filter is now actually consuming (and throwing away) more data than
>>> before.
>>>
>>> Or is MessageExtractor literally a multi-output DoFn already (which is
>>> why you're talking about TupleTags). This could possibly be more
>>> expensive if reading Kafak with headers is more expensive than reading
>>> it without.
>>>
>>> If topic1 and topic2 are truly independent, I would keep their reads
>>> separate. This will simplify your pipeline (and sounds like it'll
>>> improve performance). Note that you don't have to have a separate
>>> Dataflow job for each read, you can have a single Pipeline and do as
>>> many reads as you want and the'll all get executed in the same job.
>>>
>>> On Wed, Aug 19, 2020 at 4:14 PM Talat Uyarer
>>> <tu...@paloaltonetworks.com> wrote:
>>> >
>>> > Hi Robert,
>>> >
>>> > I calculated process speed based on worker count. When I have separate
>>> jobs. topic1 job used 5 workers, topic2 job used 7 workers. Based on
>>> KafkaIO message count. they had 4kps processing speed per worker. After I
>>> combine them in one df job. That DF job started using ~18 workers, not 12
>>> workers.
>>> >
>>> > How can I understand if they are poorly fused or not ? I can not write
>>> Filter because it is a beamsql. I just want to simplified my DAG that's why
>>> i did not mentioned
>>> >
>>> > Thanks
>>> >
>>> > On Wed, Aug 19, 2020 at 3:54 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>> >>
>>> >> Is this 2kps coming out of Filter1 + 2kps coming out of Filter2 (which
>>> >> would be 4kps total), or only 2kps coming out of KafkaIO and
>>> >> MessageExtractor?
>>> >>
>>> >> Though it /shouldn't/ matter, due to sibling fusion, there's a chance
>>> >> things are getting fused poorly and you could write Filter1 and
>>> >> Filter2 instead as a DoFn with multiple outputs (see
>>> >>
>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_programming-2Dguide_-23additional-2Doutputs&d=DwIFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=Erfg03JLKLNG3lT2ejqq7_fbvfL95-wSxZ5hFKqzyKU&s=JsWPJxBXopYYenfBAp6nkwfB0Q1Dhs1d4Yi41fBY3a8&e=
>>> ).
>>> >>
>>> >> - Robert
>>> >>
>>> >> On Wed, Aug 19, 2020 at 3:37 PM Talat Uyarer
>>> >> <tu...@paloaltonetworks.com> wrote:
>>> >> >
>>> >> > Hi,
>>> >> >
>>> >> > I have a very simple DAG on my dataflow job.
>>> (KafkaIO->Filter->WriteGCS). When I submit this Dataflow job per topic it
>>> has 4kps per instance processing speed. However I want to consume two
>>> different topics in one DF job. I used TupleTag. I created TupleTags per
>>> message type. Each topic has different message types and also needs
>>> different filters. So my pipeline turned to below DAG. Message Extractor is
>>> a very simple step checking header of kafka messages and writing the
>>> correct TupleTag. However after starting to use this new DAG, dataflow
>>> canprocess 2kps per instance.
>>> >> >
>>> >> >
>>> |--->Filter1-->WriteGCS
>>> >> > KafkaIO->MessageExtractor-> |
>>> >> >
>>> |--->Filter2-->WriteGCS
>>> >> >
>>> >> > Do you have any idea why my data process speed decreased ?
>>> >> >
>>> >> > Thanks
>>>
>>

Re: Resource Consumption increase With TupleTag

Posted by Luke Cwik <lc...@google.com>.
Do you mean I can put my simple pipeline multiple times for all topics in
one dataflow job ?
Yes

Is there any side effect having multiple independent DAG on one DF job ?
Not really. It is more about pipeline complexity, logging, debugging,
monitoring which become more complex.

And also why the TupleTag model is not working properly?
What do you mean by it's not working properly?

Why is it using more resources than what it should be?
What does the timing information for the transforms tell you on the
Dataflow Job UI? (Even if MessageExtractor seems simple it isn't free, You
have to now write to two GCS locations instead of one for each work item
that you process so your doing more network calls)


On Wed, Aug 19, 2020 at 8:36 PM Talat Uyarer <tu...@paloaltonetworks.com>
wrote:

> Filter step is an independent step. We can think it is an etl step or
> something else. MessageExtractor step writes messages on TupleTags based on
> the kafka header. Yes, MessageExtractor is literally a multi-output DoFn
> already. MessageExtractor is processing 48kps but branches are processing
> their logs. Each Filter only consumes its log type. There is no any  So
> That's why I assume it should consume the same amount of workers. But it
> consumes more workers.
>
>
>
>  |--->Filter1(20kps)-->WriteGCS
> KafkaIO->MessageExtractor(48kps)-> |
>
>  |--->Filter2(28kps)-->WriteGCS
>
> Do you mean I can put my simple pipeline multiple times for all topics in
> one dataflow job ? Is there any side effect having multiple independent DAG
> on one DF job ? And also why the TupleTag model is not working properly?
> Why is it using more resources than what it should be?
>
> Thanks
>
>
>
> On Wed, Aug 19, 2020 at 5:16 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> Just to clarify, previously you had.
>>
>> KafkaIO(topic1) --20kps--> Filter1 -> WriteGCS
>> KafkaIO(topic2) --28kps--> Filter2 -> WriteGCS
>>
>> And now you have
>>
>>
>>                                                   ---48kps--> Filter1
>> -> WriteGCS
>>                                               /
>> KafkaIO(topic1, topic2) + MessageExtractor
>>                                                \
>>                                                  ---48kps--> Filter2 ->
>> WriteGCS
>>
>> Each filter is now actually consuming (and throwing away) more data than
>> before.
>>
>> Or is MessageExtractor literally a multi-output DoFn already (which is
>> why you're talking about TupleTags). This could possibly be more
>> expensive if reading Kafak with headers is more expensive than reading
>> it without.
>>
>> If topic1 and topic2 are truly independent, I would keep their reads
>> separate. This will simplify your pipeline (and sounds like it'll
>> improve performance). Note that you don't have to have a separate
>> Dataflow job for each read, you can have a single Pipeline and do as
>> many reads as you want and the'll all get executed in the same job.
>>
>> On Wed, Aug 19, 2020 at 4:14 PM Talat Uyarer
>> <tu...@paloaltonetworks.com> wrote:
>> >
>> > Hi Robert,
>> >
>> > I calculated process speed based on worker count. When I have separate
>> jobs. topic1 job used 5 workers, topic2 job used 7 workers. Based on
>> KafkaIO message count. they had 4kps processing speed per worker. After I
>> combine them in one df job. That DF job started using ~18 workers, not 12
>> workers.
>> >
>> > How can I understand if they are poorly fused or not ? I can not write
>> Filter because it is a beamsql. I just want to simplified my DAG that's why
>> i did not mentioned
>> >
>> > Thanks
>> >
>> > On Wed, Aug 19, 2020 at 3:54 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >>
>> >> Is this 2kps coming out of Filter1 + 2kps coming out of Filter2 (which
>> >> would be 4kps total), or only 2kps coming out of KafkaIO and
>> >> MessageExtractor?
>> >>
>> >> Though it /shouldn't/ matter, due to sibling fusion, there's a chance
>> >> things are getting fused poorly and you could write Filter1 and
>> >> Filter2 instead as a DoFn with multiple outputs (see
>> >>
>> https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_programming-2Dguide_-23additional-2Doutputs&d=DwIFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=Erfg03JLKLNG3lT2ejqq7_fbvfL95-wSxZ5hFKqzyKU&s=JsWPJxBXopYYenfBAp6nkwfB0Q1Dhs1d4Yi41fBY3a8&e=
>> ).
>> >>
>> >> - Robert
>> >>
>> >> On Wed, Aug 19, 2020 at 3:37 PM Talat Uyarer
>> >> <tu...@paloaltonetworks.com> wrote:
>> >> >
>> >> > Hi,
>> >> >
>> >> > I have a very simple DAG on my dataflow job.
>> (KafkaIO->Filter->WriteGCS). When I submit this Dataflow job per topic it
>> has 4kps per instance processing speed. However I want to consume two
>> different topics in one DF job. I used TupleTag. I created TupleTags per
>> message type. Each topic has different message types and also needs
>> different filters. So my pipeline turned to below DAG. Message Extractor is
>> a very simple step checking header of kafka messages and writing the
>> correct TupleTag. However after starting to use this new DAG, dataflow
>> canprocess 2kps per instance.
>> >> >
>> >> >
>> |--->Filter1-->WriteGCS
>> >> > KafkaIO->MessageExtractor-> |
>> >> >
>> |--->Filter2-->WriteGCS
>> >> >
>> >> > Do you have any idea why my data process speed decreased ?
>> >> >
>> >> > Thanks
>>
>

Re: Resource Consumption increase With TupleTag

Posted by Talat Uyarer <tu...@paloaltonetworks.com>.
Filter step is an independent step. We can think it is an etl step or
something else. MessageExtractor step writes messages on TupleTags based on
the kafka header. Yes, MessageExtractor is literally a multi-output DoFn
already. MessageExtractor is processing 48kps but branches are processing
their logs. Each Filter only consumes its log type. There is no any  So
That's why I assume it should consume the same amount of workers. But it
consumes more workers.



 |--->Filter1(20kps)-->WriteGCS
KafkaIO->MessageExtractor(48kps)-> |

 |--->Filter2(28kps)-->WriteGCS

Do you mean I can put my simple pipeline multiple times for all topics in
one dataflow job ? Is there any side effect having multiple independent DAG
on one DF job ? And also why the TupleTag model is not working properly?
Why is it using more resources than what it should be?

Thanks



On Wed, Aug 19, 2020 at 5:16 PM Robert Bradshaw <ro...@google.com> wrote:

> Just to clarify, previously you had.
>
> KafkaIO(topic1) --20kps--> Filter1 -> WriteGCS
> KafkaIO(topic2) --28kps--> Filter2 -> WriteGCS
>
> And now you have
>
>
>                                                   ---48kps--> Filter1
> -> WriteGCS
>                                               /
> KafkaIO(topic1, topic2) + MessageExtractor
>                                                \
>                                                  ---48kps--> Filter2 ->
> WriteGCS
>
> Each filter is now actually consuming (and throwing away) more data than
> before.
>
> Or is MessageExtractor literally a multi-output DoFn already (which is
> why you're talking about TupleTags). This could possibly be more
> expensive if reading Kafak with headers is more expensive than reading
> it without.
>
> If topic1 and topic2 are truly independent, I would keep their reads
> separate. This will simplify your pipeline (and sounds like it'll
> improve performance). Note that you don't have to have a separate
> Dataflow job for each read, you can have a single Pipeline and do as
> many reads as you want and the'll all get executed in the same job.
>
> On Wed, Aug 19, 2020 at 4:14 PM Talat Uyarer
> <tu...@paloaltonetworks.com> wrote:
> >
> > Hi Robert,
> >
> > I calculated process speed based on worker count. When I have separate
> jobs. topic1 job used 5 workers, topic2 job used 7 workers. Based on
> KafkaIO message count. they had 4kps processing speed per worker. After I
> combine them in one df job. That DF job started using ~18 workers, not 12
> workers.
> >
> > How can I understand if they are poorly fused or not ? I can not write
> Filter because it is a beamsql. I just want to simplified my DAG that's why
> i did not mentioned
> >
> > Thanks
> >
> > On Wed, Aug 19, 2020 at 3:54 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> Is this 2kps coming out of Filter1 + 2kps coming out of Filter2 (which
> >> would be 4kps total), or only 2kps coming out of KafkaIO and
> >> MessageExtractor?
> >>
> >> Though it /shouldn't/ matter, due to sibling fusion, there's a chance
> >> things are getting fused poorly and you could write Filter1 and
> >> Filter2 instead as a DoFn with multiple outputs (see
> >>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_programming-2Dguide_-23additional-2Doutputs&d=DwIFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=Erfg03JLKLNG3lT2ejqq7_fbvfL95-wSxZ5hFKqzyKU&s=JsWPJxBXopYYenfBAp6nkwfB0Q1Dhs1d4Yi41fBY3a8&e=
> ).
> >>
> >> - Robert
> >>
> >> On Wed, Aug 19, 2020 at 3:37 PM Talat Uyarer
> >> <tu...@paloaltonetworks.com> wrote:
> >> >
> >> > Hi,
> >> >
> >> > I have a very simple DAG on my dataflow job.
> (KafkaIO->Filter->WriteGCS). When I submit this Dataflow job per topic it
> has 4kps per instance processing speed. However I want to consume two
> different topics in one DF job. I used TupleTag. I created TupleTags per
> message type. Each topic has different message types and also needs
> different filters. So my pipeline turned to below DAG. Message Extractor is
> a very simple step checking header of kafka messages and writing the
> correct TupleTag. However after starting to use this new DAG, dataflow
> canprocess 2kps per instance.
> >> >
> >> >
> |--->Filter1-->WriteGCS
> >> > KafkaIO->MessageExtractor-> |
> >> >
> |--->Filter2-->WriteGCS
> >> >
> >> > Do you have any idea why my data process speed decreased ?
> >> >
> >> > Thanks
>

Re: Resource Consumption increase With TupleTag

Posted by Robert Bradshaw <ro...@google.com>.
Just to clarify, previously you had.

KafkaIO(topic1) --20kps--> Filter1 -> WriteGCS
KafkaIO(topic2) --28kps--> Filter2 -> WriteGCS

And now you have


                                                  ---48kps--> Filter1
-> WriteGCS
                                              /
KafkaIO(topic1, topic2) + MessageExtractor
                                               \
                                                 ---48kps--> Filter2 -> WriteGCS

Each filter is now actually consuming (and throwing away) more data than before.

Or is MessageExtractor literally a multi-output DoFn already (which is
why you're talking about TupleTags). This could possibly be more
expensive if reading Kafak with headers is more expensive than reading
it without.

If topic1 and topic2 are truly independent, I would keep their reads
separate. This will simplify your pipeline (and sounds like it'll
improve performance). Note that you don't have to have a separate
Dataflow job for each read, you can have a single Pipeline and do as
many reads as you want and the'll all get executed in the same job.

On Wed, Aug 19, 2020 at 4:14 PM Talat Uyarer
<tu...@paloaltonetworks.com> wrote:
>
> Hi Robert,
>
> I calculated process speed based on worker count. When I have separate jobs. topic1 job used 5 workers, topic2 job used 7 workers. Based on KafkaIO message count. they had 4kps processing speed per worker. After I combine them in one df job. That DF job started using ~18 workers, not 12 workers.
>
> How can I understand if they are poorly fused or not ? I can not write Filter because it is a beamsql. I just want to simplified my DAG that's why i did not mentioned
>
> Thanks
>
> On Wed, Aug 19, 2020 at 3:54 PM Robert Bradshaw <ro...@google.com> wrote:
>>
>> Is this 2kps coming out of Filter1 + 2kps coming out of Filter2 (which
>> would be 4kps total), or only 2kps coming out of KafkaIO and
>> MessageExtractor?
>>
>> Though it /shouldn't/ matter, due to sibling fusion, there's a chance
>> things are getting fused poorly and you could write Filter1 and
>> Filter2 instead as a DoFn with multiple outputs (see
>> https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_programming-2Dguide_-23additional-2Doutputs&d=DwIFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=Erfg03JLKLNG3lT2ejqq7_fbvfL95-wSxZ5hFKqzyKU&s=JsWPJxBXopYYenfBAp6nkwfB0Q1Dhs1d4Yi41fBY3a8&e= ).
>>
>> - Robert
>>
>> On Wed, Aug 19, 2020 at 3:37 PM Talat Uyarer
>> <tu...@paloaltonetworks.com> wrote:
>> >
>> > Hi,
>> >
>> > I have a very simple DAG on my dataflow job. (KafkaIO->Filter->WriteGCS). When I submit this Dataflow job per topic it has 4kps per instance processing speed. However I want to consume two different topics in one DF job. I used TupleTag. I created TupleTags per message type. Each topic has different message types and also needs different filters. So my pipeline turned to below DAG. Message Extractor is a very simple step checking header of kafka messages and writing the correct TupleTag. However after starting to use this new DAG, dataflow canprocess 2kps per instance.
>> >
>> >                                                  |--->Filter1-->WriteGCS
>> > KafkaIO->MessageExtractor-> |
>> >                                                  |--->Filter2-->WriteGCS
>> >
>> > Do you have any idea why my data process speed decreased ?
>> >
>> > Thanks

Re: Resource Consumption increase With TupleTag

Posted by Talat Uyarer <tu...@paloaltonetworks.com>.
Hi Robert,

I calculated process speed based on worker count. When I have
separate jobs. topic1 job used 5 workers, topic2 job used 7 workers. Based
on KafkaIO message count. they had 4kps processing speed per worker. After
I combine them in one df job. That DF job started using ~18 workers, not 12
workers.

How can I understand if they are poorly fused or not ? I can not write
Filter because it is a beamsql. I just want to simplified my DAG that's why
i did not mentioned

Thanks

On Wed, Aug 19, 2020 at 3:54 PM Robert Bradshaw <ro...@google.com> wrote:

> Is this 2kps coming out of Filter1 + 2kps coming out of Filter2 (which
> would be 4kps total), or only 2kps coming out of KafkaIO and
> MessageExtractor?
>
> Though it /shouldn't/ matter, due to sibling fusion, there's a chance
> things are getting fused poorly and you could write Filter1 and
> Filter2 instead as a DoFn with multiple outputs (see
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_programming-2Dguide_-23additional-2Doutputs&d=DwIFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=Erfg03JLKLNG3lT2ejqq7_fbvfL95-wSxZ5hFKqzyKU&s=JsWPJxBXopYYenfBAp6nkwfB0Q1Dhs1d4Yi41fBY3a8&e=
> ).
>
> - Robert
>
> On Wed, Aug 19, 2020 at 3:37 PM Talat Uyarer
> <tu...@paloaltonetworks.com> wrote:
> >
> > Hi,
> >
> > I have a very simple DAG on my dataflow job.
> (KafkaIO->Filter->WriteGCS). When I submit this Dataflow job per topic it
> has 4kps per instance processing speed. However I want to consume two
> different topics in one DF job. I used TupleTag. I created TupleTags per
> message type. Each topic has different message types and also needs
> different filters. So my pipeline turned to below DAG. Message Extractor is
> a very simple step checking header of kafka messages and writing the
> correct TupleTag. However after starting to use this new DAG, dataflow
> canprocess 2kps per instance.
> >
> >                                                  |--->Filter1-->WriteGCS
> > KafkaIO->MessageExtractor-> |
> >                                                  |--->Filter2-->WriteGCS
> >
> > Do you have any idea why my data process speed decreased ?
> >
> > Thanks
>

Re: Resource Consumption increase With TupleTag

Posted by Robert Bradshaw <ro...@google.com>.
Is this 2kps coming out of Filter1 + 2kps coming out of Filter2 (which
would be 4kps total), or only 2kps coming out of KafkaIO and
MessageExtractor?

Though it /shouldn't/ matter, due to sibling fusion, there's a chance
things are getting fused poorly and you could write Filter1 and
Filter2 instead as a DoFn with multiple outputs (see
https://beam.apache.org/documentation/programming-guide/#additional-outputs).

- Robert

On Wed, Aug 19, 2020 at 3:37 PM Talat Uyarer
<tu...@paloaltonetworks.com> wrote:
>
> Hi,
>
> I have a very simple DAG on my dataflow job. (KafkaIO->Filter->WriteGCS). When I submit this Dataflow job per topic it has 4kps per instance processing speed. However I want to consume two different topics in one DF job. I used TupleTag. I created TupleTags per message type. Each topic has different message types and also needs different filters. So my pipeline turned to below DAG. Message Extractor is a very simple step checking header of kafka messages and writing the correct TupleTag. However after starting to use this new DAG, dataflow canprocess 2kps per instance.
>
>                                                  |--->Filter1-->WriteGCS
> KafkaIO->MessageExtractor-> |
>                                                  |--->Filter2-->WriteGCS
>
> Do you have any idea why my data process speed decreased ?
>
> Thanks