You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Kathula, Sandeep" <Sa...@intuit.com> on 2020/08/10 16:27:14 UTC

Beam flink runner job not keeping up with input rate after downscaling

Hi,
   We started a Beam application with Flink runner with parallelism as 50. It is a stateless application.  With initial parallelism of 50, our application is able to process up to 50,000 records per second. After a week, we took a savepoint and restarted from savepoint with the parallelism of 18. We are seeing that our application is only able to process 7000 records per second but we expect it to process almost 18,000 records per second. Records processed per task manager was almost half of what is used to process previously with 50 task managers.

 When we started a new application with 18 pods without any savepoint, it is able to process ~18500 records per second. This problem occurs only when we downscale after taking a savepoint. We ported same application to simple Flink application without Apache Beam, and there it scales well without any issues after restarting from savepoint with less parallelism.  So the problem should be with Apache Beam or some config we are passing to Beam/Flink. We are using the following config:

numberOfExecutionRetries=2
externalizedCheckpointsEnabled=true
retainExternalizedCheckpointsOnCancellation=true


We didn’t give any maxParallelism in our Beam application but just specifying parallelism.

Beam version - 2.19
Flink version- 1.9

Any suggestions/help would be appreciated.


Thanks
Sandeep Kathula



Re: Beam flink runner job not keeping up with input rate after downscaling

Posted by David Morávek <dm...@apache.org>.
Hello Sandeep,

Are you seeing any skew in your data (affected TMs are receiving more data
than others)? How many partitions does your source topic have (this could
explain why some TMs would have more work to perform)?

Also, would it be possible to retry your test with the latest SDK?

D.

On Sun, Aug 16, 2020 at 6:44 AM Eleanore Jin <el...@gmail.com> wrote:

