You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jason Rosenberg <jb...@squareup.com> on 2013/08/24 01:09:33 UTC

Producer.send questions

I'm using the kafka.javaapi.producer.Producer class from a java client.
 I'm wondering if it ever makes sense to refresh a producer by stopping it
and creating a new one, for example in response to a downstream IO error
(e.g. a broker got restarted, or a stale socket, etc.).

Or should it always be safe to rely on the producer's implementation to
manage it's pool of BlockingChannel connections, etc.

I'm also interested in trying to understand which exceptions indicate a
failed send() request might be retryable (basically anything that doesn't
involve a data-dependent problem, like a malformed message, or a message
too large, etc.).

Unfortunately, the range of Exceptions that can be thrown by the various
javaapi methods is not yet well documented.  It would be nice to have some
notion of whether an exception is the result of a data error, or a
transient downstream connection error, etc.

Jason

Re: Producer.send questions

Posted by Neha Narkhede <ne...@gmail.com>.
Since these errors are recoverable, the producer client can retry the
requests. kafka.javaapi.producer.Producer does the same.

Thanks,
Neha


On Fri, Aug 23, 2013 at 9:41 PM, Vadim Keylis <vk...@gmail.com> wrote:

> Jun. In general how one will recover from NotLeaderForPartitionException?
> What are the steps?
>
> Thanks,
> Vadim
>
>
> On Fri, Aug 23, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > For the most part, only SocketExceptions and
> NotLeaderForPartitionException
> > are recoverable. MessageSizeTooLargeException may be recoverable with a
> > smaller batch size.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Fri, Aug 23, 2013 at 4:09 PM, Jason Rosenberg <jb...@squareup.com>
> wrote:
> >
> > > I'm using the kafka.javaapi.producer.Producer class from a java client.
> > >  I'm wondering if it ever makes sense to refresh a producer by stopping
> > it
> > > and creating a new one, for example in response to a downstream IO
> error
> > > (e.g. a broker got restarted, or a stale socket, etc.).
> > >
> > > Or should it always be safe to rely on the producer's implementation to
> > > manage it's pool of BlockingChannel connections, etc.
> > >
> > > I'm also interested in trying to understand which exceptions indicate a
> > > failed send() request might be retryable (basically anything that
> doesn't
> > > involve a data-dependent problem, like a malformed message, or a
> message
> > > too large, etc.).
> > >
> > > Unfortunately, the range of Exceptions that can be thrown by the
> various
> > > javaapi methods is not yet well documented.  It would be nice to have
> > some
> > > notion of whether an exception is the result of a data error, or a
> > > transient downstream connection error, etc.
> > >
> > > Jason
> > >
> >
>

Re: Producer.send questions

Posted by Vadim Keylis <vk...@gmail.com>.
Jun. In general how one will recover from NotLeaderForPartitionException?
What are the steps?

Thanks,
Vadim


On Fri, Aug 23, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:

> For the most part, only SocketExceptions and NotLeaderForPartitionException
> are recoverable. MessageSizeTooLargeException may be recoverable with a
> smaller batch size.
>
> Thanks,
>
> Jun
>
>
> On Fri, Aug 23, 2013 at 4:09 PM, Jason Rosenberg <jb...@squareup.com> wrote:
>
> > I'm using the kafka.javaapi.producer.Producer class from a java client.
> >  I'm wondering if it ever makes sense to refresh a producer by stopping
> it
> > and creating a new one, for example in response to a downstream IO error
> > (e.g. a broker got restarted, or a stale socket, etc.).
> >
> > Or should it always be safe to rely on the producer's implementation to
> > manage it's pool of BlockingChannel connections, etc.
> >
> > I'm also interested in trying to understand which exceptions indicate a
> > failed send() request might be retryable (basically anything that doesn't
> > involve a data-dependent problem, like a malformed message, or a message
> > too large, etc.).
> >
> > Unfortunately, the range of Exceptions that can be thrown by the various
> > javaapi methods is not yet well documented.  It would be nice to have
> some
> > notion of whether an exception is the result of a data error, or a
> > transient downstream connection error, etc.
> >
> > Jason
> >
>

Re: Producer.send questions

Posted by Jason Rosenberg <jb...@squareup.com>.
Ok,

I added a comment to: https://issues.apache.org/jira/browse/KAFKA-998

I added suggestion about exposing recoverability back to the caller.  I'm
not 100% sure it's the same issue (since I'm concerned about the client
api), and the Jira seems concerned about returning quickly from the
internal retry if it's not retryable.  Let me know if it makes more sense
to create a separate Jira instead.

It seems that ticket already had info about automatically retrying with a
smaller batch size in response to a MessageToLargeException.  Let me know
if that warrants it's own Jira (can create that too).

Jason


