You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Priya Matpadi <pr...@ecofactor.com> on 2013/10/31 17:41:07 UTC

Purgatory

Hello,
What is purgatory? I believe the following two properties relate to
consumer and producer respectively.
Could someone please explain the significance of these?
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100

Thanks,
Priya

Re: Purgatory

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks Marc! I will also go through it and suggest some edits today.

Guozhang


On Fri, Nov 8, 2013 at 7:50 AM, Marc Labbe <mr...@gmail.com> wrote:

> Thx for the feedback. It is true I never mention anything about impact on
> users or the fact this is mostly internal business in Kafka. I will try to
> rephrase some of this.
>
> Marc
> On Nov 8, 2013 10:10 AM, "Yu, Libo" <li...@citi.com> wrote:
>
> > I read it and tried to understand it. It would be great to add a summary
> > at the beginning about what it is and how it may impact a user.
> >
> > Regards,
> >
> > Libo
> >
> >
> > -----Original Message-----
> > From: Joel Koshy [mailto:jjkoshy.w@gmail.com]
> > Sent: Friday, November 08, 2013 2:01 AM
> > To: users@kafka.apache.org
> > Subject: Re: Purgatory
> >
> > Excellent - thanks for putting that together! Will review it more
> > carefully tomorrow and suggest some minor edits if required.
> >
> > On Thu, Nov 07, 2013 at 10:45:40PM -0500, Marc Labbe wrote:
> > > I've just added a page for purgatory, feel free to comment/modify at
> > will.
> > > I hope I didn't misinterpret too much of the code.
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/Request+Purgatory+(0
> > > .8)
> > >
> > > I added a few questions of my own.
> > >
> > >
> > > On Fri, Nov 1, 2013 at 9:43 PM, Joe Stein <jo...@stealth.ly>
> wrote:
> > >
> > > > To edit the Wiki you need to send an ICLA
> > > > http://www.apache.org/licenses/#clas to Apache and then once that is
> > > > done an email to private@kafka.apache.org (or to me and I will copy
> > > > private) with your Wiki username and that you sent the ICLA to
> Apache.
> > > >
> > > > Then, I can add you to edit the Wiki.
> > > >
> > > > /*******************************************
> > > >  Joe Stein
> > > >  Founder, Principal Consultant
> > > >  Big Data Open Source Security LLC
> > > >  http://www.stealth.ly
> > > >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > > ********************************************/
> > > >
> > > >
> > > > On Fri, Nov 1, 2013 at 9:08 PM, Marc Labbe <mr...@gmail.com>
> wrote:
> > > >
> > > > > Hi Joel,
> > > > >
> > > > > I used to have edit to the wiki, I made a few additions to it a
> > > > > while ago but it's seem I don't have it anymore. It might have
> > > > > been lost in the confluence update. I would be glad to add what I
> > > > > have written if I get it back. Otherwise, feel free to paste my
> > > > > words in one of the pages, I don't intend on asking for copyrights
> > for this :).
> > > > >
> > > > > marc
> > > > >
> > > > >
> > > > > On Fri, Nov 1, 2013 at 4:32 PM, Joel Koshy <jj...@gmail.com>
> > wrote:
> > > > >
> > > > > > Marc, thanks for writing that up. I think it is worth adding
> > > > > > some details on the request-purgatory on a wiki (Jay had started
> > > > > > a wiki page for kafka internals [1] a while ago, but we have not
> > > > > > had time to add much to it since.) Your write-up could be
> > > > > > reviewed and added there. Do you have edit permissions on the
> wiki?
> > > > > >
> > > > > > As for the purge interval config - yes the documentation can be
> > > > > > improved a bit. It's one of those "internal" configs that
> > > > > > generally don't need to be modified by users. The reason we
> > > > > > added that was as
> > > > > > follows:
> > > > > > - We found that for low-volume topics, replica fetch requests
> > > > > > were getting expired but sitting around in purgatory
> > > > > > - This was because we were expiring them from the delay queue
> > > > > > (used to track when requests should expire), but they were still
> > > > > > sitting in the watcherFor map - i.e., they would get purged when
> > > > > > the next producer request to that topic/partition arrived, but
> > > > > > for low volume topics this could be a long time (or never in the
> > > > > > worst case) and we would eventually run into an OOME.
> > > > > > - So we needed to periodically go through the entire watcherFor
> > > > > > map and explicitly remove those requests that had expired.
> > > > > > - More details on this are in KAFKA-664.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Joel
> > > > > >
> > > > > > [1]
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internal
> > > > > > s
> > > > > >
> > > > > > On Fri, Nov 1, 2013 at 12:33 PM, Marc Labbe <mr...@gmail.com>
> > wrote:
> > > > > > > Guozhang,
> > > > > > >
> > > > > > > I have to agree with Priya the doc isn't very clear. Although
> > > > > > > the configuration is documented, it is simply rewording the
> > > > > > > name of the
> > > > > > config,
> > > > > > > which isn't particularly useful if you want more information
> > > > > > > about
> > > > what
> > > > > > the
> > > > > > > purgatory is. I searched the whole wiki and doc and could not
> > > > > > > find
> > > > > > anything
> > > > > > > very useful as opposed looking a the code. In this case,
> > > > > > > kafka.server.KafkaApis and kafka.server.RequestPurgatory will
> > > > > > > be your friends.
> > > > > > >
> > > > > > > I'll try to add to Joe's answer here, mostly just reporting
> > > > > > > what's available in the Scala doc from the project. I am doing
> > > > > > > this to
> > > > > > understand
> > > > > > > the mechanics myself btw.
> > > > > > >
> > > > > > > As Joe said, messages are not dropped by the purgatory but
> > > > > > > simply
> > > > > removed
> > > > > > > from the purgatory when they are satisfied. Satisfaction
> > > > > > > conditions
> > > > are
> > > > > > > different for both fetch and produce requests and this is
> > > > > > > implemented
> > > > > in
> > > > > > > their respective DelayedRequest implementation (DelayedFetch
> > > > > > > and DelayedProduce).
> > > > > > >
> > > > > > > Requests purgatories are defined as follow in the code:
> > > > > > >  - ProducerRequestPurgatory: A holding pen for produce
> > > > > > > requests
> > > > waiting
> > > > > > to
> > > > > > > be satisfied.
> > > > > > >  - FetchRequestPurgatory: A holding pen for fetch requests
> > > > > > > waiting to
> > > > > be
> > > > > > > satisfied
> > > > > > >
> > > > > > > Each request purgatory runs a thread (ExpiredRequestReaper).
> > > > > > > This
> > > > > thread
> > > > > > > will first try to find an expired delayed request. When one if
> > > > > > > found,
> > > > > it
> > > > > > > will run the purgatory's expire method to handle the delayed
> > > > > > > request expiration. In both produce and fetch cases, it sends
> > > > > > > a response to
> > > > the
> > > > > > > client. An expired request will be a satisfied request. The
> > > > > > > next step
> > > > > of
> > > > > > > the thread's loop is when it checks for the configuration
> > > > > > > parameters
> > > > > you
> > > > > > > asked for initially (purgatory.purge.interval.requests). When
> > > > > > > the
> > > > > number
> > > > > > of
> > > > > > > delayed requests given to watch by the purgatory reaches this
> > > > > > > value,
> > > > it
> > > > > > > goes through all previously queued requests and removes those
> > > > > > > which
> > > > are
> > > > > > > marked as satisfied. Because of that, it is really an interval
> > > > > > > more
> > > > > than
> > > > > > it
> > > > > > > is a threshold since it doesn't really care about the amount
> > > > > > > of
> > > > > satisfied
> > > > > > > requests or the size of the queue.
> > > > > > >
> > > > > > > Producer request
> > > > > > > - When is it added to purgatory (delayed)?:
> > > > > > >   * when it uses ack=-1 (actually, the code tells me anything
> > > > > > > but 0
> > > > or
> > > > > > 1);
> > > > > > > Producer config: request.required.acks
> > > > > > >   * partitions have more than one replica (in this case,
> > > > > > > ack=-1 isn't different to ack=1 and it doesn't make much sense
> > > > > > > to use a delayed
> > > > > > request)
> > > > > > >   * not all partitions are in error
> > > > > > > - When does it expire? when it reaches the timeout defined in
> > > > > > > the
> > > > > produce
> > > > > > > request (ackTimeoutMs). Translates from producer config
> > > > > > request.timeout.ms.
> > > > > > > - What happens (on the broker) when it expires? Sends a
> > > > > > > response to
> > > > the
> > > > > > > client. Response content depends on the request of course.
> > > > > > > - When is it satisfied? I didn't find the courage to dig into
> > > > > > > the
> > > > > details
> > > > > > > of this one :(  ... but mainly when all the follower have also
> > > > > > acknowledge
> > > > > > > the produce request for their replica
> > > > > > >
> > > > > > > Fetch request
> > > > > > > - When is it added to purgatory (delayed)? 2 parameters of the
> > > > requests
> > > > > > are
> > > > > > > mainly useful here: max wait time and fetch size
> > > > > > >   * if max wait is greater than 0; otherwise, it is a blocking
> > > > > > > call
> > > > by
> > > > > > the
> > > > > > > consumer
> > > > > > >   * if fetch size is greater than the current size of data
> > > > > > > available
> > > > to
> > > > > > > fulfil the request
> > > > > > > - When does it expire?
> > > > > > >   * wait time: the amount of time the consumer is willing to
> > > > > > > wait for
> > > > > > data;
> > > > > > > Consumer config: fetch.wait.max.ms
> > > > > > > - When is it satisfied? the fetch size requested is reached -
> > > > > > > ie. the amount of data the consumer wishes to receive in one
> > > > > > > response (from consumer config: fetch.message.max.bytes)
> > > > > > >
> > > > > > > ******
> > > > > > >
> > > > > > > It would be useful to add some information about the metrics
> > > > associated
> > > > > > > with this.
> > > > > > >
> > > > > > > Of course, I am all for being corrected if I said anything
> > > > > > > wrong
> > > > here.
> > > > > > The
> > > > > > > truth is always the code :-)
> > > > > > >
> > > > > > > marc
> > > > > > > - mrtheb -
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Nov 1, 2013 at 2:45 AM, Priya Matpadi
> > > > > > > <pr...@ecofactor.com>wrote:
> > > > > > >
> > > > > > >> Guozhang,
> > > > > > >> The documentation is not very clear.
> > > > > > >> Marc's response for producer purgatory makes sense.
> > > > > > >> I am not entirely clear on fetch purgatory.
> > > > > > >> How does broker use purgatory? Is it a temporary holding
> > > > > > >> area? What
> > > > > > happens
> > > > > > >> to the messages if purge interval is exceeded in case of
> > > > > > >> either/both producer and consumer? Are messages dropped in
> this
> > case?
> > > > > > >> Thanks,
> > > > > > >> Priya
> > > > > > >>
> > > > > > >>
> > > > > > >> On Thu, Oct 31, 2013 at 2:47 PM, Guozhang Wang
> > > > > > >> <wa...@gmail.com>
> > > > > > wrote:
> > > > > > >>
> > > > > > >> > Hello Priya,
> > > > > > >> >
> > > > > > >> > You can find the definitions of these two configs here:
> > > > > > >> >
> > > > > > >> > http://kafka.apache.org/documentation.html#brokerconfigs
> > > > > > >> >
> > > > > > >> > Guozhang
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On Thu, Oct 31, 2013 at 11:20 AM, Marc Labbe
> > > > > > >> > <mr...@gmail.com>
> > > > > > wrote:
> > > > > > >> >
> > > > > > >> > > Hi Priya
> > > > > > >> > >
> > > > > > >> > > my understanding is producer requests will be delayed
> > > > > > >> > > (and put
> > > > in
> > > > > > >> request
> > > > > > >> > > purgatory) only if your producer uses ack=-1. It will be
> > > > > > >> > > in the
> > > > > > >> purgatory
> > > > > > >> > > (delayed) until all brokers have acknowledged the
> > > > > > >> > > messages to be replicated. The documentation suggests to
> > > > > > >> > > monitor the ProducerRequestPurgatory size metrics , but
> > > > > > >> > > it only applies if
> > > > > > you're
> > > > > > >> > using
> > > > > > >> > > ack=-1, otherwise, this value will always be 0.
> > > > > > >> > >
> > > > > > >> > > For consumer requests, they'll be in purgatory (delayed)
> > > > > > >> > > until
> > > > the
> > > > > > max
> > > > > > >> > > allowed time to respond has been reached, unless it has
> > > > > > >> > > enough
> > > > > > messages
> > > > > > >> > to
> > > > > > >> > > fill the buffer before that. The request will not end up
> > > > > > >> > > in the
> > > > > > >> purgatory
> > > > > > >> > > if you're making a blocking request (max wait <= 0).
> > > > > > >> > >
> > > > > > >> > > Not sure about the configuration interval though.
> > > > > > >> > >
> > > > > > >> > > marc
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi <
> > > > > > >> > > priya.matpadi@ecofactor.com
> > > > > > >> > > > wrote:
> > > > > > >> > >
> > > > > > >> > > > Hello,
> > > > > > >> > > > What is purgatory? I believe the following two
> > > > > > >> > > > properties
> > > > relate
> > > > > > to
> > > > > > >> > > > consumer and producer respectively.
> > > > > > >> > > > Could someone please explain the significance of these?
> > > > > > >> > > > fetch.purgatory.purge.interval.requests=100
> > > > > > >> > > > producer.purgatory.purge.interval.requests=100
> > > > > > >> > > >
> > > > > > >> > > > Thanks,
> > > > > > >> > > > Priya
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > --
> > > > > > >> > -- Guozhang
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> >
> >
>



-- 
-- Guozhang

RE: Purgatory

Posted by Marc Labbe <mr...@gmail.com>.
Thx for the feedback. It is true I never mention anything about impact on
users or the fact this is mostly internal business in Kafka. I will try to
rephrase some of this.

Marc
On Nov 8, 2013 10:10 AM, "Yu, Libo" <li...@citi.com> wrote:

> I read it and tried to understand it. It would be great to add a summary
> at the beginning about what it is and how it may impact a user.
>
> Regards,
>
> Libo
>
>
> -----Original Message-----
> From: Joel Koshy [mailto:jjkoshy.w@gmail.com]
> Sent: Friday, November 08, 2013 2:01 AM
> To: users@kafka.apache.org
> Subject: Re: Purgatory
>
> Excellent - thanks for putting that together! Will review it more
> carefully tomorrow and suggest some minor edits if required.
>
> On Thu, Nov 07, 2013 at 10:45:40PM -0500, Marc Labbe wrote:
> > I've just added a page for purgatory, feel free to comment/modify at
> will.
> > I hope I didn't misinterpret too much of the code.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/Request+Purgatory+(0
> > .8)
> >
> > I added a few questions of my own.
> >
> >
> > On Fri, Nov 1, 2013 at 9:43 PM, Joe Stein <jo...@stealth.ly> wrote:
> >
> > > To edit the Wiki you need to send an ICLA
> > > http://www.apache.org/licenses/#clas to Apache and then once that is
> > > done an email to private@kafka.apache.org (or to me and I will copy
> > > private) with your Wiki username and that you sent the ICLA to Apache.
> > >
> > > Then, I can add you to edit the Wiki.
> > >
> > > /*******************************************
> > >  Joe Stein
> > >  Founder, Principal Consultant
> > >  Big Data Open Source Security LLC
> > >  http://www.stealth.ly
> > >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > ********************************************/
> > >
> > >
> > > On Fri, Nov 1, 2013 at 9:08 PM, Marc Labbe <mr...@gmail.com> wrote:
> > >
> > > > Hi Joel,
> > > >
> > > > I used to have edit to the wiki, I made a few additions to it a
> > > > while ago but it's seem I don't have it anymore. It might have
> > > > been lost in the confluence update. I would be glad to add what I
> > > > have written if I get it back. Otherwise, feel free to paste my
> > > > words in one of the pages, I don't intend on asking for copyrights
> for this :).
> > > >
> > > > marc
> > > >
> > > >
> > > > On Fri, Nov 1, 2013 at 4:32 PM, Joel Koshy <jj...@gmail.com>
> wrote:
> > > >
> > > > > Marc, thanks for writing that up. I think it is worth adding
> > > > > some details on the request-purgatory on a wiki (Jay had started
> > > > > a wiki page for kafka internals [1] a while ago, but we have not
> > > > > had time to add much to it since.) Your write-up could be
> > > > > reviewed and added there. Do you have edit permissions on the wiki?
> > > > >
> > > > > As for the purge interval config - yes the documentation can be
> > > > > improved a bit. It's one of those "internal" configs that
> > > > > generally don't need to be modified by users. The reason we
> > > > > added that was as
> > > > > follows:
> > > > > - We found that for low-volume topics, replica fetch requests
> > > > > were getting expired but sitting around in purgatory
> > > > > - This was because we were expiring them from the delay queue
> > > > > (used to track when requests should expire), but they were still
> > > > > sitting in the watcherFor map - i.e., they would get purged when
> > > > > the next producer request to that topic/partition arrived, but
> > > > > for low volume topics this could be a long time (or never in the
> > > > > worst case) and we would eventually run into an OOME.
> > > > > - So we needed to periodically go through the entire watcherFor
> > > > > map and explicitly remove those requests that had expired.
> > > > > - More details on this are in KAFKA-664.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Joel
> > > > >
> > > > > [1]
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internal
> > > > > s
> > > > >
> > > > > On Fri, Nov 1, 2013 at 12:33 PM, Marc Labbe <mr...@gmail.com>
> wrote:
> > > > > > Guozhang,
> > > > > >
> > > > > > I have to agree with Priya the doc isn't very clear. Although
> > > > > > the configuration is documented, it is simply rewording the
> > > > > > name of the
> > > > > config,
> > > > > > which isn't particularly useful if you want more information
> > > > > > about
> > > what
> > > > > the
> > > > > > purgatory is. I searched the whole wiki and doc and could not
> > > > > > find
> > > > > anything
> > > > > > very useful as opposed looking a the code. In this case,
> > > > > > kafka.server.KafkaApis and kafka.server.RequestPurgatory will
> > > > > > be your friends.
> > > > > >
> > > > > > I'll try to add to Joe's answer here, mostly just reporting
> > > > > > what's available in the Scala doc from the project. I am doing
> > > > > > this to
> > > > > understand
> > > > > > the mechanics myself btw.
> > > > > >
> > > > > > As Joe said, messages are not dropped by the purgatory but
> > > > > > simply
> > > > removed
> > > > > > from the purgatory when they are satisfied. Satisfaction
> > > > > > conditions
> > > are
> > > > > > different for both fetch and produce requests and this is
> > > > > > implemented
> > > > in
> > > > > > their respective DelayedRequest implementation (DelayedFetch
> > > > > > and DelayedProduce).
> > > > > >
> > > > > > Requests purgatories are defined as follow in the code:
> > > > > >  - ProducerRequestPurgatory: A holding pen for produce
> > > > > > requests
> > > waiting
> > > > > to
> > > > > > be satisfied.
> > > > > >  - FetchRequestPurgatory: A holding pen for fetch requests
> > > > > > waiting to
> > > > be
> > > > > > satisfied
> > > > > >
> > > > > > Each request purgatory runs a thread (ExpiredRequestReaper).
> > > > > > This
> > > > thread
> > > > > > will first try to find an expired delayed request. When one if
> > > > > > found,
> > > > it
> > > > > > will run the purgatory's expire method to handle the delayed
> > > > > > request expiration. In both produce and fetch cases, it sends
> > > > > > a response to
> > > the
> > > > > > client. An expired request will be a satisfied request. The
> > > > > > next step
> > > > of
> > > > > > the thread's loop is when it checks for the configuration
> > > > > > parameters
> > > > you
> > > > > > asked for initially (purgatory.purge.interval.requests). When
> > > > > > the
> > > > number
> > > > > of
> > > > > > delayed requests given to watch by the purgatory reaches this
> > > > > > value,
> > > it
> > > > > > goes through all previously queued requests and removes those
> > > > > > which
> > > are
> > > > > > marked as satisfied. Because of that, it is really an interval
> > > > > > more
> > > > than
> > > > > it
> > > > > > is a threshold since it doesn't really care about the amount
> > > > > > of
> > > > satisfied
> > > > > > requests or the size of the queue.
> > > > > >
> > > > > > Producer request
> > > > > > - When is it added to purgatory (delayed)?:
> > > > > >   * when it uses ack=-1 (actually, the code tells me anything
> > > > > > but 0
> > > or
> > > > > 1);
> > > > > > Producer config: request.required.acks
> > > > > >   * partitions have more than one replica (in this case,
> > > > > > ack=-1 isn't different to ack=1 and it doesn't make much sense
> > > > > > to use a delayed
> > > > > request)
> > > > > >   * not all partitions are in error
> > > > > > - When does it expire? when it reaches the timeout defined in
> > > > > > the
> > > > produce
> > > > > > request (ackTimeoutMs). Translates from producer config
> > > > > request.timeout.ms.
> > > > > > - What happens (on the broker) when it expires? Sends a
> > > > > > response to
> > > the
> > > > > > client. Response content depends on the request of course.
> > > > > > - When is it satisfied? I didn't find the courage to dig into
> > > > > > the
> > > > details
> > > > > > of this one :(  ... but mainly when all the follower have also
> > > > > acknowledge
> > > > > > the produce request for their replica
> > > > > >
> > > > > > Fetch request
> > > > > > - When is it added to purgatory (delayed)? 2 parameters of the
> > > requests
> > > > > are
> > > > > > mainly useful here: max wait time and fetch size
> > > > > >   * if max wait is greater than 0; otherwise, it is a blocking
> > > > > > call
> > > by
> > > > > the
> > > > > > consumer
> > > > > >   * if fetch size is greater than the current size of data
> > > > > > available
> > > to
> > > > > > fulfil the request
> > > > > > - When does it expire?
> > > > > >   * wait time: the amount of time the consumer is willing to
> > > > > > wait for
> > > > > data;
> > > > > > Consumer config: fetch.wait.max.ms
> > > > > > - When is it satisfied? the fetch size requested is reached -
> > > > > > ie. the amount of data the consumer wishes to receive in one
> > > > > > response (from consumer config: fetch.message.max.bytes)
> > > > > >
> > > > > > ******
> > > > > >
> > > > > > It would be useful to add some information about the metrics
> > > associated
> > > > > > with this.
> > > > > >
> > > > > > Of course, I am all for being corrected if I said anything
> > > > > > wrong
> > > here.
> > > > > The
> > > > > > truth is always the code :-)
> > > > > >
> > > > > > marc
> > > > > > - mrtheb -
> > > > > >
> > > > > >
> > > > > > On Fri, Nov 1, 2013 at 2:45 AM, Priya Matpadi
> > > > > > <pr...@ecofactor.com>wrote:
> > > > > >
> > > > > >> Guozhang,
> > > > > >> The documentation is not very clear.
> > > > > >> Marc's response for producer purgatory makes sense.
> > > > > >> I am not entirely clear on fetch purgatory.
> > > > > >> How does broker use purgatory? Is it a temporary holding
> > > > > >> area? What
> > > > > happens
> > > > > >> to the messages if purge interval is exceeded in case of
> > > > > >> either/both producer and consumer? Are messages dropped in this
> case?
> > > > > >> Thanks,
> > > > > >> Priya
> > > > > >>
> > > > > >>
> > > > > >> On Thu, Oct 31, 2013 at 2:47 PM, Guozhang Wang
> > > > > >> <wa...@gmail.com>
> > > > > wrote:
> > > > > >>
> > > > > >> > Hello Priya,
> > > > > >> >
> > > > > >> > You can find the definitions of these two configs here:
> > > > > >> >
> > > > > >> > http://kafka.apache.org/documentation.html#brokerconfigs
> > > > > >> >
> > > > > >> > Guozhang
> > > > > >> >
> > > > > >> >
> > > > > >> > On Thu, Oct 31, 2013 at 11:20 AM, Marc Labbe
> > > > > >> > <mr...@gmail.com>
> > > > > wrote:
> > > > > >> >
> > > > > >> > > Hi Priya
> > > > > >> > >
> > > > > >> > > my understanding is producer requests will be delayed
> > > > > >> > > (and put
> > > in
> > > > > >> request
> > > > > >> > > purgatory) only if your producer uses ack=-1. It will be
> > > > > >> > > in the
> > > > > >> purgatory
> > > > > >> > > (delayed) until all brokers have acknowledged the
> > > > > >> > > messages to be replicated. The documentation suggests to
> > > > > >> > > monitor the ProducerRequestPurgatory size metrics , but
> > > > > >> > > it only applies if
> > > > > you're
> > > > > >> > using
> > > > > >> > > ack=-1, otherwise, this value will always be 0.
> > > > > >> > >
> > > > > >> > > For consumer requests, they'll be in purgatory (delayed)
> > > > > >> > > until
> > > the
> > > > > max
> > > > > >> > > allowed time to respond has been reached, unless it has
> > > > > >> > > enough
> > > > > messages
> > > > > >> > to
> > > > > >> > > fill the buffer before that. The request will not end up
> > > > > >> > > in the
> > > > > >> purgatory
> > > > > >> > > if you're making a blocking request (max wait <= 0).
> > > > > >> > >
> > > > > >> > > Not sure about the configuration interval though.
> > > > > >> > >
> > > > > >> > > marc
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi <
> > > > > >> > > priya.matpadi@ecofactor.com
> > > > > >> > > > wrote:
> > > > > >> > >
> > > > > >> > > > Hello,
> > > > > >> > > > What is purgatory? I believe the following two
> > > > > >> > > > properties
> > > relate
> > > > > to
> > > > > >> > > > consumer and producer respectively.
> > > > > >> > > > Could someone please explain the significance of these?
> > > > > >> > > > fetch.purgatory.purge.interval.requests=100
> > > > > >> > > > producer.purgatory.purge.interval.requests=100
> > > > > >> > > >
> > > > > >> > > > Thanks,
> > > > > >> > > > Priya
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> > --
> > > > > >> > -- Guozhang
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
>
>

RE: Purgatory

Posted by "Yu, Libo " <li...@citi.com>.
I read it and tried to understand it. It would be great to add a summary
at the beginning about what it is and how it may impact a user.

Regards,

Libo


-----Original Message-----
From: Joel Koshy [mailto:jjkoshy.w@gmail.com] 
Sent: Friday, November 08, 2013 2:01 AM
To: users@kafka.apache.org
Subject: Re: Purgatory

Excellent - thanks for putting that together! Will review it more carefully tomorrow and suggest some minor edits if required.

On Thu, Nov 07, 2013 at 10:45:40PM -0500, Marc Labbe wrote:
> I've just added a page for purgatory, feel free to comment/modify at will.
> I hope I didn't misinterpret too much of the code.
> 
> https://cwiki.apache.org/confluence/display/KAFKA/Request+Purgatory+(0
> .8)
> 
> I added a few questions of my own.
> 
> 
> On Fri, Nov 1, 2013 at 9:43 PM, Joe Stein <jo...@stealth.ly> wrote:
> 
> > To edit the Wiki you need to send an ICLA 
> > http://www.apache.org/licenses/#clas to Apache and then once that is 
> > done an email to private@kafka.apache.org (or to me and I will copy 
> > private) with your Wiki username and that you sent the ICLA to Apache.
> >
> > Then, I can add you to edit the Wiki.
> >
> > /*******************************************
> >  Joe Stein
> >  Founder, Principal Consultant
> >  Big Data Open Source Security LLC
> >  http://www.stealth.ly
> >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > ********************************************/
> >
> >
> > On Fri, Nov 1, 2013 at 9:08 PM, Marc Labbe <mr...@gmail.com> wrote:
> >
> > > Hi Joel,
> > >
> > > I used to have edit to the wiki, I made a few additions to it a 
> > > while ago but it's seem I don't have it anymore. It might have 
> > > been lost in the confluence update. I would be glad to add what I 
> > > have written if I get it back. Otherwise, feel free to paste my 
> > > words in one of the pages, I don't intend on asking for copyrights for this :).
> > >
> > > marc
> > >
> > >
> > > On Fri, Nov 1, 2013 at 4:32 PM, Joel Koshy <jj...@gmail.com> wrote:
> > >
> > > > Marc, thanks for writing that up. I think it is worth adding 
> > > > some details on the request-purgatory on a wiki (Jay had started 
> > > > a wiki page for kafka internals [1] a while ago, but we have not 
> > > > had time to add much to it since.) Your write-up could be 
> > > > reviewed and added there. Do you have edit permissions on the wiki?
> > > >
> > > > As for the purge interval config - yes the documentation can be 
> > > > improved a bit. It's one of those "internal" configs that 
> > > > generally don't need to be modified by users. The reason we 
> > > > added that was as
> > > > follows:
> > > > - We found that for low-volume topics, replica fetch requests 
> > > > were getting expired but sitting around in purgatory
> > > > - This was because we were expiring them from the delay queue 
> > > > (used to track when requests should expire), but they were still 
> > > > sitting in the watcherFor map - i.e., they would get purged when 
> > > > the next producer request to that topic/partition arrived, but 
> > > > for low volume topics this could be a long time (or never in the 
> > > > worst case) and we would eventually run into an OOME.
> > > > - So we needed to periodically go through the entire watcherFor 
> > > > map and explicitly remove those requests that had expired.
> > > > - More details on this are in KAFKA-664.
> > > >
> > > > Thanks,
> > > >
> > > > Joel
> > > >
> > > > [1] 
> > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internal
> > > > s
> > > >
> > > > On Fri, Nov 1, 2013 at 12:33 PM, Marc Labbe <mr...@gmail.com> wrote:
> > > > > Guozhang,
> > > > >
> > > > > I have to agree with Priya the doc isn't very clear. Although 
> > > > > the configuration is documented, it is simply rewording the 
> > > > > name of the
> > > > config,
> > > > > which isn't particularly useful if you want more information 
> > > > > about
> > what
> > > > the
> > > > > purgatory is. I searched the whole wiki and doc and could not 
> > > > > find
> > > > anything
> > > > > very useful as opposed looking a the code. In this case, 
> > > > > kafka.server.KafkaApis and kafka.server.RequestPurgatory will 
> > > > > be your friends.
> > > > >
> > > > > I'll try to add to Joe's answer here, mostly just reporting 
> > > > > what's available in the Scala doc from the project. I am doing 
> > > > > this to
> > > > understand
> > > > > the mechanics myself btw.
> > > > >
> > > > > As Joe said, messages are not dropped by the purgatory but 
> > > > > simply
> > > removed
> > > > > from the purgatory when they are satisfied. Satisfaction 
> > > > > conditions
> > are
> > > > > different for both fetch and produce requests and this is 
> > > > > implemented
> > > in
> > > > > their respective DelayedRequest implementation (DelayedFetch 
> > > > > and DelayedProduce).
> > > > >
> > > > > Requests purgatories are defined as follow in the code:
> > > > >  - ProducerRequestPurgatory: A holding pen for produce 
> > > > > requests
> > waiting
> > > > to
> > > > > be satisfied.
> > > > >  - FetchRequestPurgatory: A holding pen for fetch requests 
> > > > > waiting to
> > > be
> > > > > satisfied
> > > > >
> > > > > Each request purgatory runs a thread (ExpiredRequestReaper). 
> > > > > This
> > > thread
> > > > > will first try to find an expired delayed request. When one if 
> > > > > found,
> > > it
> > > > > will run the purgatory's expire method to handle the delayed 
> > > > > request expiration. In both produce and fetch cases, it sends 
> > > > > a response to
> > the
> > > > > client. An expired request will be a satisfied request. The 
> > > > > next step
> > > of
> > > > > the thread's loop is when it checks for the configuration 
> > > > > parameters
> > > you
> > > > > asked for initially (purgatory.purge.interval.requests). When 
> > > > > the
> > > number
> > > > of
> > > > > delayed requests given to watch by the purgatory reaches this 
> > > > > value,
> > it
> > > > > goes through all previously queued requests and removes those 
> > > > > which
> > are
> > > > > marked as satisfied. Because of that, it is really an interval 
> > > > > more
> > > than
> > > > it
> > > > > is a threshold since it doesn't really care about the amount 
> > > > > of
> > > satisfied
> > > > > requests or the size of the queue.
> > > > >
> > > > > Producer request
> > > > > - When is it added to purgatory (delayed)?:
> > > > >   * when it uses ack=-1 (actually, the code tells me anything 
> > > > > but 0
> > or
> > > > 1);
> > > > > Producer config: request.required.acks
> > > > >   * partitions have more than one replica (in this case, 
> > > > > ack=-1 isn't different to ack=1 and it doesn't make much sense 
> > > > > to use a delayed
> > > > request)
> > > > >   * not all partitions are in error
> > > > > - When does it expire? when it reaches the timeout defined in 
> > > > > the
> > > produce
> > > > > request (ackTimeoutMs). Translates from producer config
> > > > request.timeout.ms.
> > > > > - What happens (on the broker) when it expires? Sends a 
> > > > > response to
> > the
> > > > > client. Response content depends on the request of course.
> > > > > - When is it satisfied? I didn't find the courage to dig into 
> > > > > the
> > > details
> > > > > of this one :(  ... but mainly when all the follower have also
> > > > acknowledge
> > > > > the produce request for their replica
> > > > >
> > > > > Fetch request
> > > > > - When is it added to purgatory (delayed)? 2 parameters of the
> > requests
> > > > are
> > > > > mainly useful here: max wait time and fetch size
> > > > >   * if max wait is greater than 0; otherwise, it is a blocking 
> > > > > call
> > by
> > > > the
> > > > > consumer
> > > > >   * if fetch size is greater than the current size of data 
> > > > > available
> > to
> > > > > fulfil the request
> > > > > - When does it expire?
> > > > >   * wait time: the amount of time the consumer is willing to 
> > > > > wait for
> > > > data;
> > > > > Consumer config: fetch.wait.max.ms
> > > > > - When is it satisfied? the fetch size requested is reached - 
> > > > > ie. the amount of data the consumer wishes to receive in one 
> > > > > response (from consumer config: fetch.message.max.bytes)
> > > > >
> > > > > ******
> > > > >
> > > > > It would be useful to add some information about the metrics
> > associated
> > > > > with this.
> > > > >
> > > > > Of course, I am all for being corrected if I said anything 
> > > > > wrong
> > here.
> > > > The
> > > > > truth is always the code :-)
> > > > >
> > > > > marc
> > > > > - mrtheb -
> > > > >
> > > > >
> > > > > On Fri, Nov 1, 2013 at 2:45 AM, Priya Matpadi
> > > > > <pr...@ecofactor.com>wrote:
> > > > >
> > > > >> Guozhang,
> > > > >> The documentation is not very clear.
> > > > >> Marc's response for producer purgatory makes sense.
> > > > >> I am not entirely clear on fetch purgatory.
> > > > >> How does broker use purgatory? Is it a temporary holding 
> > > > >> area? What
> > > > happens
> > > > >> to the messages if purge interval is exceeded in case of 
> > > > >> either/both producer and consumer? Are messages dropped in this case?
> > > > >> Thanks,
> > > > >> Priya
> > > > >>
> > > > >>
> > > > >> On Thu, Oct 31, 2013 at 2:47 PM, Guozhang Wang 
> > > > >> <wa...@gmail.com>
> > > > wrote:
> > > > >>
> > > > >> > Hello Priya,
> > > > >> >
> > > > >> > You can find the definitions of these two configs here:
> > > > >> >
> > > > >> > http://kafka.apache.org/documentation.html#brokerconfigs
> > > > >> >
> > > > >> > Guozhang
> > > > >> >
> > > > >> >
> > > > >> > On Thu, Oct 31, 2013 at 11:20 AM, Marc Labbe 
> > > > >> > <mr...@gmail.com>
> > > > wrote:
> > > > >> >
> > > > >> > > Hi Priya
> > > > >> > >
> > > > >> > > my understanding is producer requests will be delayed 
> > > > >> > > (and put
> > in
> > > > >> request
> > > > >> > > purgatory) only if your producer uses ack=-1. It will be 
> > > > >> > > in the
> > > > >> purgatory
> > > > >> > > (delayed) until all brokers have acknowledged the 
> > > > >> > > messages to be replicated. The documentation suggests to 
> > > > >> > > monitor the ProducerRequestPurgatory size metrics , but 
> > > > >> > > it only applies if
> > > > you're
> > > > >> > using
> > > > >> > > ack=-1, otherwise, this value will always be 0.
> > > > >> > >
> > > > >> > > For consumer requests, they'll be in purgatory (delayed) 
> > > > >> > > until
> > the
> > > > max
> > > > >> > > allowed time to respond has been reached, unless it has 
> > > > >> > > enough
> > > > messages
> > > > >> > to
> > > > >> > > fill the buffer before that. The request will not end up 
> > > > >> > > in the
> > > > >> purgatory
> > > > >> > > if you're making a blocking request (max wait <= 0).
> > > > >> > >
> > > > >> > > Not sure about the configuration interval though.
> > > > >> > >
> > > > >> > > marc
> > > > >> > >
> > > > >> > >
> > > > >> > > On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi < 
> > > > >> > > priya.matpadi@ecofactor.com
> > > > >> > > > wrote:
> > > > >> > >
> > > > >> > > > Hello,
> > > > >> > > > What is purgatory? I believe the following two 
> > > > >> > > > properties
> > relate
> > > > to
> > > > >> > > > consumer and producer respectively.
> > > > >> > > > Could someone please explain the significance of these?
> > > > >> > > > fetch.purgatory.purge.interval.requests=100
> > > > >> > > > producer.purgatory.purge.interval.requests=100
> > > > >> > > >
> > > > >> > > > Thanks,
> > > > >> > > > Priya
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > --
> > > > >> > -- Guozhang
> > > > >> >
> > > > >>
> > > >
> > >
> >


