You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Gaurav Agarwal <ga...@gmail.com> on 2016/07/29 06:39:01 UTC

Samza retries on kafka exception

Hi All,

We are using Samza (0.10.0) in our system and recently ran into a problem
where due to Kafka broker being unstable for few moments, our samza tasks
while trying to write message to kafka got exceptions. After that moment,
they went into a very long retry loop (Integer.MAX times).

The repeated warning lines we are getting in container logs are:
*.*
*.*

*WARN [2016-05-23
06:41:36,645] [U:260,F:293,T:552,M:2,267]
producer.internals.Sender:[Sender:completeBatch:257] -
[kafka-producer-network-thread
| samza_producer-job4-1-1463686278936-2] - Got error produce response with
correlation id 5888322 on topic-partition Topic3-0, retrying (2144537752
attempts left). Error: CORRUPT_MESSAGE*
*.*
*.*

We experimented with setting the kafka producer 'retries' configuration to
a smaller number but it appears that samza does not permit overriding this
parameter. On top of it there is some additional Samza level retry logic to
re-send the message if kafka errored with a 'RetriableException'

May I know what is the reason for disallowing this override? Additionally,
what is the recommended way to handle such situations?

I would have thought that a possible policy would be that if after K
(configured by user) kafka retries, samza-kafka was still unable to send
the message, it could have thrown an exception out to the user land and let
the user determine what is to be done - in our case we would have chosen to
kill the container and have yarn samza app master request for a new one
from Yarn.

There seem to be at-least a couple of bugs related to this already open


   1. https://issues.apache.org/jira/browse/SAMZA-610
   2. https://issues.apache.org/jira/browse/SAMZA-911


cheers,
gaurav

Re: Samza retries on kafka exception

Posted by Gaurav Agarwal <ga...@gmail.com>.
Thanks Jagdish. I realized that Samza-911 attempted to fix this but was
unclear on the reasons for attempting 30 retries from Samza send() method
when kafka had already retried enough times. In any case that is much
better than 0.9 or 0.10 !

Although, in current master, it appears that the retry logic from Samza
space is completely removed - which is great.

Could you please point me to Samza 0.10.1 release (or branch or tag)? I
could not locate it on github.


On Fri, Jul 29, 2016 at 5:47 PM, Jagadish Venkatraman <
jagadish1989@gmail.com> wrote:

> Heya Gaurav,
>
> Thanks for the diligent observation and walk through. We ran into a similar
> issue at LinkedIn and addressed it in SAMZA-911.
>
> I think the 0.10.1 release had this fix.
>
> Thanks
> Jagdish
>
> On Friday, July 29, 2016, Gaurav Agarwal <ga...@gmail.com> wrote:
>
> > More debugging notes and questions:
> >
> > In Samza 0.10, the `retries` parameter is honoured and passed to kafka;
> > however, Samza itself retries sending failed the message in case the
> > exception is an instance of RetriableException:
> > ```
> > class KafkaSystemProducer
> >
> > (exception, loop) => {
> >   if(exception != null && !exception.isInstanceOf[RetriableException])
> > {   // Exception is thrown & not retriable
> >     debug("Exception detail : ", exception)
> >     //Close producer
> >     stop()
> >     producer = null
> >     //Mark loop as done as we are not going to retry
> >     loop.done
> >     metrics.sendFailed.inc
> >     throw new SamzaException("Failed to send message. Exception:\n
> > %s".format(exception))
> >   } else {
> >     warn("Retrying send messsage due to RetriableException - %s. Turn
> > on debugging to get a full stack trace".format(exception))
> >     debug("Exception detail:", exception)
> >     metrics.retries.inc
> >   }
> > }
> >
> > ```
> >
> > on checking kafka codebase, it appears that kafka client only retries if
> > the exception being thrown is instance of RetriableException.
> > ```
> >
> > class Sender
> >
> > private boolean canRetry(RecordBatch batch, Errors error) {
> >     return batch.attempts < this.retries && error.exception()
> > instanceof RetriableException;
> > }
> >
> > ```
> > And on failing, it returns the exception as it is to the caller.
> >
> > So, if I understand correctly, Samza will retry this message with Kafka
> and
> > unless the problem that caused this in first place has resolved by now,
> > this will be an infinite attempt loop.
> >
> > Have I understood this correctly? and if so, should user be given a hook
> > point to control the behavior on underlying failures?
> >
> > On Fri, Jul 29, 2016 at 5:07 PM, Gaurav Agarwal <
> gauravagarwal4@gmail.com
> > <javascript:;>>
> > wrote:
> >
> > > (correction: we are using samza 0.9.0)
> > >
> > > On Fri, Jul 29, 2016 at 12:09 PM, Gaurav Agarwal <
> > gauravagarwal4@gmail.com <javascript:;>
> > > > wrote:
> > >
> > >> Hi All,
> > >>
> > >> We are using Samza (0.10.0) in our system and recently ran into a
> > >> problem where due to Kafka broker being unstable for few moments, our
> > samza
> > >> tasks while trying to write message to kafka got exceptions. After
> that
> > >> moment, they went into a very long retry loop (Integer.MAX times).
> > >>
> > >> The repeated warning lines we are getting in container logs are:
> > >> *.*
> > >> *.*
> > >>
> > >> *WARN [2016-05-23
> > >> 06:41:36,645] [U:260,F:293,T:552,M:2,267]
> > producer.internals.Sender:[Sender:completeBatch:257] -
> > [kafka-producer-network-thread
> > >> | samza_producer-job4-1-1463686278936-2] - Got error produce response
> > with
> > >> correlation id 5888322 on topic-partition Topic3-0, retrying
> (2144537752
> > >> <%282144537752> attempts left). Error: CORRUPT_MESSAGE*
> > >> *.*
> > >> *.*
> > >>
> > >> We experimented with setting the kafka producer 'retries'
> configuration
> > >> to a smaller number but it appears that samza does not permit
> overriding
> > >> this parameter. On top of it there is some additional Samza level
> retry
> > >> logic to re-send the message if kafka errored with a
> > 'RetriableException'
> > >>
> > >> May I know what is the reason for disallowing this override?
> > >> Additionally, what is the recommended way to handle such situations?
> > >>
> > >> I would have thought that a possible policy would be that if after K
> > >> (configured by user) kafka retries, samza-kafka was still unable to
> send
> > >> the message, it could have thrown an exception out to the user land
> and
> > let
> > >> the user determine what is to be done - in our case we would have
> > chosen to
> > >> kill the container and have yarn samza app master request for a new
> one
> > >> from Yarn.
> > >>
> > >> There seem to be at-least a couple of bugs related to this already
> open
> > >>
> > >>
> > >>    1. https://issues.apache.org/jira/browse/SAMZA-610
> > >>    2. https://issues.apache.org/jira/browse/SAMZA-911
> > >>
> > >>
> > >> cheers,
> > >> gaurav
> > >>
> > >>
> > >
> >
>
>
> --
> Sent from my iphone.
>

Re: Samza retries on kafka exception

Posted by Jagadish Venkatraman <ja...@gmail.com>.
Heya Gaurav,

Thanks for the diligent observation and walk through. We ran into a similar
issue at LinkedIn and addressed it in SAMZA-911.

I think the 0.10.1 release had this fix.

Thanks
Jagdish

On Friday, July 29, 2016, Gaurav Agarwal <ga...@gmail.com> wrote:

> More debugging notes and questions:
>
> In Samza 0.10, the `retries` parameter is honoured and passed to kafka;
> however, Samza itself retries sending failed the message in case the
> exception is an instance of RetriableException:
> ```
> class KafkaSystemProducer
>
> (exception, loop) => {
>   if(exception != null && !exception.isInstanceOf[RetriableException])
> {   // Exception is thrown & not retriable
>     debug("Exception detail : ", exception)
>     //Close producer
>     stop()
>     producer = null
>     //Mark loop as done as we are not going to retry
>     loop.done
>     metrics.sendFailed.inc
>     throw new SamzaException("Failed to send message. Exception:\n
> %s".format(exception))
>   } else {
>     warn("Retrying send messsage due to RetriableException - %s. Turn
> on debugging to get a full stack trace".format(exception))
>     debug("Exception detail:", exception)
>     metrics.retries.inc
>   }
> }
>
> ```
>
> on checking kafka codebase, it appears that kafka client only retries if
> the exception being thrown is instance of RetriableException.
> ```
>
> class Sender
>
> private boolean canRetry(RecordBatch batch, Errors error) {
>     return batch.attempts < this.retries && error.exception()
> instanceof RetriableException;
> }
>
> ```
> And on failing, it returns the exception as it is to the caller.
>
> So, if I understand correctly, Samza will retry this message with Kafka and
> unless the problem that caused this in first place has resolved by now,
> this will be an infinite attempt loop.
>
> Have I understood this correctly? and if so, should user be given a hook
> point to control the behavior on underlying failures?
>
> On Fri, Jul 29, 2016 at 5:07 PM, Gaurav Agarwal <gauravagarwal4@gmail.com
> <javascript:;>>
> wrote:
>
> > (correction: we are using samza 0.9.0)
> >
> > On Fri, Jul 29, 2016 at 12:09 PM, Gaurav Agarwal <
> gauravagarwal4@gmail.com <javascript:;>
> > > wrote:
> >
> >> Hi All,
> >>
> >> We are using Samza (0.10.0) in our system and recently ran into a
> >> problem where due to Kafka broker being unstable for few moments, our
> samza
> >> tasks while trying to write message to kafka got exceptions. After that
> >> moment, they went into a very long retry loop (Integer.MAX times).
> >>
> >> The repeated warning lines we are getting in container logs are:
> >> *.*
> >> *.*
> >>
> >> *WARN [2016-05-23
> >> 06:41:36,645] [U:260,F:293,T:552,M:2,267]
> producer.internals.Sender:[Sender:completeBatch:257] -
> [kafka-producer-network-thread
> >> | samza_producer-job4-1-1463686278936-2] - Got error produce response
> with
> >> correlation id 5888322 on topic-partition Topic3-0, retrying (2144537752
> >> <%282144537752> attempts left). Error: CORRUPT_MESSAGE*
> >> *.*
> >> *.*
> >>
> >> We experimented with setting the kafka producer 'retries' configuration
> >> to a smaller number but it appears that samza does not permit overriding
> >> this parameter. On top of it there is some additional Samza level retry
> >> logic to re-send the message if kafka errored with a
> 'RetriableException'
> >>
> >> May I know what is the reason for disallowing this override?
> >> Additionally, what is the recommended way to handle such situations?
> >>
> >> I would have thought that a possible policy would be that if after K
> >> (configured by user) kafka retries, samza-kafka was still unable to send
> >> the message, it could have thrown an exception out to the user land and
> let
> >> the user determine what is to be done - in our case we would have
> chosen to
> >> kill the container and have yarn samza app master request for a new one
> >> from Yarn.
> >>
> >> There seem to be at-least a couple of bugs related to this already open
> >>
> >>
> >>    1. https://issues.apache.org/jira/browse/SAMZA-610
> >>    2. https://issues.apache.org/jira/browse/SAMZA-911
> >>
> >>
> >> cheers,
> >> gaurav
> >>
> >>
> >
>


-- 
Sent from my iphone.

Re: Samza retries on kafka exception

Posted by Gaurav Agarwal <ga...@gmail.com>.
More debugging notes and questions:

In Samza 0.10, the `retries` parameter is honoured and passed to kafka;
however, Samza itself retries sending failed the message in case the
exception is an instance of RetriableException:
```
class KafkaSystemProducer

(exception, loop) => {
  if(exception != null && !exception.isInstanceOf[RetriableException])
{   // Exception is thrown & not retriable
    debug("Exception detail : ", exception)
    //Close producer
    stop()
    producer = null
    //Mark loop as done as we are not going to retry
    loop.done
    metrics.sendFailed.inc
    throw new SamzaException("Failed to send message. Exception:\n
%s".format(exception))
  } else {
    warn("Retrying send messsage due to RetriableException - %s. Turn
on debugging to get a full stack trace".format(exception))
    debug("Exception detail:", exception)
    metrics.retries.inc
  }
}

```

on checking kafka codebase, it appears that kafka client only retries if
the exception being thrown is instance of RetriableException.
```

class Sender

private boolean canRetry(RecordBatch batch, Errors error) {
    return batch.attempts < this.retries && error.exception()
instanceof RetriableException;
}

```
And on failing, it returns the exception as it is to the caller.

So, if I understand correctly, Samza will retry this message with Kafka and
unless the problem that caused this in first place has resolved by now,
this will be an infinite attempt loop.