On Sun, Aug 25, 2013 at 10:01 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Actually we do have a JIRA tracking this issue:
>
> https://issues.apache.org/jira/browse/KAFKA-998
>
> And BTW, any review comments are welcome :)
>
> Guozhang
>
>
> On Sat, Aug 24, 2013 at 8:25 PM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
>
> > >> Ok, but perhaps the producer will handle something like this in the
> > future?
> >
> > Yes, I think we need a JIRA for this.
> >
> > >> UnretryableFailedToSendMessageException (wraps the root cause)
> > NoMoreRetriesFailedToSendMessageException (wraps the root cause, from the
> > final attempt)
> >
> > Something like this makes sense. Would you mind creating a JIRA for this
> so
> > we can
> > discuss a solution there ?
> >
> > Thanks,
> > Neha
> >
> >
> > On Sat, Aug 24, 2013 at 10:41 AM, Jason Rosenberg <jb...@squareup.com>
> > wrote:
> >
> > > Thanks Neha,
> > >
> > > On Sat, Aug 24, 2013 at 10:06 AM, Neha Narkhede <
> neha.narkhede@gmail.com
> > > >wrote:
> > > >
> > > > >> I gathered from one of your previous responses, that a
> > > > MessageSizeTooLargeException
> > > > can be rectified with a smaller batch size.
> > > >  If so, does that imply that the message size limit is measured on
> the
> > > > broker by the cumulative size of the batch, and not of any one
> message?
> > > >
> > > > That's right. The broker does the message size check on the
> compressed
> > > > message. The size of the compressed message
> > > > is proportional to the batch size. Hence, reducing the batch size on
> a
> > > > retry might make sense here, but currently the
> > > > producer doesn't do this.
> > > >
> > >
> > > Ok, but perhaps the producer will handle something like this in the
> > future?
> > >
> > > >
> > > > >> If I want to implement guaranteed delivery semantics, using the
> new
> > > > request.required.acks
> > > > configuration, I need to expose retry logic beyond
> > > > that built into the producer?
> > > >
> > > > The kafka producer must handle recoverable exceptions with a
> > configurable
> > > > number of retries and must not retry
> > > > on unrecoverable exceptions. So ideally you shouldn't have to write
> > your
> > > > own batching and retry logic.
> > > >
> > >
> > > So, it seems there might be a bit of a gray area.  There is a
> > configurable
> > > retry count, which we can increase perhaps to gain confidence that
> > anything
> > > recoverable has been sent.  But, since this retry count is finite,
> > there's
> > > no way to know for sure that it won't succeed if it were retried just
> one
> > > more time.  So, it is then difficult to conclude that  if Producer.send
> > > throws a FailedToSendMessageException, the message shouldn't be
> retried.
> > >
> > > Perhaps it would be useful to define different exception types, so
> that a
> > > caller can have clearer semantics:
> > >
> > > UnretryableFailedToSendMessageException (wraps the root cause)
> > > NoMoreRetriesFailedToSendMessageException (wraps the root cause, from
> the
> > > final attempt)
> > >
> > > Probably shorter names are possible here!  Perhaps these could be
> > > subclasses of FailedToSendMessageException.  Alternately,
> > > FailedToSendMessageException could include information, such as the
> > number
> > > of retries attempted, and a flag indicating whether it's possible to
> > retry
> > > the message.
> > >
> > > Jason
> > >
> > >
> > > >
> > > >
> > > >
> > > > On Sat, Aug 24, 2013 at 9:54 AM, Jason Rosenberg <jb...@squareup.com>
> > > wrote:
> > > >
> > > > > Jun,
> > > > >
> > > > > Thanks, this is helpful.
> > > > >
> > > > > So, can QueueFullException occur in either sync or async mode (or
> > just
> > > > > async mode)?
> > > > >
> > > > > If there's a MessageSizeTooLargeException, is there any visibility
> of
> > > > this
> > > > > to the caller?  Or will it just be a FailedToSendMessageException.
>  I
> > > > > gathered from one of your previous responses, that a
> > > > > MessageSizeTooLargeException can be rectified with a smaller batch
> > > size.
> > > > >  If so, does that imply that the message size limit is measured on
> > the
> > > > > broker by the cumulative size of the batch, and not of any one
> > message?
> > > > >  (makes sense if the broker doesn't unwrap a batch of messages
> before
> > > > > storing on the server).
> > > > >
> > > > > If I want to implement guaranteed delivery semantics, using the new
> > > > > request.required.acks configuration, I need to expose retry logic
> > > beyond
> > > > > that built into the producer?  And to do this, I need to indicate
> to
> > > the
> > > > > caller whether it's possible to retry, or whether it will be
> > fruitless.
> > > >  I
> > > > > suppose allowing message.max.send.retries to allow infinite retries
> > > (e.g.
> > > > > by setting it to -1) might be useful.  But optionally, I'd like the
> > > > caller
> > > > > to be able to handle this retry logic itself.
> > > > >
> > > > > Jason
> > > > >
> > > > >
> > > > > On Sat, Aug 24, 2013 at 8:22 AM, Jun Rao <ju...@gmail.com> wrote:
> > > > >
> > > > > > You don't need to restart the producer. The producer currently
> > > handles
> > > > > all
> > > > > > error/exceptions by refreshing the metadata and retrying. If it
> > fails
> > > > all
> > > > > > retries, it throws a FailedToSendMessageException to the caller
> (in
> > > > sync
> > > > > > mode). The original cause is not included in this exception. We
> > have
> > > > > > thought about being a bit smarter in the producer retry logic
> such
> > > that
> > > > > it
> > > > > > only retries on recoverable errors and could implement this at
> some
> > > > > point.
> > > > > > Other than FailedToSendMessageException, the producer can also
> > throw
> > > > > > QueueFullException.
> > > > > > This is an indication that the producer is sending data at a rate
> > > > faster
> > > > > > than the broker can handle. This may or may not be recoverable
> > since
> > > it
> > > > > > depends on the load.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Sat, Aug 24, 2013 at 1:44 AM, Jason Rosenberg <
> jbr@squareup.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Jun,
> > > > > > >
> > > > > > > There are several others I've seen that I would have thought
> > would
> > > be
> > > > > > > retryable (possibly after an exponential backoff delay).  I'm
> > > curious
> > > > > > > about:
> > > > > > >
> > > > > > > BrokerNotAvailableException
> > > > > > > FailedToSendMessageException
> > > > > > > QueueFullException (happens if producerType is 'async')
> > > > > > > KafkaException (this seems to wrap lots of base conditions,
> does
> > > one
> > > > > have
> > > > > > > to sort through the different wrapped exception types?)
> > > > > > > LeaderNotAvailableException
> > > > > > > MessageSizeTooLargeException (does a batch of messages get
> > treated
> > > > as a
> > > > > > > single message, when checking for message size too large?)
> > > > > > > ReplicaNotAvailableException
> > > > > > > UnavailableProducerException
> > > > > > > UnknownException
> > > > > > >
> > > > > > > Also, what about my first question, regarding whether it makes
> > > sense
> > > > to
> > > > > > > refresh a producer by closing it and restarting it after a
> > failure?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jason
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Aug 23, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com>
> > wrote:
> > > > > > >
> > > > > > > > For the most part, only SocketExceptions and
> > > > > > > NotLeaderForPartitionException
> > > > > > > > are recoverable. MessageSizeTooLargeException may be
> > recoverable
> > > > > with a
> > > > > > > > smaller batch size.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Aug 23, 2013 at 4:09 PM, Jason Rosenberg <
> > > jbr@squareup.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > I'm using the kafka.javaapi.producer.Producer class from a
> > java
> > > > > > client.
> > > > > > > > >  I'm wondering if it ever makes sense to refresh a producer
> > by
> > > > > > stopping
> > > > > > > > it
> > > > > > > > > and creating a new one, for example in response to a
> > downstream
> > > > IO
> > > > > > > error
> > > > > > > > > (e.g. a broker got restarted, or a stale socket, etc.).
> > > > > > > > >
> > > > > > > > > Or should it always be safe to rely on the producer's
> > > > > implementation
> > > > > > to
> > > > > > > > > manage it's pool of BlockingChannel connections, etc.
> > > > > > > > >
> > > > > > > > > I'm also interested in trying to understand which
> exceptions
> > > > > > indicate a
> > > > > > > > > failed send() request might be retryable (basically
> anything
> > > that
> > > > > > > doesn't
> > > > > > > > > involve a data-dependent problem, like a malformed message,
> > or
> > > a
> > > > > > > message
> > > > > > > > > too large, etc.).
> > > > > > > > >
> > > > > > > > > Unfortunately, the range of Exceptions that can be thrown
> by
> > > the
> > > > > > > various
> > > > > > > > > javaapi methods is not yet well documented.  It would be
> nice
> > > to
> > > > > have
> > > > > > > > some
> > > > > > > > > notion of whether an exception is the result of a data
> error,
> > > or
> > > > a
> > > > > > > > > transient downstream connection error, etc.
> > > > > > > > >
> > > > > > > > > Jason
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > So, can QueueFullException occur in either sync or async mode (or
> just
> > > > async mode)?
> > > >
> > > > If there's a MessageSizeTooLargeException, is there any visibility of
> > > this
> > > > to the caller?  Or will it just be a FailedToSendMessageException.  I
> > > > gathered from one of your previous responses, that a
> > > > MessageSizeTooLargeException can be rectified with a smaller batch
> > size.
> > > >  If so, does that imply that the message size limit is measured on
> the
> > > > broker by the cumulative size of the batch, and not of any one
> message?
> > > >  (makes sense if the broker doesn't unwrap a batch of messages before
> > > > storing on the server).
> > > >
> > > > If I want to implement guaranteed delivery semantics, using the new
> > > > request.required.acks configuration, I need to expose retry logic
> > beyond
> > > > that built into the producer?
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Producer.send questions