> Hi Sandeep,
>
> As I am also exploring the Beam KafkaIO, just to share some of my
> thoughts.
> 1. My understanding is, in order to guarantee no message loss, you will
> need to use KafkaExactlyOnceSink [1]. And it is not possible to relax to
> at-least-once with current KafkaIO.
> 2. when KafkaExactlyOnceSink is enabled, the checkpoint will include:
> offset from source, all the messages in between the last checkpoint and
> current checkpoint.
> 3. Only after the checkpoint is completed, then KakfaExactlyOnceSink will
> start publishing messages that have been checkpointed.
> 4. In case of failure during publishing these messages, the messages will
> be retried, there will be sequenceId assigned to each message, to determine
> which messages are published successfully, which one need to be tried. e.g.
> say messages 5 - 10 are in checkpoint, only 5 and 6 are published
> successfully, then when restart from checkpoint, only 7 to 10 will be
> published again.
>
> My question for your setup:
> if you just enable checkpoint and still use KafkaWriter [2], and your
> application is stateless, then the only state is source offset.
> consider below scenario:
> checkpoint offset 10, and checkpoint is succeeded, then message with
> offset 10 fails to be published, job restarted, it will resume from
> checkpoint, and start from offset 11, then message 10 gets lost.
>
> Please correct me if I am missing anything.
>
> Thanks a lot!
> Eleanore
>
> [1]
> https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
> [2]
> https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
>
> On Tue, Aug 11, 2020 at 11:19 PM Eleanore Jin <el...@gmail.com>
> wrote:
>
>>
>> Hi Sandeep,
>>
>> Thanks a lot for the information! I am on a similar track which requires
>> to scale up/down the stateless pipeline from a savepoint.
>>
>> It’s good to learn from your experience.
>>
>> Thanks!
>> Eleanore
>>
>> On Tue, Aug 11, 2020 at 10:21 AM Kathula, Sandeep <
>> Sandeep_Kathula@intuit.com> wrote:
>>
>>> Hi Eleanore,
>>>
>>> We are using atleast once semantics when writing to Kafka. We are Ok
>>> with duplicate messages.
>>>
>>> Thanks
>>>
>>> Sandeep Kathula
>>>
>>>
>>>
>>> *From: *Eleanore Jin <el...@gmail.com>
>>> *Date: *Monday, August 10, 2020 at 11:32 AM
>>> *To: *"Kathula, Sandeep" <Sa...@intuit.com>
>>> *Cc: *"user@beam.apache.org" <us...@beam.apache.org>, "Vora, Jainik" <
>>> Jainik_Vora@intuit.com>, "Benenson, Mikhail" <
>>> Mikhail_Benenson@intuit.com>, "Deshpande, Omkar" <
>>> Omkar_Deshpande@intuit.com>, "LeVeck, Matt" <Ma...@intuit.com>
>>> *Subject: *Re: Beam flink runner job not keeping up with input rate
>>> after downscaling
>>>
>>>
>>>
>>> This email is from an external sender.
>>>
>>>
>>>
>>> Hi Sandeep,
>>>
>>>
>>>
>>> Thanks a lot for sharing! On a separate note, I see you are using the
>>> KafkaIO.write, but not with EOS (exactly once semantics). From my
>>> understanding, just enabling a checkpoint will not be enough to guarantee
>>> no message loss? I pasted part of my DAG with KakfaIO EOS enabled. I am
>>> also read and write to Kafka with KafkaIO.
>>>
>>>
>>>
>>> Thanks a lot!
>>>
>>> Eleanore
>>>
>>>
>>>
>>> On Mon, Aug 10, 2020 at 11:07 AM Kathula, Sandeep <
>>> Sandeep_Kathula@intuit.com> wrote:
>>>
>>> Hi Eleanore,
>>>
>>>                 We are also observing that few task managers are able to
>>> keep up with incoming load but few task managers are lagging behind after
>>> starting from savepoint with less parallelism. Not all task managers are
>>> affected by this problem. We repeated this test multiple times to confirm.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Sandeep Kathula
>>>
>>>
>>>
>>> *From: *"Kathula, Sandeep" <Sa...@intuit.com>
>>> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
>>> *Date: *Monday, August 10, 2020 at 11:04 AM
>>> *To: *"user@beam.apache.org" <us...@beam.apache.org>, "
>>> eleanore.jin@gmail.com" <el...@gmail.com>
>>> *Cc: *"Vora, Jainik" <Ja...@intuit.com>, "Benenson, Mikhail" <
>>> Mikhail_Benenson@intuit.com>, "Deshpande, Omkar" <
>>> Omkar_Deshpande@intuit.com>, "LeVeck, Matt" <Ma...@intuit.com>
>>> *Subject: *Re: Beam flink runner job not keeping up with input rate
>>> after downscaling
>>>
>>>
>>>
>>> This email is from an external sender.
>>>
>>>
>>>
>>> Hi Eleanore,
>>>
>>>                     Our DAG:
>>>
>>> Source: Strip Metadata/EventBusIO.Read/Read Bytes From
>>> Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip
>>> Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip
>>> Metadata/EventBusIO.Read/Decode EB Bytes/ParMultiDo(EbExtractor) -> Strip
>>> Metadata/MapElements/Map/ParMultiDo(Anonymous) -> Filter Unreadeable
>>> Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Extract
>>> Events/Map/ParMultiDo(Anonymous) ->
>>> UaEnrichEvent/ParMultiDo(UserAgentEnrichment) ->
>>> IpEnrichEvent/ParMultiDo(GeoEnrichment) -> Keyless
>>> Write/MapElements/Map/ParMultiDo(Anonymous) -> Keyless
>>> Write/EventBusIO.Write/ParDo(EbFormatter)/ParMultiDo(EbFormatter) ->
>>> Keyless Write/EventBusIO.Write/KafkaIO.Write/Kafka
>>> ProducerRecord/Map/ParMultiDo(Anonymous) -> Keyless
>>> Write/EventBusIO.Write/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)
>>>
>>>
>>>
>>>
>>>
>>>             We read from and write to *kafka*.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Sandeep Kathula
>>>
>>>
>>>
>>>
>>>
>>> *From: *Eleanore Jin <el...@gmail.com>
>>> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
>>> *Date: *Monday, August 10, 2020 at 10:31 AM
>>> *To: *"user@beam.apache.org" <us...@beam.apache.org>
>>> *Subject: *Re: Beam flink runner job not keeping up with input rate
>>> after downscaling
>>>
>>>
>>>
>>> This email is from an external sender.
>>>
>>>
>>>
>>> Hi Sandeep,
>>>
>>>
>>>
>>> Can you please share your DAG? Is your job read and write to some sink?
>>>
>>>
>>>
>>> Thanks a lot!
>>>
>>>
>>>
>>> On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep <
>>> Sandeep_Kathula@intuit.com> wrote:
>>>
>>> Hi,
>>>
>>>    We started a Beam application with Flink runner with parallelism as
>>> 50. It is a *stateless application.*  With initial parallelism of 50,
>>> our application is able to process up to *50,000 records* per second.
>>> After a week, we took a savepoint and restarted from savepoint with the
>>> parallelism of *18.* We are seeing that our application is only able to
>>> process *7000* records per second but we expect it to process almost
>>> 18,000 records per second. Records processed per task manager was almost
>>> *half* of what is used to process previously with 50 task managers.
>>>
>>>
>>>
>>>  When we started a new application with 18 pods without any savepoint,
>>> it is able to process ~18500 records per second. This problem *occurs
>>> only when we downscale after taking a savepoint*. We ported same
>>> application to simple *Flink application without Apache Beam*, and
>>> there *it scales well without any issues* after restarting from
>>> savepoint with less parallelism.  So the problem should be with Apache Beam
>>> or some config we are passing to Beam/Flink. We are using the following
>>> config:
>>>
>>>
>>>
>>> numberOfExecutionRetries=2
>>>
>>> externalizedCheckpointsEnabled=true
>>>
>>> retainExternalizedCheckpointsOnCancellation=true
>>>
>>>
>>>
>>>
>>>
>>> We didn’t give any maxParallelism in our Beam application but just
>>> specifying parallelism.
>>>
>>>
>>>
>>> Beam version - 2.19
>>>
>>> Flink version- 1.9
>>>
>>>
>>>
>>> Any suggestions/help would be appreciated.
>>>
>>>
>>>
>>>
>>>
>>> Thanks
>>>
>>> Sandeep Kathula
>>>
>>>
>>>
>>>
>>>
>>>