Re: Purgatory

Posted by Marc Labbe <mr...@gmail.com>.
I've just updated the wiki with most of your comments hoping to make it
more clear. I stole much of Joel's comment so all the credit goes to him if
it works! The line between documenting the code and describing it is always
thin. My intention was not to make anything official at first either, hence
the disclaimers and email quotes :)

I agree a graph would be great but didn't take the time to do this just yet.

For the CSV reporter, I can easily reproduce the problem with PurgatorySize
and NumDelayedRequests metrics. My workaround was to delete the files a
first time after starting the brokers, it would cure me from having error
logs but I was probably missing metrics from one of the purgatories. I'll
open start email thread for that. I used it extensively because it gives me
better accuracy with metrics that the system we use and I can easily
generate simple graphs from it using Python's matplotlib.


On Sun, Nov 10, 2013 at 12:15 PM, Priya Matpadi <priya.matpadi@ecofactor.com
> wrote:

> Marc, thanks much for documenting the guts!
> There is one correction for Fetch Request handling:
>
> When is it satisfied?
>
> The fetch size requested is reached - ie. the amount of data the consumer
> wishes to receive in one response
>
> Consumer configuration: *fetch.message.max.bytes*
> As per the code:
>
>   /**
>    * A holding pen for fetch requests waiting to be satisfied
>    */
>   class FetchRequestPurgatory(requestChannel: RequestChannel,
> purgeInterval: Int)
>           extends RequestPurgatory[DelayedFetch, Int](brokerId,
> purgeInterval) {
>     this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId)
>
>     /**
>      * A fetch request is satisfied when it has accumulated enough data to
> meet the min_bytes field
>      */
>     def checkSatisfied(messageSizeInBytes: Int, delayedFetch:
> DelayedFetch): Boolean = {
>       val accumulatedSize =
> delayedFetch.bytesAccumulated.addAndGet(messageSizeInBytes)
>       accumulatedSize >= delayedFetch.fetch.minBytes
>     }
>
>
>
> On Fri, Nov 8, 2013 at 1:01 PM, Joel Koshy <jj...@gmail.com> wrote:
>
> > Marc - thanks again for doing this.  Couple of suggestions:
> >
> > - I would suggest removing the disclaimer and email quotes since this
> >   can become a stand-alone clean document on what the purgatory is and
> >   how it works.
> > - A diagram would be helpful - it could say, show the watcher map and
> >   the expiration queue, and it will be especially useful if it can
> >   show the flow of producer/fetch requests through the purgatory. That
> >   would also help cut down a lot of the text in the doc.
> > - I think it would be preferrable to have just high-level details in
> >   this document.  Internal details (such as the purge interval
> >   settings) can either be removed or moved (to say, a short faq or
> >   config section at the end).
> > - In the overview may want to comment on why we added it: i.e., it is
> >   the primary data structure we use for supporting long poll of
> >   producer/fetch requests. E.g., if we don't do this consumers would
> >   have to keep issuing fetch requests if there's no data yet - as
> >   opposed to just saying "respond when 'n' bytes of data are available
> >   or when 't' millisecs have elapsed, whichever is earlier."
> > - WRT your question on PurgatorySize - we added that just to keep a
> >   tab on how many requests are sitting in purgatory (including both
> >   watchers map and expiration queue) as a rough gauge of memory usage.
> >   Also the fetch/producer request gauges should not collide - the
> >   KafkaMetricsGroup class takes care of this. The CSV reporter might
> >   run into issues though - I thought we had fixed that but could be
> >   wrong.
> >
> > Joel
> >
> > On Thu, Nov 07, 2013 at 11:01:06PM -0800, Joel Koshy wrote:
> > > Excellent - thanks for putting that together! Will review it more
> > > carefully tomorrow and suggest some minor edits if required.
> > >
> > > On Thu, Nov 07, 2013 at 10:45:40PM -0500, Marc Labbe wrote:
> > > > I've just added a page for purgatory, feel free to comment/modify at
> > will.
> > > > I hope I didn't misinterpret too much of the code.
> > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Request+Purgatory+(0.8)
> > > >
> > > > I added a few questions of my own.
> > > >
> > > >
> > > > On Fri, Nov 1, 2013 at 9:43 PM, Joe Stein <jo...@stealth.ly>
> > wrote:
> > > >
> > > > > To edit the Wiki you need to send an ICLA
> > > > > http://www.apache.org/licenses/#clas to Apache and then once that
> > is done
> > > > > an email to private@kafka.apache.org (or to me and I will copy
> > private)
> > > > > with your Wiki username and that you sent the ICLA to Apache.
> > > > >
> > > > > Then, I can add you to edit the Wiki.
> > > > >
> > > > > /*******************************************
> > > > >  Joe Stein
> > > > >  Founder, Principal Consultant
> > > > >  Big Data Open Source Security LLC
> > > > >  http://www.stealth.ly
> > > > >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop
> >
> > > > > ********************************************/
> > > > >
> > > > >
> > > > > On Fri, Nov 1, 2013 at 9:08 PM, Marc Labbe <mr...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi Joel,
> > > > > >
> > > > > > I used to have edit to the wiki, I made a few additions to it a
> > while ago
> > > > > > but it's seem I don't have it anymore. It might have been lost in
> > the
> > > > > > confluence update. I would be glad to add what I have written if
> I
> > get it
> > > > > > back. Otherwise, feel free to paste my words in one of the pages,
> > I don't
> > > > > > intend on asking for copyrights for this :).
> > > > > >
> > > > > > marc
> > > > > >
> > > > > >
> > > > > > On Fri, Nov 1, 2013 at 4:32 PM, Joel Koshy <jj...@gmail.com>
> > wrote:
> > > > > >
> > > > > > > Marc, thanks for writing that up. I think it is worth adding
> some
> > > > > > > details on the request-purgatory on a wiki (Jay had started a
> > wiki
> > > > > > > page for kafka internals [1] a while ago, but we have not had
> > time to
> > > > > > > add much to it since.) Your write-up could be reviewed and
> added
> > > > > > > there. Do you have edit permissions on the wiki?
> > > > > > >
> > > > > > > As for the purge interval config - yes the documentation can be
> > > > > > > improved a bit. It's one of those "internal" configs that
> > generally
> > > > > > > don't need to be modified by users. The reason we added that
> was
> > as
> > > > > > > follows:
> > > > > > > - We found that for low-volume topics, replica fetch requests
> > were
> > > > > > > getting expired but sitting around in purgatory
> > > > > > > - This was because we were expiring them from the delay queue
> > (used to
> > > > > > > track when requests should expire), but they were still sitting
> > in the
> > > > > > > watcherFor map - i.e., they would get purged when the next
> > producer
> > > > > > > request to that topic/partition arrived, but for low volume
> > topics
> > > > > > > this could be a long time (or never in the worst case) and we
> > would
> > > > > > > eventually run into an OOME.
> > > > > > > - So we needed to periodically go through the entire watcherFor
> > map
> > > > > > > and explicitly remove those requests that had expired.
> > > > > > > - More details on this are in KAFKA-664.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Joel
> > > > > > >
> > > > > > > [1]
> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internals
> > > > > > >
> > > > > > > On Fri, Nov 1, 2013 at 12:33 PM, Marc Labbe <mrlabbe@gmail.com
> >
> > wrote:
> > > > > > > > Guozhang,
> > > > > > > >
> > > > > > > > I have to agree with Priya the doc isn't very clear. Although
> > the
> > > > > > > > configuration is documented, it is simply rewording the name
> > of the
> > > > > > > config,
> > > > > > > > which isn't particularly useful if you want more information
> > about
> > > > > what
> > > > > > > the
> > > > > > > > purgatory is. I searched the whole wiki and doc and could not
> > find
> > > > > > > anything
> > > > > > > > very useful as opposed looking a the code. In this case,
> > > > > > > > kafka.server.KafkaApis and kafka.server.RequestPurgatory will
> > be your
> > > > > > > > friends.
> > > > > > > >
> > > > > > > > I'll try to add to Joe's answer here, mostly just reporting
> > what's
> > > > > > > > available in the Scala doc from the project. I am doing this
> to
> > > > > > > understand
> > > > > > > > the mechanics myself btw.
> > > > > > > >
> > > > > > > > As Joe said, messages are not dropped by the purgatory but
> > simply
> > > > > > removed
> > > > > > > > from the purgatory when they are satisfied. Satisfaction
> > conditions
> > > > > are
> > > > > > > > different for both fetch and produce requests and this is
> > implemented
> > > > > > in
> > > > > > > > their respective DelayedRequest implementation (DelayedFetch
> > and
> > > > > > > > DelayedProduce).
> > > > > > > >
> > > > > > > > Requests purgatories are defined as follow in the code:
> > > > > > > >  - ProducerRequestPurgatory: A holding pen for produce
> requests
> > > > > waiting
> > > > > > > to
> > > > > > > > be satisfied.
> > > > > > > >  - FetchRequestPurgatory: A holding pen for fetch requests
> > waiting to
> > > > > > be
> > > > > > > > satisfied
> > > > > > > >
> > > > > > > > Each request purgatory runs a thread (ExpiredRequestReaper).
> > This
> > > > > > thread
> > > > > > > > will first try to find an expired delayed request. When one
> if
> > found,
> > > > > > it
> > > > > > > > will run the purgatory's expire method to handle the delayed
> > request
> > > > > > > > expiration. In both produce and fetch cases, it sends a
> > response to
> > > > > the
> > > > > > > > client. An expired request will be a satisfied request. The
> > next step
> > > > > > of
> > > > > > > > the thread's loop is when it checks for the configuration
> > parameters
> > > > > > you
> > > > > > > > asked for initially (purgatory.purge.interval.requests). When
> > the
> > > > > > number
> > > > > > > of
> > > > > > > > delayed requests given to watch by the purgatory reaches this
> > value,
> > > > > it
> > > > > > > > goes through all previously queued requests and removes those
> > which
> > > > > are
> > > > > > > > marked as satisfied. Because of that, it is really an
> interval
> > more
> > > > > > than
> > > > > > > it
> > > > > > > > is a threshold since it doesn't really care about the amount
> of
> > > > > > satisfied
> > > > > > > > requests or the size of the queue.
> > > > > > > >
> > > > > > > > Producer request
> > > > > > > > - When is it added to purgatory (delayed)?:
> > > > > > > >   * when it uses ack=-1 (actually, the code tells me anything
> > but 0
> > > > > or
> > > > > > > 1);
> > > > > > > > Producer config: request.required.acks
> > > > > > > >   * partitions have more than one replica (in this case,
> > ack=-1 isn't
> > > > > > > > different to ack=1 and it doesn't make much sense to use a
> > delayed
> > > > > > > request)
> > > > > > > >   * not all partitions are in error
> > > > > > > > - When does it expire? when it reaches the timeout defined in
> > the
> > > > > > produce
> > > > > > > > request (ackTimeoutMs). Translates from producer config
> > > > > > > request.timeout.ms.
> > > > > > > > - What happens (on the broker) when it expires? Sends a
> > response to
> > > > > the
> > > > > > > > client. Response content depends on the request of course.
> > > > > > > > - When is it satisfied? I didn't find the courage to dig into
> > the
> > > > > > details
> > > > > > > > of this one :(  ... but mainly when all the follower have
> also
> > > > > > > acknowledge
> > > > > > > > the produce request for their replica
> > > > > > > >
> > > > > > > > Fetch request
> > > > > > > > - When is it added to purgatory (delayed)? 2 parameters of
> the
> > > > > requests
> > > > > > > are
> > > > > > > > mainly useful here: max wait time and fetch size
> > > > > > > >   * if max wait is greater than 0; otherwise, it is a
> blocking
> > call
> > > > > by
> > > > > > > the
> > > > > > > > consumer
> > > > > > > >   * if fetch size is greater than the current size of data
> > available
> > > > > to
> > > > > > > > fulfil the request
> > > > > > > > - When does it expire?
> > > > > > > >   * wait time: the amount of time the consumer is willing to
> > wait for
> > > > > > > data;
> > > > > > > > Consumer config: fetch.wait.max.ms
> > > > > > > > - When is it satisfied? the fetch size requested is reached -
> > ie. the
> > > > > > > > amount of data the consumer wishes to receive in one response
> > (from
> > > > > > > > consumer config: fetch.message.max.bytes)
> > > > > > > >
> > > > > > > > ******
> > > > > > > >
> > > > > > > > It would be useful to add some information about the metrics
> > > > > associated
> > > > > > > > with this.
> > > > > > > >
> > > > > > > > Of course, I am all for being corrected if I said anything
> > wrong
> > > > > here.
> > > > > > > The
> > > > > > > > truth is always the code :-)
> > > > > > > >
> > > > > > > > marc
> > > > > > > > - mrtheb -
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Nov 1, 2013 at 2:45 AM, Priya Matpadi
> > > > > > > > <pr...@ecofactor.com>wrote:
> > > > > > > >
> > > > > > > >> Guozhang,
> > > > > > > >> The documentation is not very clear.
> > > > > > > >> Marc's response for producer purgatory makes sense.
> > > > > > > >> I am not entirely clear on fetch purgatory.
> > > > > > > >> How does broker use purgatory? Is it a temporary holding
> > area? What
> > > > > > > happens
> > > > > > > >> to the messages if purge interval is exceeded in case of
> > either/both
> > > > > > > >> producer and consumer? Are messages dropped in this case?
> > > > > > > >> Thanks,
> > > > > > > >> Priya
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On Thu, Oct 31, 2013 at 2:47 PM, Guozhang Wang <
> > wangguoz@gmail.com>
> > > > > > > wrote:
> > > > > > > >>
> > > > > > > >> > Hello Priya,
> > > > > > > >> >
> > > > > > > >> > You can find the definitions of these two configs here:
> > > > > > > >> >
> > > > > > > >> > http://kafka.apache.org/documentation.html#brokerconfigs
> > > > > > > >> >
> > > > > > > >> > Guozhang
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > On Thu, Oct 31, 2013 at 11:20 AM, Marc Labbe <
> > mrlabbe@gmail.com>
> > > > > > > wrote:
> > > > > > > >> >
> > > > > > > >> > > Hi Priya
> > > > > > > >> > >
> > > > > > > >> > > my understanding is producer requests will be delayed
> > (and put
> > > > > in
> > > > > > > >> request
> > > > > > > >> > > purgatory) only if your producer uses ack=-1. It will be
> > in the
> > > > > > > >> purgatory
> > > > > > > >> > > (delayed) until all brokers have acknowledged the
> > messages to be
> > > > > > > >> > > replicated. The documentation suggests to monitor the
> > > > > > > >> > > ProducerRequestPurgatory size metrics , but it only
> > applies if
> > > > > > > you're
> > > > > > > >> > using
> > > > > > > >> > > ack=-1, otherwise, this value will always be 0.
> > > > > > > >> > >
> > > > > > > >> > > For consumer requests, they'll be in purgatory (delayed)
> > until
> > > > > the
> > > > > > > max
> > > > > > > >> > > allowed time to respond has been reached, unless it has
> > enough
> > > > > > > messages
> > > > > > > >> > to
> > > > > > > >> > > fill the buffer before that. The request will not end up
> > in the
> > > > > > > >> purgatory
> > > > > > > >> > > if you're making a blocking request (max wait <= 0).
> > > > > > > >> > >
> > > > > > > >> > > Not sure about the configuration interval though.
> > > > > > > >> > >
> > > > > > > >> > > marc
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > > On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi <
> > > > > > > >> > > priya.matpadi@ecofactor.com
> > > > > > > >> > > > wrote:
> > > > > > > >> > >
> > > > > > > >> > > > Hello,
> > > > > > > >> > > > What is purgatory? I believe the following two
> > properties
> > > > > relate
> > > > > > > to
> > > > > > > >> > > > consumer and producer respectively.
> > > > > > > >> > > > Could someone please explain the significance of
> these?
> > > > > > > >> > > > fetch.purgatory.purge.interval.requests=100
> > > > > > > >> > > > producer.purgatory.purge.interval.requests=100
> > > > > > > >> > > >
> > > > > > > >> > > > Thanks,
> > > > > > > >> > > > Priya
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > --
> > > > > > > >> > -- Guozhang
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > >
> >
> >
>