Have I understood this correctly? and if so, should user be given a hook
point to control the behavior on underlying failures?

On Fri, Jul 29, 2016 at 5:07 PM, Gaurav Agarwal <ga...@gmail.com>
wrote:

> (correction: we are using samza 0.9.0)
>
> On Fri, Jul 29, 2016 at 12:09 PM, Gaurav Agarwal <gauravagarwal4@gmail.com
> > wrote:
>
>> Hi All,
>>
>> We are using Samza (0.10.0) in our system and recently ran into a
>> problem where due to Kafka broker being unstable for few moments, our samza
>> tasks while trying to write message to kafka got exceptions. After that
>> moment, they went into a very long retry loop (Integer.MAX times).
>>
>> The repeated warning lines we are getting in container logs are:
>> *.*
>> *.*
>>
>> *WARN [2016-05-23
>> 06:41:36,645] [U:260,F:293,T:552,M:2,267] producer.internals.Sender:[Sender:completeBatch:257] - [kafka-producer-network-thread
>> | samza_producer-job4-1-1463686278936-2] - Got error produce response with
>> correlation id 5888322 on topic-partition Topic3-0, retrying (2144537752
>> <%282144537752> attempts left). Error: CORRUPT_MESSAGE*
>> *.*
>> *.*
>>
>> We experimented with setting the kafka producer 'retries' configuration
>> to a smaller number but it appears that samza does not permit overriding
>> this parameter. On top of it there is some additional Samza level retry
>> logic to re-send the message if kafka errored with a 'RetriableException'
>>
>> May I know what is the reason for disallowing this override?
>> Additionally, what is the recommended way to handle such situations?
>>
>> I would have thought that a possible policy would be that if after K
>> (configured by user) kafka retries, samza-kafka was still unable to send
>> the message, it could have thrown an exception out to the user land and let
>> the user determine what is to be done - in our case we would have chosen to
>> kill the container and have yarn samza app master request for a new one
>> from Yarn.
>>
>> There seem to be at-least a couple of bugs related to this already open
>>
>>
>>    1. https://issues.apache.org/jira/browse/SAMZA-610
>>    2. https://issues.apache.org/jira/browse/SAMZA-911
>>
>>
>> cheers,
>> gaurav
>>
>>
>

Re: Samza retries on kafka exception

Posted by Gaurav Agarwal <ga...@gmail.com>.
(correction: we are using samza 0.9.0)

On Fri, Jul 29, 2016 at 12:09 PM, Gaurav Agarwal <ga...@gmail.com>
wrote:

> Hi All,
>
> We are using Samza (0.10.0) in our system and recently ran into a problem
> where due to Kafka broker being unstable for few moments, our samza tasks
> while trying to write message to kafka got exceptions. After that moment,
> they went into a very long retry loop (Integer.MAX times).
>
> The repeated warning lines we are getting in container logs are:
> *.*
> *.*
>
> *WARN [2016-05-23
> 06:41:36,645] [U:260,F:293,T:552,M:2,267] producer.internals.Sender:[Sender:completeBatch:257] - [kafka-producer-network-thread
> | samza_producer-job4-1-1463686278936-2] - Got error produce response with
> correlation id 5888322 on topic-partition Topic3-0, retrying (2144537752
> <%282144537752> attempts left). Error: CORRUPT_MESSAGE*
> *.*
> *.*
>
> We experimented with setting the kafka producer 'retries' configuration to
> a smaller number but it appears that samza does not permit overriding this
> parameter. On top of it there is some additional Samza level retry logic to
> re-send the message if kafka errored with a 'RetriableException'
>
> May I know what is the reason for disallowing this override? Additionally,
> what is the recommended way to handle such situations?
>
> I would have thought that a possible policy would be that if after K
> (configured by user) kafka retries, samza-kafka was still unable to send
> the message, it could have thrown an exception out to the user land and let
> the user determine what is to be done - in our case we would have chosen to
> kill the container and have yarn samza app master request for a new one
> from Yarn.
>
> There seem to be at-least a couple of bugs related to this already open
>
>
>    1. https://issues.apache.org/jira/browse/SAMZA-610
>    2. https://issues.apache.org/jira/browse/SAMZA-911
>
>
> cheers,
> gaurav
>
>