Re: Beam flink runner job not keeping up with input rate after downscaling

Posted by Eleanore Jin <el...@gmail.com>.
Hi Sandeep,

As I am also exploring the Beam KafkaIO, just to share some of my thoughts.
1. My understanding is, in order to guarantee no message loss, you will
need to use KafkaExactlyOnceSink [1]. And it is not possible to relax to
at-least-once with current KafkaIO.
2. when KafkaExactlyOnceSink is enabled, the checkpoint will include:
offset from source, all the messages in between the last checkpoint and
current checkpoint.
3. Only after the checkpoint is completed, then KakfaExactlyOnceSink will
start publishing messages that have been checkpointed.
4. In case of failure during publishing these messages, the messages will
be retried, there will be sequenceId assigned to each message, to determine
which messages are published successfully, which one need to be tried. e.g.
say messages 5 - 10 are in checkpoint, only 5 and 6 are published
successfully, then when restart from checkpoint, only 7 to 10 will be
published again.

My question for your setup:
if you just enable checkpoint and still use KafkaWriter [2], and your
application is stateless, then the only state is source offset.
consider below scenario:
checkpoint offset 10, and checkpoint is succeeded, then message with offset
10 fails to be published, job restarted, it will resume from checkpoint,
and start from offset 11, then message 10 gets lost.

Please correct me if I am missing anything.

Thanks a lot!
Eleanore

[1]
https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
[2]
https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java

On Tue, Aug 11, 2020 at 11:19 PM Eleanore Jin <el...@gmail.com>
wrote:

>
> Hi Sandeep,
>
> Thanks a lot for the information! I am on a similar track which requires
> to scale up/down the stateless pipeline from a savepoint.
>
> It’s good to learn from your experience.
>
> Thanks!
> Eleanore
>
> On Tue, Aug 11, 2020 at 10:21 AM Kathula, Sandeep <
> Sandeep_Kathula@intuit.com> wrote:
>
>> Hi Eleanore,
>>
>> We are using atleast once semantics when writing to Kafka. We are Ok with
>> duplicate messages.
>>
>> Thanks
>>
>> Sandeep Kathula
>>
>>
>>
>> *From: *Eleanore Jin <el...@gmail.com>
>> *Date: *Monday, August 10, 2020 at 11:32 AM
>> *To: *"Kathula, Sandeep" <Sa...@intuit.com>
>> *Cc: *"user@beam.apache.org" <us...@beam.apache.org>, "Vora, Jainik" <
>> Jainik_Vora@intuit.com>, "Benenson, Mikhail" <Mi...@intuit.com>,
>> "Deshpande, Omkar" <Om...@intuit.com>, "LeVeck, Matt" <
>> Matt_LeVeck@intuit.com>
>> *Subject: *Re: Beam flink runner job not keeping up with input rate
>> after downscaling
>>
>>
>>
>> This email is from an external sender.
>>
>>
>>
>> Hi Sandeep,
>>
>>
>>
>> Thanks a lot for sharing! On a separate note, I see you are using the
>> KafkaIO.write, but not with EOS (exactly once semantics). From my
>> understanding, just enabling a checkpoint will not be enough to guarantee
>> no message loss? I pasted part of my DAG with KakfaIO EOS enabled. I am
>> also read and write to Kafka with KafkaIO.
>>
>>
>>
>> Thanks a lot!
>>
>> Eleanore
>>
>>
>>
>> On Mon, Aug 10, 2020 at 11:07 AM Kathula, Sandeep <
>> Sandeep_Kathula@intuit.com> wrote:
>>
>> Hi Eleanore,
>>
>>                 We are also observing that few task managers are able to
>> keep up with incoming load but few task managers are lagging behind after
>> starting from savepoint with less parallelism. Not all task managers are
>> affected by this problem. We repeated this test multiple times to confirm.
>>
>>
>>
>> Thanks
>>
>> Sandeep Kathula
>>
>>
>>
>> *From: *"Kathula, Sandeep" <Sa...@intuit.com>
>> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
>> *Date: *Monday, August 10, 2020 at 11:04 AM
>> *To: *"user@beam.apache.org" <us...@beam.apache.org>, "
>> eleanore.jin@gmail.com" <el...@gmail.com>
>> *Cc: *"Vora, Jainik" <Ja...@intuit.com>, "Benenson, Mikhail" <
>> Mikhail_Benenson@intuit.com>, "Deshpande, Omkar" <
>> Omkar_Deshpande@intuit.com>, "LeVeck, Matt" <Ma...@intuit.com>
>> *Subject: *Re: Beam flink runner job not keeping up with input rate
>> after downscaling
>>
>>
>>
>> This email is from an external sender.
>>
>>
>>
>> Hi Eleanore,
>>
>>                     Our DAG:
>>
>> Source: Strip Metadata/EventBusIO.Read/Read Bytes From
>> Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip
>> Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip
>> Metadata/EventBusIO.Read/Decode EB Bytes/ParMultiDo(EbExtractor) -> Strip
>> Metadata/MapElements/Map/ParMultiDo(Anonymous) -> Filter Unreadeable
>> Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Extract
>> Events/Map/ParMultiDo(Anonymous) ->
>> UaEnrichEvent/ParMultiDo(UserAgentEnrichment) ->
>> IpEnrichEvent/ParMultiDo(GeoEnrichment) -> Keyless
>> Write/MapElements/Map/ParMultiDo(Anonymous) -> Keyless
>> Write/EventBusIO.Write/ParDo(EbFormatter)/ParMultiDo(EbFormatter) ->
>> Keyless Write/EventBusIO.Write/KafkaIO.Write/Kafka
>> ProducerRecord/Map/ParMultiDo(Anonymous) -> Keyless
>> Write/EventBusIO.Write/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)
>>
>>
>>
>>
>>
>>             We read from and write to *kafka*.
>>
>>
>>
>> Thanks
>>
>> Sandeep Kathula
>>
>>
>>
>>
>>
>> *From: *Eleanore Jin <el...@gmail.com>
>> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
>> *Date: *Monday, August 10, 2020 at 10:31 AM
>> *To: *"user@beam.apache.org" <us...@beam.apache.org>
>> *Subject: *Re: Beam flink runner job not keeping up with input rate
>> after downscaling
>>
>>
>>
>> This email is from an external sender.
>>
>>
>>
>> Hi Sandeep,
>>
>>
>>
>> Can you please share your DAG? Is your job read and write to some sink?
>>
>>
>>
>> Thanks a lot!
>>
>>
>>
>> On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep <
>> Sandeep_Kathula@intuit.com> wrote:
>>
>> Hi,
>>
>>    We started a Beam application with Flink runner with parallelism as
>> 50. It is a *stateless application.*  With initial parallelism of 50,
>> our application is able to process up to *50,000 records* per second.
>> After a week, we took a savepoint and restarted from savepoint with the
>> parallelism of *18.* We are seeing that our application is only able to
>> process *7000* records per second but we expect it to process almost
>> 18,000 records per second. Records processed per task manager was almost
>> *half* of what is used to process previously with 50 task managers.
>>
>>
>>
>>  When we started a new application with 18 pods without any savepoint, it
>> is able to process ~18500 records per second. This problem *occurs only
>> when we downscale after taking a savepoint*. We ported same application
>> to simple *Flink application without Apache Beam*, and there *it scales
>> well without any issues* after restarting from savepoint with less
>> parallelism.  So the problem should be with Apache Beam or some config we
>> are passing to Beam/Flink. We are using the following config:
>>
>>
>>
>> numberOfExecutionRetries=2
>>
>> externalizedCheckpointsEnabled=true
>>
>> retainExternalizedCheckpointsOnCancellation=true
>>
>>
>>
>>
>>
>> We didn’t give any maxParallelism in our Beam application but just
>> specifying parallelism.
>>
>>
>>
>> Beam version - 2.19
>>
>> Flink version- 1.9
>>
>>
>>
>> Any suggestions/help would be appreciated.
>>
>>
>>
>>
>>
>> Thanks
>>
>> Sandeep Kathula
>>
>>
>>
>>
>>
>>

