You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by "Kathula, Sandeep" <Sa...@intuit.com> on 2022/06/29 18:06:39 UTC

Checkpoints timing out upgrading from Beam version 2.29 with Flink 1.12 to Beam 2.38 and Flink 1.14

Hi,
   We have a stateless application which


  1.  Reads from kafka
  2.  Doing some stateless transformations by reading from in memory databases and updating the records
  3.  Writing back to Kafka.



With Beam 2.23 and Flink 1.9, we are seeing checkpoints are working fine (it takes max 1 min).

With Beam 2.29 and Flink 1.12, we are seeing checkpoints taking longer time (it takes max 6-7 min sometimes)

With Beam 2.38 and Flink 1.14, we are seeing checkpoints timing out after 10 minutes.


I am checking Beam code and after some logging and analysis found the problem is at https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L287-L307


We are still using the old API to read from Kafka and not yet using KafkaIO based on SplittableDoFn.

There are two threads

  1.  Legacy source thread reading from kafka and doing entire processing.
  2.  Thread which emits watermark on timer https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L454-L474

Both these code blocks are in synchronized block waiting for same checkpoint lock. Under heavy load, the thread reading from kafka is running for ever in the while loop and  the thread emitting the watermarks is waiting for ever to get the lock not emitting the watermarks and the checkpoint times out.


Is it a known issue and do we have any solution here? For now we are putting Thread.sleep(1) once for every 10 sec after the synchronized block so that the thread emitting the watermarks can be unblocked and run.

One of my colleagues tried to follow up on this (attaching the previous email here) but we didn’t get any reply. Any help on this would be appreciated.

Thanks,
Sandeep

Re: Checkpoints timing out upgrading from Beam version 2.29 with Flink 1.12 to Beam 2.38 and Flink 1.14

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Sandeep,

looking into the code, can you please elaborate on how the reading 
thread holds the lock for ever? From what I understand from the code the 
lock is released after each call to reader.advance(). Therefore the 
checkpoint should not be blocked "for ever". Am I missing something? 
Could you maybe take thread dump and send dumps of those two threads?

Thanks,

  Jan

On 7/27/22 15:58, John Casey via dev wrote:
> Would it be possible to recreate the experiments to try and isolate 
> variables? Right now the 3 cases change both beam and flink versions.
>
>
>
> On Tue, Jul 26, 2022 at 11:35 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>     Bumping this and adding +John Casey
>     <ma...@google.com> who knows about KafkaIO and
>     unbounded sources, though probably less about the FlinkRunner. It
>     seems you have isolated it to the Flink translation logic. I'm not
>     sure who would be the best expert to evaluate if that logic is
>     still OK.
>
>     Kenn
>
>     On Wed, Jun 29, 2022 at 11:07 AM Kathula, Sandeep
>     <Sa...@intuit.com> wrote:
>
>         Hi,
>
>            We have a stateless application which
>
>          1. Reads from kafka
>          2. Doing some stateless transformations by reading from in
>             memory databases and updating the records
>          3. Writing back to Kafka.
>
>         *With Beam 2.23 and Flink 1.9, we are seeing checkpoints are
>         working fine (it takes max 1 min).*
>
>         **
>
>         *With Beam 2.29 and Flink 1.12, we are seeing checkpoints
>         taking longer time (it takes max 6-7 min sometimes)*
>
>         **
>
>         *With Beam 2.38 and Flink 1.14, we are seeing checkpoints
>         timing out after 10 minutes.*
>
>         I am checking Beam code and after some logging and analysis
>         found the problem is at
>         https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L287-L307
>
>         We are still using the old API to read from Kafka and not yet
>         using KafkaIO based on SplittableDoFn.
>
>         There are two threads
>
>          1. Legacy source thread reading from kafka and doing entire
>             processing.
>          2. Thread which emits watermark on timer
>             https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L454-L474
>
>         Both these code blocks are in synchronized block waiting for
>         same checkpoint lock. Under heavy load, the thread reading
>         from kafka is running for ever in the while loop and  the
>         thread emitting the watermarks is waiting for ever to get the
>         lock not emitting the watermarks and the checkpoint times out.
>
>         Is it a known issue and do we have any solution here? For now
>         we are putting Thread.sleep(1) once for every 10 sec after the
>         synchronized block so that the thread emitting the watermarks
>         can be unblocked and run.
>
>         One of my colleagues tried to follow up on this (attaching the
>         previous email here) but we didn’t get any reply. Any help on
>         this would be appreciated.
>
>         Thanks,
>
>         Sandeep
>

Re: Checkpoints timing out upgrading from Beam version 2.29 with Flink 1.12 to Beam 2.38 and Flink 1.14

Posted by John Casey via dev <de...@beam.apache.org>.
Would it be possible to recreate the experiments to try and isolate
variables? Right now the 3 cases change both beam and flink versions.