Posted by Guozhang Wang <wa...@gmail.com>.
Actually we do have a JIRA tracking this issue:

https://issues.apache.org/jira/browse/KAFKA-998

And BTW, any review comments are welcome :)

Guozhang


On Sat, Aug 24, 2013 at 8:25 PM, Neha Narkhede <ne...@gmail.com>wrote:

> >> Ok, but perhaps the producer will handle something like this in the
> future?
>
> Yes, I think we need a JIRA for this.
>
> >> UnretryableFailedToSendMessageException (wraps the root cause)
> NoMoreRetriesFailedToSendMessageException (wraps the root cause, from the
> final attempt)
>
> Something like this makes sense. Would you mind creating a JIRA for this so
> we can
> discuss a solution there ?
>
> Thanks,
> Neha
>
>
> On Sat, Aug 24, 2013 at 10:41 AM, Jason Rosenberg <jb...@squareup.com>
> wrote:
>
> > Thanks Neha,
> >
> > On Sat, Aug 24, 2013 at 10:06 AM, Neha Narkhede <neha.narkhede@gmail.com
> > >wrote:
> > >
> > > >> I gathered from one of your previous responses, that a
> > > MessageSizeTooLargeException
> > > can be rectified with a smaller batch size.
> > >  If so, does that imply that the message size limit is measured on the
> > > broker by the cumulative size of the batch, and not of any one message?
> > >
> > > That's right. The broker does the message size check on the compressed
> > > message. The size of the compressed message
> > > is proportional to the batch size. Hence, reducing the batch size on a
> > > retry might make sense here, but currently the
> > > producer doesn't do this.
> > >
> >
> > Ok, but perhaps the producer will handle something like this in the
> future?
> >
> > >
> > > >> If I want to implement guaranteed delivery semantics, using the new
> > > request.required.acks
> > > configuration, I need to expose retry logic beyond
> > > that built into the producer?
> > >
> > > The kafka producer must handle recoverable exceptions with a
> configurable
> > > number of retries and must not retry
> > > on unrecoverable exceptions. So ideally you shouldn't have to write
> your
> > > own batching and retry logic.
> > >
> >
> > So, it seems there might be a bit of a gray area.  There is a
> configurable
> > retry count, which we can increase perhaps to gain confidence that
> anything
> > recoverable has been sent.  But, since this retry count is finite,
> there's
> > no way to know for sure that it won't succeed if it were retried just one
> > more time.  So, it is then difficult to conclude that  if Producer.send
> > throws a FailedToSendMessageException, the message shouldn't be retried.
> >
> > Perhaps it would be useful to define different exception types, so that a
> > caller can have clearer semantics:
> >
> > UnretryableFailedToSendMessageException (wraps the root cause)
> > NoMoreRetriesFailedToSendMessageException (wraps the root cause, from the
> > final attempt)
> >
> > Probably shorter names are possible here!  Perhaps these could be
> > subclasses of FailedToSendMessageException.  Alternately,
> > FailedToSendMessageException could include information, such as the
> number
> > of retries attempted, and a flag indicating whether it's possible to
> retry
> > the message.
> >
> > Jason
> >
> >
> > >
> > >
> > >
> > > On Sat, Aug 24, 2013 at 9:54 AM, Jason Rosenberg <jb...@squareup.com>
> > wrote:
> > >
> > > > Jun,
> > > >
> > > > Thanks, this is helpful.
> > > >
> > > > So, can QueueFullException occur in either sync or async mode (or
> just
> > > > async mode)?
> > > >
> > > > If there's a MessageSizeTooLargeException, is there any visibility of
> > > this
> > > > to the caller?  Or will it just be a FailedToSendMessageException.  I
> > > > gathered from one of your previous responses, that a
> > > > MessageSizeTooLargeException can be rectified with a smaller batch
> > size.
> > > >  If so, does that imply that the message size limit is measured on
> the
> > > > broker by the cumulative size of the batch, and not of any one
> message?
> > > >  (makes sense if the broker doesn't unwrap a batch of messages before
> > > > storing on the server).
> > > >
> > > > If I want to implement guaranteed delivery semantics, using the new
> > > > request.required.acks configuration, I need to expose retry logic
> > beyond
> > > > that built into the producer?  And to do this, I need to indicate to
> > the
> > > > caller whether it's possible to retry, or whether it will be
> fruitless.
> > >  I
> > > > suppose allowing message.max.send.retries to allow infinite retries
> > (e.g.
> > > > by setting it to -1) might be useful.  But optionally, I'd like the
> > > caller
> > > > to be able to handle this retry logic itself.
> > > >
> > > > Jason
> > > >
> > > >
> > > > On Sat, Aug 24, 2013 at 8:22 AM, Jun Rao <ju...@gmail.com> wrote:
> > > >
> > > > > You don't need to restart the producer. The producer currently
> > handles
> > > > all
> > > > > error/exceptions by refreshing the metadata and retrying. If it
> fails
> > > all
> > > > > retries, it throws a FailedToSendMessageException to the caller (in
> > > sync
> > > > > mode). The original cause is not included in this exception. We
> have
> > > > > thought about being a bit smarter in the producer retry logic such
> > that
> > > > it
> > > > > only retries on recoverable errors and could implement this at some
> > > > point.
> > > > > Other than FailedToSendMessageException, the producer can also
> throw
> > > > > QueueFullException.
> > > > > This is an indication that the producer is sending data at a rate
> > > faster
> > > > > than the broker can handle. This may or may not be recoverable
> since
> > it
> > > > > depends on the load.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Sat, Aug 24, 2013 at 1:44 AM, Jason Rosenberg <jbr@squareup.com
> >
> > > > wrote:
> > > > >
> > > > > > Jun,
> > > > > >
> > > > > > There are several others I've seen that I would have thought
> would
> > be
> > > > > > retryable (possibly after an exponential backoff delay).  I'm
> > curious
> > > > > > about:
> > > > > >
> > > > > > BrokerNotAvailableException
> > > > > > FailedToSendMessageException
> > > > > > QueueFullException (happens if producerType is 'async')
> > > > > > KafkaException (this seems to wrap lots of base conditions, does
> > one
> > > > have
> > > > > > to sort through the different wrapped exception types?)
> > > > > > LeaderNotAvailableException
> > > > > > MessageSizeTooLargeException (does a batch of messages get
> treated
> > > as a
> > > > > > single message, when checking for message size too large?)
> > > > > > ReplicaNotAvailableException
> > > > > > UnavailableProducerException
> > > > > > UnknownException
> > > > > >
> > > > > > Also, what about my first question, regarding whether it makes
> > sense
> > > to
> > > > > > refresh a producer by closing it and restarting it after a
> failure?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jason
> > > > > >
> > > > > >
> > > > > > On Fri, Aug 23, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com>
> wrote:
> > > > > >
> > > > > > > For the most part, only SocketExceptions and
> > > > > > NotLeaderForPartitionException
> > > > > > > are recoverable. MessageSizeTooLargeException may be
> recoverable
> > > > with a
> > > > > > > smaller batch size.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Aug 23, 2013 at 4:09 PM, Jason Rosenberg <
> > jbr@squareup.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > I'm using the kafka.javaapi.producer.Producer class from a
> java
> > > > > client.
> > > > > > > >  I'm wondering if it ever makes sense to refresh a producer
> by
> > > > > stopping
> > > > > > > it
> > > > > > > > and creating a new one, for example in response to a
> downstream
> > > IO
> > > > > > error
> > > > > > > > (e.g. a broker got restarted, or a stale socket, etc.).
> > > > > > > >
> > > > > > > > Or should it always be safe to rely on the producer's
> > > > implementation
> > > > > to
> > > > > > > > manage it's pool of BlockingChannel connections, etc.
> > > > > > > >
> > > > > > > > I'm also interested in trying to understand which exceptions
> > > > > indicate a
> > > > > > > > failed send() request might be retryable (basically anything
> > that
> > > > > > doesn't
> > > > > > > > involve a data-dependent problem, like a malformed message,
> or
> > a
> > > > > > message
> > > > > > > > too large, etc.).
> > > > > > > >
> > > > > > > > Unfortunately, the range of Exceptions that can be thrown by
> > the
> > > > > > various
> > > > > > > > javaapi methods is not yet well documented.  It would be nice
> > to
> > > > have
> > > > > > > some
> > > > > > > > notion of whether an exception is the result of a data error,
> > or
> > > a
> > > > > > > > transient downstream connection error, etc.
> > > > > > > >
> > > > > > > > Jason
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > So, can QueueFullException occur in either sync or async mode (or just
> > > async mode)?
> > >
> > > If there's a MessageSizeTooLargeException, is there any visibility of
> > this
> > > to the caller?  Or will it just be a FailedToSendMessageException.  I
> > > gathered from one of your previous responses, that a
> > > MessageSizeTooLargeException can be rectified with a smaller batch
> size.
> > >  If so, does that imply that the message size limit is measured on the
> > > broker by the cumulative size of the batch, and not of any one message?
> > >  (makes sense if the broker doesn't unwrap a batch of messages before
> > > storing on the server).
> > >
> > > If I want to implement guaranteed delivery semantics, using the new
> > > request.required.acks configuration, I need to expose retry logic
> beyond
> > > that built into the producer?
> > >
> >
>