Re: Purgatory

Posted by Priya Matpadi <pr...@ecofactor.com>.
Marc, thanks much for documenting the guts!
There is one correction for Fetch Request handling:

When is it satisfied?

The fetch size requested is reached - ie. the amount of data the consumer
wishes to receive in one response

Consumer configuration: *fetch.message.max.bytes*
As per the code:

  /**
   * A holding pen for fetch requests waiting to be satisfied
   */
  class FetchRequestPurgatory(requestChannel: RequestChannel,
purgeInterval: Int)
          extends RequestPurgatory[DelayedFetch, Int](brokerId,
purgeInterval) {
    this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId)

    /**
     * A fetch request is satisfied when it has accumulated enough data to
meet the min_bytes field
     */
    def checkSatisfied(messageSizeInBytes: Int, delayedFetch:
DelayedFetch): Boolean = {
      val accumulatedSize =
delayedFetch.bytesAccumulated.addAndGet(messageSizeInBytes)
      accumulatedSize >= delayedFetch.fetch.minBytes
    }



On Fri, Nov 8, 2013 at 1:01 PM, Joel Koshy <jj...@gmail.com> wrote:

> Marc - thanks again for doing this.  Couple of suggestions:
>
> - I would suggest removing the disclaimer and email quotes since this
>   can become a stand-alone clean document on what the purgatory is and
>   how it works.
> - A diagram would be helpful - it could say, show the watcher map and
>   the expiration queue, and it will be especially useful if it can
>   show the flow of producer/fetch requests through the purgatory. That
>   would also help cut down a lot of the text in the doc.
> - I think it would be preferrable to have just high-level details in
>   this document.  Internal details (such as the purge interval
>   settings) can either be removed or moved (to say, a short faq or
>   config section at the end).
> - In the overview may want to comment on why we added it: i.e., it is
>   the primary data structure we use for supporting long poll of
>   producer/fetch requests. E.g., if we don't do this consumers would
>   have to keep issuing fetch requests if there's no data yet - as
>   opposed to just saying "respond when 'n' bytes of data are available
>   or when 't' millisecs have elapsed, whichever is earlier."
> - WRT your question on PurgatorySize - we added that just to keep a
>   tab on how many requests are sitting in purgatory (including both
>   watchers map and expiration queue) as a rough gauge of memory usage.
>   Also the fetch/producer request gauges should not collide - the
>   KafkaMetricsGroup class takes care of this. The CSV reporter might
>   run into issues though - I thought we had fixed that but could be
>   wrong.
>
> Joel
>
> On Thu, Nov 07, 2013 at 11:01:06PM -0800, Joel Koshy wrote:
> > Excellent - thanks for putting that together! Will review it more
> > carefully tomorrow and suggest some minor edits if required.
> >
> > On Thu, Nov 07, 2013 at 10:45:40PM -0500, Marc Labbe wrote:
> > > I've just added a page for purgatory, feel free to comment/modify at
> will.
> > > I hope I didn't misinterpret too much of the code.
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/Request+Purgatory+(0.8)
> > >
> > > I added a few questions of my own.
> > >
> > >
> > > On Fri, Nov 1, 2013 at 9:43 PM, Joe Stein <jo...@stealth.ly>
> wrote:
> > >
> > > > To edit the Wiki you need to send an ICLA
> > > > http://www.apache.org/licenses/#clas to Apache and then once that
> is done
> > > > an email to private@kafka.apache.org (or to me and I will copy
> private)
> > > > with your Wiki username and that you sent the ICLA to Apache.
> > > >
> > > > Then, I can add you to edit the Wiki.
> > > >
> > > > /*******************************************
> > > >  Joe Stein
> > > >  Founder, Principal Consultant
> > > >  Big Data Open Source Security LLC
> > > >  http://www.stealth.ly
> > > >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > > ********************************************/
> > > >
> > > >
> > > > On Fri, Nov 1, 2013 at 9:08 PM, Marc Labbe <mr...@gmail.com>
> wrote:
> > > >
> > > > > Hi Joel,
> > > > >
> > > > > I used to have edit to the wiki, I made a few additions to it a
> while ago
> > > > > but it's seem I don't have it anymore. It might have been lost in
> the
> > > > > confluence update. I would be glad to add what I have written if I
> get it
> > > > > back. Otherwise, feel free to paste my words in one of the pages,
> I don't
> > > > > intend on asking for copyrights for this :).
> > > > >
> > > > > marc
> > > > >
> > > > >
> > > > > On Fri, Nov 1, 2013 at 4:32 PM, Joel Koshy <jj...@gmail.com>
> wrote:
> > > > >
> > > > > > Marc, thanks for writing that up. I think it is worth adding some
> > > > > > details on the request-purgatory on a wiki (Jay had started a
> wiki
> > > > > > page for kafka internals [1] a while ago, but we have not had
> time to
> > > > > > add much to it since.) Your write-up could be reviewed and added
> > > > > > there. Do you have edit permissions on the wiki?
> > > > > >
> > > > > > As for the purge interval config - yes the documentation can be
> > > > > > improved a bit. It's one of those "internal" configs that
> generally
> > > > > > don't need to be modified by users. The reason we added that was
> as
> > > > > > follows:
> > > > > > - We found that for low-volume topics, replica fetch requests
> were
> > > > > > getting expired but sitting around in purgatory
> > > > > > - This was because we were expiring them from the delay queue
> (used to
> > > > > > track when requests should expire), but they were still sitting
> in the
> > > > > > watcherFor map - i.e., they would get purged when the next
> producer
> > > > > > request to that topic/partition arrived, but for low volume
> topics
> > > > > > this could be a long time (or never in the worst case) and we
> would
> > > > > > eventually run into an OOME.
> > > > > > - So we needed to periodically go through the entire watcherFor
> map
> > > > > > and explicitly remove those requests that had expired.
> > > > > > - More details on this are in KAFKA-664.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Joel
> > > > > >
> > > > > > [1]
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internals
> > > > > >
> > > > > > On Fri, Nov 1, 2013 at 12:33 PM, Marc Labbe <mr...@gmail.com>
> wrote:
> > > > > > > Guozhang,
> > > > > > >
> > > > > > > I have to agree with Priya the doc isn't very clear. Although
> the
> > > > > > > configuration is documented, it is simply rewording the name
> of the
> > > > > > config,
> > > > > > > which isn't particularly useful if you want more information
> about
> > > > what
> > > > > > the
> > > > > > > purgatory is. I searched the whole wiki and doc and could not
> find
> > > > > > anything
> > > > > > > very useful as opposed looking a the code. In this case,
> > > > > > > kafka.server.KafkaApis and kafka.server.RequestPurgatory will
> be your
> > > > > > > friends.
> > > > > > >
> > > > > > > I'll try to add to Joe's answer here, mostly just reporting
> what's
> > > > > > > available in the Scala doc from the project. I am doing this to
> > > > > > understand
> > > > > > > the mechanics myself btw.
> > > > > > >
> > > > > > > As Joe said, messages are not dropped by the purgatory but
> simply
> > > > > removed
> > > > > > > from the purgatory when they are satisfied. Satisfaction
> conditions
> > > > are
> > > > > > > different for both fetch and produce requests and this is
> implemented
> > > > > in
> > > > > > > their respective DelayedRequest implementation (DelayedFetch
> and
> > > > > > > DelayedProduce).
> > > > > > >
> > > > > > > Requests purgatories are defined as follow in the code:
> > > > > > >  - ProducerRequestPurgatory: A holding pen for produce requests
> > > > waiting
> > > > > > to
> > > > > > > be satisfied.
> > > > > > >  - FetchRequestPurgatory: A holding pen for fetch requests
> waiting to
> > > > > be
> > > > > > > satisfied
> > > > > > >
> > > > > > > Each request purgatory runs a thread (ExpiredRequestReaper).
> This
> > > > > thread
> > > > > > > will first try to find an expired delayed request. When one if
> found,
> > > > > it
> > > > > > > will run the purgatory's expire method to handle the delayed
> request
> > > > > > > expiration. In both produce and fetch cases, it sends a
> response to
> > > > the
> > > > > > > client. An expired request will be a satisfied request. The
> next step
> > > > > of
> > > > > > > the thread's loop is when it checks for the configuration
> parameters
> > > > > you
> > > > > > > asked for initially (purgatory.purge.interval.requests). When
> the
> > > > > number
> > > > > > of
> > > > > > > delayed requests given to watch by the purgatory reaches this
> value,
> > > > it
> > > > > > > goes through all previously queued requests and removes those
> which
> > > > are
> > > > > > > marked as satisfied. Because of that, it is really an interval
> more
> > > > > than
> > > > > > it
> > > > > > > is a threshold since it doesn't really care about the amount of
> > > > > satisfied
> > > > > > > requests or the size of the queue.
> > > > > > >
> > > > > > > Producer request
> > > > > > > - When is it added to purgatory (delayed)?:
> > > > > > >   * when it uses ack=-1 (actually, the code tells me anything
> but 0
> > > > or
> > > > > > 1);
> > > > > > > Producer config: request.required.acks
> > > > > > >   * partitions have more than one replica (in this case,
> ack=-1 isn't
> > > > > > > different to ack=1 and it doesn't make much sense to use a
> delayed
> > > > > > request)
> > > > > > >   * not all partitions are in error
> > > > > > > - When does it expire? when it reaches the timeout defined in
> the
> > > > > produce
> > > > > > > request (ackTimeoutMs). Translates from producer config
> > > > > > request.timeout.ms.
> > > > > > > - What happens (on the broker) when it expires? Sends a
> response to
> > > > the
> > > > > > > client. Response content depends on the request of course.
> > > > > > > - When is it satisfied? I didn't find the courage to dig into
> the
> > > > > details
> > > > > > > of this one :(  ... but mainly when all the follower have also
> > > > > > acknowledge
> > > > > > > the produce request for their replica
> > > > > > >
> > > > > > > Fetch request
> > > > > > > - When is it added to purgatory (delayed)? 2 parameters of the
> > > > requests
> > > > > > are
> > > > > > > mainly useful here: max wait time and fetch size
> > > > > > >   * if max wait is greater than 0; otherwise, it is a blocking
> call
> > > > by
> > > > > > the
> > > > > > > consumer
> > > > > > >   * if fetch size is greater than the current size of data
> available
> > > > to
> > > > > > > fulfil the request
> > > > > > > - When does it expire?
> > > > > > >   * wait time: the amount of time the consumer is willing to
> wait for
> > > > > > data;
> > > > > > > Consumer config: fetch.wait.max.ms
> > > > > > > - When is it satisfied? the fetch size requested is reached -
> ie. the
> > > > > > > amount of data the consumer wishes to receive in one response
> (from
> > > > > > > consumer config: fetch.message.max.bytes)
> > > > > > >
> > > > > > > ******
> > > > > > >
> > > > > > > It would be useful to add some information about the metrics
> > > > associated
> > > > > > > with this.
> > > > > > >
> > > > > > > Of course, I am all for being corrected if I said anything
> wrong
> > > > here.
> > > > > > The
> > > > > > > truth is always the code :-)
> > > > > > >
> > > > > > > marc
> > > > > > > - mrtheb -
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Nov 1, 2013 at 2:45 AM, Priya Matpadi
> > > > > > > <pr...@ecofactor.com>wrote:
> > > > > > >
> > > > > > >> Guozhang,
> > > > > > >> The documentation is not very clear.
> > > > > > >> Marc's response for producer purgatory makes sense.
> > > > > > >> I am not entirely clear on fetch purgatory.
> > > > > > >> How does broker use purgatory? Is it a temporary holding
> area? What
> > > > > > happens
> > > > > > >> to the messages if purge interval is exceeded in case of
> either/both
> > > > > > >> producer and consumer? Are messages dropped in this case?
> > > > > > >> Thanks,
> > > > > > >> Priya
> > > > > > >>
> > > > > > >>
> > > > > > >> On Thu, Oct 31, 2013 at 2:47 PM, Guozhang Wang <
> wangguoz@gmail.com>
> > > > > > wrote:
> > > > > > >>
> > > > > > >> > Hello Priya,
> > > > > > >> >
> > > > > > >> > You can find the definitions of these two configs here:
> > > > > > >> >
> > > > > > >> > http://kafka.apache.org/documentation.html#brokerconfigs
> > > > > > >> >
> > > > > > >> > Guozhang
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On Thu, Oct 31, 2013 at 11:20 AM, Marc Labbe <
> mrlabbe@gmail.com>
> > > > > > wrote:
> > > > > > >> >
> > > > > > >> > > Hi Priya
> > > > > > >> > >
> > > > > > >> > > my understanding is producer requests will be delayed
> (and put
> > > > in
> > > > > > >> request
> > > > > > >> > > purgatory) only if your producer uses ack=-1. It will be
> in the
> > > > > > >> purgatory
> > > > > > >> > > (delayed) until all brokers have acknowledged the
> messages to be
> > > > > > >> > > replicated. The documentation suggests to monitor the
> > > > > > >> > > ProducerRequestPurgatory size metrics , but it only
> applies if
> > > > > > you're
> > > > > > >> > using
> > > > > > >> > > ack=-1, otherwise, this value will always be 0.
> > > > > > >> > >
> > > > > > >> > > For consumer requests, they'll be in purgatory (delayed)
> until
> > > > the
> > > > > > max
> > > > > > >> > > allowed time to respond has been reached, unless it has
> enough
> > > > > > messages
> > > > > > >> > to
> > > > > > >> > > fill the buffer before that. The request will not end up
> in the
> > > > > > >> purgatory
> > > > > > >> > > if you're making a blocking request (max wait <= 0).
> > > > > > >> > >
> > > > > > >> > > Not sure about the configuration interval though.
> > > > > > >> > >
> > > > > > >> > > marc
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi <
> > > > > > >> > > priya.matpadi@ecofactor.com
> > > > > > >> > > > wrote:
> > > > > > >> > >
> > > > > > >> > > > Hello,
> > > > > > >> > > > What is purgatory? I believe the following two
> properties
> > > > relate
> > > > > > to
> > > > > > >> > > > consumer and producer respectively.
> > > > > > >> > > > Could someone please explain the significance of these?
> > > > > > >> > > > fetch.purgatory.purge.interval.requests=100
> > > > > > >> > > > producer.purgatory.purge.interval.requests=100
> > > > > > >> > > >
> > > > > > >> > > > Thanks,
> > > > > > >> > > > Priya
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > --
> > > > > > >> > -- Guozhang
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> >
>
>