Re: Beam flink runner job not keeping up with input rate after downscaling

Posted by "Kathula, Sandeep" <Sa...@intuit.com>.
Hi Eleanore,
We are using atleast once semantics when writing to Kafka. We are Ok with duplicate messages.
Thanks
Sandeep Kathula

From: Eleanore Jin <el...@gmail.com>
Date: Monday, August 10, 2020 at 11:32 AM
To: "Kathula, Sandeep" <Sa...@intuit.com>
Cc: "user@beam.apache.org" <us...@beam.apache.org>, "Vora, Jainik" <Ja...@intuit.com>, "Benenson, Mikhail" <Mi...@intuit.com>, "Deshpande, Omkar" <Om...@intuit.com>, "LeVeck, Matt" <Ma...@intuit.com>
Subject: Re: Beam flink runner job not keeping up with input rate after downscaling

This email is from an external sender.

Hi Sandeep,

Thanks a lot for sharing! On a separate note, I see you are using the KafkaIO.write, but not with EOS (exactly once semantics). From my understanding, just enabling a checkpoint will not be enough to guarantee no message loss? I pasted part of my DAG with KakfaIO EOS enabled. I am also read and write to Kafka with KafkaIO.

[cid:image001.png@01D66FC9.1494E420]
Thanks a lot!
Eleanore

On Mon, Aug 10, 2020 at 11:07 AM Kathula, Sandeep <Sa...@intuit.com>> wrote:
Hi Eleanore,
                We are also observing that few task managers are able to keep up with incoming load but few task managers are lagging behind after starting from savepoint with less parallelism. Not all task managers are affected by this problem. We repeated this test multiple times to confirm.

Thanks
Sandeep Kathula

From: "Kathula, Sandeep" <Sa...@intuit.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Monday, August 10, 2020 at 11:04 AM
To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>, "eleanore.jin@gmail.com<ma...@gmail.com>" <el...@gmail.com>>
Cc: "Vora, Jainik" <Ja...@intuit.com>>, "Benenson, Mikhail" <Mi...@intuit.com>>, "Deshpande, Omkar" <Om...@intuit.com>>, "LeVeck, Matt" <Ma...@intuit.com>>
Subject: Re: Beam flink runner job not keeping up with input rate after downscaling

This email is from an external sender.

Hi Eleanore,
                    Our DAG:
Source: Strip Metadata/EventBusIO.Read/Read Bytes From Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip Metadata/EventBusIO.Read/Decode EB Bytes/ParMultiDo(EbExtractor) -> Strip Metadata/MapElements/Map/ParMultiDo(Anonymous) -> Filter Unreadeable Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Extract Events/Map/ParMultiDo(Anonymous) -> UaEnrichEvent/ParMultiDo(UserAgentEnrichment) -> IpEnrichEvent/ParMultiDo(GeoEnrichment) -> Keyless Write/MapElements/Map/ParMultiDo(Anonymous) -> Keyless Write/EventBusIO.Write/ParDo(EbFormatter)/ParMultiDo(EbFormatter) -> Keyless Write/EventBusIO.Write/KafkaIO.Write/Kafka ProducerRecord/Map/ParMultiDo(Anonymous) -> Keyless Write/EventBusIO.Write/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)


            We read from and write to kafka.

Thanks
Sandeep Kathula


From: Eleanore Jin <el...@gmail.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Monday, August 10, 2020 at 10:31 AM
To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Subject: Re: Beam flink runner job not keeping up with input rate after downscaling

This email is from an external sender.

Hi Sandeep,

Can you please share your DAG? Is your job read and write to some sink?

Thanks a lot!