-- 
-- Guozhang

Re: Producer.send questions

Posted by Neha Narkhede <ne...@gmail.com>.
>> Ok, but perhaps the producer will handle something like this in the
future?

Yes, I think we need a JIRA for this.

>> UnretryableFailedToSendMessageException (wraps the root cause)
NoMoreRetriesFailedToSendMessageException (wraps the root cause, from the
final attempt)

Something like this makes sense. Would you mind creating a JIRA for this so
we can
discuss a solution there ?

Thanks,
Neha


On Sat, Aug 24, 2013 at 10:41 AM, Jason Rosenberg <jb...@squareup.com> wrote:

> Thanks Neha,
>
> On Sat, Aug 24, 2013 at 10:06 AM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
> >
> > >> I gathered from one of your previous responses, that a
> > MessageSizeTooLargeException
> > can be rectified with a smaller batch size.
> >  If so, does that imply that the message size limit is measured on the
> > broker by the cumulative size of the batch, and not of any one message?
> >
> > That's right. The broker does the message size check on the compressed
> > message. The size of the compressed message
> > is proportional to the batch size. Hence, reducing the batch size on a
> > retry might make sense here, but currently the
> > producer doesn't do this.
> >
>
> Ok, but perhaps the producer will handle something like this in the future?
>
> >
> > >> If I want to implement guaranteed delivery semantics, using the new
> > request.required.acks
> > configuration, I need to expose retry logic beyond
> > that built into the producer?
> >
> > The kafka producer must handle recoverable exceptions with a configurable
> > number of retries and must not retry
> > on unrecoverable exceptions. So ideally you shouldn't have to write your
> > own batching and retry logic.
> >
>
> So, it seems there might be a bit of a gray area.  There is a configurable
> retry count, which we can increase perhaps to gain confidence that anything
> recoverable has been sent.  But, since this retry count is finite, there's
> no way to know for sure that it won't succeed if it were retried just one
> more time.  So, it is then difficult to conclude that  if Producer.send
> throws a FailedToSendMessageException, the message shouldn't be retried.
>
> Perhaps it would be useful to define different exception types, so that a
> caller can have clearer semantics:
>
> UnretryableFailedToSendMessageException (wraps the root cause)
> NoMoreRetriesFailedToSendMessageException (wraps the root cause, from the
> final attempt)
>
> Probably shorter names are possible here!  Perhaps these could be
> subclasses of FailedToSendMessageException.  Alternately,
> FailedToSendMessageException could include information, such as the number
> of retries attempted, and a flag indicating whether it's possible to retry
> the message.
>
> Jason
>
>
> >
> >
> >
> > On Sat, Aug 24, 2013 at 9:54 AM, Jason Rosenberg <jb...@squareup.com>
> wrote:
> >
> > > Jun,
> > >
> > > Thanks, this is helpful.
> > >
> > > So, can QueueFullException occur in either sync or async mode (or just
> > > async mode)?
> > >
> > > If there's a MessageSizeTooLargeException, is there any visibility of
> > this
> > > to the caller?  Or will it just be a FailedToSendMessageException.  I
> > > gathered from one of your previous responses, that a
> > > MessageSizeTooLargeException can be rectified with a smaller batch
> size.
> > >  If so, does that imply that the message size limit is measured on the
> > > broker by the cumulative size of the batch, and not of any one message?
> > >  (makes sense if the broker doesn't unwrap a batch of messages before
> > > storing on the server).
> > >
> > > If I want to implement guaranteed delivery semantics, using the new
> > > request.required.acks configuration, I need to expose retry logic
> beyond
> > > that built into the producer?  And to do this, I need to indicate to
> the
> > > caller whether it's possible to retry, or whether it will be fruitless.
> >  I
> > > suppose allowing message.max.send.retries to allow infinite retries
> (e.g.
> > > by setting it to -1) might be useful.  But optionally, I'd like the
> > caller
> > > to be able to handle this retry logic itself.
> > >
> > > Jason
> > >
> > >
> > > On Sat, Aug 24, 2013 at 8:22 AM, Jun Rao <ju...@gmail.com> wrote:
> > >
> > > > You don't need to restart the producer. The producer currently
> handles
> > > all
> > > > error/exceptions by refreshing the metadata and retrying. If it fails
> > all
> > > > retries, it throws a FailedToSendMessageException to the caller (in
> > sync
> > > > mode). The original cause is not included in this exception. We have
> > > > thought about being a bit smarter in the producer retry logic such
> that
> > > it
> > > > only retries on recoverable errors and could implement this at some
> > > point.
> > > > Other than FailedToSendMessageException, the producer can also throw
> > > > QueueFullException.
> > > > This is an indication that the producer is sending data at a rate
> > faster
> > > > than the broker can handle. This may or may not be recoverable since
> it
> > > > depends on the load.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Sat, Aug 24, 2013 at 1:44 AM, Jason Rosenberg <jb...@squareup.com>
> > > wrote:
> > > >
> > > > > Jun,
> > > > >
> > > > > There are several others I've seen that I would have thought would
> be
> > > > > retryable (possibly after an exponential backoff delay).  I'm
> curious
> > > > > about:
> > > > >
> > > > > BrokerNotAvailableException
> > > > > FailedToSendMessageException
> > > > > QueueFullException (happens if producerType is 'async')
> > > > > KafkaException (this seems to wrap lots of base conditions, does
> one
> > > have
> > > > > to sort through the different wrapped exception types?)
> > > > > LeaderNotAvailableException
> > > > > MessageSizeTooLargeException (does a batch of messages get treated
> > as a
> > > > > single message, when checking for message size too large?)
> > > > > ReplicaNotAvailableException
> > > > > UnavailableProducerException
> > > > > UnknownException
> > > > >
> > > > > Also, what about my first question, regarding whether it makes
> sense
> > to
> > > > > refresh a producer by closing it and restarting it after a failure?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jason
> > > > >
> > > > >
> > > > > On Fri, Aug 23, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:
> > > > >
> > > > > > For the most part, only SocketExceptions and
> > > > > NotLeaderForPartitionException
> > > > > > are recoverable. MessageSizeTooLargeException may be recoverable
> > > with a
> > > > > > smaller batch size.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Fri, Aug 23, 2013 at 4:09 PM, Jason Rosenberg <
> jbr@squareup.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > I'm using the kafka.javaapi.producer.Producer class from a java
> > > > client.
> > > > > > >  I'm wondering if it ever makes sense to refresh a producer by
> > > > stopping
> > > > > > it
> > > > > > > and creating a new one, for example in response to a downstream
> > IO
> > > > > error
> > > > > > > (e.g. a broker got restarted, or a stale socket, etc.).
> > > > > > >
> > > > > > > Or should it always be safe to rely on the producer's
> > > implementation
> > > > to
> > > > > > > manage it's pool of BlockingChannel connections, etc.
> > > > > > >
> > > > > > > I'm also interested in trying to understand which exceptions
> > > > indicate a
> > > > > > > failed send() request might be retryable (basically anything
> that
> > > > > doesn't
> > > > > > > involve a data-dependent problem, like a malformed message, or
> a
> > > > > message
> > > > > > > too large, etc.).
> > > > > > >
> > > > > > > Unfortunately, the range of Exceptions that can be thrown by
> the
> > > > > various
> > > > > > > javaapi methods is not yet well documented.  It would be nice
> to
> > > have
> > > > > > some
> > > > > > > notion of whether an exception is the result of a data error,
> or
> > a
> > > > > > > transient downstream connection error, etc.
> > > > > > >
> > > > > > > Jason
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > So, can QueueFullException occur in either sync or async mode (or just
> > async mode)?
> >
> > If there's a MessageSizeTooLargeException, is there any visibility of
> this
> > to the caller?  Or will it just be a FailedToSendMessageException.  I
> > gathered from one of your previous responses, that a
> > MessageSizeTooLargeException can be rectified with a smaller batch size.
> >  If so, does that imply that the message size limit is measured on the
> > broker by the cumulative size of the batch, and not of any one message?
> >  (makes sense if the broker doesn't unwrap a batch of messages before
> > storing on the server).
> >
> > If I want to implement guaranteed delivery semantics, using the new
> > request.required.acks configuration, I need to expose retry logic beyond
> > that built into the producer?
> >
>