Re: Purgatory

Posted by Joel Koshy <jj...@gmail.com>.
Marc - thanks again for doing this.  Couple of suggestions:

- I would suggest removing the disclaimer and email quotes since this
  can become a stand-alone clean document on what the purgatory is and
  how it works.
- A diagram would be helpful - it could say, show the watcher map and
  the expiration queue, and it will be especially useful if it can
  show the flow of producer/fetch requests through the purgatory. That
  would also help cut down a lot of the text in the doc.
- I think it would be preferrable to have just high-level details in
  this document.  Internal details (such as the purge interval
  settings) can either be removed or moved (to say, a short faq or
  config section at the end).
- In the overview may want to comment on why we added it: i.e., it is
  the primary data structure we use for supporting long poll of
  producer/fetch requests. E.g., if we don't do this consumers would
  have to keep issuing fetch requests if there's no data yet - as
  opposed to just saying "respond when 'n' bytes of data are available
  or when 't' millisecs have elapsed, whichever is earlier."
- WRT your question on PurgatorySize - we added that just to keep a
  tab on how many requests are sitting in purgatory (including both
  watchers map and expiration queue) as a rough gauge of memory usage.
  Also the fetch/producer request gauges should not collide - the
  KafkaMetricsGroup class takes care of this. The CSV reporter might
  run into issues though - I thought we had fixed that but could be
  wrong.

Joel

On Thu, Nov 07, 2013 at 11:01:06PM -0800, Joel Koshy wrote:
> Excellent - thanks for putting that together! Will review it more
> carefully tomorrow and suggest some minor edits if required.
> 
> On Thu, Nov 07, 2013 at 10:45:40PM -0500, Marc Labbe wrote:
> > I've just added a page for purgatory, feel free to comment/modify at will.
> > I hope I didn't misinterpret too much of the code.
> > 
> > https://cwiki.apache.org/confluence/display/KAFKA/Request+Purgatory+(0.8)
> > 
> > I added a few questions of my own.
> > 
> > 
> > On Fri, Nov 1, 2013 at 9:43 PM, Joe Stein <jo...@stealth.ly> wrote:
> > 
> > > To edit the Wiki you need to send an ICLA
> > > http://www.apache.org/licenses/#clas to Apache and then once that is done
> > > an email to private@kafka.apache.org (or to me and I will copy private)
> > > with your Wiki username and that you sent the ICLA to Apache.
> > >
> > > Then, I can add you to edit the Wiki.
> > >
> > > /*******************************************
> > >  Joe Stein
> > >  Founder, Principal Consultant
> > >  Big Data Open Source Security LLC
> > >  http://www.stealth.ly
> > >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > ********************************************/
> > >
> > >
> > > On Fri, Nov 1, 2013 at 9:08 PM, Marc Labbe <mr...@gmail.com> wrote:
> > >
> > > > Hi Joel,
> > > >
> > > > I used to have edit to the wiki, I made a few additions to it a while ago
> > > > but it's seem I don't have it anymore. It might have been lost in the
> > > > confluence update. I would be glad to add what I have written if I get it
> > > > back. Otherwise, feel free to paste my words in one of the pages, I don't
> > > > intend on asking for copyrights for this :).
> > > >
> > > > marc
> > > >
> > > >
> > > > On Fri, Nov 1, 2013 at 4:32 PM, Joel Koshy <jj...@gmail.com> wrote:
> > > >
> > > > > Marc, thanks for writing that up. I think it is worth adding some
> > > > > details on the request-purgatory on a wiki (Jay had started a wiki
> > > > > page for kafka internals [1] a while ago, but we have not had time to
> > > > > add much to it since.) Your write-up could be reviewed and added
> > > > > there. Do you have edit permissions on the wiki?
> > > > >
> > > > > As for the purge interval config - yes the documentation can be
> > > > > improved a bit. It's one of those "internal" configs that generally
> > > > > don't need to be modified by users. The reason we added that was as
> > > > > follows:
> > > > > - We found that for low-volume topics, replica fetch requests were
> > > > > getting expired but sitting around in purgatory
> > > > > - This was because we were expiring them from the delay queue (used to
> > > > > track when requests should expire), but they were still sitting in the
> > > > > watcherFor map - i.e., they would get purged when the next producer
> > > > > request to that topic/partition arrived, but for low volume topics
> > > > > this could be a long time (or never in the worst case) and we would
> > > > > eventually run into an OOME.
> > > > > - So we needed to periodically go through the entire watcherFor map
> > > > > and explicitly remove those requests that had expired.
> > > > > - More details on this are in KAFKA-664.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Joel
> > > > >
> > > > > [1] https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internals
> > > > >
> > > > > On Fri, Nov 1, 2013 at 12:33 PM, Marc Labbe <mr...@gmail.com> wrote:
> > > > > > Guozhang,
> > > > > >
> > > > > > I have to agree with Priya the doc isn't very clear. Although the
> > > > > > configuration is documented, it is simply rewording the name of the
> > > > > config,
> > > > > > which isn't particularly useful if you want more information about
> > > what
> > > > > the
> > > > > > purgatory is. I searched the whole wiki and doc and could not find
> > > > > anything
> > > > > > very useful as opposed looking a the code. In this case,
> > > > > > kafka.server.KafkaApis and kafka.server.RequestPurgatory will be your
> > > > > > friends.
> > > > > >
> > > > > > I'll try to add to Joe's answer here, mostly just reporting what's
> > > > > > available in the Scala doc from the project. I am doing this to
> > > > > understand
> > > > > > the mechanics myself btw.
> > > > > >
> > > > > > As Joe said, messages are not dropped by the purgatory but simply
> > > > removed
> > > > > > from the purgatory when they are satisfied. Satisfaction conditions
> > > are
> > > > > > different for both fetch and produce requests and this is implemented
> > > > in
> > > > > > their respective DelayedRequest implementation (DelayedFetch and
> > > > > > DelayedProduce).
> > > > > >
> > > > > > Requests purgatories are defined as follow in the code:
> > > > > >  - ProducerRequestPurgatory: A holding pen for produce requests
> > > waiting
> > > > > to
> > > > > > be satisfied.
> > > > > >  - FetchRequestPurgatory: A holding pen for fetch requests waiting to
> > > > be
> > > > > > satisfied
> > > > > >
> > > > > > Each request purgatory runs a thread (ExpiredRequestReaper). This
> > > > thread
> > > > > > will first try to find an expired delayed request. When one if found,
> > > > it
> > > > > > will run the purgatory's expire method to handle the delayed request
> > > > > > expiration. In both produce and fetch cases, it sends a response to
> > > the
> > > > > > client. An expired request will be a satisfied request. The next step
> > > > of
> > > > > > the thread's loop is when it checks for the configuration parameters
> > > > you
> > > > > > asked for initially (purgatory.purge.interval.requests). When the
> > > > number
> > > > > of
> > > > > > delayed requests given to watch by the purgatory reaches this value,
> > > it
> > > > > > goes through all previously queued requests and removes those which
> > > are
> > > > > > marked as satisfied. Because of that, it is really an interval more
> > > > than
> > > > > it
> > > > > > is a threshold since it doesn't really care about the amount of
> > > > satisfied
> > > > > > requests or the size of the queue.
> > > > > >
> > > > > > Producer request
> > > > > > - When is it added to purgatory (delayed)?:
> > > > > >   * when it uses ack=-1 (actually, the code tells me anything but 0
> > > or
> > > > > 1);
> > > > > > Producer config: request.required.acks
> > > > > >   * partitions have more than one replica (in this case, ack=-1 isn't
> > > > > > different to ack=1 and it doesn't make much sense to use a delayed
> > > > > request)
> > > > > >   * not all partitions are in error
> > > > > > - When does it expire? when it reaches the timeout defined in the
> > > > produce
> > > > > > request (ackTimeoutMs). Translates from producer config
> > > > > request.timeout.ms.
> > > > > > - What happens (on the broker) when it expires? Sends a response to
> > > the
> > > > > > client. Response content depends on the request of course.
> > > > > > - When is it satisfied? I didn't find the courage to dig into the
> > > > details
> > > > > > of this one :(  ... but mainly when all the follower have also
> > > > > acknowledge
> > > > > > the produce request for their replica
> > > > > >
> > > > > > Fetch request
> > > > > > - When is it added to purgatory (delayed)? 2 parameters of the
> > > requests
> > > > > are
> > > > > > mainly useful here: max wait time and fetch size
> > > > > >   * if max wait is greater than 0; otherwise, it is a blocking call
> > > by
> > > > > the
> > > > > > consumer
> > > > > >   * if fetch size is greater than the current size of data available
> > > to
> > > > > > fulfil the request
> > > > > > - When does it expire?
> > > > > >   * wait time: the amount of time the consumer is willing to wait for
> > > > > data;
> > > > > > Consumer config: fetch.wait.max.ms
> > > > > > - When is it satisfied? the fetch size requested is reached - ie. the
> > > > > > amount of data the consumer wishes to receive in one response (from
> > > > > > consumer config: fetch.message.max.bytes)
> > > > > >
> > > > > > ******
> > > > > >
> > > > > > It would be useful to add some information about the metrics
> > > associated
> > > > > > with this.
> > > > > >
> > > > > > Of course, I am all for being corrected if I said anything wrong
> > > here.
> > > > > The
> > > > > > truth is always the code :-)
> > > > > >
> > > > > > marc
> > > > > > - mrtheb -
> > > > > >
> > > > > >
> > > > > > On Fri, Nov 1, 2013 at 2:45 AM, Priya Matpadi
> > > > > > <pr...@ecofactor.com>wrote:
> > > > > >
> > > > > >> Guozhang,
> > > > > >> The documentation is not very clear.
> > > > > >> Marc's response for producer purgatory makes sense.
> > > > > >> I am not entirely clear on fetch purgatory.
> > > > > >> How does broker use purgatory? Is it a temporary holding area? What
> > > > > happens
> > > > > >> to the messages if purge interval is exceeded in case of either/both
> > > > > >> producer and consumer? Are messages dropped in this case?
> > > > > >> Thanks,
> > > > > >> Priya
> > > > > >>
> > > > > >>
> > > > > >> On Thu, Oct 31, 2013 at 2:47 PM, Guozhang Wang <wa...@gmail.com>
> > > > > wrote:
> > > > > >>
> > > > > >> > Hello Priya,
> > > > > >> >
> > > > > >> > You can find the definitions of these two configs here:
> > > > > >> >
> > > > > >> > http://kafka.apache.org/documentation.html#brokerconfigs
> > > > > >> >
> > > > > >> > Guozhang
> > > > > >> >
> > > > > >> >
> > > > > >> > On Thu, Oct 31, 2013 at 11:20 AM, Marc Labbe <mr...@gmail.com>
> > > > > wrote:
> > > > > >> >
> > > > > >> > > Hi Priya
> > > > > >> > >
> > > > > >> > > my understanding is producer requests will be delayed (and put
> > > in
> > > > > >> request
> > > > > >> > > purgatory) only if your producer uses ack=-1. It will be in the
> > > > > >> purgatory
> > > > > >> > > (delayed) until all brokers have acknowledged the messages to be
> > > > > >> > > replicated. The documentation suggests to monitor the
> > > > > >> > > ProducerRequestPurgatory size metrics , but it only applies if
> > > > > you're
> > > > > >> > using
> > > > > >> > > ack=-1, otherwise, this value will always be 0.
> > > > > >> > >
> > > > > >> > > For consumer requests, they'll be in purgatory (delayed) until
> > > the
> > > > > max
> > > > > >> > > allowed time to respond has been reached, unless it has enough
> > > > > messages
> > > > > >> > to
> > > > > >> > > fill the buffer before that. The request will not end up in the
> > > > > >> purgatory
> > > > > >> > > if you're making a blocking request (max wait <= 0).
> > > > > >> > >
> > > > > >> > > Not sure about the configuration interval though.
> > > > > >> > >
> > > > > >> > > marc
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi <
> > > > > >> > > priya.matpadi@ecofactor.com
> > > > > >> > > > wrote:
> > > > > >> > >
> > > > > >> > > > Hello,
> > > > > >> > > > What is purgatory? I believe the following two properties
> > > relate
> > > > > to
> > > > > >> > > > consumer and producer respectively.
> > > > > >> > > > Could someone please explain the significance of these?
> > > > > >> > > > fetch.purgatory.purge.interval.requests=100
> > > > > >> > > > producer.purgatory.purge.interval.requests=100
> > > > > >> > > >
> > > > > >> > > > Thanks,
> > > > > >> > > > Priya
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> > --
> > > > > >> > -- Guozhang
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> 


Re: Purgatory

Posted by Joel Koshy <jj...@gmail.com>.
Excellent - thanks for putting that together! Will review it more
carefully tomorrow and suggest some minor edits if required.