On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep <Sa...@intuit.com>> wrote:
Hi,
   We started a Beam application with Flink runner with parallelism as 50. It is a stateless application.  With initial parallelism of 50, our application is able to process up to 50,000 records per second. After a week, we took a savepoint and restarted from savepoint with the parallelism of 18. We are seeing that our application is only able to process 7000 records per second but we expect it to process almost 18,000 records per second. Records processed per task manager was almost half of what is used to process previously with 50 task managers.

 When we started a new application with 18 pods without any savepoint, it is able to process ~18500 records per second. This problem occurs only when we downscale after taking a savepoint. We ported same application to simple Flink application without Apache Beam, and there it scales well without any issues after restarting from savepoint with less parallelism.  So the problem should be with Apache Beam or some config we are passing to Beam/Flink. We are using the following config:

numberOfExecutionRetries=2
externalizedCheckpointsEnabled=true
retainExternalizedCheckpointsOnCancellation=true


We didn’t give any maxParallelism in our Beam application but just specifying parallelism.

Beam version - 2.19
Flink version- 1.9

Any suggestions/help would be appreciated.


Thanks
Sandeep Kathula



Re: Beam flink runner job not keeping up with input rate after downscaling

Posted by Eleanore Jin <el...@gmail.com>.
Hi Sandeep,

Thanks a lot for sharing! On a separate note, I see you are using the
KafkaIO.write, but not with EOS (exactly once semantics). From my
understanding, just enabling a checkpoint will not be enough to guarantee
no message loss? I pasted part of my DAG with KakfaIO EOS enabled. I am
also read and write to Kafka with KafkaIO.

[image: image.png]
Thanks a lot!
Eleanore

On Mon, Aug 10, 2020 at 11:07 AM Kathula, Sandeep <
Sandeep_Kathula@intuit.com> wrote:

> Hi Eleanore,
>
>                 We are also observing that few task managers are able to
> keep up with incoming load but few task managers are lagging behind after
> starting from savepoint with less parallelism. Not all task managers are
> affected by this problem. We repeated this test multiple times to confirm.
>
>
>
> Thanks
>
> Sandeep Kathula
>
>
>
> *From: *"Kathula, Sandeep" <Sa...@intuit.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Monday, August 10, 2020 at 11:04 AM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>, "
> eleanore.jin@gmail.com" <el...@gmail.com>
> *Cc: *"Vora, Jainik" <Ja...@intuit.com>, "Benenson, Mikhail" <
> Mikhail_Benenson@intuit.com>, "Deshpande, Omkar" <
> Omkar_Deshpande@intuit.com>, "LeVeck, Matt" <Ma...@intuit.com>
> *Subject: *Re: Beam flink runner job not keeping up with input rate after
> downscaling
>
>
>
> This email is from an external sender.
>
>
>
> Hi Eleanore,
>
>                     Our DAG:
>
> Source: Strip Metadata/EventBusIO.Read/Read Bytes From
> Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip
> Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip
> Metadata/EventBusIO.Read/Decode EB Bytes/ParMultiDo(EbExtractor) -> Strip
> Metadata/MapElements/Map/ParMultiDo(Anonymous) -> Filter Unreadeable
> Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Extract
> Events/Map/ParMultiDo(Anonymous) ->
> UaEnrichEvent/ParMultiDo(UserAgentEnrichment) ->
> IpEnrichEvent/ParMultiDo(GeoEnrichment) -> Keyless
> Write/MapElements/Map/ParMultiDo(Anonymous) -> Keyless
> Write/EventBusIO.Write/ParDo(EbFormatter)/ParMultiDo(EbFormatter) ->
> Keyless Write/EventBusIO.Write/KafkaIO.Write/Kafka
> ProducerRecord/Map/ParMultiDo(Anonymous) -> Keyless
> Write/EventBusIO.Write/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)
>
>
>
>
>
>             We read from and write to *kafka*.
>
>
>
> Thanks
>
> Sandeep Kathula
>
>
>
>
>
> *From: *Eleanore Jin <el...@gmail.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Monday, August 10, 2020 at 10:31 AM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Subject: *Re: Beam flink runner job not keeping up with input rate after
> downscaling
>
>
>
> This email is from an external sender.
>
>
>
> Hi Sandeep,
>
>
>
> Can you please share your DAG? Is your job read and write to some sink?
>
>
>
> Thanks a lot!
>
>
>
> On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep <
> Sandeep_Kathula@intuit.com> wrote:
>
> Hi,
>
>    We started a Beam application with Flink runner with parallelism as 50.
> It is a *stateless application.*  With initial parallelism of 50, our
> application is able to process up to *50,000 records* per second. After a
> week, we took a savepoint and restarted from savepoint with the parallelism
> of *18.* We are seeing that our application is only able to process *7000* records
> per second but we expect it to process almost 18,000 records per second.
> Records processed per task manager was almost *half* of what is used to
> process previously with 50 task managers.
>
>
>
>  When we started a new application with 18 pods without any savepoint, it
> is able to process ~18500 records per second. This problem *occurs only
> when we downscale after taking a savepoint*. We ported same application
> to simple *Flink application without Apache Beam*, and there *it scales
> well without any issues* after restarting from savepoint with less
> parallelism.  So the problem should be with Apache Beam or some config we
> are passing to Beam/Flink. We are using the following config:
>
>
>
> numberOfExecutionRetries=2
>
> externalizedCheckpointsEnabled=true
>
> retainExternalizedCheckpointsOnCancellation=true
>
>
>
>
>
> We didn’t give any maxParallelism in our Beam application but just
> specifying parallelism.
>
>
>
> Beam version - 2.19
>
> Flink version- 1.9
>
>
>
> Any suggestions/help would be appreciated.
>
>
>
>
>
> Thanks
>
> Sandeep Kathula
>
>
>
>
>
>

