You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Nimalan Mahendran <ni...@liminalinsights.com> on 2023/07/19 00:43:07 UTC

Growing checkpoint size with Python SDF for reading from Redis streams

Hello,

I am running a pipeline built in the Python SDK that reads from a Redis
stream <https://redis.io/docs/data-types/streams/> via an SDF, in the
following environment:

   - Python 3.11
   - Apache Beam 2.48.0
   - Flink 1.16
   - Checkpoint interval: 60s
   - state.backend (Flink): hashmap
   - state_backend (Beam): filesystem

The issue that I am observing is that the checkpoint size keeps growing,
even when there are no items to read on the Redis stream. Since there are
no items to read on the Redis stream, the Redis stream SDF is simply doing
the following steps repeatedly, as part of DoFn.process, i.e. the pattern
described in the user-initiated checkpoint pattern in the Apache Beam
programming guide
<https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint>
to handle polling for new items with some delay, if the last poll returned
no items:

   1. Make the call to the Redis client to read items from the Redis stream
   2. Receive no items from the Redis stream, and hence,
   3. Call tracker.defer_remainder(Duration.of(5)) and return-ing to defer
   execution for 5 seconds. That code is located here
   <https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-beam-py-L534-L541>
   .
   4. Go back to step 1.

This checkpoint size growth happens regardless of whether I'm using
heap-based or RocksDB-based checkpoints. Eventually, the checkpoint grows
large enough to cause the task manager to crash, due to exhausting Java
heap space. The rate of checkpoint size growth is proportional to the
number of tracker.defer_remainder() calls I have done, i.e. increasing
parallelism and/or decreasing the timeout used in tracker.defer_remainder
will increase the rate of checkpoint growth.

I took a look at the heap-based checkpoint files that I observed were
getting larger with each checkpoint (just using the less command) and
noticed that many copies of the residual restriction were present, which
seemed like a red flag. The residual restriction here is the one that
results from calling tracker.defer_remainder(), which results in a
tracker.try_split(0.0).

I've included the SDF code and jobmanager logs showing growing checkpoint
size here: https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e.
I've included the restriction provider/tracker and other pieces for
completeness, but the SDF is towards the bottom.

Any help would be appreciated! 🙏🏾

Thanks,
-- 
Nimalan Mahendran
ML Engineer at Liminal Insights

Re: Growing checkpoint size with Python SDF for reading from Redis streams

Posted by Robert Bradshaw via user <us...@beam.apache.org>.
Your SDF looks fine. I wonder if there is an issue with how Flink is
implementing SDFs (e.g. not garbage collecting previous remainders).

On Tue, Jul 18, 2023 at 5:43 PM Nimalan Mahendran
<ni...@liminalinsights.com> wrote:
>
> Hello,
>
> I am running a pipeline built in the Python SDK that reads from a Redis stream via an SDF, in the following environment:
>
> Python 3.11
> Apache Beam 2.48.0
> Flink 1.16
> Checkpoint interval: 60s
> state.backend (Flink): hashmap
> state_backend (Beam): filesystem
>
> The issue that I am observing is that the checkpoint size keeps growing, even when there are no items to read on the Redis stream. Since there are no items to read on the Redis stream, the Redis stream SDF is simply doing the following steps repeatedly, as part of DoFn.process, i.e. the pattern described in the user-initiated checkpoint pattern in the Apache Beam programming guide to handle polling for new items with some delay, if the last poll returned no items:
>
> Make the call to the Redis client to read items from the Redis stream
> Receive no items from the Redis stream, and hence,
> Call tracker.defer_remainder(Duration.of(5)) and return-ing to defer execution for 5 seconds. That code is located here.
> Go back to step 1.
>
> This checkpoint size growth happens regardless of whether I'm using heap-based or RocksDB-based checkpoints. Eventually, the checkpoint grows large enough to cause the task manager to crash, due to exhausting Java heap space. The rate of checkpoint size growth is proportional to the number of tracker.defer_remainder() calls I have done, i.e. increasing parallelism and/or decreasing the timeout used in tracker.defer_remainder will increase the rate of checkpoint growth.
>
> I took a look at the heap-based checkpoint files that I observed were getting larger with each checkpoint (just using the less command) and noticed that many copies of the residual restriction were present, which seemed like a red flag. The residual restriction here is the one that results from calling tracker.defer_remainder(), which results in a tracker.try_split(0.0).
>
> I've included the SDF code and jobmanager logs showing growing checkpoint size here: https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e. I've included the restriction provider/tracker and other pieces for completeness, but the SDF is towards the bottom.
>
> Any help would be appreciated! 🙏🏾
>
> Thanks,
> --
> Nimalan Mahendran
> ML Engineer at Liminal Insights