On Thu, Nov 07, 2013 at 10:45:40PM -0500, Marc Labbe wrote:
> I've just added a page for purgatory, feel free to comment/modify at will.
> I hope I didn't misinterpret too much of the code.
> 
> https://cwiki.apache.org/confluence/display/KAFKA/Request+Purgatory+(0.8)
> 
> I added a few questions of my own.
> 
> 
> On Fri, Nov 1, 2013 at 9:43 PM, Joe Stein <jo...@stealth.ly> wrote:
> 
> > To edit the Wiki you need to send an ICLA
> > http://www.apache.org/licenses/#clas to Apache and then once that is done
> > an email to private@kafka.apache.org (or to me and I will copy private)
> > with your Wiki username and that you sent the ICLA to Apache.
> >
> > Then, I can add you to edit the Wiki.
> >
> > /*******************************************
> >  Joe Stein
> >  Founder, Principal Consultant
> >  Big Data Open Source Security LLC
> >  http://www.stealth.ly
> >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > ********************************************/
> >
> >
> > On Fri, Nov 1, 2013 at 9:08 PM, Marc Labbe <mr...@gmail.com> wrote:
> >
> > > Hi Joel,
> > >
> > > I used to have edit to the wiki, I made a few additions to it a while ago
> > > but it's seem I don't have it anymore. It might have been lost in the
> > > confluence update. I would be glad to add what I have written if I get it
> > > back. Otherwise, feel free to paste my words in one of the pages, I don't
> > > intend on asking for copyrights for this :).
> > >
> > > marc
> > >
> > >
> > > On Fri, Nov 1, 2013 at 4:32 PM, Joel Koshy <jj...@gmail.com> wrote:
> > >
> > > > Marc, thanks for writing that up. I think it is worth adding some
> > > > details on the request-purgatory on a wiki (Jay had started a wiki
> > > > page for kafka internals [1] a while ago, but we have not had time to
> > > > add much to it since.) Your write-up could be reviewed and added
> > > > there. Do you have edit permissions on the wiki?
> > > >
> > > > As for the purge interval config - yes the documentation can be
> > > > improved a bit. It's one of those "internal" configs that generally
> > > > don't need to be modified by users. The reason we added that was as
> > > > follows:
> > > > - We found that for low-volume topics, replica fetch requests were
> > > > getting expired but sitting around in purgatory
> > > > - This was because we were expiring them from the delay queue (used to
> > > > track when requests should expire), but they were still sitting in the
> > > > watcherFor map - i.e., they would get purged when the next producer
> > > > request to that topic/partition arrived, but for low volume topics
> > > > this could be a long time (or never in the worst case) and we would
> > > > eventually run into an OOME.
> > > > - So we needed to periodically go through the entire watcherFor map
> > > > and explicitly remove those requests that had expired.
> > > > - More details on this are in KAFKA-664.
> > > >
> > > > Thanks,
> > > >
> > > > Joel
> > > >
> > > > [1] https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internals
> > > >
> > > > On Fri, Nov 1, 2013 at 12:33 PM, Marc Labbe <mr...@gmail.com> wrote:
> > > > > Guozhang,
> > > > >
> > > > > I have to agree with Priya the doc isn't very clear. Although the
> > > > > configuration is documented, it is simply rewording the name of the
> > > > config,
> > > > > which isn't particularly useful if you want more information about
> > what
> > > > the
> > > > > purgatory is. I searched the whole wiki and doc and could not find
> > > > anything
> > > > > very useful as opposed looking a the code. In this case,
> > > > > kafka.server.KafkaApis and kafka.server.RequestPurgatory will be your
> > > > > friends.
> > > > >
> > > > > I'll try to add to Joe's answer here, mostly just reporting what's
> > > > > available in the Scala doc from the project. I am doing this to
> > > > understand
> > > > > the mechanics myself btw.
> > > > >
> > > > > As Joe said, messages are not dropped by the purgatory but simply
> > > removed
> > > > > from the purgatory when they are satisfied. Satisfaction conditions
> > are
> > > > > different for both fetch and produce requests and this is implemented
> > > in
> > > > > their respective DelayedRequest implementation (DelayedFetch and
> > > > > DelayedProduce).
> > > > >
> > > > > Requests purgatories are defined as follow in the code:
> > > > >  - ProducerRequestPurgatory: A holding pen for produce requests
> > waiting
> > > > to
> > > > > be satisfied.
> > > > >  - FetchRequestPurgatory: A holding pen for fetch requests waiting to
> > > be
> > > > > satisfied
> > > > >
> > > > > Each request purgatory runs a thread (ExpiredRequestReaper). This
> > > thread
> > > > > will first try to find an expired delayed request. When one if found,
> > > it
> > > > > will run the purgatory's expire method to handle the delayed request
> > > > > expiration. In both produce and fetch cases, it sends a response to
> > the
> > > > > client. An expired request will be a satisfied request. The next step
> > > of
> > > > > the thread's loop is when it checks for the configuration parameters
> > > you
> > > > > asked for initially (purgatory.purge.interval.requests). When the
> > > number
> > > > of
> > > > > delayed requests given to watch by the purgatory reaches this value,
> > it
> > > > > goes through all previously queued requests and removes those which
> > are
> > > > > marked as satisfied. Because of that, it is really an interval more
> > > than
> > > > it
> > > > > is a threshold since it doesn't really care about the amount of
> > > satisfied
> > > > > requests or the size of the queue.
> > > > >
> > > > > Producer request
> > > > > - When is it added to purgatory (delayed)?:
> > > > >   * when it uses ack=-1 (actually, the code tells me anything but 0
> > or
> > > > 1);
> > > > > Producer config: request.required.acks
> > > > >   * partitions have more than one replica (in this case, ack=-1 isn't
> > > > > different to ack=1 and it doesn't make much sense to use a delayed
> > > > request)
> > > > >   * not all partitions are in error
> > > > > - When does it expire? when it reaches the timeout defined in the
> > > produce
> > > > > request (ackTimeoutMs). Translates from producer config
> > > > request.timeout.ms.
> > > > > - What happens (on the broker) when it expires? Sends a response to
> > the
> > > > > client. Response content depends on the request of course.
> > > > > - When is it satisfied? I didn't find the courage to dig into the
> > > details
> > > > > of this one :(  ... but mainly when all the follower have also
> > > > acknowledge
> > > > > the produce request for their replica
> > > > >
> > > > > Fetch request
> > > > > - When is it added to purgatory (delayed)? 2 parameters of the
> > requests
> > > > are
> > > > > mainly useful here: max wait time and fetch size
> > > > >   * if max wait is greater than 0; otherwise, it is a blocking call
> > by
> > > > the
> > > > > consumer
> > > > >   * if fetch size is greater than the current size of data available
> > to
> > > > > fulfil the request
> > > > > - When does it expire?
> > > > >   * wait time: the amount of time the consumer is willing to wait for
> > > > data;
> > > > > Consumer config: fetch.wait.max.ms
> > > > > - When is it satisfied? the fetch size requested is reached - ie. the
> > > > > amount of data the consumer wishes to receive in one response (from
> > > > > consumer config: fetch.message.max.bytes)
> > > > >
> > > > > ******
> > > > >
> > > > > It would be useful to add some information about the metrics
> > associated
> > > > > with this.
> > > > >
> > > > > Of course, I am all for being corrected if I said anything wrong
> > here.
> > > > The
> > > > > truth is always the code :-)
> > > > >
> > > > > marc
> > > > > - mrtheb -
> > > > >
> > > > >
> > > > > On Fri, Nov 1, 2013 at 2:45 AM, Priya Matpadi
> > > > > <pr...@ecofactor.com>wrote:
> > > > >
> > > > >> Guozhang,
> > > > >> The documentation is not very clear.
> > > > >> Marc's response for producer purgatory makes sense.
> > > > >> I am not entirely clear on fetch purgatory.
> > > > >> How does broker use purgatory? Is it a temporary holding area? What
> > > > happens
> > > > >> to the messages if purge interval is exceeded in case of either/both
> > > > >> producer and consumer? Are messages dropped in this case?
> > > > >> Thanks,
> > > > >> Priya
> > > > >>
> > > > >>
> > > > >> On Thu, Oct 31, 2013 at 2:47 PM, Guozhang Wang <wa...@gmail.com>
> > > > wrote:
> > > > >>
> > > > >> > Hello Priya,
> > > > >> >
> > > > >> > You can find the definitions of these two configs here:
> > > > >> >
> > > > >> > http://kafka.apache.org/documentation.html#brokerconfigs
> > > > >> >
> > > > >> > Guozhang
> > > > >> >
> > > > >> >
> > > > >> > On Thu, Oct 31, 2013 at 11:20 AM, Marc Labbe <mr...@gmail.com>
> > > > wrote:
> > > > >> >
> > > > >> > > Hi Priya
> > > > >> > >
> > > > >> > > my understanding is producer requests will be delayed (and put
> > in
> > > > >> request
> > > > >> > > purgatory) only if your producer uses ack=-1. It will be in the
> > > > >> purgatory
> > > > >> > > (delayed) until all brokers have acknowledged the messages to be
> > > > >> > > replicated. The documentation suggests to monitor the
> > > > >> > > ProducerRequestPurgatory size metrics , but it only applies if
> > > > you're
> > > > >> > using
> > > > >> > > ack=-1, otherwise, this value will always be 0.
> > > > >> > >
> > > > >> > > For consumer requests, they'll be in purgatory (delayed) until
> > the
> > > > max
> > > > >> > > allowed time to respond has been reached, unless it has enough
> > > > messages
> > > > >> > to
> > > > >> > > fill the buffer before that. The request will not end up in the
> > > > >> purgatory
> > > > >> > > if you're making a blocking request (max wait <= 0).
> > > > >> > >
> > > > >> > > Not sure about the configuration interval though.
> > > > >> > >
> > > > >> > > marc
> > > > >> > >
> > > > >> > >
> > > > >> > > On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi <
> > > > >> > > priya.matpadi@ecofactor.com
> > > > >> > > > wrote:
> > > > >> > >
> > > > >> > > > Hello,
> > > > >> > > > What is purgatory? I believe the following two properties
> > relate
> > > > to
> > > > >> > > > consumer and producer respectively.
> > > > >> > > > Could someone please explain the significance of these?
> > > > >> > > > fetch.purgatory.purge.interval.requests=100
> > > > >> > > > producer.purgatory.purge.interval.requests=100
> > > > >> > > >
> > > > >> > > > Thanks,
> > > > >> > > > Priya
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > --
> > > > >> > -- Guozhang
> > > > >> >
> > > > >>
> > > >
> > >
> >


Re: Purgatory

Posted by Marc Labbe <mr...@gmail.com>.
I've just added a page for purgatory, feel free to comment/modify at will.
I hope I didn't misinterpret too much of the code.

https://cwiki.apache.org/confluence/display/KAFKA/Request+Purgatory+(0.8)

I added a few questions of my own.


On Fri, Nov 1, 2013 at 9:43 PM, Joe Stein <jo...@stealth.ly> wrote:

> To edit the Wiki you need to send an ICLA
> http://www.apache.org/licenses/#clas to Apache and then once that is done
> an email to private@kafka.apache.org (or to me and I will copy private)
> with your Wiki username and that you sent the ICLA to Apache.
>
> Then, I can add you to edit the Wiki.
>
> /*******************************************
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> ********************************************/
>
>
> On Fri, Nov 1, 2013 at 9:08 PM, Marc Labbe <mr...@gmail.com> wrote:
>
> > Hi Joel,
> >
> > I used to have edit to the wiki, I made a few additions to it a while ago
> > but it's seem I don't have it anymore. It might have been lost in the
> > confluence update. I would be glad to add what I have written if I get it
> > back. Otherwise, feel free to paste my words in one of the pages, I don't
> > intend on asking for copyrights for this :).
> >
> > marc
> >
> >
> > On Fri, Nov 1, 2013 at 4:32 PM, Joel Koshy <jj...@gmail.com> wrote:
> >
> > > Marc, thanks for writing that up. I think it is worth adding some
> > > details on the request-purgatory on a wiki (Jay had started a wiki
> > > page for kafka internals [1] a while ago, but we have not had time to
> > > add much to it since.) Your write-up could be reviewed and added
> > > there. Do you have edit permissions on the wiki?
> > >
> > > As for the purge interval config - yes the documentation can be
> > > improved a bit. It's one of those "internal" configs that generally
> > > don't need to be modified by users. The reason we added that was as
> > > follows:
> > > - We found that for low-volume topics, replica fetch requests were
> > > getting expired but sitting around in purgatory
> > > - This was because we were expiring them from the delay queue (used to
> > > track when requests should expire), but they were still sitting in the
> > > watcherFor map - i.e., they would get purged when the next producer
> > > request to that topic/partition arrived, but for low volume topics
> > > this could be a long time (or never in the worst case) and we would
> > > eventually run into an OOME.
> > > - So we needed to periodically go through the entire watcherFor map
> > > and explicitly remove those requests that had expired.
> > > - More details on this are in KAFKA-664.
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > > [1] https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internals
> > >
> > > On Fri, Nov 1, 2013 at 12:33 PM, Marc Labbe <mr...@gmail.com> wrote:
> > > > Guozhang,
> > > >
> > > > I have to agree with Priya the doc isn't very clear. Although the
> > > > configuration is documented, it is simply rewording the name of the
> > > config,
> > > > which isn't particularly useful if you want more information about
> what
> > > the
> > > > purgatory is. I searched the whole wiki and doc and could not find
> > > anything
> > > > very useful as opposed looking a the code. In this case,
> > > > kafka.server.KafkaApis and kafka.server.RequestPurgatory will be your
> > > > friends.
> > > >
> > > > I'll try to add to Joe's answer here, mostly just reporting what's
> > > > available in the Scala doc from the project. I am doing this to
> > > understand
> > > > the mechanics myself btw.
> > > >
> > > > As Joe said, messages are not dropped by the purgatory but simply
> > removed
> > > > from the purgatory when they are satisfied. Satisfaction conditions
> are
> > > > different for both fetch and produce requests and this is implemented
> > in
> > > > their respective DelayedRequest implementation (DelayedFetch and
> > > > DelayedProduce).
> > > >
> > > > Requests purgatories are defined as follow in the code:
> > > >  - ProducerRequestPurgatory: A holding pen for produce requests
> waiting
> > > to
> > > > be satisfied.
> > > >  - FetchRequestPurgatory: A holding pen for fetch requests waiting to
> > be
> > > > satisfied
> > > >
> > > > Each request purgatory runs a thread (ExpiredRequestReaper). This
> > thread
> > > > will first try to find an expired delayed request. When one if found,
> > it
> > > > will run the purgatory's expire method to handle the delayed request
> > > > expiration. In both produce and fetch cases, it sends a response to
> the
> > > > client. An expired request will be a satisfied request. The next step
> > of
> > > > the thread's loop is when it checks for the configuration parameters
> > you
> > > > asked for initially (purgatory.purge.interval.requests). When the
> > number
> > > of
> > > > delayed requests given to watch by the purgatory reaches this value,
> it
> > > > goes through all previously queued requests and removes those which
> are
> > > > marked as satisfied. Because of that, it is really an interval more
> > than
> > > it
> > > > is a threshold since it doesn't really care about the amount of
> > satisfied
> > > > requests or the size of the queue.
> > > >
> > > > Producer request
> > > > - When is it added to purgatory (delayed)?:
> > > >   * when it uses ack=-1 (actually, the code tells me anything but 0
> or
> > > 1);
> > > > Producer config: request.required.acks
> > > >   * partitions have more than one replica (in this case, ack=-1 isn't
> > > > different to ack=1 and it doesn't make much sense to use a delayed
> > > request)
> > > >   * not all partitions are in error
> > > > - When does it expire? when it reaches the timeout defined in the
> > produce
> > > > request (ackTimeoutMs). Translates from producer config
> > > request.timeout.ms.
> > > > - What happens (on the broker) when it expires? Sends a response to
> the
> > > > client. Response content depends on the request of course.
> > > > - When is it satisfied? I didn't find the courage to dig into the
> > details
> > > > of this one :(  ... but mainly when all the follower have also
> > > acknowledge
> > > > the produce request for their replica
> > > >
> > > > Fetch request
> > > > - When is it added to purgatory (delayed)? 2 parameters of the
> requests
> > > are
> > > > mainly useful here: max wait time and fetch size
> > > >   * if max wait is greater than 0; otherwise, it is a blocking call
> by
> > > the
> > > > consumer
> > > >   * if fetch size is greater than the current size of data available
> to
> > > > fulfil the request
> > > > - When does it expire?
> > > >   * wait time: the amount of time the consumer is willing to wait for
> > > data;
> > > > Consumer config: fetch.wait.max.ms
> > > > - When is it satisfied? the fetch size requested is reached - ie. the
> > > > amount of data the consumer wishes to receive in one response (from
> > > > consumer config: fetch.message.max.bytes)
> > > >
> > > > ******
> > > >
> > > > It would be useful to add some information about the metrics
> associated
> > > > with this.
> > > >
> > > > Of course, I am all for being corrected if I said anything wrong
> here.
> > > The
> > > > truth is always the code :-)
> > > >
> > > > marc
> > > > - mrtheb -
> > > >
> > > >
> > > > On Fri, Nov 1, 2013 at 2:45 AM, Priya Matpadi
> > > > <pr...@ecofactor.com>wrote:
> > > >
> > > >> Guozhang,
> > > >> The documentation is not very clear.
> > > >> Marc's response for producer purgatory makes sense.
> > > >> I am not entirely clear on fetch purgatory.
> > > >> How does broker use purgatory? Is it a temporary holding area? What
> > > happens
> > > >> to the messages if purge interval is exceeded in case of either/both
> > > >> producer and consumer? Are messages dropped in this case?
> > > >> Thanks,
> > > >> Priya
> > > >>
> > > >>
> > > >> On Thu, Oct 31, 2013 at 2:47 PM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >>
> > > >> > Hello Priya,
> > > >> >
> > > >> > You can find the definitions of these two configs here:
> > > >> >
> > > >> > http://kafka.apache.org/documentation.html#brokerconfigs
> > > >> >
> > > >> > Guozhang
> > > >> >
> > > >> >
> > > >> > On Thu, Oct 31, 2013 at 11:20 AM, Marc Labbe <mr...@gmail.com>
> > > wrote:
> > > >> >
> > > >> > > Hi Priya
> > > >> > >
> > > >> > > my understanding is producer requests will be delayed (and put
> in
> > > >> request
> > > >> > > purgatory) only if your producer uses ack=-1. It will be in the
> > > >> purgatory
> > > >> > > (delayed) until all brokers have acknowledged the messages to be
> > > >> > > replicated. The documentation suggests to monitor the
> > > >> > > ProducerRequestPurgatory size metrics , but it only applies if
> > > you're
> > > >> > using
> > > >> > > ack=-1, otherwise, this value will always be 0.
> > > >> > >
> > > >> > > For consumer requests, they'll be in purgatory (delayed) until
> the
> > > max
> > > >> > > allowed time to respond has been reached, unless it has enough
> > > messages
> > > >> > to
> > > >> > > fill the buffer before that. The request will not end up in the
> > > >> purgatory
> > > >> > > if you're making a blocking request (max wait <= 0).
> > > >> > >
> > > >> > > Not sure about the configuration interval though.
> > > >> > >
> > > >> > > marc
> > > >> > >
> > > >> > >
> > > >> > > On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi <
> > > >> > > priya.matpadi@ecofactor.com
> > > >> > > > wrote:
> > > >> > >
> > > >> > > > Hello,
> > > >> > > > What is purgatory? I believe the following two properties
> relate
> > > to
> > > >> > > > consumer and producer respectively.
> > > >> > > > Could someone please explain the significance of these?
> > > >> > > > fetch.purgatory.purge.interval.requests=100
> > > >> > > > producer.purgatory.purge.interval.requests=100
> > > >> > > >
> > > >> > > > Thanks,
> > > >> > > > Priya
> > > >> > > >
> > > >> > >
> > > >> >
> > > >> >
> > > >> >
> > > >> > --
> > > >> > -- Guozhang
> > > >> >
> > > >>
> > >
> >
>

Re: Purgatory

Posted by Joe Stein <jo...@stealth.ly>.
To edit the Wiki you need to send an ICLA
http://www.apache.org/licenses/#clas to Apache and then once that is done
an email to private@kafka.apache.org (or to me and I will copy private)
with your Wiki username and that you sent the ICLA to Apache.

Then, I can add you to edit the Wiki.

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
********************************************/


On Fri, Nov 1, 2013 at 9:08 PM, Marc Labbe <mr...@gmail.com> wrote:

> Hi Joel,
>
> I used to have edit to the wiki, I made a few additions to it a while ago
> but it's seem I don't have it anymore. It might have been lost in the
> confluence update. I would be glad to add what I have written if I get it
> back. Otherwise, feel free to paste my words in one of the pages, I don't
> intend on asking for copyrights for this :).
>
> marc
>
>
> On Fri, Nov 1, 2013 at 4:32 PM, Joel Koshy <jj...@gmail.com> wrote:
>
> > Marc, thanks for writing that up. I think it is worth adding some
> > details on the request-purgatory on a wiki (Jay had started a wiki
> > page for kafka internals [1] a while ago, but we have not had time to
> > add much to it since.) Your write-up could be reviewed and added
> > there. Do you have edit permissions on the wiki?
> >
> > As for the purge interval config - yes the documentation can be
> > improved a bit. It's one of those "internal" configs that generally
> > don't need to be modified by users. The reason we added that was as
> > follows:
> > - We found that for low-volume topics, replica fetch requests were
> > getting expired but sitting around in purgatory
> > - This was because we were expiring them from the delay queue (used to
> > track when requests should expire), but they were still sitting in the
> > watcherFor map - i.e., they would get purged when the next producer
> > request to that topic/partition arrived, but for low volume topics
> > this could be a long time (or never in the worst case) and we would
> > eventually run into an OOME.
> > - So we needed to periodically go through the entire watcherFor map
> > and explicitly remove those requests that had expired.
> > - More details on this are in KAFKA-664.
> >
> > Thanks,
> >
> > Joel
> >
> > [1] https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internals
> >
> > On Fri, Nov 1, 2013 at 12:33 PM, Marc Labbe <mr...@gmail.com> wrote:
> > > Guozhang,
> > >
> > > I have to agree with Priya the doc isn't very clear. Although the
> > > configuration is documented, it is simply rewording the name of the
> > config,
> > > which isn't particularly useful if you want more information about what
> > the
> > > purgatory is. I searched the whole wiki and doc and could not find
> > anything
> > > very useful as opposed looking a the code. In this case,
> > > kafka.server.KafkaApis and kafka.server.RequestPurgatory will be your
> > > friends.
> > >
> > > I'll try to add to Joe's answer here, mostly just reporting what's
> > > available in the Scala doc from the project. I am doing this to
> > understand
> > > the mechanics myself btw.
> > >
> > > As Joe said, messages are not dropped by the purgatory but simply
> removed
> > > from the purgatory when they are satisfied. Satisfaction conditions are
> > > different for both fetch and produce requests and this is implemented
> in
> > > their respective DelayedRequest implementation (DelayedFetch and
> > > DelayedProduce).
> > >
> > > Requests purgatories are defined as follow in the code:
> > >  - ProducerRequestPurgatory: A holding pen for produce requests waiting
> > to
> > > be satisfied.
> > >  - FetchRequestPurgatory: A holding pen for fetch requests waiting to
> be
> > > satisfied
> > >
> > > Each request purgatory runs a thread (ExpiredRequestReaper). This
> thread
> > > will first try to find an expired delayed request. When one if found,
> it
> > > will run the purgatory's expire method to handle the delayed request
> > > expiration. In both produce and fetch cases, it sends a response to the
> > > client. An expired request will be a satisfied request. The next step
> of
> > > the thread's loop is when it checks for the configuration parameters
> you
> > > asked for initially (purgatory.purge.interval.requests). When the
> number
> > of
> > > delayed requests given to watch by the purgatory reaches this value, it
> > > goes through all previously queued requests and removes those which are
> > > marked as satisfied. Because of that, it is really an interval more
> than
> > it
> > > is a threshold since it doesn't really care about the amount of
> satisfied
> > > requests or the size of the queue.
> > >
> > > Producer request
> > > - When is it added to purgatory (delayed)?:
> > >   * when it uses ack=-1 (actually, the code tells me anything but 0 or
> > 1);
> > > Producer config: request.required.acks
> > >   * partitions have more than one replica (in this case, ack=-1 isn't
> > > different to ack=1 and it doesn't make much sense to use a delayed
> > request)
> > >   * not all partitions are in error
> > > - When does it expire? when it reaches the timeout defined in the
> produce
> > > request (ackTimeoutMs). Translates from producer config
> > request.timeout.ms.
> > > - What happens (on the broker) when it expires? Sends a response to the
> > > client. Response content depends on the request of course.
> > > - When is it satisfied? I didn't find the courage to dig into the
> details
> > > of this one :(  ... but mainly when all the follower have also
> > acknowledge
> > > the produce request for their replica
> > >
> > > Fetch request
> > > - When is it added to purgatory (delayed)? 2 parameters of the requests
> > are
> > > mainly useful here: max wait time and fetch size
> > >   * if max wait is greater than 0; otherwise, it is a blocking call by
> > the
> > > consumer
> > >   * if fetch size is greater than the current size of data available to
> > > fulfil the request
> > > - When does it expire?
> > >   * wait time: the amount of time the consumer is willing to wait for
> > data;
> > > Consumer config: fetch.wait.max.ms
> > > - When is it satisfied? the fetch size requested is reached - ie. the
> > > amount of data the consumer wishes to receive in one response (from
> > > consumer config: fetch.message.max.bytes)
> > >
> > > ******
> > >
> > > It would be useful to add some information about the metrics associated
> > > with this.
> > >
> > > Of course, I am all for being corrected if I said anything wrong here.
> > The
> > > truth is always the code :-)
> > >
> > > marc
> > > - mrtheb -
> > >
> > >
> > > On Fri, Nov 1, 2013 at 2:45 AM, Priya Matpadi
> > > <pr...@ecofactor.com>wrote:
> > >
> > >> Guozhang,
> > >> The documentation is not very clear.
> > >> Marc's response for producer purgatory makes sense.
> > >> I am not entirely clear on fetch purgatory.
> > >> How does broker use purgatory? Is it a temporary holding area? What
> > happens
> > >> to the messages if purge interval is exceeded in case of either/both
> > >> producer and consumer? Are messages dropped in this case?
> > >> Thanks,
> > >> Priya
> > >>
> > >>
> > >> On Thu, Oct 31, 2013 at 2:47 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >>
> > >> > Hello Priya,
> > >> >
> > >> > You can find the definitions of these two configs here:
> > >> >
> > >> > http://kafka.apache.org/documentation.html#brokerconfigs
> > >> >
> > >> > Guozhang
> > >> >
> > >> >
> > >> > On Thu, Oct 31, 2013 at 11:20 AM, Marc Labbe <mr...@gmail.com>
> > wrote:
> > >> >
> > >> > > Hi Priya
> > >> > >
> > >> > > my understanding is producer requests will be delayed (and put in
> > >> request
> > >> > > purgatory) only if your producer uses ack=-1. It will be in the
> > >> purgatory
> > >> > > (delayed) until all brokers have acknowledged the messages to be
> > >> > > replicated. The documentation suggests to monitor the
> > >> > > ProducerRequestPurgatory size metrics , but it only applies if
> > you're
> > >> > using
> > >> > > ack=-1, otherwise, this value will always be 0.
> > >> > >
> > >> > > For consumer requests, they'll be in purgatory (delayed) until the
> > max
> > >> > > allowed time to respond has been reached, unless it has enough
> > messages
> > >> > to
> > >> > > fill the buffer before that. The request will not end up in the
> > >> purgatory
> > >> > > if you're making a blocking request (max wait <= 0).
> > >> > >
> > >> > > Not sure about the configuration interval though.
> > >> > >
> > >> > > marc
> > >> > >
> > >> > >
> > >> > > On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi <
> > >> > > priya.matpadi@ecofactor.com
> > >> > > > wrote:
> > >> > >
> > >> > > > Hello,
> > >> > > > What is purgatory? I believe the following two properties relate
> > to
> > >> > > > consumer and producer respectively.
> > >> > > > Could someone please explain the significance of these?
> > >> > > > fetch.purgatory.purge.interval.requests=100
> > >> > > > producer.purgatory.purge.interval.requests=100
> > >> > > >
> > >> > > > Thanks,
> > >> > > > Priya
> > >> > > >
> > >> > >
> > >> >
> > >> >
> > >> >
> > >> > --
> > >> > -- Guozhang
> > >> >
> > >>
> >
>

Re: Purgatory

Posted by Marc Labbe <mr...@gmail.com>.
Hi Joel,

I used to have edit to the wiki, I made a few additions to it a while ago
but it's seem I don't have it anymore. It might have been lost in the
confluence update. I would be glad to add what I have written if I get it
back. Otherwise, feel free to paste my words in one of the pages, I don't
intend on asking for copyrights for this :).

marc


On Fri, Nov 1, 2013 at 4:32 PM, Joel Koshy <jj...@gmail.com> wrote:

> Marc, thanks for writing that up. I think it is worth adding some
> details on the request-purgatory on a wiki (Jay had started a wiki
> page for kafka internals [1] a while ago, but we have not had time to
> add much to it since.) Your write-up could be reviewed and added
> there. Do you have edit permissions on the wiki?
>
> As for the purge interval config - yes the documentation can be
> improved a bit. It's one of those "internal" configs that generally
> don't need to be modified by users. The reason we added that was as
> follows:
> - We found that for low-volume topics, replica fetch requests were
> getting expired but sitting around in purgatory
> - This was because we were expiring them from the delay queue (used to
> track when requests should expire), but they were still sitting in the
> watcherFor map - i.e., they would get purged when the next producer
> request to that topic/partition arrived, but for low volume topics
> this could be a long time (or never in the worst case) and we would
> eventually run into an OOME.
> - So we needed to periodically go through the entire watcherFor map
> and explicitly remove those requests that had expired.
> - More details on this are in KAFKA-664.
>
> Thanks,
>
> Joel
>
> [1] https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internals
>
> On Fri, Nov 1, 2013 at 12:33 PM, Marc Labbe <mr...@gmail.com> wrote:
> > Guozhang,
> >
> > I have to agree with Priya the doc isn't very clear. Although the
> > configuration is documented, it is simply rewording the name of the
> config,
> > which isn't particularly useful if you want more information about what
> the
> > purgatory is. I searched the whole wiki and doc and could not find
> anything
> > very useful as opposed looking a the code. In this case,
> > kafka.server.KafkaApis and kafka.server.RequestPurgatory will be your
> > friends.
> >
> > I'll try to add to Joe's answer here, mostly just reporting what's
> > available in the Scala doc from the project. I am doing this to
> understand
> > the mechanics myself btw.
> >
> > As Joe said, messages are not dropped by the purgatory but simply removed
> > from the purgatory when they are satisfied. Satisfaction conditions are
> > different for both fetch and produce requests and this is implemented in
> > their respective DelayedRequest implementation (DelayedFetch and
> > DelayedProduce).
> >
> > Requests purgatories are defined as follow in the code:
> >  - ProducerRequestPurgatory: A holding pen for produce requests waiting
> to
> > be satisfied.
> >  - FetchRequestPurgatory: A holding pen for fetch requests waiting to be
> > satisfied
> >
> > Each request purgatory runs a thread (ExpiredRequestReaper). This thread
> > will first try to find an expired delayed request. When one if found, it
> > will run the purgatory's expire method to handle the delayed request
> > expiration. In both produce and fetch cases, it sends a response to the
> > client. An expired request will be a satisfied request. The next step of
> > the thread's loop is when it checks for the configuration parameters you
> > asked for initially (purgatory.purge.interval.requests). When the number
> of
> > delayed requests given to watch by the purgatory reaches this value, it
> > goes through all previously queued requests and removes those which are
> > marked as satisfied. Because of that, it is really an interval more than
> it
> > is a threshold since it doesn't really care about the amount of satisfied
> > requests or the size of the queue.
> >
> > Producer request
> > - When is it added to purgatory (delayed)?:
> >   * when it uses ack=-1 (actually, the code tells me anything but 0 or
> 1);
> > Producer config: request.required.acks
> >   * partitions have more than one replica (in this case, ack=-1 isn't
> > different to ack=1 and it doesn't make much sense to use a delayed
> request)
> >   * not all partitions are in error
> > - When does it expire? when it reaches the timeout defined in the produce
> > request (ackTimeoutMs). Translates from producer config
> request.timeout.ms.
> > - What happens (on the broker) when it expires? Sends a response to the
> > client. Response content depends on the request of course.
> > - When is it satisfied? I didn't find the courage to dig into the details
> > of this one :(  ... but mainly when all the follower have also
> acknowledge
> > the produce request for their replica
> >
> > Fetch request
> > - When is it added to purgatory (delayed)? 2 parameters of the requests
> are
> > mainly useful here: max wait time and fetch size
> >   * if max wait is greater than 0; otherwise, it is a blocking call by
> the
> > consumer
> >   * if fetch size is greater than the current size of data available to
> > fulfil the request
> > - When does it expire?
> >   * wait time: the amount of time the consumer is willing to wait for
> data;
> > Consumer config: fetch.wait.max.ms
> > - When is it satisfied? the fetch size requested is reached - ie. the
> > amount of data the consumer wishes to receive in one response (from
> > consumer config: fetch.message.max.bytes)
> >
> > ******
> >
> > It would be useful to add some information about the metrics associated
> > with this.
> >
> > Of course, I am all for being corrected if I said anything wrong here.
> The
> > truth is always the code :-)
> >
> > marc
> > - mrtheb -
> >
> >
> > On Fri, Nov 1, 2013 at 2:45 AM, Priya Matpadi
> > <pr...@ecofactor.com>wrote:
> >
> >> Guozhang,
> >> The documentation is not very clear.
> >> Marc's response for producer purgatory makes sense.
> >> I am not entirely clear on fetch purgatory.
> >> How does broker use purgatory? Is it a temporary holding area? What
> happens
> >> to the messages if purge interval is exceeded in case of either/both
> >> producer and consumer? Are messages dropped in this case?
> >> Thanks,
> >> Priya
> >>
> >>
> >> On Thu, Oct 31, 2013 at 2:47 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >>
> >> > Hello Priya,
> >> >
> >> > You can find the definitions of these two configs here:
> >> >
> >> > http://kafka.apache.org/documentation.html#brokerconfigs
> >> >
> >> > Guozhang
> >> >
> >> >
> >> > On Thu, Oct 31, 2013 at 11:20 AM, Marc Labbe <mr...@gmail.com>
> wrote:
> >> >
> >> > > Hi Priya
> >> > >
> >> > > my understanding is producer requests will be delayed (and put in
> >> request
> >> > > purgatory) only if your producer uses ack=-1. It will be in the
> >> purgatory
> >> > > (delayed) until all brokers have acknowledged the messages to be
> >> > > replicated. The documentation suggests to monitor the
> >> > > ProducerRequestPurgatory size metrics , but it only applies if
> you're
> >> > using
> >> > > ack=-1, otherwise, this value will always be 0.
> >> > >
> >> > > For consumer requests, they'll be in purgatory (delayed) until the
> max
> >> > > allowed time to respond has been reached, unless it has enough
> messages
> >> > to
> >> > > fill the buffer before that. The request will not end up in the
> >> purgatory
> >> > > if you're making a blocking request (max wait <= 0).
> >> > >
> >> > > Not sure about the configuration interval though.
> >> > >
> >> > > marc
> >> > >
> >> > >
> >> > > On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi <
> >> > > priya.matpadi@ecofactor.com
> >> > > > wrote:
> >> > >
> >> > > > Hello,
> >> > > > What is purgatory? I believe the following two properties relate
> to
> >> > > > consumer and producer respectively.
> >> > > > Could someone please explain the significance of these?
> >> > > > fetch.purgatory.purge.interval.requests=100
> >> > > > producer.purgatory.purge.interval.requests=100
> >> > > >
> >> > > > Thanks,
> >> > > > Priya
> >> > > >
> >> > >
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >>
>

Re: Purgatory

Posted by Joel Koshy <jj...@gmail.com>.
Marc, thanks for writing that up. I think it is worth adding some
details on the request-purgatory on a wiki (Jay had started a wiki
page for kafka internals [1] a while ago, but we have not had time to
add much to it since.) Your write-up could be reviewed and added
there. Do you have edit permissions on the wiki?

As for the purge interval config - yes the documentation can be
improved a bit. It's one of those "internal" configs that generally
don't need to be modified by users. The reason we added that was as
follows:
- We found that for low-volume topics, replica fetch requests were
getting expired but sitting around in purgatory
- This was because we were expiring them from the delay queue (used to
track when requests should expire), but they were still sitting in the
watcherFor map - i.e., they would get purged when the next producer
request to that topic/partition arrived, but for low volume topics
this could be a long time (or never in the worst case) and we would
eventually run into an OOME.
- So we needed to periodically go through the entire watcherFor map
and explicitly remove those requests that had expired.
- More details on this are in KAFKA-664.

Thanks,

Joel

[1] https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internals