Re: Producer.send questions

Posted by Jason Rosenberg <jb...@squareup.com>.
Thanks Neha,

On Sat, Aug 24, 2013 at 10:06 AM, Neha Narkhede <ne...@gmail.com>wrote:
>
> >> I gathered from one of your previous responses, that a
> MessageSizeTooLargeException
> can be rectified with a smaller batch size.
>  If so, does that imply that the message size limit is measured on the
> broker by the cumulative size of the batch, and not of any one message?
>
> That's right. The broker does the message size check on the compressed
> message. The size of the compressed message
> is proportional to the batch size. Hence, reducing the batch size on a
> retry might make sense here, but currently the
> producer doesn't do this.
>

Ok, but perhaps the producer will handle something like this in the future?

>
> >> If I want to implement guaranteed delivery semantics, using the new
> request.required.acks
> configuration, I need to expose retry logic beyond
> that built into the producer?
>
> The kafka producer must handle recoverable exceptions with a configurable
> number of retries and must not retry
> on unrecoverable exceptions. So ideally you shouldn't have to write your
> own batching and retry logic.
>

So, it seems there might be a bit of a gray area.  There is a configurable
retry count, which we can increase perhaps to gain confidence that anything
recoverable has been sent.  But, since this retry count is finite, there's
no way to know for sure that it won't succeed if it were retried just one
more time.  So, it is then difficult to conclude that  if Producer.send
throws a FailedToSendMessageException, the message shouldn't be retried.

Perhaps it would be useful to define different exception types, so that a
caller can have clearer semantics:

UnretryableFailedToSendMessageException (wraps the root cause)
NoMoreRetriesFailedToSendMessageException (wraps the root cause, from the
final attempt)

Probably shorter names are possible here!  Perhaps these could be
subclasses of FailedToSendMessageException.  Alternately,
FailedToSendMessageException could include information, such as the number
of retries attempted, and a flag indicating whether it's possible to retry
the message.

Jason


