You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Maximilian Michels <mx...@apache.org> on 2019/01/30 13:06:01 UTC
Re: Acknowledging Pubsub messages in Flink Runner
Just a heads-up that the underlying issue has been fixed for Beam 2.10.0.
I've shared this information earlier with Encho and Valeri when the
corresponding ticket was resolved (Dec 30):
https://jira.apache.org/jira/browse/BEAM-5386
Thanks,
Max
On 19.09.18 14:42, Valeri Tsolov wrote:
> Hey Max,
> I think it is possible but not sure when we are going to plan such activity.
> Will return you ASAP.
>
> Thanks,
> Valeri
>
> На пн, 17.09.2018 г. в 19:27 ч. Maximilian Michels <mxm@apache.org
> <ma...@apache.org>> написа:
>
> I wonder if you could do some profiling on the TaskManagers and see
> where they spend most of their time? That would be very helpful. If it
> is indeed `finalizeCheckpoint`, then we could introduce asynchronous
> acknowledgement. If it is in `snapshotState`, then we know that the
> bottleneck is there.
>
> Do you think profiling on the TaskManagers would be feasible?
>
> Another question: Did you activate asynchronous snapshots?
>
> Thanks,
> Max
>
> On 17.09.18 17:15, Encho Mishinev wrote:
> > Hi Max,
> >
> > I agree that the problem might not be in the acknowledgement itself. A
> > very long checkpoint could go past the subscription acknowledgement
> > deadline (10min is the maximum allowed) and hence the message might be
> > resent yielding the behaviour we see.
> >
> > In any way, the extreme slow down of checkpoints still remains
> > unexplained. This occurs even if the job simply reads from Pubsub and
> > does nothing else.
> >
> > We do use FsStateBackend using HDFS. The whole setup is deployed in
> > Kubernetes. Any ideas of why this might be happening would be of great help.
> >
> > Thanks,
> > Encho
> >
> > On Mon, Sep 17, 2018 at 4:15 PM Maximilian Michels <mxm@apache.org
> <ma...@apache.org>
> > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> >
> > Hi Encho,
> >
> > Thanks for providing more insight into this. I've re-examined the
> > checkpointing code and couldn't find anything suspicious there.
> >
> > > The first job I stopped right when it processed more messages than I
> > > had loaded. The subscription afterwards had 52 000 unacknowledged
> > > messages.
> >
> > That does sound suspicious with a parallelism of 52, but your other
> > experiments don't confirm that there is something wrong with the
> > acknowledgment. Rather, it seems the checkpointing itself is taking
> > longer and longer. This could also be caused by long acknowlegments,
> > since this stalls in-progress checkpoints.
> >
> > Please check the Web UI for statistics about the checkpoints:
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/checkpoint_monitoring.html
> >
> >
> >
> > You're going through a lot of messages in between the checkpoints.
> > Which
> > state backend do you use? Please try re-running your job with the file
> > system state backend (FsStateBackend) or the RocksDB state backend
> > (RocksDBStateBackend). For the RocksDB state backend you will have to
> > add the RocksDB dependency. The file system backend should work out of
> > the box, just specify a path and set
> > FlinkPipelineOptions#setStateBackend(..). See:
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html
> >
> > Next, I could supply you with a custom Beam version which logs more
> > debug information.
> >
> > Best,
> > Max
> >
> > On 13.09.18 16:40, Encho Mishinev wrote:
> > > Hello Max,
> > >
> > > I am currently performing more tests on it and will follow-up with
> > > anything I find.
> > >
> > > Currently I have the following observations:
> > >
> > > Whenever there are few (relative to the parallelism) messages
> > left in a
> > > pubsub topic the checkpointing length becomes very long. I have
> > tried
> > > this with different parallelism. My usual set for testing is 13 task
> > > managers with 4 task slots eac, 52 parallelism for the job and
> > > checkpointing every 60s. I've done three runs on a subscription
> > filled
> > > with about 122,000,000 messages. The job works fast going through
> > about
> > > 1,500,000 messages/minute until it reaches about 120,000,000 or
> > so, when
> > > it progressively slows down. Checkpointing length increases from an
> > > average of 50-60s to 2:30min-3min. When about a few hundred thousand
> > > messages are left the job mostly does long checkpoints and no work.
> > > Messages pass through but seemingly forever.
> > >
> > > The first job I stopped right when it processed more messages
> > than I had
> > > loaded. The subscription afterwards had 52 000 unacknowledged
> > messages.
> > >
> > > Another job with the same approach had 87 000 unacknowledged
> > messages.
> > >
> > > A third job I left over 30 minutes after it had processed more
> > messages
> > > than I had loaded. It worked very slowly with long checkpoints and
> > > processed a few hundred thousand messages in total over the 30
> > minute
> > > period. That subscription then had only 235 unacknowledged
> > messages left.
> > >
> > > I have put large acknowledgement deadline for the subscriptions
> > so that
> > > the checkpointing time is less than the deadline (otherwise the
> > messages
> > > are naturally resent and can't be acknowledged), that
> > unfortunately is
> > > not the problem.
> > >
> > > I then tried running the whole thing with parallelism of 1 and
> > about 100
> > > 000 messages. The job started fast once again, doing a few
> > thousand a
> > > second and doing all checkpoints in under 3s. Upon reaching about
> > 90 000
> > > it again started to slow down. This time it slowly reached it's
> > goal and
> > > there were actually no unacknowledged messages, but the last 10 000
> > > messages were processed dreadfully slowly and one checkpoint
> > during that
> > > period took 45s (compared to tens of checkpoints under 3s before
> > that).
> > >
> > > I am not sure how to check how many messages get acknowledged per
> > > checkpoint.
> > > I'm open to trying new runs and sharing the results - let me know
> > if you
> > > want me to try and run the job with some specific parameters.
> > >
> > > Thanks for the help,
> > > Encho
> > >
> > > On Thu, Sep 13, 2018 at 5:20 PM Maximilian Michels
> > <mxm@apache.org <ma...@apache.org> <mailto:mxm@apache.org
> <ma...@apache.org>>
> > > <mailto:mxm@apache.org <ma...@apache.org>
> <mailto:mxm@apache.org <ma...@apache.org>>>> wrote:
> > >
> > > That is indeed strange. Would you be able to provide some
> > debugging
> > > information, e.g. how many message get acked for each checkpoint?
> > >
> > > What is the parallelism of your job?
> > >
> > > Thanks,
> > > Max
> > >
> > > On 12.09.18 12:57, Encho Mishinev wrote:
> > > > Hello Max,
> > > >
> > > > Thanks for the answer. My guess was that they are
> > acknowledged at
> > > > completion of Flink's checkpoints, but wanted to make sure
> > since
> > > that
> > > > doesn't explain my problem.
> > > >
> > > > Whenever a subscription is nearly empty the job gets slower
> > > overall and
> > > > the Flink's checkpoints start taking much more time
> > (thrice or more)
> > > > even though their state is much smaller, and of course,
> > there always
> > > > seem to be messages cycling over and over again.
> > > >
> > > > If you have any clue at all why this might be, let me know.
> > > >
> > > > Thanks for the help,
> > > > Encho
> > > >
> > > > On Tue, Sep 11, 2018 at 1:45 PM Maximilian Michels
> > > <mxm@apache.org <ma...@apache.org> <mailto:mxm@apache.org
> <ma...@apache.org>>
> > <mailto:mxm@apache.org <ma...@apache.org> <mailto:mxm@apache.org
> <ma...@apache.org>>>
> > > > <mailto:mxm@apache.org <ma...@apache.org>
> <mailto:mxm@apache.org <ma...@apache.org>>
> > <mailto:mxm@apache.org <ma...@apache.org> <mailto:mxm@apache.org
> <ma...@apache.org>>>>> wrote:
> > > >
> > > > Hey Encho,
> > > >
> > > > The Flink Runner acknowledges messages through PubSubIO's
> > > > `CheckpointMark#finalizeCheckpoint()` method.
> > > >
> > > > The Flink Runner wraps the PubSubIO source via the
> > > > UnboundedSourceWrapper. When Flink takes a checkpoint
> > of the
> > > running
> > > > Beam streaming job, the wrapper will retrieve the
> > > CheckpointMarks from
> > > > the PubSubIO source.
> > > >
> > > > When the Checkpoint is completed, there is a callback
> > which
> > > informs the
> > > > wrapper (`notifyCheckpointComplete()`) and calls
> > > `finalizeCheckpoint()`
> > > > on all the generated CheckpointMarks.
> > > >
> > > > Hope that helps debugging your problem. I don't have an
> > > explanation why
> > > > this doesn't work for the last records in your PubSub
> > queue. It
> > > > shouldn't make a difference for how the Flink Runner does
> > > checkpointing.
> > > >
> > > > Best,
> > > > Max
> > > >
> > > > On 10.09.18 18:17, Encho Mishinev wrote:
> > > > > Hello,
> > > > >
> > > > > I am using Flink runner with Apache Beam 2.6.0. I was
> > > wondering
> > > > if there
> > > > > is information on when exactly the runner
> > acknowledges a
> > > pubsub
> > > > message
> > > > > when reading from PubsubIO?
> > > > >
> > > > > My problem is that whenever there are a few
> > messages left in a
> > > > > subscription my streaming job never really seems to
> > > acknowledge them
> > > > > all. For example is a subscription has 100,000,000
> > messages in
> > > > total,
> > > > > the job will go through about 99,990,000 and then
> > keep reading
> > > > the last
> > > > > few thousand and seemingly never acknowledge them.
> > > > >
> > > > > Some clarity on when the acknowledgement happens in the
> > > pipeline
> > > > might
> > > > > help me debug this problem.
> > > > >
> > > > > Thanks!
> > > >
> > >
> >
>
> --
> *Valeri*
> * Tsolov*
> Software engineer
> 089.358.1040
> <http://www.leanplum.com/>
> Mobile Engagement Delivered
> Find out how in <90 seconds! <https://vimeo.com/241978055>