On Fri, Nov 1, 2013 at 12:33 PM, Marc Labbe <mr...@gmail.com> wrote:
> Guozhang,
>
> I have to agree with Priya the doc isn't very clear. Although the
> configuration is documented, it is simply rewording the name of the config,
> which isn't particularly useful if you want more information about what the
> purgatory is. I searched the whole wiki and doc and could not find anything
> very useful as opposed looking a the code. In this case,
> kafka.server.KafkaApis and kafka.server.RequestPurgatory will be your
> friends.
>
> I'll try to add to Joe's answer here, mostly just reporting what's
> available in the Scala doc from the project. I am doing this to understand
> the mechanics myself btw.
>
> As Joe said, messages are not dropped by the purgatory but simply removed
> from the purgatory when they are satisfied. Satisfaction conditions are
> different for both fetch and produce requests and this is implemented in
> their respective DelayedRequest implementation (DelayedFetch and
> DelayedProduce).
>
> Requests purgatories are defined as follow in the code:
>  - ProducerRequestPurgatory: A holding pen for produce requests waiting to
> be satisfied.
>  - FetchRequestPurgatory: A holding pen for fetch requests waiting to be
> satisfied
>
> Each request purgatory runs a thread (ExpiredRequestReaper). This thread
> will first try to find an expired delayed request. When one if found, it
> will run the purgatory's expire method to handle the delayed request
> expiration. In both produce and fetch cases, it sends a response to the
> client. An expired request will be a satisfied request. The next step of
> the thread's loop is when it checks for the configuration parameters you
> asked for initially (purgatory.purge.interval.requests). When the number of
> delayed requests given to watch by the purgatory reaches this value, it
> goes through all previously queued requests and removes those which are
> marked as satisfied. Because of that, it is really an interval more than it
> is a threshold since it doesn't really care about the amount of satisfied
> requests or the size of the queue.
>
> Producer request
> - When is it added to purgatory (delayed)?:
>   * when it uses ack=-1 (actually, the code tells me anything but 0 or 1);
> Producer config: request.required.acks
>   * partitions have more than one replica (in this case, ack=-1 isn't
> different to ack=1 and it doesn't make much sense to use a delayed request)
>   * not all partitions are in error
> - When does it expire? when it reaches the timeout defined in the produce
> request (ackTimeoutMs). Translates from producer config request.timeout.ms.
> - What happens (on the broker) when it expires? Sends a response to the
> client. Response content depends on the request of course.
> - When is it satisfied? I didn't find the courage to dig into the details
> of this one :(  ... but mainly when all the follower have also acknowledge
> the produce request for their replica
>
> Fetch request
> - When is it added to purgatory (delayed)? 2 parameters of the requests are
> mainly useful here: max wait time and fetch size
>   * if max wait is greater than 0; otherwise, it is a blocking call by the
> consumer
>   * if fetch size is greater than the current size of data available to
> fulfil the request
> - When does it expire?
>   * wait time: the amount of time the consumer is willing to wait for data;
> Consumer config: fetch.wait.max.ms
> - When is it satisfied? the fetch size requested is reached - ie. the
> amount of data the consumer wishes to receive in one response (from
> consumer config: fetch.message.max.bytes)
>
> ******
>
> It would be useful to add some information about the metrics associated
> with this.
>
> Of course, I am all for being corrected if I said anything wrong here. The
> truth is always the code :-)
>
> marc
> - mrtheb -
>
>
> On Fri, Nov 1, 2013 at 2:45 AM, Priya Matpadi
> <pr...@ecofactor.com>wrote:
>
>> Guozhang,
>> The documentation is not very clear.
>> Marc's response for producer purgatory makes sense.
>> I am not entirely clear on fetch purgatory.
>> How does broker use purgatory? Is it a temporary holding area? What happens
>> to the messages if purge interval is exceeded in case of either/both
>> producer and consumer? Are messages dropped in this case?
>> Thanks,
>> Priya
>>
>>
>> On Thu, Oct 31, 2013 at 2:47 PM, Guozhang Wang <wa...@gmail.com> wrote:
>>
>> > Hello Priya,
>> >
>> > You can find the definitions of these two configs here:
>> >
>> > http://kafka.apache.org/documentation.html#brokerconfigs
>> >
>> > Guozhang
>> >
>> >
>> > On Thu, Oct 31, 2013 at 11:20 AM, Marc Labbe <mr...@gmail.com> wrote:
>> >
>> > > Hi Priya
>> > >
>> > > my understanding is producer requests will be delayed (and put in
>> request
>> > > purgatory) only if your producer uses ack=-1. It will be in the
>> purgatory
>> > > (delayed) until all brokers have acknowledged the messages to be
>> > > replicated. The documentation suggests to monitor the
>> > > ProducerRequestPurgatory size metrics , but it only applies if you're
>> > using
>> > > ack=-1, otherwise, this value will always be 0.
>> > >
>> > > For consumer requests, they'll be in purgatory (delayed) until the max
>> > > allowed time to respond has been reached, unless it has enough messages
>> > to
>> > > fill the buffer before that. The request will not end up in the
>> purgatory
>> > > if you're making a blocking request (max wait <= 0).
>> > >
>> > > Not sure about the configuration interval though.
>> > >
>> > > marc
>> > >
>> > >
>> > > On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi <
>> > > priya.matpadi@ecofactor.com
>> > > > wrote:
>> > >
>> > > > Hello,
>> > > > What is purgatory? I believe the following two properties relate to
>> > > > consumer and producer respectively.
>> > > > Could someone please explain the significance of these?
>> > > > fetch.purgatory.purge.interval.requests=100
>> > > > producer.purgatory.purge.interval.requests=100
>> > > >
>> > > > Thanks,
>> > > > Priya
>> > > >
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>

Re: Purgatory

Posted by Marc Labbe <mr...@gmail.com>.
Guozhang,

I have to agree with Priya the doc isn't very clear. Although the
configuration is documented, it is simply rewording the name of the config,
which isn't particularly useful if you want more information about what the
purgatory is. I searched the whole wiki and doc and could not find anything
very useful as opposed looking a the code. In this case,
kafka.server.KafkaApis and kafka.server.RequestPurgatory will be your
friends.

I'll try to add to Joe's answer here, mostly just reporting what's
available in the Scala doc from the project. I am doing this to understand
the mechanics myself btw.

As Joe said, messages are not dropped by the purgatory but simply removed
from the purgatory when they are satisfied. Satisfaction conditions are
different for both fetch and produce requests and this is implemented in
their respective DelayedRequest implementation (DelayedFetch and
DelayedProduce).

Requests purgatories are defined as follow in the code:
 - ProducerRequestPurgatory: A holding pen for produce requests waiting to
be satisfied.
 - FetchRequestPurgatory: A holding pen for fetch requests waiting to be
satisfied

Each request purgatory runs a thread (ExpiredRequestReaper). This thread
will first try to find an expired delayed request. When one if found, it
will run the purgatory's expire method to handle the delayed request
expiration. In both produce and fetch cases, it sends a response to the
client. An expired request will be a satisfied request. The next step of
the thread's loop is when it checks for the configuration parameters you
asked for initially (purgatory.purge.interval.requests). When the number of
delayed requests given to watch by the purgatory reaches this value, it
goes through all previously queued requests and removes those which are
marked as satisfied. Because of that, it is really an interval more than it
is a threshold since it doesn't really care about the amount of satisfied
requests or the size of the queue.

Producer request
- When is it added to purgatory (delayed)?:
  * when it uses ack=-1 (actually, the code tells me anything but 0 or 1);
Producer config: request.required.acks
  * partitions have more than one replica (in this case, ack=-1 isn't
different to ack=1 and it doesn't make much sense to use a delayed request)
  * not all partitions are in error
- When does it expire? when it reaches the timeout defined in the produce
request (ackTimeoutMs). Translates from producer config request.timeout.ms.
- What happens (on the broker) when it expires? Sends a response to the
client. Response content depends on the request of course.
- When is it satisfied? I didn't find the courage to dig into the details
of this one :(  ... but mainly when all the follower have also acknowledge
the produce request for their replica

Fetch request
- When is it added to purgatory (delayed)? 2 parameters of the requests are
mainly useful here: max wait time and fetch size
  * if max wait is greater than 0; otherwise, it is a blocking call by the
consumer
  * if fetch size is greater than the current size of data available to
fulfil the request
- When does it expire?
  * wait time: the amount of time the consumer is willing to wait for data;
Consumer config: fetch.wait.max.ms
- When is it satisfied? the fetch size requested is reached - ie. the
amount of data the consumer wishes to receive in one response (from
consumer config: fetch.message.max.bytes)

******

It would be useful to add some information about the metrics associated
with this.

Of course, I am all for being corrected if I said anything wrong here. The
truth is always the code :-)

marc
- mrtheb -


On Fri, Nov 1, 2013 at 2:45 AM, Priya Matpadi
<pr...@ecofactor.com>wrote:

> Guozhang,
> The documentation is not very clear.
> Marc's response for producer purgatory makes sense.
> I am not entirely clear on fetch purgatory.
> How does broker use purgatory? Is it a temporary holding area? What happens
> to the messages if purge interval is exceeded in case of either/both
> producer and consumer? Are messages dropped in this case?
> Thanks,
> Priya
>
>
> On Thu, Oct 31, 2013 at 2:47 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Priya,
> >
> > You can find the definitions of these two configs here:
> >
> > http://kafka.apache.org/documentation.html#brokerconfigs
> >
> > Guozhang
> >
> >
> > On Thu, Oct 31, 2013 at 11:20 AM, Marc Labbe <mr...@gmail.com> wrote:
> >
> > > Hi Priya
> > >
> > > my understanding is producer requests will be delayed (and put in
> request
> > > purgatory) only if your producer uses ack=-1. It will be in the
> purgatory
> > > (delayed) until all brokers have acknowledged the messages to be
> > > replicated. The documentation suggests to monitor the
> > > ProducerRequestPurgatory size metrics , but it only applies if you're
> > using
> > > ack=-1, otherwise, this value will always be 0.
> > >
> > > For consumer requests, they'll be in purgatory (delayed) until the max
> > > allowed time to respond has been reached, unless it has enough messages
> > to
> > > fill the buffer before that. The request will not end up in the
> purgatory
> > > if you're making a blocking request (max wait <= 0).
> > >
> > > Not sure about the configuration interval though.
> > >
> > > marc
> > >
> > >
> > > On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi <
> > > priya.matpadi@ecofactor.com
> > > > wrote:
> > >
> > > > Hello,
> > > > What is purgatory? I believe the following two properties relate to
> > > > consumer and producer respectively.
> > > > Could someone please explain the significance of these?
> > > > fetch.purgatory.purge.interval.requests=100
> > > > producer.purgatory.purge.interval.requests=100
> > > >
> > > > Thanks,
> > > > Priya
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: Purgatory

Posted by Joe Stein <jo...@stealth.ly>.
Priya, if you want you can look at RequestPurgatory.scala for some more
details.

The config is the size of the atomic requestCounter.

Basically the purge in the purgatory is a way to check if the request has
been satisfied and delayed and can get removed.  It is a background scan
when the size reaches a certain point to get it back low again if it can
force it.  Kind of like a minor compaction how I see it so it does not grow
without bound.

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
********************************************/


On Fri, Nov 1, 2013 at 2:45 AM, Priya Matpadi
<pr...@ecofactor.com>wrote:

> Guozhang,
> The documentation is not very clear.
> Marc's response for producer purgatory makes sense.
> I am not entirely clear on fetch purgatory.
> How does broker use purgatory? Is it a temporary holding area? What happens
> to the messages if purge interval is exceeded in case of either/both
> producer and consumer? Are messages dropped in this case?
> Thanks,
> Priya
>
>
> On Thu, Oct 31, 2013 at 2:47 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Priya,
> >
> > You can find the definitions of these two configs here:
> >
> > http://kafka.apache.org/documentation.html#brokerconfigs
> >
> > Guozhang
> >
> >
> > On Thu, Oct 31, 2013 at 11:20 AM, Marc Labbe <mr...@gmail.com> wrote:
> >
> > > Hi Priya
> > >
> > > my understanding is producer requests will be delayed (and put in
> request
> > > purgatory) only if your producer uses ack=-1. It will be in the
> purgatory
> > > (delayed) until all brokers have acknowledged the messages to be
> > > replicated. The documentation suggests to monitor the
> > > ProducerRequestPurgatory size metrics , but it only applies if you're
> > using
> > > ack=-1, otherwise, this value will always be 0.
> > >
> > > For consumer requests, they'll be in purgatory (delayed) until the max
> > > allowed time to respond has been reached, unless it has enough messages
> > to
> > > fill the buffer before that. The request will not end up in the
> purgatory
> > > if you're making a blocking request (max wait <= 0).
> > >
> > > Not sure about the configuration interval though.
> > >
> > > marc
> > >
> > >
> > > On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi <
> > > priya.matpadi@ecofactor.com
> > > > wrote:
> > >
> > > > Hello,
> > > > What is purgatory? I believe the following two properties relate to
> > > > consumer and producer respectively.
> > > > Could someone please explain the significance of these?
> > > > fetch.purgatory.purge.interval.requests=100
> > > > producer.purgatory.purge.interval.requests=100
> > > >
> > > > Thanks,
> > > > Priya
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: Purgatory

Posted by Priya Matpadi <pr...@ecofactor.com>.
Guozhang,
The documentation is not very clear.
Marc's response for producer purgatory makes sense.
I am not entirely clear on fetch purgatory.
How does broker use purgatory? Is it a temporary holding area? What happens
to the messages if purge interval is exceeded in case of either/both
producer and consumer? Are messages dropped in this case?
Thanks,
Priya


On Thu, Oct 31, 2013 at 2:47 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hello Priya,
>
> You can find the definitions of these two configs here:
>
> http://kafka.apache.org/documentation.html#brokerconfigs
>
> Guozhang
>
>
> On Thu, Oct 31, 2013 at 11:20 AM, Marc Labbe <mr...@gmail.com> wrote:
>
> > Hi Priya
> >
> > my understanding is producer requests will be delayed (and put in request
> > purgatory) only if your producer uses ack=-1. It will be in the purgatory
> > (delayed) until all brokers have acknowledged the messages to be
> > replicated. The documentation suggests to monitor the
> > ProducerRequestPurgatory size metrics , but it only applies if you're
> using
> > ack=-1, otherwise, this value will always be 0.
> >
> > For consumer requests, they'll be in purgatory (delayed) until the max
> > allowed time to respond has been reached, unless it has enough messages
> to
> > fill the buffer before that. The request will not end up in the purgatory
> > if you're making a blocking request (max wait <= 0).
> >
> > Not sure about the configuration interval though.
> >
> > marc
> >
> >
> > On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi <
> > priya.matpadi@ecofactor.com
> > > wrote:
> >
> > > Hello,
> > > What is purgatory? I believe the following two properties relate to
> > > consumer and producer respectively.
> > > Could someone please explain the significance of these?
> > > fetch.purgatory.purge.interval.requests=100
> > > producer.purgatory.purge.interval.requests=100
> > >
> > > Thanks,
> > > Priya
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Purgatory

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Priya,

You can find the definitions of these two configs here:

http://kafka.apache.org/documentation.html#brokerconfigs

Guozhang


On Thu, Oct 31, 2013 at 11:20 AM, Marc Labbe <mr...@gmail.com> wrote:

> Hi Priya
>
> my understanding is producer requests will be delayed (and put in request
> purgatory) only if your producer uses ack=-1. It will be in the purgatory
> (delayed) until all brokers have acknowledged the messages to be
> replicated. The documentation suggests to monitor the
> ProducerRequestPurgatory size metrics , but it only applies if you're using
> ack=-1, otherwise, this value will always be 0.
>
> For consumer requests, they'll be in purgatory (delayed) until the max
> allowed time to respond has been reached, unless it has enough messages to
> fill the buffer before that. The request will not end up in the purgatory
> if you're making a blocking request (max wait <= 0).
>
> Not sure about the configuration interval though.
>
> marc
>
>
> On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi <
> priya.matpadi@ecofactor.com
> > wrote:
>
> > Hello,
> > What is purgatory? I believe the following two properties relate to
> > consumer and producer respectively.
> > Could someone please explain the significance of these?
> > fetch.purgatory.purge.interval.requests=100
> > producer.purgatory.purge.interval.requests=100
> >
> > Thanks,
> > Priya
> >
>



-- 
-- Guozhang

Re: Purgatory

Posted by Marc Labbe <mr...@gmail.com>.
Hi Priya

my understanding is producer requests will be delayed (and put in request
purgatory) only if your producer uses ack=-1. It will be in the purgatory
(delayed) until all brokers have acknowledged the messages to be
replicated. The documentation suggests to monitor the
ProducerRequestPurgatory size metrics , but it only applies if you're using
ack=-1, otherwise, this value will always be 0.

For consumer requests, they'll be in purgatory (delayed) until the max
allowed time to respond has been reached, unless it has enough messages to
fill the buffer before that. The request will not end up in the purgatory
if you're making a blocking request (max wait <= 0).

Not sure about the configuration interval though.

marc


On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi <priya.matpadi@ecofactor.com
> wrote:

> Hello,
> What is purgatory? I believe the following two properties relate to
> consumer and producer respectively.
> Could someone please explain the significance of these?
> fetch.purgatory.purge.interval.requests=100
> producer.purgatory.purge.interval.requests=100
>
> Thanks,
> Priya
>