Re: Beam flink runner job not keeping up with input rate after downscaling

Posted by "Kathula, Sandeep" <Sa...@intuit.com>.
Hi Eleanore,
                We are also observing that few task managers are able to keep up with incoming load but few task managers are lagging behind after starting from savepoint with less parallelism. Not all task managers are affected by this problem. We repeated this test multiple times to confirm.

Thanks
Sandeep Kathula

From: "Kathula, Sandeep" <Sa...@intuit.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Monday, August 10, 2020 at 11:04 AM
To: "user@beam.apache.org" <us...@beam.apache.org>, "eleanore.jin@gmail.com" <el...@gmail.com>
Cc: "Vora, Jainik" <Ja...@intuit.com>, "Benenson, Mikhail" <Mi...@intuit.com>, "Deshpande, Omkar" <Om...@intuit.com>, "LeVeck, Matt" <Ma...@intuit.com>
Subject: Re: Beam flink runner job not keeping up with input rate after downscaling

This email is from an external sender.

Hi Eleanore,
                    Our DAG:
Source: Strip Metadata/EventBusIO.Read/Read Bytes From Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip Metadata/EventBusIO.Read/Decode EB Bytes/ParMultiDo(EbExtractor) -> Strip Metadata/MapElements/Map/ParMultiDo(Anonymous) -> Filter Unreadeable Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Extract Events/Map/ParMultiDo(Anonymous) -> UaEnrichEvent/ParMultiDo(UserAgentEnrichment) -> IpEnrichEvent/ParMultiDo(GeoEnrichment) -> Keyless Write/MapElements/Map/ParMultiDo(Anonymous) -> Keyless Write/EventBusIO.Write/ParDo(EbFormatter)/ParMultiDo(EbFormatter) -> Keyless Write/EventBusIO.Write/KafkaIO.Write/Kafka ProducerRecord/Map/ParMultiDo(Anonymous) -> Keyless Write/EventBusIO.Write/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)


            We read from and write to kafka.

Thanks
Sandeep Kathula


From: Eleanore Jin <el...@gmail.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Monday, August 10, 2020 at 10:31 AM
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: Beam flink runner job not keeping up with input rate after downscaling

This email is from an external sender.

Hi Sandeep,

Can you please share your DAG? Is your job read and write to some sink?

Thanks a lot!

On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep <Sa...@intuit.com>> wrote:
Hi,
   We started a Beam application with Flink runner with parallelism as 50. It is a stateless application.  With initial parallelism of 50, our application is able to process up to 50,000 records per second. After a week, we took a savepoint and restarted from savepoint with the parallelism of 18. We are seeing that our application is only able to process 7000 records per second but we expect it to process almost 18,000 records per second. Records processed per task manager was almost half of what is used to process previously with 50 task managers.

 When we started a new application with 18 pods without any savepoint, it is able to process ~18500 records per second. This problem occurs only when we downscale after taking a savepoint. We ported same application to simple Flink application without Apache Beam, and there it scales well without any issues after restarting from savepoint with less parallelism.  So the problem should be with Apache Beam or some config we are passing to Beam/Flink. We are using the following config:

numberOfExecutionRetries=2
externalizedCheckpointsEnabled=true
retainExternalizedCheckpointsOnCancellation=true