Re: Growing checkpoint size with Python SDF for reading from Redis streams

Posted by Pavel Solomin <p....@gmail.com>.
Hi Nimalan,

Thank you for this tremendous effort of narrowing this down and creating
the minimal example!

I used only Beam Java SDK so far, and experienced checkpoint size growing
only in 2 cases:
1 - some resources leaking in user code in DoFn
2 - data overload not being handled well with connectors (rare)

I usually used Java memory dumps to investigate both. While awaiting for
replies from Python SDK users, you can try to run your OOM case with memory
dump <https://www.baeldung.com/java-heap-dump-capture> to see what gets
accumulated in Java heap inside your Flink app.

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Tue, 25 Jul 2023 at 00:54, Nimalan Mahendran <ni...@liminalinsights.com>
wrote:

> I wanted to update that I pared down my setup to reproduce this bug and
> filed an issue: https://github.com/apache/beam/issues/27648. It includes
> steps to reproduce the issue. I would be very interested to hear whether
> others are experiencing the same issue, or if they're also running
> unbounded SDFs on Flink and not seeing this issue.
>
> I'm also really hoping to get some traction on this issue as it is quite a
> serious bug for our system that we have built on top of Beam/Flink ☹️
>
> On Fri, Jul 21, 2023 at 10:28 AM Nimalan Mahendran <
> nimalan@liminalinsights.com> wrote:
>
>> An interesting update - I found PeriodicImpulse, which is a Python-based
>> unbounded SDF. I created the following minimal pipeline:
>>
>>     with beam.Pipeline(options=runner_options) as pipeline:
>>         traceable_measurements = pipeline |
>> PeriodicImpulse(fire_interval=5)
>>         traceable_measurements | beam.Map(print)
>>
>> And I am still seeing growing checkpoint size in Flink. I'll keep running
>> the pipeline to ensure that this trend continues, but this is pretty
>> concerning:
>> 2023-07-21 17:24:22,401 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
>> Triggering checkpoint 1 (type=CheckpointType{name='Checkpoint',
>> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1689960262390 for job
>> a5e3323d73491f3e6c409c79f160c555.
>> 2023-07-21 17:24:22,487 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
>> Completed checkpoint 1 for job a5e3323d73491f3e6c409c79f160c555 (500341
>> bytes, checkpointDuration=94 ms, finalizationTime=3 ms).
>> 2023-07-21 17:25:22,389 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
>> Triggering checkpoint 2 (type=CheckpointType{name='Checkpoint',
>> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1689960322388 for job
>> a5e3323d73491f3e6c409c79f160c555.
>> 2023-07-21 17:25:22,431 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
>> Completed checkpoint 2 for job a5e3323d73491f3e6c409c79f160c555 (895320
>> bytes, checkpointDuration=42 ms, finalizationTime=1 ms).
>>
>> Once again, any pointers would be appreciated.
>>
>> On Fri, Jul 21, 2023 at 9:04 AM Nimalan Mahendran <
>> nimalan@liminalinsights.com> wrote:
>>
>>> Hello again,
>>>
>>> I tried to follow a process of elimination and simplify my code as much
>>> as possible, see
>>> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_beam-py.
>>> This splittable DoFn is pared down to only doing the polling. It no longer
>>> uses the Redis client or uses any libraries internal to us. I have also
>>> included a simplified pipeline, see
>>> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_pipeline-py.
>>> It now simply reads from the dummy Redis reader splittable DoFn and writes
>>> using the dummer writer DoFn. I also did away with dataclasses and simply
>>> represent restrictions as ints.
>>>
>>> I was surprised to see that I'm still experiencing checkpoint growth. I
>>> have to say that I am at quite a loss. I also looked at an old mailing list
>>> thread (https://www.mail-archive.com/user@beam.apache.org/msg05513.html)
>>> and tried windowing/triggering the PCollection coming out of the dummy
>>> Redis reader splittable DoFn, but still saw unbounded checkpoint size
>>> growth.
>>>
>>> I thought it might be worth flagging that My Redis reader splittable
>>> DoFn works such that an element never finishes processing, i.e. the
>>> unboundedness is at the element level. I went with this design to mirror
>>> how Redis streams are structured, i.e. the message ids in Redis streams
>>> start at 0-0, go up to ">" (which represents the part of the stream that
>>> has not been consumed yet) and finish at "$", which is kind of like
>>> Long.MAX_VALUE (i.e. you are never meant to get there).
>>>
>>> Another thing that may be worth flagging is that when I am polling an
>>> empty stream, I am essentially returning the same consumed and residual
>>> restriction on each poll. The input restriction to
>>> try_split(fraction_of_remainder=0) is [>, $), i.e. all unread messages
>>> available on the stream. The output consumed partition is [>, >), i.e. no
>>> messages were read. The output residual partition is [>, $) again, i.e. all
>>> unread messages available on the stream.
>>>
>>> I've also been looking through the apache/beam github repo to try to get
>>> a better sense of what might be going on. I noticed that the residual
>>> restrictions I return are being added to
>>> execution_context.delayed_applications, which is then returned via a
>>> protobuf message in the residual_roots field, but I cannot tell where/how
>>> execution_context.delayed_applications is actually getting cleared. Maybe
>>> that's a red herring. I also tried to look at the KafkaIO connector (which
>>> is in Java, of course) to see it would be informative about any issues with
>>> how I am using the various Beam abstractions, but that led nowhere for me.
>>>
>>> It would be really helpful to hear from people on this mailing list
>>> regarding:
>>>
>>>    1. Any potential causes for the checkpoint size growth. The only
>>>    possible cause I can think of so far is that perhaps the SDF is implicitly
>>>    storing some state that is not getting cleared out.
>>>    2. A complete working example of an unbounded SDF written using the
>>>    Python SDK that uses tracker.defer_remainder, so I can at least start from
>>>    something that works (i.e. does not have unbounded checkpoint size growth).
>>>    3. Whether what I am flagging above might just be red herrings?
>>>    4. Anything else that might help me make progress.
>>>
>>> Thanks again,
>>>
>>> On Tue, Jul 18, 2023 at 5:43 PM Nimalan Mahendran <
>>> nimalan@liminalinsights.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I am running a pipeline built in the Python SDK that reads from a Redis
>>>> stream <https://redis.io/docs/data-types/streams/> via an SDF, in the
>>>> following environment:
>>>>
>>>>    - Python 3.11
>>>>    - Apache Beam 2.48.0
>>>>    - Flink 1.16
>>>>    - Checkpoint interval: 60s
>>>>    - state.backend (Flink): hashmap
>>>>    - state_backend (Beam): filesystem
>>>>
>>>> The issue that I am observing is that the checkpoint size keeps
>>>> growing, even when there are no items to read on the Redis stream. Since
>>>> there are no items to read on the Redis stream, the Redis stream SDF is
>>>> simply doing the following steps repeatedly, as part of DoFn.process, i.e.
>>>> the pattern described in the user-initiated checkpoint pattern in the
>>>> Apache Beam programming guide
>>>> <https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint>
>>>> to handle polling for new items with some delay, if the last poll returned
>>>> no items:
>>>>
>>>>    1. Make the call to the Redis client to read items from the Redis
>>>>    stream
>>>>    2. Receive no items from the Redis stream, and hence,
>>>>    3. Call tracker.defer_remainder(Duration.of(5)) and return-ing to
>>>>    defer execution for 5 seconds. That code is located here
>>>>    <https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-beam-py-L534-L541>
>>>>    .
>>>>    4. Go back to step 1.
>>>>
>>>> This checkpoint size growth happens regardless of whether I'm using
>>>> heap-based or RocksDB-based checkpoints. Eventually, the checkpoint grows
>>>> large enough to cause the task manager to crash, due to exhausting Java
>>>> heap space. The rate of checkpoint size growth is proportional to the
>>>> number of tracker.defer_remainder() calls I have done, i.e. increasing
>>>> parallelism and/or decreasing the timeout used in tracker.defer_remainder
>>>> will increase the rate of checkpoint growth.
>>>>
>>>> I took a look at the heap-based checkpoint files that I observed were
>>>> getting larger with each checkpoint (just using the less command) and
>>>> noticed that many copies of the residual restriction were present, which
>>>> seemed like a red flag. The residual restriction here is the one that
>>>> results from calling tracker.defer_remainder(), which results in a
>>>> tracker.try_split(0.0).
>>>>
>>>> I've included the SDF code and jobmanager logs showing growing
>>>> checkpoint size here:
>>>> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e. I've
>>>> included the restriction provider/tracker and other pieces for
>>>> completeness, but the SDF is towards the bottom.
>>>>
>>>> Any help would be appreciated! 🙏🏾
>>>>
>>>> Thanks,
>>>> --
>>>> Nimalan Mahendran
>>>> ML Engineer at Liminal Insights
>>>>
>>>

Re: Growing checkpoint size with Python SDF for reading from Redis streams

Posted by Nimalan Mahendran <ni...@liminalinsights.com>.
I wanted to update that I pared down my setup to reproduce this bug and
filed an issue: https://github.com/apache/beam/issues/27648. It includes
steps to reproduce the issue. I would be very interested to hear whether
others are experiencing the same issue, or if they're also running
unbounded SDFs on Flink and not seeing this issue.

I'm also really hoping to get some traction on this issue as it is quite a
serious bug for our system that we have built on top of Beam/Flink ☹️

On Fri, Jul 21, 2023 at 10:28 AM Nimalan Mahendran <
nimalan@liminalinsights.com> wrote:

> An interesting update - I found PeriodicImpulse, which is a Python-based
> unbounded SDF. I created the following minimal pipeline:
>
>     with beam.Pipeline(options=runner_options) as pipeline:
>         traceable_measurements = pipeline |
> PeriodicImpulse(fire_interval=5)
>         traceable_measurements | beam.Map(print)
>
> And I am still seeing growing checkpoint size in Flink. I'll keep running
> the pipeline to ensure that this trend continues, but this is pretty
> concerning:
> 2023-07-21 17:24:22,401 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> Triggering checkpoint 1 (type=CheckpointType{name='Checkpoint',
> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1689960262390 for job
> a5e3323d73491f3e6c409c79f160c555.
> 2023-07-21 17:24:22,487 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> Completed checkpoint 1 for job a5e3323d73491f3e6c409c79f160c555 (500341
> bytes, checkpointDuration=94 ms, finalizationTime=3 ms).
> 2023-07-21 17:25:22,389 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> Triggering checkpoint 2 (type=CheckpointType{name='Checkpoint',
> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1689960322388 for job
> a5e3323d73491f3e6c409c79f160c555.
> 2023-07-21 17:25:22,431 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> Completed checkpoint 2 for job a5e3323d73491f3e6c409c79f160c555 (895320
> bytes, checkpointDuration=42 ms, finalizationTime=1 ms).
>
> Once again, any pointers would be appreciated.
>
> On Fri, Jul 21, 2023 at 9:04 AM Nimalan Mahendran <
> nimalan@liminalinsights.com> wrote:
>
>> Hello again,
>>
>> I tried to follow a process of elimination and simplify my code as much
>> as possible, see
>> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_beam-py.
>> This splittable DoFn is pared down to only doing the polling. It no longer
>> uses the Redis client or uses any libraries internal to us. I have also
>> included a simplified pipeline, see
>> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_pipeline-py.
>> It now simply reads from the dummy Redis reader splittable DoFn and writes
>> using the dummer writer DoFn. I also did away with dataclasses and simply
>> represent restrictions as ints.
>>
>> I was surprised to see that I'm still experiencing checkpoint growth. I
>> have to say that I am at quite a loss. I also looked at an old mailing list
>> thread (https://www.mail-archive.com/user@beam.apache.org/msg05513.html)
>> and tried windowing/triggering the PCollection coming out of the dummy
>> Redis reader splittable DoFn, but still saw unbounded checkpoint size
>> growth.
>>
>> I thought it might be worth flagging that My Redis reader splittable DoFn
>> works such that an element never finishes processing, i.e. the
>> unboundedness is at the element level. I went with this design to mirror
>> how Redis streams are structured, i.e. the message ids in Redis streams
>> start at 0-0, go up to ">" (which represents the part of the stream that
>> has not been consumed yet) and finish at "$", which is kind of like
>> Long.MAX_VALUE (i.e. you are never meant to get there).
>>
>> Another thing that may be worth flagging is that when I am polling an
>> empty stream, I am essentially returning the same consumed and residual
>> restriction on each poll. The input restriction to
>> try_split(fraction_of_remainder=0) is [>, $), i.e. all unread messages
>> available on the stream. The output consumed partition is [>, >), i.e. no
>> messages were read. The output residual partition is [>, $) again, i.e. all
>> unread messages available on the stream.
>>
>> I've also been looking through the apache/beam github repo to try to get
>> a better sense of what might be going on. I noticed that the residual
>> restrictions I return are being added to
>> execution_context.delayed_applications, which is then returned via a
>> protobuf message in the residual_roots field, but I cannot tell where/how
>> execution_context.delayed_applications is actually getting cleared. Maybe
>> that's a red herring. I also tried to look at the KafkaIO connector (which
>> is in Java, of course) to see it would be informative about any issues with
>> how I am using the various Beam abstractions, but that led nowhere for me.
>>
>> It would be really helpful to hear from people on this mailing list
>> regarding:
>>
>>    1. Any potential causes for the checkpoint size growth. The only
>>    possible cause I can think of so far is that perhaps the SDF is implicitly
>>    storing some state that is not getting cleared out.
>>    2. A complete working example of an unbounded SDF written using the
>>    Python SDK that uses tracker.defer_remainder, so I can at least start from
>>    something that works (i.e. does not have unbounded checkpoint size growth).
>>    3. Whether what I am flagging above might just be red herrings?
>>    4. Anything else that might help me make progress.
>>
>> Thanks again,
>>
>> On Tue, Jul 18, 2023 at 5:43 PM Nimalan Mahendran <
>> nimalan@liminalinsights.com> wrote:
>>
>>> Hello,
>>>
>>> I am running a pipeline built in the Python SDK that reads from a Redis
>>> stream <https://redis.io/docs/data-types/streams/> via an SDF, in the
>>> following environment:
>>>
>>>    - Python 3.11
>>>    - Apache Beam 2.48.0
>>>    - Flink 1.16
>>>    - Checkpoint interval: 60s
>>>    - state.backend (Flink): hashmap
>>>    - state_backend (Beam): filesystem
>>>
>>> The issue that I am observing is that the checkpoint size keeps growing,
>>> even when there are no items to read on the Redis stream. Since there are
>>> no items to read on the Redis stream, the Redis stream SDF is simply doing
>>> the following steps repeatedly, as part of DoFn.process, i.e. the pattern
>>> described in the user-initiated checkpoint pattern in the Apache Beam
>>> programming guide
>>> <https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint>
>>> to handle polling for new items with some delay, if the last poll returned
>>> no items:
>>>
>>>    1. Make the call to the Redis client to read items from the Redis
>>>    stream
>>>    2. Receive no items from the Redis stream, and hence,
>>>    3. Call tracker.defer_remainder(Duration.of(5)) and return-ing to
>>>    defer execution for 5 seconds. That code is located here
>>>    <https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-beam-py-L534-L541>
>>>    .
>>>    4. Go back to step 1.
>>>
>>> This checkpoint size growth happens regardless of whether I'm using
>>> heap-based or RocksDB-based checkpoints. Eventually, the checkpoint grows
>>> large enough to cause the task manager to crash, due to exhausting Java
>>> heap space. The rate of checkpoint size growth is proportional to the
>>> number of tracker.defer_remainder() calls I have done, i.e. increasing
>>> parallelism and/or decreasing the timeout used in tracker.defer_remainder
>>> will increase the rate of checkpoint growth.
>>>
>>> I took a look at the heap-based checkpoint files that I observed were
>>> getting larger with each checkpoint (just using the less command) and
>>> noticed that many copies of the residual restriction were present, which
>>> seemed like a red flag. The residual restriction here is the one that
>>> results from calling tracker.defer_remainder(), which results in a
>>> tracker.try_split(0.0).
>>>
>>> I've included the SDF code and jobmanager logs showing growing
>>> checkpoint size here:
>>> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e. I've
>>> included the restriction provider/tracker and other pieces for
>>> completeness, but the SDF is towards the bottom.
>>>
>>> Any help would be appreciated! 🙏🏾
>>>
>>> Thanks,
>>> --
>>> Nimalan Mahendran
>>> ML Engineer at Liminal Insights
>>>
>>

Re: Growing checkpoint size with Python SDF for reading from Redis streams

Posted by Nimalan Mahendran <ni...@liminalinsights.com>.
An interesting update - I found PeriodicImpulse, which is a Python-based
unbounded SDF. I created the following minimal pipeline:

    with beam.Pipeline(options=runner_options) as pipeline:
        traceable_measurements = pipeline | PeriodicImpulse(fire_interval=5)
        traceable_measurements | beam.Map(print)

And I am still seeing growing checkpoint size in Flink. I'll keep running
the pipeline to ensure that this trend continues, but this is pretty
concerning:
2023-07-21 17:24:22,401 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
Triggering checkpoint 1 (type=CheckpointType{name='Checkpoint',
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1689960262390 for job
a5e3323d73491f3e6c409c79f160c555.
2023-07-21 17:24:22,487 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
Completed checkpoint 1 for job a5e3323d73491f3e6c409c79f160c555 (500341
bytes, checkpointDuration=94 ms, finalizationTime=3 ms).
2023-07-21 17:25:22,389 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
Triggering checkpoint 2 (type=CheckpointType{name='Checkpoint',
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1689960322388 for job
a5e3323d73491f3e6c409c79f160c555.
2023-07-21 17:25:22,431 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
Completed checkpoint 2 for job a5e3323d73491f3e6c409c79f160c555 (895320
bytes, checkpointDuration=42 ms, finalizationTime=1 ms).

Once again, any pointers would be appreciated.

On Fri, Jul 21, 2023 at 9:04 AM Nimalan Mahendran <
nimalan@liminalinsights.com> wrote:

> Hello again,
>
> I tried to follow a process of elimination and simplify my code as much as
> possible, see
> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_beam-py.
> This splittable DoFn is pared down to only doing the polling. It no longer
> uses the Redis client or uses any libraries internal to us. I have also
> included a simplified pipeline, see
> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_pipeline-py.
> It now simply reads from the dummy Redis reader splittable DoFn and writes
> using the dummer writer DoFn. I also did away with dataclasses and simply
> represent restrictions as ints.
>
> I was surprised to see that I'm still experiencing checkpoint growth. I
> have to say that I am at quite a loss. I also looked at an old mailing list
> thread (https://www.mail-archive.com/user@beam.apache.org/msg05513.html)
> and tried windowing/triggering the PCollection coming out of the dummy
> Redis reader splittable DoFn, but still saw unbounded checkpoint size
> growth.
>
> I thought it might be worth flagging that My Redis reader splittable DoFn
> works such that an element never finishes processing, i.e. the
> unboundedness is at the element level. I went with this design to mirror
> how Redis streams are structured, i.e. the message ids in Redis streams
> start at 0-0, go up to ">" (which represents the part of the stream that
> has not been consumed yet) and finish at "$", which is kind of like
> Long.MAX_VALUE (i.e. you are never meant to get there).
>
> Another thing that may be worth flagging is that when I am polling an
> empty stream, I am essentially returning the same consumed and residual
> restriction on each poll. The input restriction to
> try_split(fraction_of_remainder=0) is [>, $), i.e. all unread messages
> available on the stream. The output consumed partition is [>, >), i.e. no
> messages were read. The output residual partition is [>, $) again, i.e. all
> unread messages available on the stream.
>
> I've also been looking through the apache/beam github repo to try to get a
> better sense of what might be going on. I noticed that the residual
> restrictions I return are being added to
> execution_context.delayed_applications, which is then returned via a
> protobuf message in the residual_roots field, but I cannot tell where/how
> execution_context.delayed_applications is actually getting cleared. Maybe
> that's a red herring. I also tried to look at the KafkaIO connector (which
> is in Java, of course) to see it would be informative about any issues with
> how I am using the various Beam abstractions, but that led nowhere for me.
>
> It would be really helpful to hear from people on this mailing list
> regarding:
>
>    1. Any potential causes for the checkpoint size growth. The only
>    possible cause I can think of so far is that perhaps the SDF is implicitly
>    storing some state that is not getting cleared out.
>    2. A complete working example of an unbounded SDF written using the
>    Python SDK that uses tracker.defer_remainder, so I can at least start from
>    something that works (i.e. does not have unbounded checkpoint size growth).
>    3. Whether what I am flagging above might just be red herrings?
>    4. Anything else that might help me make progress.
>
> Thanks again,
>
> On Tue, Jul 18, 2023 at 5:43 PM Nimalan Mahendran <
> nimalan@liminalinsights.com> wrote:
>
>> Hello,
>>
>> I am running a pipeline built in the Python SDK that reads from a Redis
>> stream <https://redis.io/docs/data-types/streams/> via an SDF, in the
>> following environment:
>>
>>    - Python 3.11
>>    - Apache Beam 2.48.0
>>    - Flink 1.16
>>    - Checkpoint interval: 60s
>>    - state.backend (Flink): hashmap
>>    - state_backend (Beam): filesystem
>>
>> The issue that I am observing is that the checkpoint size keeps growing,
>> even when there are no items to read on the Redis stream. Since there are
>> no items to read on the Redis stream, the Redis stream SDF is simply doing
>> the following steps repeatedly, as part of DoFn.process, i.e. the pattern
>> described in the user-initiated checkpoint pattern in the Apache Beam
>> programming guide
>> <https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint>
>> to handle polling for new items with some delay, if the last poll returned
>> no items:
>>
>>    1. Make the call to the Redis client to read items from the Redis
>>    stream
>>    2. Receive no items from the Redis stream, and hence,
>>    3. Call tracker.defer_remainder(Duration.of(5)) and return-ing to
>>    defer execution for 5 seconds. That code is located here
>>    <https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-beam-py-L534-L541>
>>    .
>>    4. Go back to step 1.
>>
>> This checkpoint size growth happens regardless of whether I'm using
>> heap-based or RocksDB-based checkpoints. Eventually, the checkpoint grows
>> large enough to cause the task manager to crash, due to exhausting Java
>> heap space. The rate of checkpoint size growth is proportional to the
>> number of tracker.defer_remainder() calls I have done, i.e. increasing
>> parallelism and/or decreasing the timeout used in tracker.defer_remainder
>> will increase the rate of checkpoint growth.
>>
>> I took a look at the heap-based checkpoint files that I observed were
>> getting larger with each checkpoint (just using the less command) and
>> noticed that many copies of the residual restriction were present, which
>> seemed like a red flag. The residual restriction here is the one that
>> results from calling tracker.defer_remainder(), which results in a
>> tracker.try_split(0.0).
>>
>> I've included the SDF code and jobmanager logs showing growing checkpoint
>> size here:
>> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e. I've
>> included the restriction provider/tracker and other pieces for
>> completeness, but the SDF is towards the bottom.
>>
>> Any help would be appreciated! 🙏🏾
>>
>> Thanks,
>> --
>> Nimalan Mahendran
>> ML Engineer at Liminal Insights
>>
>

Re: Growing checkpoint size with Python SDF for reading from Redis streams

Posted by Nimalan Mahendran <ni...@liminalinsights.com>.
Hello again,

I tried to follow a process of elimination and simplify my code as much as
possible, see
https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_beam-py.
This splittable DoFn is pared down to only doing the polling. It no longer
uses the Redis client or uses any libraries internal to us. I have also
included a simplified pipeline, see
https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_pipeline-py.
It now simply reads from the dummy Redis reader splittable DoFn and writes
using the dummer writer DoFn. I also did away with dataclasses and simply
represent restrictions as ints.

I was surprised to see that I'm still experiencing checkpoint growth. I
have to say that I am at quite a loss. I also looked at an old mailing list
thread (https://www.mail-archive.com/user@beam.apache.org/msg05513.html)
and tried windowing/triggering the PCollection coming out of the dummy
Redis reader splittable DoFn, but still saw unbounded checkpoint size
growth.

I thought it might be worth flagging that My Redis reader splittable DoFn
works such that an element never finishes processing, i.e. the
unboundedness is at the element level. I went with this design to mirror
how Redis streams are structured, i.e. the message ids in Redis streams
start at 0-0, go up to ">" (which represents the part of the stream that
has not been consumed yet) and finish at "$", which is kind of like
Long.MAX_VALUE (i.e. you are never meant to get there).

Another thing that may be worth flagging is that when I am polling an empty
stream, I am essentially returning the same consumed and residual
restriction on each poll. The input restriction to
try_split(fraction_of_remainder=0) is [>, $), i.e. all unread messages
available on the stream. The output consumed partition is [>, >), i.e. no
messages were read. The output residual partition is [>, $) again, i.e. all
unread messages available on the stream.

I've also been looking through the apache/beam github repo to try to get a
better sense of what might be going on. I noticed that the residual
restrictions I return are being added to
execution_context.delayed_applications, which is then returned via a
protobuf message in the residual_roots field, but I cannot tell where/how
execution_context.delayed_applications is actually getting cleared. Maybe
that's a red herring. I also tried to look at the KafkaIO connector (which
is in Java, of course) to see it would be informative about any issues with
how I am using the various Beam abstractions, but that led nowhere for me.

It would be really helpful to hear from people on this mailing list
regarding:

   1. Any potential causes for the checkpoint size growth. The only
   possible cause I can think of so far is that perhaps the SDF is implicitly
   storing some state that is not getting cleared out.
   2. A complete working example of an unbounded SDF written using the
   Python SDK that uses tracker.defer_remainder, so I can at least start from
   something that works (i.e. does not have unbounded checkpoint size growth).
   3. Whether what I am flagging above might just be red herrings?
   4. Anything else that might help me make progress.

Thanks again,

On Tue, Jul 18, 2023 at 5:43 PM Nimalan Mahendran <
nimalan@liminalinsights.com> wrote:

> Hello,
>
> I am running a pipeline built in the Python SDK that reads from a Redis
> stream <https://redis.io/docs/data-types/streams/> via an SDF, in the
> following environment:
>
>    - Python 3.11
>    - Apache Beam 2.48.0
>    - Flink 1.16
>    - Checkpoint interval: 60s
>    - state.backend (Flink): hashmap
>    - state_backend (Beam): filesystem
>
> The issue that I am observing is that the checkpoint size keeps growing,
> even when there are no items to read on the Redis stream. Since there are
> no items to read on the Redis stream, the Redis stream SDF is simply doing
> the following steps repeatedly, as part of DoFn.process, i.e. the pattern
> described in the user-initiated checkpoint pattern in the Apache Beam
> programming guide
> <https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint>
> to handle polling for new items with some delay, if the last poll returned
> no items:
>
>    1. Make the call to the Redis client to read items from the Redis
>    stream
>    2. Receive no items from the Redis stream, and hence,
>    3. Call tracker.defer_remainder(Duration.of(5)) and return-ing to
>    defer execution for 5 seconds. That code is located here
>    <https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-beam-py-L534-L541>
>    .
>    4. Go back to step 1.
>
> This checkpoint size growth happens regardless of whether I'm using
> heap-based or RocksDB-based checkpoints. Eventually, the checkpoint grows
> large enough to cause the task manager to crash, due to exhausting Java
> heap space. The rate of checkpoint size growth is proportional to the
> number of tracker.defer_remainder() calls I have done, i.e. increasing
> parallelism and/or decreasing the timeout used in tracker.defer_remainder
> will increase the rate of checkpoint growth.
>
> I took a look at the heap-based checkpoint files that I observed were
> getting larger with each checkpoint (just using the less command) and
> noticed that many copies of the residual restriction were present, which
> seemed like a red flag. The residual restriction here is the one that
> results from calling tracker.defer_remainder(), which results in a
> tracker.try_split(0.0).
>
> I've included the SDF code and jobmanager logs showing growing checkpoint
> size here:
> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e. I've
> included the restriction provider/tracker and other pieces for
> completeness, but the SDF is towards the bottom.
>
> Any help would be appreciated! 🙏🏾
>
> Thanks,
> --
> Nimalan Mahendran
> ML Engineer at Liminal Insights
>