>
>
>
> On Sat, Aug 24, 2013 at 9:54 AM, Jason Rosenberg <jb...@squareup.com> wrote:
>
> > Jun,
> >
> > Thanks, this is helpful.
> >
> > So, can QueueFullException occur in either sync or async mode (or just
> > async mode)?
> >
> > If there's a MessageSizeTooLargeException, is there any visibility of
> this
> > to the caller?  Or will it just be a FailedToSendMessageException.  I
> > gathered from one of your previous responses, that a
> > MessageSizeTooLargeException can be rectified with a smaller batch size.
> >  If so, does that imply that the message size limit is measured on the
> > broker by the cumulative size of the batch, and not of any one message?
> >  (makes sense if the broker doesn't unwrap a batch of messages before
> > storing on the server).
> >
> > If I want to implement guaranteed delivery semantics, using the new
> > request.required.acks configuration, I need to expose retry logic beyond
> > that built into the producer?  And to do this, I need to indicate to the
> > caller whether it's possible to retry, or whether it will be fruitless.
>  I
> > suppose allowing message.max.send.retries to allow infinite retries (e.g.
> > by setting it to -1) might be useful.  But optionally, I'd like the
> caller
> > to be able to handle this retry logic itself.
> >
> > Jason
> >
> >
> > On Sat, Aug 24, 2013 at 8:22 AM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > You don't need to restart the producer. The producer currently handles
> > all
> > > error/exceptions by refreshing the metadata and retrying. If it fails
> all
> > > retries, it throws a FailedToSendMessageException to the caller (in
> sync
> > > mode). The original cause is not included in this exception. We have
> > > thought about being a bit smarter in the producer retry logic such that
> > it
> > > only retries on recoverable errors and could implement this at some
> > point.
> > > Other than FailedToSendMessageException, the producer can also throw
> > > QueueFullException.
> > > This is an indication that the producer is sending data at a rate
> faster
> > > than the broker can handle. This may or may not be recoverable since it
> > > depends on the load.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Sat, Aug 24, 2013 at 1:44 AM, Jason Rosenberg <jb...@squareup.com>
> > wrote:
> > >
> > > > Jun,
> > > >
> > > > There are several others I've seen that I would have thought would be
> > > > retryable (possibly after an exponential backoff delay).  I'm curious
> > > > about:
> > > >
> > > > BrokerNotAvailableException
> > > > FailedToSendMessageException
> > > > QueueFullException (happens if producerType is 'async')
> > > > KafkaException (this seems to wrap lots of base conditions, does one
> > have
> > > > to sort through the different wrapped exception types?)
> > > > LeaderNotAvailableException
> > > > MessageSizeTooLargeException (does a batch of messages get treated
> as a
> > > > single message, when checking for message size too large?)
> > > > ReplicaNotAvailableException
> > > > UnavailableProducerException
> > > > UnknownException
> > > >
> > > > Also, what about my first question, regarding whether it makes sense
> to
> > > > refresh a producer by closing it and restarting it after a failure?
> > > >
> > > > Thanks,
> > > >
> > > > Jason
> > > >
> > > >
> > > > On Fri, Aug 23, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:
> > > >
> > > > > For the most part, only SocketExceptions and
> > > > NotLeaderForPartitionException
> > > > > are recoverable. MessageSizeTooLargeException may be recoverable
> > with a
> > > > > smaller batch size.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Fri, Aug 23, 2013 at 4:09 PM, Jason Rosenberg <jbr@squareup.com
> >
> > > > wrote:
> > > > >
> > > > > > I'm using the kafka.javaapi.producer.Producer class from a java
> > > client.
> > > > > >  I'm wondering if it ever makes sense to refresh a producer by
> > > stopping
> > > > > it
> > > > > > and creating a new one, for example in response to a downstream
> IO
> > > > error
> > > > > > (e.g. a broker got restarted, or a stale socket, etc.).
> > > > > >
> > > > > > Or should it always be safe to rely on the producer's
> > implementation
> > > to
> > > > > > manage it's pool of BlockingChannel connections, etc.
> > > > > >
> > > > > > I'm also interested in trying to understand which exceptions
> > > indicate a
> > > > > > failed send() request might be retryable (basically anything that
> > > > doesn't
> > > > > > involve a data-dependent problem, like a malformed message, or a
> > > > message
> > > > > > too large, etc.).
> > > > > >
> > > > > > Unfortunately, the range of Exceptions that can be thrown by the
> > > > various
> > > > > > javaapi methods is not yet well documented.  It would be nice to
> > have
> > > > > some
> > > > > > notion of whether an exception is the result of a data error, or
> a
> > > > > > transient downstream connection error, etc.
> > > > > >
> > > > > > Jason
> > > > > >
> > > > >
> > > >
> > >
> >
> So, can QueueFullException occur in either sync or async mode (or just
> async mode)?
>
> If there's a MessageSizeTooLargeException, is there any visibility of this
> to the caller?  Or will it just be a FailedToSendMessageException.  I
> gathered from one of your previous responses, that a
> MessageSizeTooLargeException can be rectified with a smaller batch size.
>  If so, does that imply that the message size limit is measured on the
> broker by the cumulative size of the batch, and not of any one message?
>  (makes sense if the broker doesn't unwrap a batch of messages before
> storing on the server).
>
> If I want to implement guaranteed delivery semantics, using the new
> request.required.acks configuration, I need to expose retry logic beyond
> that built into the producer?
>

Re: Producer.send questions

Posted by Neha Narkhede <ne...@gmail.com>.
>> So, can QueueFullException occur in either sync or async mode (or just async
mode)?

QueueFullException can only occur in async mode since there is no queue in
sync mode.

>> If there's a MessageSizeTooLargeException, is there any visibility of
thisto the caller?

The kafka producer should not retry on unrecoverable exceptions. I'm not
sure we do this
correctly right now. In any case, this is a bug that should be fixed inside
the kafka producer.

>> I gathered from one of your previous responses, that a MessageSizeTooLargeException
can be rectified with a smaller batch size.
 If so, does that imply that the message size limit is measured on the
broker by the cumulative size of the batch, and not of any one message?

That's right. The broker does the message size check on the compressed
message. The size of the compressed message
is proportional to the batch size. Hence, reducing the batch size on a
retry might make sense here, but currently the
producer doesn't do this.

>> If I want to implement guaranteed delivery semantics, using the new request.required.acks
configuration, I need to expose retry logic beyond
that built into the producer?

The kafka producer must handle recoverable exceptions with a configurable
number of retries and must not retry
on unrecoverable exceptions. So ideally you shouldn't have to write your
own batching and retry logic.

Thanks,
Neha


On Sat, Aug 24, 2013 at 9:54 AM, Jason Rosenberg <jb...@squareup.com> wrote:

> Jun,
>
> Thanks, this is helpful.
>
> So, can QueueFullException occur in either sync or async mode (or just
> async mode)?
>
> If there's a MessageSizeTooLargeException, is there any visibility of this
> to the caller?  Or will it just be a FailedToSendMessageException.  I
> gathered from one of your previous responses, that a
> MessageSizeTooLargeException can be rectified with a smaller batch size.
>  If so, does that imply that the message size limit is measured on the
> broker by the cumulative size of the batch, and not of any one message?
>  (makes sense if the broker doesn't unwrap a batch of messages before
> storing on the server).
>
> If I want to implement guaranteed delivery semantics, using the new
> request.required.acks configuration, I need to expose retry logic beyond
> that built into the producer?  And to do this, I need to indicate to the
> caller whether it's possible to retry, or whether it will be fruitless.  I
> suppose allowing message.max.send.retries to allow infinite retries (e.g.
> by setting it to -1) might be useful.  But optionally, I'd like the caller
> to be able to handle this retry logic itself.
>
> Jason
>
>
> On Sat, Aug 24, 2013 at 8:22 AM, Jun Rao <ju...@gmail.com> wrote:
>
> > You don't need to restart the producer. The producer currently handles
> all
> > error/exceptions by refreshing the metadata and retrying. If it fails all
> > retries, it throws a FailedToSendMessageException to the caller (in sync
> > mode). The original cause is not included in this exception. We have
> > thought about being a bit smarter in the producer retry logic such that
> it
> > only retries on recoverable errors and could implement this at some
> point.
> > Other than FailedToSendMessageException, the producer can also throw
> > QueueFullException.
> > This is an indication that the producer is sending data at a rate faster
> > than the broker can handle. This may or may not be recoverable since it
> > depends on the load.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Sat, Aug 24, 2013 at 1:44 AM, Jason Rosenberg <jb...@squareup.com>
> wrote:
> >
> > > Jun,
> > >
> > > There are several others I've seen that I would have thought would be
> > > retryable (possibly after an exponential backoff delay).  I'm curious
> > > about:
> > >
> > > BrokerNotAvailableException
> > > FailedToSendMessageException
> > > QueueFullException (happens if producerType is 'async')
> > > KafkaException (this seems to wrap lots of base conditions, does one
> have
> > > to sort through the different wrapped exception types?)
> > > LeaderNotAvailableException
> > > MessageSizeTooLargeException (does a batch of messages get treated as a
> > > single message, when checking for message size too large?)
> > > ReplicaNotAvailableException
> > > UnavailableProducerException
> > > UnknownException
> > >
> > > Also, what about my first question, regarding whether it makes sense to
> > > refresh a producer by closing it and restarting it after a failure?
> > >
> > > Thanks,
> > >
> > > Jason
> > >
> > >
> > > On Fri, Aug 23, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:
> > >
> > > > For the most part, only SocketExceptions and
> > > NotLeaderForPartitionException
> > > > are recoverable. MessageSizeTooLargeException may be recoverable
> with a
> > > > smaller batch size.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Fri, Aug 23, 2013 at 4:09 PM, Jason Rosenberg <jb...@squareup.com>
> > > wrote:
> > > >
> > > > > I'm using the kafka.javaapi.producer.Producer class from a java
> > client.
> > > > >  I'm wondering if it ever makes sense to refresh a producer by
> > stopping
> > > > it
> > > > > and creating a new one, for example in response to a downstream IO
> > > error
> > > > > (e.g. a broker got restarted, or a stale socket, etc.).
> > > > >
> > > > > Or should it always be safe to rely on the producer's
> implementation
> > to
> > > > > manage it's pool of BlockingChannel connections, etc.
> > > > >
> > > > > I'm also interested in trying to understand which exceptions
> > indicate a
> > > > > failed send() request might be retryable (basically anything that
> > > doesn't
> > > > > involve a data-dependent problem, like a malformed message, or a
> > > message
> > > > > too large, etc.).
> > > > >
> > > > > Unfortunately, the range of Exceptions that can be thrown by the
> > > various
> > > > > javaapi methods is not yet well documented.  It would be nice to
> have
> > > > some
> > > > > notion of whether an exception is the result of a data error, or a
> > > > > transient downstream connection error, etc.
> > > > >
> > > > > Jason
> > > > >
> > > >
> > >
> >
>
So, can QueueFullException occur in either sync or async mode (or just
async mode)?

If there's a MessageSizeTooLargeException, is there any visibility of this
to the caller?  Or will it just be a FailedToSendMessageException.  I
gathered from one of your previous responses, that a
MessageSizeTooLargeException can be rectified with a smaller batch size.
 If so, does that imply that the message size limit is measured on the
broker by the cumulative size of the batch, and not of any one message?
 (makes sense if the broker doesn't unwrap a batch of messages before
storing on the server).

If I want to implement guaranteed delivery semantics, using the new
request.required.acks configuration, I need to expose retry logic beyond
that built into the producer?

Re: Producer.send questions

Posted by Jason Rosenberg <jb...@squareup.com>.
Jun,

Thanks, this is helpful.

So, can QueueFullException occur in either sync or async mode (or just
async mode)?

If there's a MessageSizeTooLargeException, is there any visibility of this
to the caller?  Or will it just be a FailedToSendMessageException.  I
gathered from one of your previous responses, that a
MessageSizeTooLargeException can be rectified with a smaller batch size.
 If so, does that imply that the message size limit is measured on the
broker by the cumulative size of the batch, and not of any one message?
 (makes sense if the broker doesn't unwrap a batch of messages before
storing on the server).

If I want to implement guaranteed delivery semantics, using the new
request.required.acks configuration, I need to expose retry logic beyond
that built into the producer?  And to do this, I need to indicate to the
caller whether it's possible to retry, or whether it will be fruitless.  I
suppose allowing message.max.send.retries to allow infinite retries (e.g.
by setting it to -1) might be useful.  But optionally, I'd like the caller
to be able to handle this retry logic itself.

Jason


On Sat, Aug 24, 2013 at 8:22 AM, Jun Rao <ju...@gmail.com> wrote:

> You don't need to restart the producer. The producer currently handles all
> error/exceptions by refreshing the metadata and retrying. If it fails all
> retries, it throws a FailedToSendMessageException to the caller (in sync
> mode). The original cause is not included in this exception. We have
> thought about being a bit smarter in the producer retry logic such that it
> only retries on recoverable errors and could implement this at some point.
> Other than FailedToSendMessageException, the producer can also throw
> QueueFullException.
> This is an indication that the producer is sending data at a rate faster
> than the broker can handle. This may or may not be recoverable since it
> depends on the load.
>
> Thanks,
>
> Jun
>
>
> On Sat, Aug 24, 2013 at 1:44 AM, Jason Rosenberg <jb...@squareup.com> wrote:
>
> > Jun,
> >
> > There are several others I've seen that I would have thought would be
> > retryable (possibly after an exponential backoff delay).  I'm curious
> > about:
> >
> > BrokerNotAvailableException
> > FailedToSendMessageException
> > QueueFullException (happens if producerType is 'async')
> > KafkaException (this seems to wrap lots of base conditions, does one have
> > to sort through the different wrapped exception types?)
> > LeaderNotAvailableException
> > MessageSizeTooLargeException (does a batch of messages get treated as a
> > single message, when checking for message size too large?)
> > ReplicaNotAvailableException
> > UnavailableProducerException
> > UnknownException
> >
> > Also, what about my first question, regarding whether it makes sense to
> > refresh a producer by closing it and restarting it after a failure?
> >
> > Thanks,
> >
> > Jason
> >
> >
> > On Fri, Aug 23, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > For the most part, only SocketExceptions and
> > NotLeaderForPartitionException
> > > are recoverable. MessageSizeTooLargeException may be recoverable with a
> > > smaller batch size.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Fri, Aug 23, 2013 at 4:09 PM, Jason Rosenberg <jb...@squareup.com>
> > wrote:
> > >
> > > > I'm using the kafka.javaapi.producer.Producer class from a java
> client.
> > > >  I'm wondering if it ever makes sense to refresh a producer by
> stopping
> > > it
> > > > and creating a new one, for example in response to a downstream IO
> > error
> > > > (e.g. a broker got restarted, or a stale socket, etc.).
> > > >
> > > > Or should it always be safe to rely on the producer's implementation
> to
> > > > manage it's pool of BlockingChannel connections, etc.
> > > >
> > > > I'm also interested in trying to understand which exceptions
> indicate a
> > > > failed send() request might be retryable (basically anything that
> > doesn't
> > > > involve a data-dependent problem, like a malformed message, or a
> > message
> > > > too large, etc.).
> > > >
> > > > Unfortunately, the range of Exceptions that can be thrown by the
> > various
> > > > javaapi methods is not yet well documented.  It would be nice to have
> > > some
> > > > notion of whether an exception is the result of a data error, or a
> > > > transient downstream connection error, etc.
> > > >
> > > > Jason
> > > >
> > >
> >
>