We didn’t give any maxParallelism in our Beam application but just specifying parallelism.

Beam version - 2.19
Flink version- 1.9

Any suggestions/help would be appreciated.


Thanks
Sandeep Kathula



Re: Beam flink runner job not keeping up with input rate after downscaling

Posted by "Kathula, Sandeep" <Sa...@intuit.com>.
Hi Eleanore,
                    Our DAG:
Source: Strip Metadata/EventBusIO.Read/Read Bytes From Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip Metadata/EventBusIO.Read/Decode EB Bytes/ParMultiDo(EbExtractor) -> Strip Metadata/MapElements/Map/ParMultiDo(Anonymous) -> Filter Unreadeable Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Extract Events/Map/ParMultiDo(Anonymous) -> UaEnrichEvent/ParMultiDo(UserAgentEnrichment) -> IpEnrichEvent/ParMultiDo(GeoEnrichment) -> Keyless Write/MapElements/Map/ParMultiDo(Anonymous) -> Keyless Write/EventBusIO.Write/ParDo(EbFormatter)/ParMultiDo(EbFormatter) -> Keyless Write/EventBusIO.Write/KafkaIO.Write/Kafka ProducerRecord/Map/ParMultiDo(Anonymous) -> Keyless Write/EventBusIO.Write/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)


            We read from and write to kafka.

Thanks
Sandeep Kathula


From: Eleanore Jin <el...@gmail.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Monday, August 10, 2020 at 10:31 AM
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: Beam flink runner job not keeping up with input rate after downscaling

This email is from an external sender.

Hi Sandeep,

Can you please share your DAG? Is your job read and write to some sink?

Thanks a lot!

On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep <Sa...@intuit.com>> wrote:
Hi,
   We started a Beam application with Flink runner with parallelism as 50. It is a stateless application.  With initial parallelism of 50, our application is able to process up to 50,000 records per second. After a week, we took a savepoint and restarted from savepoint with the parallelism of 18. We are seeing that our application is only able to process 7000 records per second but we expect it to process almost 18,000 records per second. Records processed per task manager was almost half of what is used to process previously with 50 task managers.

 When we started a new application with 18 pods without any savepoint, it is able to process ~18500 records per second. This problem occurs only when we downscale after taking a savepoint. We ported same application to simple Flink application without Apache Beam, and there it scales well without any issues after restarting from savepoint with less parallelism.  So the problem should be with Apache Beam or some config we are passing to Beam/Flink. We are using the following config:

numberOfExecutionRetries=2
externalizedCheckpointsEnabled=true
retainExternalizedCheckpointsOnCancellation=true


We didn’t give any maxParallelism in our Beam application but just specifying parallelism.

Beam version - 2.19
Flink version- 1.9

Any suggestions/help would be appreciated.


Thanks
Sandeep Kathula



Re: Beam flink runner job not keeping up with input rate after downscaling

Posted by Eleanore Jin <el...@gmail.com>.
Hi Sandeep,

Can you please share your DAG? Is your job read and write to some sink?

Thanks a lot!

On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep <Sa...@intuit.com>
wrote:

> Hi,
>
>    We started a Beam application with Flink runner with parallelism as 50.
> It is a *stateless application.*  With initial parallelism of 50, our
> application is able to process up to *50,000 records* per second. After a
> week, we took a savepoint and restarted from savepoint with the parallelism
> of *18.* We are seeing that our application is only able to process *7000* records
> per second but we expect it to process almost 18,000 records per second.
> Records processed per task manager was almost *half* of what is used to
> process previously with 50 task managers.
>
>
>
>  When we started a new application with 18 pods without any savepoint, it
> is able to process ~18500 records per second. This problem *occurs only
> when we downscale after taking a savepoint*. We ported same application
> to simple *Flink application without Apache Beam*, and there *it scales
> well without any issues* after restarting from savepoint with less
> parallelism.  So the problem should be with Apache Beam or some config we
> are passing to Beam/Flink. We are using the following config:
>
>
>
> numberOfExecutionRetries=2
>
> externalizedCheckpointsEnabled=true
>
> retainExternalizedCheckpointsOnCancellation=true
>
>
>
>
>
> We didn’t give any maxParallelism in our Beam application but just
> specifying parallelism.
>
>
>
> Beam version - 2.19
>
> Flink version- 1.9
>
>
>
> Any suggestions/help would be appreciated.
>
>
>
>
>
> Thanks
>
> Sandeep Kathula
>
>
>
>
>