On Tue, Jul 26, 2022 at 11:35 PM Kenneth Knowles <ke...@apache.org> wrote:

> Bumping this and adding +John Casey <jo...@google.com> who knows
> about KafkaIO and unbounded sources, though probably less about the
> FlinkRunner. It seems you have isolated it to the Flink translation logic.
> I'm not sure who would be the best expert to evaluate if that logic is
> still OK.
>
> Kenn
>
> On Wed, Jun 29, 2022 at 11:07 AM Kathula, Sandeep <
> Sandeep_Kathula@intuit.com> wrote:
>
>> Hi,
>>
>>    We have a stateless application which
>>
>>
>>
>>    1. Reads from kafka
>>    2. Doing some stateless transformations by reading from in memory
>>    databases and updating the records
>>    3. Writing back to Kafka.
>>
>>
>>
>>
>>
>>
>>
>> *With Beam 2.23 and Flink 1.9, we are seeing checkpoints are working fine
>> (it takes max 1 min).*
>>
>>
>>
>> *With Beam 2.29 and Flink 1.12, we are seeing checkpoints taking longer
>> time (it takes max 6-7 min sometimes)*
>>
>>
>>
>> *With Beam 2.38 and Flink 1.14, we are seeing checkpoints timing out
>> after 10 minutes.*
>>
>>
>>
>>
>>
>> I am checking Beam code and after some logging and analysis found the
>> problem is at
>> https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L287-L307
>>
>>
>>
>>
>>
>> We are still using the old API to read from Kafka and not yet using KafkaIO
>> based on SplittableDoFn.
>>
>>
>>
>> There are two threads
>>
>>    1. Legacy source thread reading from kafka and doing entire
>>    processing.
>>    2. Thread which emits watermark on timer
>>    https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L454-L474
>>
>>
>>
>> Both these code blocks are in synchronized block waiting for same
>> checkpoint lock. Under heavy load, the thread reading from kafka is running
>> for ever in the while loop and  the thread emitting the watermarks is
>> waiting for ever to get the lock not emitting the watermarks and the
>> checkpoint times out.
>>
>>
>>
>>
>>
>> Is it a known issue and do we have any solution here? For now we are
>> putting Thread.sleep(1) once for every 10 sec after the synchronized block
>> so that the thread emitting the watermarks can be unblocked and run.
>>
>>
>>
>> One of my colleagues tried to follow up on this (attaching the previous
>> email here) but we didn’t get any reply. Any help on this would be
>> appreciated.
>>
>>
>>
>> Thanks,
>>
>> Sandeep
>>
>

Re: Checkpoints timing out upgrading from Beam version 2.29 with Flink 1.12 to Beam 2.38 and Flink 1.14

Posted by Kenneth Knowles <ke...@apache.org>.
Bumping this and adding +John Casey <jo...@google.com> who knows about
KafkaIO and unbounded sources, though probably less about the FlinkRunner.
It seems you have isolated it to the Flink translation logic. I'm not sure
who would be the best expert to evaluate if that logic is still OK.

Kenn

On Wed, Jun 29, 2022 at 11:07 AM Kathula, Sandeep <
Sandeep_Kathula@intuit.com> wrote:

> Hi,
>
>    We have a stateless application which
>
>
>
>    1. Reads from kafka
>    2. Doing some stateless transformations by reading from in memory
>    databases and updating the records
>    3. Writing back to Kafka.
>
>
>
>
>
>
>
> *With Beam 2.23 and Flink 1.9, we are seeing checkpoints are working fine
> (it takes max 1 min).*
>
>
>
> *With Beam 2.29 and Flink 1.12, we are seeing checkpoints taking longer
> time (it takes max 6-7 min sometimes)*
>
>
>
> *With Beam 2.38 and Flink 1.14, we are seeing checkpoints timing out after
> 10 minutes.*
>
>
>
>
>
> I am checking Beam code and after some logging and analysis found the
> problem is at
> https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L287-L307
>
>
>
>
>
> We are still using the old API to read from Kafka and not yet using KafkaIO
> based on SplittableDoFn.
>
>
>
> There are two threads
>
>    1. Legacy source thread reading from kafka and doing entire processing.
>    2. Thread which emits watermark on timer
>    https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L454-L474
>
>
>
> Both these code blocks are in synchronized block waiting for same
> checkpoint lock. Under heavy load, the thread reading from kafka is running
> for ever in the while loop and  the thread emitting the watermarks is
> waiting for ever to get the lock not emitting the watermarks and the
> checkpoint times out.
>
>
>
>
>
> Is it a known issue and do we have any solution here? For now we are
> putting Thread.sleep(1) once for every 10 sec after the synchronized block
> so that the thread emitting the watermarks can be unblocked and run.
>
>
>
> One of my colleagues tried to follow up on this (attaching the previous
> email here) but we didn’t get any reply. Any help on this would be
> appreciated.
>
>
>
> Thanks,
>
> Sandeep
>