Re: Producer.send questions

Posted by Jun Rao <ju...@gmail.com>.
You don't need to restart the producer. The producer currently handles all
error/exceptions by refreshing the metadata and retrying. If it fails all
retries, it throws a FailedToSendMessageException to the caller (in sync
mode). The original cause is not included in this exception. We have
thought about being a bit smarter in the producer retry logic such that it
only retries on recoverable errors and could implement this at some point.
Other than FailedToSendMessageException, the producer can also throw
QueueFullException.
This is an indication that the producer is sending data at a rate faster
than the broker can handle. This may or may not be recoverable since it
depends on the load.

Thanks,

Jun


On Sat, Aug 24, 2013 at 1:44 AM, Jason Rosenberg <jb...@squareup.com> wrote:

> Jun,
>
> There are several others I've seen that I would have thought would be
> retryable (possibly after an exponential backoff delay).  I'm curious
> about:
>
> BrokerNotAvailableException
> FailedToSendMessageException
> QueueFullException (happens if producerType is 'async')
> KafkaException (this seems to wrap lots of base conditions, does one have
> to sort through the different wrapped exception types?)
> LeaderNotAvailableException
> MessageSizeTooLargeException (does a batch of messages get treated as a
> single message, when checking for message size too large?)
> ReplicaNotAvailableException
> UnavailableProducerException
> UnknownException
>
> Also, what about my first question, regarding whether it makes sense to
> refresh a producer by closing it and restarting it after a failure?
>
> Thanks,
>
> Jason
>
>
> On Fri, Aug 23, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > For the most part, only SocketExceptions and
> NotLeaderForPartitionException
> > are recoverable. MessageSizeTooLargeException may be recoverable with a
> > smaller batch size.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Fri, Aug 23, 2013 at 4:09 PM, Jason Rosenberg <jb...@squareup.com>
> wrote:
> >
> > > I'm using the kafka.javaapi.producer.Producer class from a java client.
> > >  I'm wondering if it ever makes sense to refresh a producer by stopping
> > it
> > > and creating a new one, for example in response to a downstream IO
> error
> > > (e.g. a broker got restarted, or a stale socket, etc.).
> > >
> > > Or should it always be safe to rely on the producer's implementation to
> > > manage it's pool of BlockingChannel connections, etc.
> > >
> > > I'm also interested in trying to understand which exceptions indicate a
> > > failed send() request might be retryable (basically anything that
> doesn't
> > > involve a data-dependent problem, like a malformed message, or a
> message
> > > too large, etc.).
> > >
> > > Unfortunately, the range of Exceptions that can be thrown by the
> various
> > > javaapi methods is not yet well documented.  It would be nice to have
> > some
> > > notion of whether an exception is the result of a data error, or a
> > > transient downstream connection error, etc.
> > >
> > > Jason
> > >
> >
>

Re: Producer.send questions

Posted by Jason Rosenberg <jb...@squareup.com>.
Jun,

There are several others I've seen that I would have thought would be
retryable (possibly after an exponential backoff delay).  I'm curious about:

BrokerNotAvailableException
FailedToSendMessageException
QueueFullException (happens if producerType is 'async')
KafkaException (this seems to wrap lots of base conditions, does one have
to sort through the different wrapped exception types?)
LeaderNotAvailableException
MessageSizeTooLargeException (does a batch of messages get treated as a
single message, when checking for message size too large?)
ReplicaNotAvailableException
UnavailableProducerException
UnknownException

Also, what about my first question, regarding whether it makes sense to
refresh a producer by closing it and restarting it after a failure?

Thanks,

Jason


On Fri, Aug 23, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:

> For the most part, only SocketExceptions and NotLeaderForPartitionException
> are recoverable. MessageSizeTooLargeException may be recoverable with a
> smaller batch size.
>
> Thanks,
>
> Jun
>
>
> On Fri, Aug 23, 2013 at 4:09 PM, Jason Rosenberg <jb...@squareup.com> wrote:
>
> > I'm using the kafka.javaapi.producer.Producer class from a java client.
> >  I'm wondering if it ever makes sense to refresh a producer by stopping
> it
> > and creating a new one, for example in response to a downstream IO error
> > (e.g. a broker got restarted, or a stale socket, etc.).
> >
> > Or should it always be safe to rely on the producer's implementation to
> > manage it's pool of BlockingChannel connections, etc.
> >
> > I'm also interested in trying to understand which exceptions indicate a
> > failed send() request might be retryable (basically anything that doesn't
> > involve a data-dependent problem, like a malformed message, or a message
> > too large, etc.).
> >
> > Unfortunately, the range of Exceptions that can be thrown by the various
> > javaapi methods is not yet well documented.  It would be nice to have
> some
> > notion of whether an exception is the result of a data error, or a
> > transient downstream connection error, etc.
> >
> > Jason
> >
>

Re: Producer.send questions

Posted by Jun Rao <ju...@gmail.com>.
For the most part, only SocketExceptions and NotLeaderForPartitionException
are recoverable. MessageSizeTooLargeException may be recoverable with a
smaller batch size.

Thanks,

Jun


On Fri, Aug 23, 2013 at 4:09 PM, Jason Rosenberg <jb...@squareup.com> wrote:

> I'm using the kafka.javaapi.producer.Producer class from a java client.
>  I'm wondering if it ever makes sense to refresh a producer by stopping it
> and creating a new one, for example in response to a downstream IO error
> (e.g. a broker got restarted, or a stale socket, etc.).
>
> Or should it always be safe to rely on the producer's implementation to
> manage it's pool of BlockingChannel connections, etc.
>
> I'm also interested in trying to understand which exceptions indicate a
> failed send() request might be retryable (basically anything that doesn't
> involve a data-dependent problem, like a malformed message, or a message
> too large, etc.).
>
> Unfortunately, the range of Exceptions that can be thrown by the various
> javaapi methods is not yet well documented.  It would be nice to have some
> notion of whether an exception is the result of a data error, or a
> transient downstream connection error, etc.
>
> Jason
>