You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Maximilian Michels <mx...@apache.org> on 2019/01/30 13:06:01 UTC

Re: Acknowledging Pubsub messages in Flink Runner

Just a heads-up that the underlying issue has been fixed for Beam 2.10.0.

I've shared this information earlier with Encho and Valeri when the 
corresponding ticket was resolved (Dec 30): 
https://jira.apache.org/jira/browse/BEAM-5386

Thanks,
Max

On 19.09.18 14:42, Valeri Tsolov wrote:
> Hey Max,
> I think it is possible but not sure when we are going to plan such activity. 
> Will return you ASAP.
> 
> Thanks,
> Valeri
> 
> На пн, 17.09.2018 г. в 19:27 ч. Maximilian Michels <mxm@apache.org 
> <ma...@apache.org>> написа:
> 
>     I wonder if you could do some profiling on the TaskManagers and see
>     where they spend most of their time? That would be very helpful. If it
>     is indeed `finalizeCheckpoint`, then we could introduce asynchronous
>     acknowledgement. If it is in `snapshotState`, then we know that the
>     bottleneck is there.
> 
>     Do you think profiling on the TaskManagers would be feasible?
> 
>     Another question: Did you activate asynchronous snapshots?
> 
>     Thanks,
>     Max
> 
>     On 17.09.18 17:15, Encho Mishinev wrote:
>      > Hi Max,
>      >
>      > I agree that the problem might not be in the acknowledgement itself. A
>      > very long checkpoint could go past the subscription acknowledgement
>      > deadline (10min is the maximum allowed) and hence the message might be
>      > resent yielding the behaviour we see.
>      >
>      > In any way, the extreme slow down of checkpoints still remains
>      > unexplained. This occurs even if the job simply reads from Pubsub and
>      > does nothing else.
>      >
>      > We do use FsStateBackend using HDFS. The whole setup is deployed in
>      > Kubernetes. Any ideas of why this might be happening would be of great help.
>      >
>      > Thanks,
>      > Encho
>      >
>      > On Mon, Sep 17, 2018 at 4:15 PM Maximilian Michels <mxm@apache.org
>     <ma...@apache.org>
>      > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>      >
>      >     Hi Encho,
>      >
>      >     Thanks for providing more insight into this. I've re-examined the
>      >     checkpointing code and couldn't find anything suspicious there.
>      >
>      >       > The first job I stopped right when it processed more messages than I
>      >       > had loaded. The subscription afterwards had 52 000 unacknowledged
>      >       > messages.
>      >
>      >     That does sound suspicious with a parallelism of 52, but your other
>      >     experiments don't confirm that there is something wrong with the
>      >     acknowledgment. Rather, it seems the checkpointing itself is taking
>      >     longer and longer. This could also be caused by long acknowlegments,
>      >     since this stalls in-progress checkpoints.
>      >
>      >     Please check the Web UI for statistics about the checkpoints:
>      >
>     https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/checkpoint_monitoring.html
>      >
>      >
>      >
>      >     You're going through a lot of messages in between the checkpoints.
>      >     Which
>      >     state backend do you use? Please try re-running your job with the file
>      >     system state backend (FsStateBackend) or the RocksDB state backend
>      >     (RocksDBStateBackend). For the RocksDB state backend you will have to
>      >     add the RocksDB dependency. The file system backend should work out of
>      >     the box, just specify a path and set
>      >     FlinkPipelineOptions#setStateBackend(..). See:
>      >
>     https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html
>      >
>      >     Next, I could supply you with a custom Beam version which logs more
>      >     debug information.
>      >
>      >     Best,
>      >     Max
>      >
>      >     On 13.09.18 16:40, Encho Mishinev wrote:
>      >      > Hello Max,
>      >      >
>      >      > I am currently performing more tests on it and will follow-up with
>      >      > anything I find.
>      >      >
>      >      > Currently I have the following observations:
>      >      >
>      >      > Whenever there are few (relative to the parallelism) messages
>      >     left in a
>      >      > pubsub topic the checkpointing length becomes very long. I have
>      >     tried
>      >      > this with different parallelism. My usual set for testing is 13 task
>      >      > managers with 4 task slots eac, 52 parallelism for the job and
>      >      > checkpointing every 60s. I've done three runs on a subscription
>      >     filled
>      >      > with about 122,000,000 messages. The job works fast going through
>      >     about
>      >      > 1,500,000 messages/minute until it reaches about 120,000,000 or
>      >     so, when
>      >      > it progressively slows down. Checkpointing length increases from an
>      >      > average of 50-60s to 2:30min-3min. When about a few hundred thousand
>      >      > messages are left the job mostly does long checkpoints and no work.
>      >      > Messages pass through but seemingly forever.
>      >      >
>      >      > The first job I stopped right when it processed more messages
>      >     than I had
>      >      > loaded. The subscription afterwards had 52 000 unacknowledged
>      >     messages.
>      >      >
>      >      > Another job with the same approach had 87 000 unacknowledged
>      >     messages.
>      >      >
>      >      > A third job I left over 30 minutes after it had processed more
>      >     messages
>      >      > than I had loaded. It worked very slowly with long checkpoints and
>      >      > processed a few hundred thousand messages in total over the 30
>      >     minute
>      >      > period. That subscription then had only 235 unacknowledged
>      >     messages left.
>      >      >
>      >      > I have put large acknowledgement deadline for the subscriptions
>      >     so that
>      >      > the checkpointing time is less than the deadline (otherwise the
>      >     messages
>      >      > are naturally resent and can't be acknowledged), that
>      >     unfortunately is
>      >      > not the problem.
>      >      >
>      >      > I then tried running the whole thing with parallelism of 1 and
>      >     about 100
>      >      > 000 messages. The job started fast once again, doing a few
>      >     thousand a
>      >      > second and doing all checkpoints in under 3s. Upon reaching about
>      >     90 000
>      >      > it again started to slow down. This time it slowly reached it's
>      >     goal and
>      >      > there were actually no unacknowledged messages, but the last 10 000
>      >      > messages were processed dreadfully slowly and one checkpoint
>      >     during that
>      >      > period took 45s (compared to tens of checkpoints under 3s before
>      >     that).
>      >      >
>      >      > I am not sure how to check how many messages get acknowledged per
>      >      > checkpoint.
>      >      > I'm open to trying new runs and sharing the results - let me know
>      >     if you
>      >      > want me to try and run the job with some specific parameters.
>      >      >
>      >      > Thanks for the help,
>      >      > Encho
>      >      >
>      >      > On Thu, Sep 13, 2018 at 5:20 PM Maximilian Michels
>      >     <mxm@apache.org <ma...@apache.org> <mailto:mxm@apache.org
>     <ma...@apache.org>>
>      >      > <mailto:mxm@apache.org <ma...@apache.org>
>     <mailto:mxm@apache.org <ma...@apache.org>>>> wrote:
>      >      >
>      >      >     That is indeed strange. Would you be able to provide some
>      >     debugging
>      >      >     information, e.g. how many message get acked for each checkpoint?
>      >      >
>      >      >     What is the parallelism of your job?
>      >      >
>      >      >     Thanks,
>      >      >     Max
>      >      >
>      >      >     On 12.09.18 12:57, Encho Mishinev wrote:
>      >      >      > Hello Max,
>      >      >      >
>      >      >      > Thanks for the answer. My guess was that they are
>      >     acknowledged at
>      >      >      > completion of Flink's checkpoints, but wanted to make sure
>      >     since
>      >      >     that
>      >      >      > doesn't explain my problem.
>      >      >      >
>      >      >      > Whenever a subscription is nearly empty the job gets slower
>      >      >     overall and
>      >      >      > the Flink's checkpoints start taking much more time
>      >     (thrice or more)
>      >      >      > even though their state is much smaller, and of course,
>      >     there always
>      >      >      > seem to be messages cycling over and over again.
>      >      >      >
>      >      >      > If you have any clue at all why this might be, let me know.
>      >      >      >
>      >      >      > Thanks for the help,
>      >      >      > Encho
>      >      >      >
>      >      >      > On Tue, Sep 11, 2018 at 1:45 PM Maximilian Michels
>      >      >     <mxm@apache.org <ma...@apache.org> <mailto:mxm@apache.org
>     <ma...@apache.org>>
>      >     <mailto:mxm@apache.org <ma...@apache.org> <mailto:mxm@apache.org
>     <ma...@apache.org>>>
>      >      >      > <mailto:mxm@apache.org <ma...@apache.org>
>     <mailto:mxm@apache.org <ma...@apache.org>>
>      >     <mailto:mxm@apache.org <ma...@apache.org> <mailto:mxm@apache.org
>     <ma...@apache.org>>>>> wrote:
>      >      >      >
>      >      >      >     Hey Encho,
>      >      >      >
>      >      >      >     The Flink Runner acknowledges messages through PubSubIO's
>      >      >      >     `CheckpointMark#finalizeCheckpoint()` method.
>      >      >      >
>      >      >      >     The Flink Runner wraps the PubSubIO source via the
>      >      >      >     UnboundedSourceWrapper. When Flink takes a checkpoint
>      >     of the
>      >      >     running
>      >      >      >     Beam streaming job, the wrapper will retrieve the
>      >      >     CheckpointMarks from
>      >      >      >     the PubSubIO source.
>      >      >      >
>      >      >      >     When the Checkpoint is completed, there is a callback
>      >     which
>      >      >     informs the
>      >      >      >     wrapper (`notifyCheckpointComplete()`) and calls
>      >      >     `finalizeCheckpoint()`
>      >      >      >     on all the generated CheckpointMarks.
>      >      >      >
>      >      >      >     Hope that helps debugging your problem. I don't have an
>      >      >     explanation why
>      >      >      >     this doesn't work for the last records in your PubSub
>      >     queue. It
>      >      >      >     shouldn't make a difference for how the Flink Runner does
>      >      >     checkpointing.
>      >      >      >
>      >      >      >     Best,
>      >      >      >     Max
>      >      >      >
>      >      >      >     On 10.09.18 18:17, Encho Mishinev wrote:
>      >      >      >      > Hello,
>      >      >      >      >
>      >      >      >      > I am using Flink runner with Apache Beam 2.6.0. I was
>      >      >     wondering
>      >      >      >     if there
>      >      >      >      > is information on when exactly the runner
>      >     acknowledges a
>      >      >     pubsub
>      >      >      >     message
>      >      >      >      > when reading from PubsubIO?
>      >      >      >      >
>      >      >      >      > My problem is that whenever there are a few
>      >     messages left in a
>      >      >      >      > subscription my streaming job never really seems to
>      >      >     acknowledge them
>      >      >      >      > all. For example is a subscription has 100,000,000
>      >     messages in
>      >      >      >     total,
>      >      >      >      > the job will go through about 99,990,000 and then
>      >     keep reading
>      >      >      >     the last
>      >      >      >      > few thousand and seemingly never acknowledge them.
>      >      >      >      >
>      >      >      >      > Some clarity on when the acknowledgement happens in the
>      >      >     pipeline
>      >      >      >     might
>      >      >      >      > help me debug this problem.
>      >      >      >      >
>      >      >      >      > Thanks!
>      >      >      >
>      >      >
>      >
> 
> -- 
> *Valeri*
> * Tsolov*
> Software engineer
> 089.358.1040
> <http://www.leanplum.com/>
> Mobile Engagement Delivered
> Find out how in <90 seconds! <https://vimeo.com/241978055>