You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Eleanore Jin <el...@gmail.com> on 2021/04/20 06:10:10 UTC

Fwd: Upgrade to Beam 2.28 and Flink 1.12.2 seeing significant performance degradation

Hi community,

I recently upgraded Beam 2.23/Flink 1.10.2 to Beam 2.28/Flink 1.12.2
without any code change for my beam pipeline, and I see a significant
performance degradation

The pipeline: read from kafka topic 1 using KafkaIO from Beam-> using a
side input to filter the message (translates to broadcast stream in
flink)-> some json patch to transfer the message -> output to another kafka
topic using KafkaIO from Beam

What I have observed
1. with checkpoint enabled, with my original config of checkpoint timeout
30s, it never succeeded in checkpoint after upgrade, due to checkpoint
timeout
2. I see the DAG shown in flink has been changed a lot, more rebalance and
hashing is introduced
[image: image (68).png] - this is Beam 2.23 / Flink 1.10.2
[image: image (69).png] - this is Beam 2.28 / Flink 1.12.2
3. Even without enabling checkpoint, the throughput dropped from 80K rps
(Beam 2.23/Flink 1.) to 5K rps

Very much appreciated for any suggestions

Thanks a lot for your help!
Eleanore

Re: Upgrade to Beam 2.28 and Flink 1.12.2 seeing significant performance degradation

Posted by Eleanore Jin <el...@gmail.com>.
Hi Arvid,

Yes, I reached out to Beam Community, thanks for the suggestion.

Eleanore

On Wed, Apr 21, 2021 at 10:22 AM Arvid Heise <ar...@apache.org> wrote:

> Hi Eleanore,
>
> I'm assuming that the degradation happens because of the more complex DAG.
> I'm also assuming it's causing more latency and prolongs the checkpointing
> times.
>
> I recommend you to ask this question on the Beam ML - they can probably
> explain why it's so complicated and how to tweak it to get to the prior
> shape.
>
> On Tue, Apr 20, 2021 at 8:17 AM Eleanore Jin <el...@gmail.com>
> wrote:
>
>> Hi community,
>>
>> I recently upgraded Beam 2.23/Flink 1.10.2 to Beam 2.28/Flink 1.12.2
>> without any code change for my beam pipeline, and I see a significant
>> performance degradation
>>
>> The pipeline: read from kafka topic 1 using KafkaIO from Beam-> using a
>> side input to filter the message (translates to broadcast stream in
>> flink)-> some json patch to transfer the message -> output to another kafka
>> topic using KafkaIO from Beam
>>
>> What I have observed
>> 1. with checkpoint enabled, with my original config of checkpoint timeout
>> 30s, it never succeeded in checkpoint after upgrade, due to checkpoint
>> timeout
>> 2. I see the DAG shown in flink has been changed a lot, more rebalance
>> and hashing is introduced
>> [image: image (68).png] - this is Beam 2.23 / Flink 1.10.2
>> [image: image (69).png] - this is Beam 2.28 / Flink 1.12.2
>> 3. Even without enabling checkpoint, the throughput dropped from 80K rps
>> (Beam 2.23/Flink 1.) to 5K rps
>>
>> Very much appreciated for any suggestions
>>
>> Thanks a lot for your help!
>> Eleanore
>>
>>

Re: Upgrade to Beam 2.28 and Flink 1.12.2 seeing significant performance degradation

Posted by Arvid Heise <ar...@apache.org>.
Hi Eleanore,

I'm assuming that the degradation happens because of the more complex DAG.
I'm also assuming it's causing more latency and prolongs the checkpointing
times.

I recommend you to ask this question on the Beam ML - they can probably
explain why it's so complicated and how to tweak it to get to the prior
shape.

On Tue, Apr 20, 2021 at 8:17 AM Eleanore Jin <el...@gmail.com> wrote:

> Hi community,
>
> I recently upgraded Beam 2.23/Flink 1.10.2 to Beam 2.28/Flink 1.12.2
> without any code change for my beam pipeline, and I see a significant
> performance degradation
>
> The pipeline: read from kafka topic 1 using KafkaIO from Beam-> using a
> side input to filter the message (translates to broadcast stream in
> flink)-> some json patch to transfer the message -> output to another kafka
> topic using KafkaIO from Beam
>
> What I have observed
> 1. with checkpoint enabled, with my original config of checkpoint timeout
> 30s, it never succeeded in checkpoint after upgrade, due to checkpoint
> timeout
> 2. I see the DAG shown in flink has been changed a lot, more rebalance and
> hashing is introduced
> [image: image (68).png] - this is Beam 2.23 / Flink 1.10.2
> [image: image (69).png] - this is Beam 2.28 / Flink 1.12.2
> 3. Even without enabling checkpoint, the throughput dropped from 80K rps
> (Beam 2.23/Flink 1.) to 5K rps
>
> Very much appreciated for any suggestions
>
> Thanks a lot for your help!
> Eleanore
>
>