You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Mevada, Vatsal" <Me...@sky.optymyze.com> on 2016/12/01 13:22:43 UTC

Detecting when all the retries are expired for a message

Hi,



I am reading a file and dumping each record on Kafka. Here is my producer code:



public void produce(String topicName, String filePath, String bootstrapServers, String encoding) {

                try (BufferedReader bf = getBufferedReader(filePath, encoding);

                                KafkaProducer<Object, String> producer = initKafkaProducer(bootstrapServers)) {

                                String line;

                                while ((line = bf.readLine()) != null) {

                                                producer.send(new ProducerRecord<>(topicName, line), (metadata, e) -> {

                                                                if (e != null) {

                                                                                e.printStackTrace();

                                                                }

                                                });

                                }

                                producer.flush();

                } catch (IOException e) {

                                Throwables.propagate(e);

                }

}



private static KafkaProducer<Object, String> initKafkaProducer(String bootstrapServer) {

                Properties properties = new Properties();

                properties.put("bootstrap.servers", bootstrapServer);

                properties.put("key.serializer", StringSerializer.class.getCanonicalName());

                properties.put("value.serializer", StringSerializer.class.getCanonicalName());

                properties.put("acks", "-1");

                properties.put("retries", 10);

                return new KafkaProducer<>(properties);

}



private BufferedReader getBufferedReader(String filePath, String encoding) throws UnsupportedEncodingException, FileNotFoundException {

                return new BufferedReader(new InputStreamReader(new FileInputStream(filePath), Optional.ofNullable(encoding).orElse("UTF-8")));

}



As per the official documentation of Callback<https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>, TimeoutException is a retriable exception. As I have kept retries 10, producer will try to resend the message if delivering some message fails with TimeoutException. I am looking for some reliable to way to detect when delivery of a message is failed permanently after all retries.



Regards,

Vatsal

Re: Detecting when all the retries are expired for a message

Posted by Ismael Juma <is...@juma.me.uk>.
Hi Asaf,

That PR is for the backport to 0.9.0.x, the original change was merged to
trunk and is in 0.10.x.x.

Ismael

On Tue, Dec 6, 2016 at 10:10 AM, Asaf Mesika <as...@gmail.com> wrote:

> Vatsal:
>
> I don't think they merged the fix for this bug (retries doesn't work) in
> 0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547
>
>
> On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal <Me...@sky.optymyze.com>
> wrote:
>
> > Hello,
> >
> > Bumping up this thread in case anyone of you have any say on this issue.
> >
> > Regards,
> > Vatsal
> >
> > -----Original Message-----
> > From: Mevada, Vatsal
> > Sent: 02 December 2016 16:16
> > To: Kafka Users <us...@kafka.apache.org>
> > Subject: RE: Detecting when all the retries are expired for a message
> >
> > I executed the same producer code for a single record file with following
> > config:
> >
> >         properties.put("bootstrap.servers", bootstrapServer);
> >         properties.put("key.serializer",
> > StringSerializer.class.getCanonicalName());
> >         properties.put("value.serializer",
> > StringSerializer.class.getCanonicalName());
> >         properties.put("acks", "-1");
> >         properties.put("retries", 50000);
> >         properties.put("request.timeout.ms", 1);
> >
> > I have kept request.timeout.ms=1 to make sure that message delivery will
> > fail with TimeoutException. Since the retries are 50000 then the program
> > should take at-least 50000 ms (50 seconds) to complete for single record.
> > However the program is completing almost instantly with only one callback
> > with TimeoutException. I suspect that producer is not going for any
> > retries. Or am I missing something in my code?
> >
> > My Kafka version is 0.10.0.1.
> >
> > Regards,
> > Vatsal
> > Am I missing any configuration or
> > -----Original Message-----
> > From: Ismael Juma [mailto:ismaelj@gmail.com]
> > Sent: 02 December 2016 13:30
> > To: Kafka Users <us...@kafka.apache.org>
> > Subject: RE: Detecting when all the retries are expired for a message
> >
> > The callback is called after the retries have been exhausted.
> >
> > Ismael
> >
> > On 2 Dec 2016 3:34 am, "Mevada, Vatsal" <Me...@sky.optymyze.com> wrote:
> >
> > > @Ismael:
> > >
> > > I can handle TimeoutException in the callback. However as per the
> > > documentation of Callback(link: https://kafka.apache.org/0100/
> > > javadoc/org/apache/kafka/clients/producer/Callback.html),
> > > TimeoutException is a retriable exception and it says that it "may be
> > > covered by increasing #.retries". So even if I get TimeoutException in
> > > callback, wouldn't it try to send message again until all the retries
> > > are done? Would it be safe to assume that message delivery is failed
> > > permanently just by encountering TimeoutException in callback?
> > >
> > > Here is a snippet from above mentioned documentation:
> > > "exception - The exception thrown during processing of this record.
> > > Null if no error occurred. Possible thrown exceptions include:
> > > Non-Retriable exceptions (fatal, the message will never be sent):
> > > InvalidTopicException OffsetMetadataTooLargeException
> > > RecordBatchTooLargeException RecordTooLargeException
> > > UnknownServerException Retriable exceptions (transient, may be covered
> > > by increasing #.retries): CorruptRecordException
> > > InvalidMetadataException NotEnoughReplicasAfterAppendException
> > > NotEnoughReplicasException OffsetOutOfRangeException TimeoutException
> > > UnknownTopicOrPartitionException"
> > >
> > > @asaf :My kafka - API version is 0.10.0.1. So I think I should not
> > > face the issue that you are mentioning. I mentioned documentation link
> > > of 0.9 by mistake.
> > >
> > > Regards,
> > > Vatsal
> > > -----Original Message-----
> > > From: Asaf Mesika [mailto:asaf.mesika@gmail.com]
> > > Sent: 02 December 2016 00:32
> > > To: Kafka Users <us...@kafka.apache.org>
> > > Subject: Re: Detecting when all the retries are expired for a message
> > >
> > > There's a critical bug in that section that has only been fixed in
> > > 0.9.0.2 which has not been release yet. Without the fix it doesn't
> > really retry.
> > > I forked the kafka repo, applied the fix, built it and placed it in
> > > our own Nexus Maven repository until 0.9.0.2 will be released.
> > >
> > > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
> > >
> > > Feel free to use it.
> > >
> > > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <is...@juma.me.uk> wrote:
> > >
> > > > The callback should give you what you are asking for. Has it not
> > > > worked as you expect when you tried it?
> > > >
> > > > Ismael
> > > >
> > > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal
> > > > <Me...@sky.optymyze.com>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > >
> > > > >
> > > > > I am reading a file and dumping each record on Kafka. Here is my
> > > > > producer
> > > > > code:
> > > > >
> > > > >
> > > > >
> > > > > public void produce(String topicName, String filePath, String
> > > > > bootstrapServers, String encoding) {
> > > > >
> > > > >                 try (BufferedReader bf =
> > > > > getBufferedReader(filePath, encoding);
> > > > >
> > > > >                                 KafkaProducer<Object, String>
> > > > > producer =
> > > > > initKafkaProducer(bootstrapServers)) {
> > > > >
> > > > >                                 String line;
> > > > >
> > > > >                                 while ((line = bf.readLine()) !=
> > > > > null) {
> > > > >
> > > > >                                                 producer.send(new
> > > > > ProducerRecord<>(topicName, line), (metadata, e) -> {
> > > > >
> > > > >                                                                 if
> > > > > (e !=
> > > > > null) {
> > > > >
> > > > >
> > > > >       e.printStackTrace();
> > > > >
> > > > >                                                                 }
> > > > >
> > > > >                                                 });
> > > > >
> > > > >                                 }
> > > > >
> > > > >                                 producer.flush();
> > > > >
> > > > >                 } catch (IOException e) {
> > > > >
> > > > >                                 Throwables.propagate(e);
> > > > >
> > > > >                 }
> > > > >
> > > > > }
> > > > >
> > > > >
> > > > >
> > > > > private static KafkaProducer<Object, String>
> > > > > initKafkaProducer(String
> > > > > bootstrapServer) {
> > > > >
> > > > >                 Properties properties = new Properties();
> > > > >
> > > > >                 properties.put("bootstrap.servers",
> > > > > bootstrapServer);
> > > > >
> > > > >                 properties.put("key.serializer",
> > > StringSerializer.class.
> > > > > getCanonicalName());
> > > > >
> > > > >                 properties.put("value.serializer",
> > > > StringSerializer.class.
> > > > > getCanonicalName());
> > > > >
> > > > >                 properties.put("acks", "-1");
> > > > >
> > > > >                 properties.put("retries", 10);
> > > > >
> > > > >                 return new KafkaProducer<>(properties);
> > > > >
> > > > > }
> > > > >
> > > > >
> > > > >
> > > > > private BufferedReader getBufferedReader(String filePath, String
> > > > encoding)
> > > > > throws UnsupportedEncodingException, FileNotFoundException {
> > > > >
> > > > >                 return new BufferedReader(new
> > > > > InputStreamReader(new FileInputStream(filePath),
> > Optional.ofNullable(encoding).
> > > > > orElse("UTF-8")));
> > > > >
> > > > > }
> > > > >
> > > > >
> > > > >
> > > > > As per the official documentation of Callback<https://kafka.apache
> .
> > > > > org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
> > > > > TimeoutException is a retriable exception. As I have kept retries
> > > > > 10, producer will try to resend the message if delivering some
> > > > > message fails with TimeoutException. I am looking for some
> > > > > reliable to way to detect
> > > > when
> > > > > delivery of a message is failed permanently after all retries.
> > > > >
> > > > >
> > > > >
> > > > > Regards,
> > > > >
> > > > > Vatsal
> > > > >
> > > >
> > >
> >
>

Re: Detecting when all the retries are expired for a message

Posted by Rajini Sivaram <ra...@googlemail.com>.
Thanks Ismael, I hadn't seen the KIP. That does cover the issue described
here.

On Wed, Dec 7, 2016 at 10:39 AM, Ismael Juma <is...@juma.me.uk> wrote:

> Note that Sumant has been working on a KIP proposal to make the producer
> timeout behaviour more intuitive:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 91+Provide+Intuitive+User+Timeouts+in+The+Producer
>
> Ismael
>
> On Wed, Dec 7, 2016 at 9:42 AM, Rajini Sivaram <
> rajinisivaram@googlemail.com
> > wrote:
>
> > If you just want to test retries, you could restart Kafka while the
> > producer is running and you should see the producer retry while Kafka is
> > down/leader is being elected after Kafka restarts. If you specifically
> want
> > a TimeoutException to trigger all retries, I am not sure how you can. I
> > would suggest that you raise a JIRA since the current behaviour is not
> very
> > intuitive.
> >
> >
> > On Wed, Dec 7, 2016 at 6:51 AM, Mevada, Vatsal <Me...@sky.optymyze.com>
> > wrote:
> >
> > > @Asaf
> > >
> > >
> > >
> > > Do I need to raise new bug for this?
> > >
> > >
> > >
> > > @Rajini
> > >
> > >
> > >
> > > Please suggest some the configuration with which retries should work
> > > according to you. The code is already there in the mail chain. I am
> > adding
> > > it here again:
> > >
> > >
> > >
> > > public void produce(String topicName, String filePath, String
> > > bootstrapServers, String encoding) {
> > >
> > >                 try (BufferedReader bf = getBufferedReader(filePath,
> > > encoding);
> > >
> > >                                 KafkaProducer<Object, String> producer
> =
> > > initKafkaProducer(bootstrapServers)) {
> > >
> > >                                 String line;
> > >
> > >                                 while ((line = bf.readLine()) != null)
> {
> > >
> > >                                                 producer.send(new
> > > ProducerRecord<>(topicName, line), (metadata, e) -> {
> > >
> > >                                                                 if (e
> !=
> > > null) {
> > >
> > >
> > >       e.printStackTrace();
> > >
> > >                                                                 }
> > >
> > >                                                 });
> > >
> > >                                 }
> > >
> > >                                 producer.flush();
> > >
> > >                 } catch (IOException e) {
> > >
> > >                                 Throwables.propagate(e);
> > >
> > >                 }
> > >
> > > }
> > >
> > >
> > >
> > > private static KafkaProducer<Object, String> initKafkaProducer(String
> > > bootstrapServer) {
> > >
> > >                 Properties properties = new Properties();
> > >
> > >                 properties.put("bootstrap.servers", bootstrapServer);
> > >
> > >                 properties.put("key.serializer",
> StringSerializer.class.
> > > getCanonicalName());
> > >
> > >                 properties.put("value.serializer",StringSerializer.
> > > class.getCanonicalName());
> > >
> > >                 properties.put("acks", "-1");
> > >
> > >                 properties.put("retries", 50000);
> > >
> > >                 properties.put("request.timeout.ms", 1);
> > >
> > >                 return new KafkaProducer<>(properties);
> > >
> > > }
> > >
> > >
> > >
> > > private BufferedReader getBufferedReader(String filePath, String
> > encoding)
> > > throws UnsupportedEncodingException, FileNotFoundException {
> > >
> > >                 return new BufferedReader(new InputStreamReader(new
> > > FileInputStream(filePath), Optional.ofNullable(encoding).
> > > orElse("UTF-8")));
> > >
> > > }
> > >
> > >
> > >
> > > Regards,
> > >
> > > Vatsal
> > >
> > >
> > >
> > > -----Original Message-----
> > > From: Rajini Sivaram [mailto:rajinisivaram@googlemail.com]
> > > Sent: 06 December 2016 17:27
> > > To: users@kafka.apache.org
> > > Subject: Re: Detecting when all the retries are expired for a message
> > >
> > >
> > >
> > > I believe batches in RecordAccumulator are expired after
> > > request.timeout.ms, so they wouldn't get retried in this case. I think
> > > the config options are quite confusing, making it hard to figure out
> the
> > > behavior without looking into the code.
> > >
> > >
> > >
> > > On Tue, Dec 6, 2016 at 10:10 AM, Asaf Mesika <asaf.mesika@gmail.com
> > > <ma...@gmail.com>> wrote:
> > >
> > >
> > >
> > > > Vatsal:
> > >
> > > >
> > >
> > > > I don't think they merged the fix for this bug (retries doesn't work)
> > >
> > > > in 0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547
> > >
> > > >
> > >
> > > >
> > >
> > > > On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal
> > >
> > > > <Me...@sky.optymyze.com>>
> > >
> > > > wrote:
> > >
> > > >
> > >
> > > > > Hello,
> > >
> > > > >
> > >
> > > > > Bumping up this thread in case anyone of you have any say on this
> > > issue.
> > >
> > > > >
> > >
> > > > > Regards,
> > >
> > > > > Vatsal
> > >
> > > > >
> > >
> > > > > -----Original Message-----
> > >
> > > > > From: Mevada, Vatsal
> > >
> > > > > Sent: 02 December 2016 16:16
> > >
> > > > > To: Kafka Users <users@kafka.apache.org<mailto:
> > users@kafka.apache.org>
> > > >
> > >
> > > > > Subject: RE: Detecting when all the retries are expired for a
> > >
> > > > > message
> > >
> > > > >
> > >
> > > > > I executed the same producer code for a single record file with
> > >
> > > > > following
> > >
> > > > > config:
> > >
> > > > >
> > >
> > > > >         properties.put("bootstrap.servers", bootstrapServer);
> > >
> > > > >         properties.put("key.serializer",
> > >
> > > > > StringSerializer.class.getCanonicalName());
> > >
> > > > >         properties.put("value.serializer",
> > >
> > > > > StringSerializer.class.getCanonicalName());
> > >
> > > > >         properties.put("acks", "-1");
> > >
> > > > >         properties.put("retries", 50000);
> > >
> > > > >         properties.put("request.timeout.ms", 1);
> > >
> > > > >
> > >
> > > > > I have kept request.timeout.ms=1 to make sure that message
> delivery
> > >
> > > > > will fail with TimeoutException. Since the retries are 50000 then
> > >
> > > > > the program should take at-least 50000 ms (50 seconds) to complete
> > for
> > > single record.
> > >
> > > > > However the program is completing almost instantly with only one
> > >
> > > > > callback with TimeoutException. I suspect that producer is not
> going
> > >
> > > > > for any retries. Or am I missing something in my code?
> > >
> > > > >
> > >
> > > > > My Kafka version is 0.10.0.1.
> > >
> > > > >
> > >
> > > > > Regards,
> > >
> > > > > Vatsal
> > >
> > > > > Am I missing any configuration or
> > >
> > > > > -----Original Message-----
> > >
> > > > > From: Ismael Juma [mailto:ismaelj@gmail.com]
> > >
> > > > > Sent: 02 December 2016 13:30
> > >
> > > > > To: Kafka Users <users@kafka.apache.org<mailto:
> > users@kafka.apache.org>
> > > >
> > >
> > > > > Subject: RE: Detecting when all the retries are expired for a
> > >
> > > > > message
> > >
> > > > >
> > >
> > > > > The callback is called after the retries have been exhausted.
> > >
> > > > >
> > >
> > > > > Ismael
> > >
> > > > >
> > >
> > > > > On 2 Dec 2016 3:34 am, "Mevada, Vatsal" <Mevada@sky.optymyze.com<
> > > mailto:Mevada@sky.optymyze.com>> wrote:
> > >
> > > > >
> > >
> > > > > > @Ismael:
> > >
> > > > > >
> > >
> > > > > > I can handle TimeoutException in the callback. However as per the
> > >
> > > > > > documentation of Callback(link: https://kafka.apache.org/0100/
> > >
> > > > > > javadoc/org/apache/kafka/clients/producer/Callback.html),
> > >
> > > > > > TimeoutException is a retriable exception and it says that it
> "may
> > >
> > > > > > be covered by increasing #.retries". So even if I get
> > >
> > > > > > TimeoutException in callback, wouldn't it try to send message
> > >
> > > > > > again until all the retries are done? Would it be safe to assume
> > >
> > > > > > that message delivery is failed permanently just by encountering
> > > TimeoutException in callback?
> > >
> > > > > >
> > >
> > > > > > Here is a snippet from above mentioned documentation:
> > >
> > > > > > "exception - The exception thrown during processing of this
> record.
> > >
> > > > > > Null if no error occurred. Possible thrown exceptions include:
> > >
> > > > > > Non-Retriable exceptions (fatal, the message will never be sent):
> > >
> > > > > > InvalidTopicException OffsetMetadataTooLargeException
> > >
> > > > > > RecordBatchTooLargeException RecordTooLargeException
> > >
> > > > > > UnknownServerException Retriable exceptions (transient, may be
> > >
> > > > > > covered by increasing #.retries): CorruptRecordException
> > >
> > > > > > InvalidMetadataException NotEnoughReplicasAfterAppendException
> > >
> > > > > > NotEnoughReplicasException OffsetOutOfRangeException
> > >
> > > > > > TimeoutException UnknownTopicOrPartitionException"
> > >
> > > > > >
> > >
> > > > > > @asaf :My kafka - API version is 0.10.0.1. So I think I should
> not
> > >
> > > > > > face the issue that you are mentioning. I mentioned documentation
> > >
> > > > > > link of 0.9 by mistake.
> > >
> > > > > >
> > >
> > > > > > Regards,
> > >
> > > > > > Vatsal
> > >
> > > > > > -----Original Message-----
> > >
> > > > > > From: Asaf Mesika [mailto:asaf.mesika@gmail.com]
> > >
> > > > > > Sent: 02 December 2016 00:32
> > >
> > > > > > To: Kafka Users <users@kafka.apache.org<mailto:
> > > users@kafka.apache.org>>
> > >
> > > > > > Subject: Re: Detecting when all the retries are expired for a
> > >
> > > > > > message
> > >
> > > > > >
> > >
> > > > > > There's a critical bug in that section that has only been fixed
> in
> > >
> > > > > > 0.9.0.2 which has not been release yet. Without the fix it
> doesn't
> > >
> > > > > really retry.
> > >
> > > > > > I forked the kafka repo, applied the fix, built it and placed it
> > >
> > > > > > in our own Nexus Maven repository until 0.9.0.2 will be released.
> > >
> > > > > >
> > >
> > > > > > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
> > >
> > > > > >
> > >
> > > > > > Feel free to use it.
> > >
> > > > > >
> > >
> > > > > > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <ismael@juma.me.uk
> > > <ma...@juma.me.uk>> wrote:
> > >
> > > > > >
> > >
> > > > > > > The callback should give you what you are asking for. Has it
> not
> > >
> > > > > > > worked as you expect when you tried it?
> > >
> > > > > > >
> > >
> > > > > > > Ismael
> > >
> > > > > > >
> > >
> > > > > > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal
> > >
> > > > > > > <Me...@sky.optymyze.com>>
> > >
> > > > > > > wrote:
> > >
> > > > > > >
> > >
> > > > > > > > Hi,
> > >
> > > > > > > >
> > >
> > > > > > > >
> > >
> > > > > > > >
> > >
> > > > > > > > I am reading a file and dumping each record on Kafka. Here is
> > >
> > > > > > > > my producer
> > >
> > > > > > > > code:
> > >
> > > > > > > >
> > >
> > > > > > > >
> > >
> > > > > > > >
> > >
> > > > > > > > public void produce(String topicName, String filePath, String
> > >
> > > > > > > > bootstrapServers, String encoding) {
> > >
> > > > > > > >
> > >
> > > > > > > >                 try (BufferedReader bf =
> > >
> > > > > > > > getBufferedReader(filePath, encoding);
> > >
> > > > > > > >
> > >
> > > > > > > >                                 KafkaProducer<Object, String>
> > >
> > > > > > > > producer =
> > >
> > > > > > > > initKafkaProducer(bootstrapServers)) {
> > >
> > > > > > > >
> > >
> > > > > > > >                                 String line;
> > >
> > > > > > > >
> > >
> > > > > > > >                                 while ((line = bf.readLine())
> > >
> > > > > > > > !=
> > >
> > > > > > > > null) {
> > >
> > > > > > > >
> > >
> > > > > > > >
> > >
> > > > > > > > producer.send(new ProducerRecord<>(topicName, line),
> > >
> > > > > > > > (metadata, e) -> {
> > >
> > > > > > > >
> > >
> > > > > > > >
> > >
> > > > > > > > if (e !=
> > >
> > > > > > > > null) {
> > >
> > > > > > > >
> > >
> > > > > > > >
> > >
> > > > > > > >       e.printStackTrace();
> > >
> > > > > > > >
> > >
> > > > > > > >
> > >
> > > > > > > > }
> > >
> > > > > > > >
> > >
> > > > > > > >                                                 });
> > >
> > > > > > > >
> > >
> > > > > > > >                                 }
> > >
> > > > > > > >
> > >
> > > > > > > >                                 producer.flush();
> > >
> > > > > > > >
> > >
> > > > > > > >                 } catch (IOException e) {
> > >
> > > > > > > >
> > >
> > > > > > > >                                 Throwables.propagate(e);
> > >
> > > > > > > >
> > >
> > > > > > > >                 }
> > >
> > > > > > > >
> > >
> > > > > > > > }
> > >
> > > > > > > >
> > >
> > > > > > > >
> > >
> > > > > > > >
> > >
> > > > > > > > private static KafkaProducer<Object, String>
> > >
> > > > > > > > initKafkaProducer(String
> > >
> > > > > > > > bootstrapServer) {
> > >
> > > > > > > >
> > >
> > > > > > > >                 Properties properties = new Properties();
> > >
> > > > > > > >
> > >
> > > > > > > >                 properties.put("bootstrap.servers",
> > >
> > > > > > > > bootstrapServer);
> > >
> > > > > > > >
> > >
> > > > > > > >                 properties.put("key.serializer",
> > >
> > > > > > StringSerializer.class.
> > >
> > > > > > > > getCanonicalName());
> > >
> > > > > > > >
> > >
> > > > > > > >                 properties.put("value.serializer",
> > >
> > > > > > > StringSerializer.class.
> > >
> > > > > > > > getCanonicalName());
> > >
> > > > > > > >
> > >
> > > > > > > >                 properties.put("acks", "-1");
> > >
> > > > > > > >
> > >
> > > > > > > >                 properties.put("retries", 10);
> > >
> > > > > > > >
> > >
> > > > > > > >                 return new KafkaProducer<>(properties);
> > >
> > > > > > > >
> > >
> > > > > > > > }
> > >
> > > > > > > >
> > >
> > > > > > > >
> > >
> > > > > > > >
> > >
> > > > > > > > private BufferedReader getBufferedReader(String filePath,
> > >
> > > > > > > > String
> > >
> > > > > > > encoding)
> > >
> > > > > > > > throws UnsupportedEncodingException, FileNotFoundException {
> > >
> > > > > > > >
> > >
> > > > > > > >                 return new BufferedReader(new
> > >
> > > > > > > > InputStreamReader(new FileInputStream(filePath),
> > >
> > > > > Optional.ofNullable(encoding).
> > >
> > > > > > > > orElse("UTF-8")));
> > >
> > > > > > > >
> > >
> > > > > > > > }
> > >
> > > > > > > >
> > >
> > > > > > > >
> > >
> > > > > > > >
> > >
> > > > > > > > As per the official documentation of
> > >
> > > > > > > > Callback<https://kafka.apache
> > >
> > > > .
> > >
> > > > > > > > org/090/javadoc/org/apache/kafka/clients/producer/
> Callback.htm
> > >
> > > > > > > > l>, TimeoutException is a retriable exception. As I have kept
> > >
> > > > > > > > retries 10, producer will try to resend the message if
> > >
> > > > > > > > delivering some message fails with TimeoutException. I am
> > >
> > > > > > > > looking for some reliable to way to detect
> > >
> > > > > > > when
> > >
> > > > > > > > delivery of a message is failed permanently after all
> retries.
> > >
> > > > > > > >
> > >
> > > > > > > >
> > >
> > > > > > > >
> > >
> > > > > > > > Regards,
> > >
> > > > > > > >
> > >
> > > > > > > > Vatsal
> > >
> > > > > > > >
> > >
> > > > > > >
> > >
> > > > > >
> > >
> > > > >
> > >
> > > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > --
> > >
> > > Regards,
> > >
> > >
> > >
> > > Rajini
> > >
> >
> >
> >
> > --
> > Regards,
> >
> > Rajini
> >
>



-- 
Regards,

Rajini

Re: Detecting when all the retries are expired for a message

Posted by Ismael Juma <is...@juma.me.uk>.
Note that Sumant has been working on a KIP proposal to make the producer
timeout behaviour more intuitive:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer

Ismael

On Wed, Dec 7, 2016 at 9:42 AM, Rajini Sivaram <rajinisivaram@googlemail.com
> wrote:

> If you just want to test retries, you could restart Kafka while the
> producer is running and you should see the producer retry while Kafka is
> down/leader is being elected after Kafka restarts. If you specifically want
> a TimeoutException to trigger all retries, I am not sure how you can. I
> would suggest that you raise a JIRA since the current behaviour is not very
> intuitive.
>
>
> On Wed, Dec 7, 2016 at 6:51 AM, Mevada, Vatsal <Me...@sky.optymyze.com>
> wrote:
>
> > @Asaf
> >
> >
> >
> > Do I need to raise new bug for this?
> >
> >
> >
> > @Rajini
> >
> >
> >
> > Please suggest some the configuration with which retries should work
> > according to you. The code is already there in the mail chain. I am
> adding
> > it here again:
> >
> >
> >
> > public void produce(String topicName, String filePath, String
> > bootstrapServers, String encoding) {
> >
> >                 try (BufferedReader bf = getBufferedReader(filePath,
> > encoding);
> >
> >                                 KafkaProducer<Object, String> producer =
> > initKafkaProducer(bootstrapServers)) {
> >
> >                                 String line;
> >
> >                                 while ((line = bf.readLine()) != null) {
> >
> >                                                 producer.send(new
> > ProducerRecord<>(topicName, line), (metadata, e) -> {
> >
> >                                                                 if (e !=
> > null) {
> >
> >
> >       e.printStackTrace();
> >
> >                                                                 }
> >
> >                                                 });
> >
> >                                 }
> >
> >                                 producer.flush();
> >
> >                 } catch (IOException e) {
> >
> >                                 Throwables.propagate(e);
> >
> >                 }
> >
> > }
> >
> >
> >
> > private static KafkaProducer<Object, String> initKafkaProducer(String
> > bootstrapServer) {
> >
> >                 Properties properties = new Properties();
> >
> >                 properties.put("bootstrap.servers", bootstrapServer);
> >
> >                 properties.put("key.serializer", StringSerializer.class.
> > getCanonicalName());
> >
> >                 properties.put("value.serializer",StringSerializer.
> > class.getCanonicalName());
> >
> >                 properties.put("acks", "-1");
> >
> >                 properties.put("retries", 50000);
> >
> >                 properties.put("request.timeout.ms", 1);
> >
> >                 return new KafkaProducer<>(properties);
> >
> > }
> >
> >
> >
> > private BufferedReader getBufferedReader(String filePath, String
> encoding)
> > throws UnsupportedEncodingException, FileNotFoundException {
> >
> >                 return new BufferedReader(new InputStreamReader(new
> > FileInputStream(filePath), Optional.ofNullable(encoding).
> > orElse("UTF-8")));
> >
> > }
> >
> >
> >
> > Regards,
> >
> > Vatsal
> >
> >
> >
> > -----Original Message-----
> > From: Rajini Sivaram [mailto:rajinisivaram@googlemail.com]
> > Sent: 06 December 2016 17:27
> > To: users@kafka.apache.org
> > Subject: Re: Detecting when all the retries are expired for a message
> >
> >
> >
> > I believe batches in RecordAccumulator are expired after
> > request.timeout.ms, so they wouldn't get retried in this case. I think
> > the config options are quite confusing, making it hard to figure out the
> > behavior without looking into the code.
> >
> >
> >
> > On Tue, Dec 6, 2016 at 10:10 AM, Asaf Mesika <asaf.mesika@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> >
> >
> > > Vatsal:
> >
> > >
> >
> > > I don't think they merged the fix for this bug (retries doesn't work)
> >
> > > in 0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547
> >
> > >
> >
> > >
> >
> > > On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal
> >
> > > <Me...@sky.optymyze.com>>
> >
> > > wrote:
> >
> > >
> >
> > > > Hello,
> >
> > > >
> >
> > > > Bumping up this thread in case anyone of you have any say on this
> > issue.
> >
> > > >
> >
> > > > Regards,
> >
> > > > Vatsal
> >
> > > >
> >
> > > > -----Original Message-----
> >
> > > > From: Mevada, Vatsal
> >
> > > > Sent: 02 December 2016 16:16
> >
> > > > To: Kafka Users <users@kafka.apache.org<mailto:
> users@kafka.apache.org>
> > >
> >
> > > > Subject: RE: Detecting when all the retries are expired for a
> >
> > > > message
> >
> > > >
> >
> > > > I executed the same producer code for a single record file with
> >
> > > > following
> >
> > > > config:
> >
> > > >
> >
> > > >         properties.put("bootstrap.servers", bootstrapServer);
> >
> > > >         properties.put("key.serializer",
> >
> > > > StringSerializer.class.getCanonicalName());
> >
> > > >         properties.put("value.serializer",
> >
> > > > StringSerializer.class.getCanonicalName());
> >
> > > >         properties.put("acks", "-1");
> >
> > > >         properties.put("retries", 50000);
> >
> > > >         properties.put("request.timeout.ms", 1);
> >
> > > >
> >
> > > > I have kept request.timeout.ms=1 to make sure that message delivery
> >
> > > > will fail with TimeoutException. Since the retries are 50000 then
> >
> > > > the program should take at-least 50000 ms (50 seconds) to complete
> for
> > single record.
> >
> > > > However the program is completing almost instantly with only one
> >
> > > > callback with TimeoutException. I suspect that producer is not going
> >
> > > > for any retries. Or am I missing something in my code?
> >
> > > >
> >
> > > > My Kafka version is 0.10.0.1.
> >
> > > >
> >
> > > > Regards,
> >
> > > > Vatsal
> >
> > > > Am I missing any configuration or
> >
> > > > -----Original Message-----
> >
> > > > From: Ismael Juma [mailto:ismaelj@gmail.com]
> >
> > > > Sent: 02 December 2016 13:30
> >
> > > > To: Kafka Users <users@kafka.apache.org<mailto:
> users@kafka.apache.org>
> > >
> >
> > > > Subject: RE: Detecting when all the retries are expired for a
> >
> > > > message
> >
> > > >
> >
> > > > The callback is called after the retries have been exhausted.
> >
> > > >
> >
> > > > Ismael
> >
> > > >
> >
> > > > On 2 Dec 2016 3:34 am, "Mevada, Vatsal" <Mevada@sky.optymyze.com<
> > mailto:Mevada@sky.optymyze.com>> wrote:
> >
> > > >
> >
> > > > > @Ismael:
> >
> > > > >
> >
> > > > > I can handle TimeoutException in the callback. However as per the
> >
> > > > > documentation of Callback(link: https://kafka.apache.org/0100/
> >
> > > > > javadoc/org/apache/kafka/clients/producer/Callback.html),
> >
> > > > > TimeoutException is a retriable exception and it says that it "may
> >
> > > > > be covered by increasing #.retries". So even if I get
> >
> > > > > TimeoutException in callback, wouldn't it try to send message
> >
> > > > > again until all the retries are done? Would it be safe to assume
> >
> > > > > that message delivery is failed permanently just by encountering
> > TimeoutException in callback?
> >
> > > > >
> >
> > > > > Here is a snippet from above mentioned documentation:
> >
> > > > > "exception - The exception thrown during processing of this record.
> >
> > > > > Null if no error occurred. Possible thrown exceptions include:
> >
> > > > > Non-Retriable exceptions (fatal, the message will never be sent):
> >
> > > > > InvalidTopicException OffsetMetadataTooLargeException
> >
> > > > > RecordBatchTooLargeException RecordTooLargeException
> >
> > > > > UnknownServerException Retriable exceptions (transient, may be
> >
> > > > > covered by increasing #.retries): CorruptRecordException
> >
> > > > > InvalidMetadataException NotEnoughReplicasAfterAppendException
> >
> > > > > NotEnoughReplicasException OffsetOutOfRangeException
> >
> > > > > TimeoutException UnknownTopicOrPartitionException"
> >
> > > > >
> >
> > > > > @asaf :My kafka - API version is 0.10.0.1. So I think I should not
> >
> > > > > face the issue that you are mentioning. I mentioned documentation
> >
> > > > > link of 0.9 by mistake.
> >
> > > > >
> >
> > > > > Regards,
> >
> > > > > Vatsal
> >
> > > > > -----Original Message-----
> >
> > > > > From: Asaf Mesika [mailto:asaf.mesika@gmail.com]
> >
> > > > > Sent: 02 December 2016 00:32
> >
> > > > > To: Kafka Users <users@kafka.apache.org<mailto:
> > users@kafka.apache.org>>
> >
> > > > > Subject: Re: Detecting when all the retries are expired for a
> >
> > > > > message
> >
> > > > >
> >
> > > > > There's a critical bug in that section that has only been fixed in
> >
> > > > > 0.9.0.2 which has not been release yet. Without the fix it doesn't
> >
> > > > really retry.
> >
> > > > > I forked the kafka repo, applied the fix, built it and placed it
> >
> > > > > in our own Nexus Maven repository until 0.9.0.2 will be released.
> >
> > > > >
> >
> > > > > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
> >
> > > > >
> >
> > > > > Feel free to use it.
> >
> > > > >
> >
> > > > > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <ismael@juma.me.uk
> > <ma...@juma.me.uk>> wrote:
> >
> > > > >
> >
> > > > > > The callback should give you what you are asking for. Has it not
> >
> > > > > > worked as you expect when you tried it?
> >
> > > > > >
> >
> > > > > > Ismael
> >
> > > > > >
> >
> > > > > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal
> >
> > > > > > <Me...@sky.optymyze.com>>
> >
> > > > > > wrote:
> >
> > > > > >
> >
> > > > > > > Hi,
> >
> > > > > > >
> >
> > > > > > >
> >
> > > > > > >
> >
> > > > > > > I am reading a file and dumping each record on Kafka. Here is
> >
> > > > > > > my producer
> >
> > > > > > > code:
> >
> > > > > > >
> >
> > > > > > >
> >
> > > > > > >
> >
> > > > > > > public void produce(String topicName, String filePath, String
> >
> > > > > > > bootstrapServers, String encoding) {
> >
> > > > > > >
> >
> > > > > > >                 try (BufferedReader bf =
> >
> > > > > > > getBufferedReader(filePath, encoding);
> >
> > > > > > >
> >
> > > > > > >                                 KafkaProducer<Object, String>
> >
> > > > > > > producer =
> >
> > > > > > > initKafkaProducer(bootstrapServers)) {
> >
> > > > > > >
> >
> > > > > > >                                 String line;
> >
> > > > > > >
> >
> > > > > > >                                 while ((line = bf.readLine())
> >
> > > > > > > !=
> >
> > > > > > > null) {
> >
> > > > > > >
> >
> > > > > > >
> >
> > > > > > > producer.send(new ProducerRecord<>(topicName, line),
> >
> > > > > > > (metadata, e) -> {
> >
> > > > > > >
> >
> > > > > > >
> >
> > > > > > > if (e !=
> >
> > > > > > > null) {
> >
> > > > > > >
> >
> > > > > > >
> >
> > > > > > >       e.printStackTrace();
> >
> > > > > > >
> >
> > > > > > >
> >
> > > > > > > }
> >
> > > > > > >
> >
> > > > > > >                                                 });
> >
> > > > > > >
> >
> > > > > > >                                 }
> >
> > > > > > >
> >
> > > > > > >                                 producer.flush();
> >
> > > > > > >
> >
> > > > > > >                 } catch (IOException e) {
> >
> > > > > > >
> >
> > > > > > >                                 Throwables.propagate(e);
> >
> > > > > > >
> >
> > > > > > >                 }
> >
> > > > > > >
> >
> > > > > > > }
> >
> > > > > > >
> >
> > > > > > >
> >
> > > > > > >
> >
> > > > > > > private static KafkaProducer<Object, String>
> >
> > > > > > > initKafkaProducer(String
> >
> > > > > > > bootstrapServer) {
> >
> > > > > > >
> >
> > > > > > >                 Properties properties = new Properties();
> >
> > > > > > >
> >
> > > > > > >                 properties.put("bootstrap.servers",
> >
> > > > > > > bootstrapServer);
> >
> > > > > > >
> >
> > > > > > >                 properties.put("key.serializer",
> >
> > > > > StringSerializer.class.
> >
> > > > > > > getCanonicalName());
> >
> > > > > > >
> >
> > > > > > >                 properties.put("value.serializer",
> >
> > > > > > StringSerializer.class.
> >
> > > > > > > getCanonicalName());
> >
> > > > > > >
> >
> > > > > > >                 properties.put("acks", "-1");
> >
> > > > > > >
> >
> > > > > > >                 properties.put("retries", 10);
> >
> > > > > > >
> >
> > > > > > >                 return new KafkaProducer<>(properties);
> >
> > > > > > >
> >
> > > > > > > }
> >
> > > > > > >
> >
> > > > > > >
> >
> > > > > > >
> >
> > > > > > > private BufferedReader getBufferedReader(String filePath,
> >
> > > > > > > String
> >
> > > > > > encoding)
> >
> > > > > > > throws UnsupportedEncodingException, FileNotFoundException {
> >
> > > > > > >
> >
> > > > > > >                 return new BufferedReader(new
> >
> > > > > > > InputStreamReader(new FileInputStream(filePath),
> >
> > > > Optional.ofNullable(encoding).
> >
> > > > > > > orElse("UTF-8")));
> >
> > > > > > >
> >
> > > > > > > }
> >
> > > > > > >
> >
> > > > > > >
> >
> > > > > > >
> >
> > > > > > > As per the official documentation of
> >
> > > > > > > Callback<https://kafka.apache
> >
> > > .
> >
> > > > > > > org/090/javadoc/org/apache/kafka/clients/producer/Callback.htm
> >
> > > > > > > l>, TimeoutException is a retriable exception. As I have kept
> >
> > > > > > > retries 10, producer will try to resend the message if
> >
> > > > > > > delivering some message fails with TimeoutException. I am
> >
> > > > > > > looking for some reliable to way to detect
> >
> > > > > > when
> >
> > > > > > > delivery of a message is failed permanently after all retries.
> >
> > > > > > >
> >
> > > > > > >
> >
> > > > > > >
> >
> > > > > > > Regards,
> >
> > > > > > >
> >
> > > > > > > Vatsal
> >
> > > > > > >
> >
> > > > > >
> >
> > > > >
> >
> > > >
> >
> > >
> >
> >
> >
> >
> >
> >
> >
> > --
> >
> > Regards,
> >
> >
> >
> > Rajini
> >
>
>
>
> --
> Regards,
>
> Rajini
>

Re: Detecting when all the retries are expired for a message

Posted by Rajini Sivaram <ra...@googlemail.com>.
If you just want to test retries, you could restart Kafka while the
producer is running and you should see the producer retry while Kafka is
down/leader is being elected after Kafka restarts. If you specifically want
a TimeoutException to trigger all retries, I am not sure how you can. I
would suggest that you raise a JIRA since the current behaviour is not very
intuitive.


On Wed, Dec 7, 2016 at 6:51 AM, Mevada, Vatsal <Me...@sky.optymyze.com>
wrote:

> @Asaf
>
>
>
> Do I need to raise new bug for this?
>
>
>
> @Rajini
>
>
>
> Please suggest some the configuration with which retries should work
> according to you. The code is already there in the mail chain. I am adding
> it here again:
>
>
>
> public void produce(String topicName, String filePath, String
> bootstrapServers, String encoding) {
>
>                 try (BufferedReader bf = getBufferedReader(filePath,
> encoding);
>
>                                 KafkaProducer<Object, String> producer =
> initKafkaProducer(bootstrapServers)) {
>
>                                 String line;
>
>                                 while ((line = bf.readLine()) != null) {
>
>                                                 producer.send(new
> ProducerRecord<>(topicName, line), (metadata, e) -> {
>
>                                                                 if (e !=
> null) {
>
>
>       e.printStackTrace();
>
>                                                                 }
>
>                                                 });
>
>                                 }
>
>                                 producer.flush();
>
>                 } catch (IOException e) {
>
>                                 Throwables.propagate(e);
>
>                 }
>
> }
>
>
>
> private static KafkaProducer<Object, String> initKafkaProducer(String
> bootstrapServer) {
>
>                 Properties properties = new Properties();
>
>                 properties.put("bootstrap.servers", bootstrapServer);
>
>                 properties.put("key.serializer", StringSerializer.class.
> getCanonicalName());
>
>                 properties.put("value.serializer",StringSerializer.
> class.getCanonicalName());
>
>                 properties.put("acks", "-1");
>
>                 properties.put("retries", 50000);
>
>                 properties.put("request.timeout.ms", 1);
>
>                 return new KafkaProducer<>(properties);
>
> }
>
>
>
> private BufferedReader getBufferedReader(String filePath, String encoding)
> throws UnsupportedEncodingException, FileNotFoundException {
>
>                 return new BufferedReader(new InputStreamReader(new
> FileInputStream(filePath), Optional.ofNullable(encoding).
> orElse("UTF-8")));
>
> }
>
>
>
> Regards,
>
> Vatsal
>
>
>
> -----Original Message-----
> From: Rajini Sivaram [mailto:rajinisivaram@googlemail.com]
> Sent: 06 December 2016 17:27
> To: users@kafka.apache.org
> Subject: Re: Detecting when all the retries are expired for a message
>
>
>
> I believe batches in RecordAccumulator are expired after
> request.timeout.ms, so they wouldn't get retried in this case. I think
> the config options are quite confusing, making it hard to figure out the
> behavior without looking into the code.
>
>
>
> On Tue, Dec 6, 2016 at 10:10 AM, Asaf Mesika <asaf.mesika@gmail.com
> <ma...@gmail.com>> wrote:
>
>
>
> > Vatsal:
>
> >
>
> > I don't think they merged the fix for this bug (retries doesn't work)
>
> > in 0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547
>
> >
>
> >
>
> > On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal
>
> > <Me...@sky.optymyze.com>>
>
> > wrote:
>
> >
>
> > > Hello,
>
> > >
>
> > > Bumping up this thread in case anyone of you have any say on this
> issue.
>
> > >
>
> > > Regards,
>
> > > Vatsal
>
> > >
>
> > > -----Original Message-----
>
> > > From: Mevada, Vatsal
>
> > > Sent: 02 December 2016 16:16
>
> > > To: Kafka Users <us...@kafka.apache.org>
> >
>
> > > Subject: RE: Detecting when all the retries are expired for a
>
> > > message
>
> > >
>
> > > I executed the same producer code for a single record file with
>
> > > following
>
> > > config:
>
> > >
>
> > >         properties.put("bootstrap.servers", bootstrapServer);
>
> > >         properties.put("key.serializer",
>
> > > StringSerializer.class.getCanonicalName());
>
> > >         properties.put("value.serializer",
>
> > > StringSerializer.class.getCanonicalName());
>
> > >         properties.put("acks", "-1");
>
> > >         properties.put("retries", 50000);
>
> > >         properties.put("request.timeout.ms", 1);
>
> > >
>
> > > I have kept request.timeout.ms=1 to make sure that message delivery
>
> > > will fail with TimeoutException. Since the retries are 50000 then
>
> > > the program should take at-least 50000 ms (50 seconds) to complete for
> single record.
>
> > > However the program is completing almost instantly with only one
>
> > > callback with TimeoutException. I suspect that producer is not going
>
> > > for any retries. Or am I missing something in my code?
>
> > >
>
> > > My Kafka version is 0.10.0.1.
>
> > >
>
> > > Regards,
>
> > > Vatsal
>
> > > Am I missing any configuration or
>
> > > -----Original Message-----
>
> > > From: Ismael Juma [mailto:ismaelj@gmail.com]
>
> > > Sent: 02 December 2016 13:30
>
> > > To: Kafka Users <us...@kafka.apache.org>
> >
>
> > > Subject: RE: Detecting when all the retries are expired for a
>
> > > message
>
> > >
>
> > > The callback is called after the retries have been exhausted.
>
> > >
>
> > > Ismael
>
> > >
>
> > > On 2 Dec 2016 3:34 am, "Mevada, Vatsal" <Mevada@sky.optymyze.com<
> mailto:Mevada@sky.optymyze.com>> wrote:
>
> > >
>
> > > > @Ismael:
>
> > > >
>
> > > > I can handle TimeoutException in the callback. However as per the
>
> > > > documentation of Callback(link: https://kafka.apache.org/0100/
>
> > > > javadoc/org/apache/kafka/clients/producer/Callback.html),
>
> > > > TimeoutException is a retriable exception and it says that it "may
>
> > > > be covered by increasing #.retries". So even if I get
>
> > > > TimeoutException in callback, wouldn't it try to send message
>
> > > > again until all the retries are done? Would it be safe to assume
>
> > > > that message delivery is failed permanently just by encountering
> TimeoutException in callback?
>
> > > >
>
> > > > Here is a snippet from above mentioned documentation:
>
> > > > "exception - The exception thrown during processing of this record.
>
> > > > Null if no error occurred. Possible thrown exceptions include:
>
> > > > Non-Retriable exceptions (fatal, the message will never be sent):
>
> > > > InvalidTopicException OffsetMetadataTooLargeException
>
> > > > RecordBatchTooLargeException RecordTooLargeException
>
> > > > UnknownServerException Retriable exceptions (transient, may be
>
> > > > covered by increasing #.retries): CorruptRecordException
>
> > > > InvalidMetadataException NotEnoughReplicasAfterAppendException
>
> > > > NotEnoughReplicasException OffsetOutOfRangeException
>
> > > > TimeoutException UnknownTopicOrPartitionException"
>
> > > >
>
> > > > @asaf :My kafka - API version is 0.10.0.1. So I think I should not
>
> > > > face the issue that you are mentioning. I mentioned documentation
>
> > > > link of 0.9 by mistake.
>
> > > >
>
> > > > Regards,
>
> > > > Vatsal
>
> > > > -----Original Message-----
>
> > > > From: Asaf Mesika [mailto:asaf.mesika@gmail.com]
>
> > > > Sent: 02 December 2016 00:32
>
> > > > To: Kafka Users <users@kafka.apache.org<mailto:
> users@kafka.apache.org>>
>
> > > > Subject: Re: Detecting when all the retries are expired for a
>
> > > > message
>
> > > >
>
> > > > There's a critical bug in that section that has only been fixed in
>
> > > > 0.9.0.2 which has not been release yet. Without the fix it doesn't
>
> > > really retry.
>
> > > > I forked the kafka repo, applied the fix, built it and placed it
>
> > > > in our own Nexus Maven repository until 0.9.0.2 will be released.
>
> > > >
>
> > > > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
>
> > > >
>
> > > > Feel free to use it.
>
> > > >
>
> > > > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <ismael@juma.me.uk
> <ma...@juma.me.uk>> wrote:
>
> > > >
>
> > > > > The callback should give you what you are asking for. Has it not
>
> > > > > worked as you expect when you tried it?
>
> > > > >
>
> > > > > Ismael
>
> > > > >
>
> > > > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal
>
> > > > > <Me...@sky.optymyze.com>>
>
> > > > > wrote:
>
> > > > >
>
> > > > > > Hi,
>
> > > > > >
>
> > > > > >
>
> > > > > >
>
> > > > > > I am reading a file and dumping each record on Kafka. Here is
>
> > > > > > my producer
>
> > > > > > code:
>
> > > > > >
>
> > > > > >
>
> > > > > >
>
> > > > > > public void produce(String topicName, String filePath, String
>
> > > > > > bootstrapServers, String encoding) {
>
> > > > > >
>
> > > > > >                 try (BufferedReader bf =
>
> > > > > > getBufferedReader(filePath, encoding);
>
> > > > > >
>
> > > > > >                                 KafkaProducer<Object, String>
>
> > > > > > producer =
>
> > > > > > initKafkaProducer(bootstrapServers)) {
>
> > > > > >
>
> > > > > >                                 String line;
>
> > > > > >
>
> > > > > >                                 while ((line = bf.readLine())
>
> > > > > > !=
>
> > > > > > null) {
>
> > > > > >
>
> > > > > >
>
> > > > > > producer.send(new ProducerRecord<>(topicName, line),
>
> > > > > > (metadata, e) -> {
>
> > > > > >
>
> > > > > >
>
> > > > > > if (e !=
>
> > > > > > null) {
>
> > > > > >
>
> > > > > >
>
> > > > > >       e.printStackTrace();
>
> > > > > >
>
> > > > > >
>
> > > > > > }
>
> > > > > >
>
> > > > > >                                                 });
>
> > > > > >
>
> > > > > >                                 }
>
> > > > > >
>
> > > > > >                                 producer.flush();
>
> > > > > >
>
> > > > > >                 } catch (IOException e) {
>
> > > > > >
>
> > > > > >                                 Throwables.propagate(e);
>
> > > > > >
>
> > > > > >                 }
>
> > > > > >
>
> > > > > > }
>
> > > > > >
>
> > > > > >
>
> > > > > >
>
> > > > > > private static KafkaProducer<Object, String>
>
> > > > > > initKafkaProducer(String
>
> > > > > > bootstrapServer) {
>
> > > > > >
>
> > > > > >                 Properties properties = new Properties();
>
> > > > > >
>
> > > > > >                 properties.put("bootstrap.servers",
>
> > > > > > bootstrapServer);
>
> > > > > >
>
> > > > > >                 properties.put("key.serializer",
>
> > > > StringSerializer.class.
>
> > > > > > getCanonicalName());
>
> > > > > >
>
> > > > > >                 properties.put("value.serializer",
>
> > > > > StringSerializer.class.
>
> > > > > > getCanonicalName());
>
> > > > > >
>
> > > > > >                 properties.put("acks", "-1");
>
> > > > > >
>
> > > > > >                 properties.put("retries", 10);
>
> > > > > >
>
> > > > > >                 return new KafkaProducer<>(properties);
>
> > > > > >
>
> > > > > > }
>
> > > > > >
>
> > > > > >
>
> > > > > >
>
> > > > > > private BufferedReader getBufferedReader(String filePath,
>
> > > > > > String
>
> > > > > encoding)
>
> > > > > > throws UnsupportedEncodingException, FileNotFoundException {
>
> > > > > >
>
> > > > > >                 return new BufferedReader(new
>
> > > > > > InputStreamReader(new FileInputStream(filePath),
>
> > > Optional.ofNullable(encoding).
>
> > > > > > orElse("UTF-8")));
>
> > > > > >
>
> > > > > > }
>
> > > > > >
>
> > > > > >
>
> > > > > >
>
> > > > > > As per the official documentation of
>
> > > > > > Callback<https://kafka.apache
>
> > .
>
> > > > > > org/090/javadoc/org/apache/kafka/clients/producer/Callback.htm
>
> > > > > > l>, TimeoutException is a retriable exception. As I have kept
>
> > > > > > retries 10, producer will try to resend the message if
>
> > > > > > delivering some message fails with TimeoutException. I am
>
> > > > > > looking for some reliable to way to detect
>
> > > > > when
>
> > > > > > delivery of a message is failed permanently after all retries.
>
> > > > > >
>
> > > > > >
>
> > > > > >
>
> > > > > > Regards,
>
> > > > > >
>
> > > > > > Vatsal
>
> > > > > >
>
> > > > >
>
> > > >
>
> > >
>
> >
>
>
>
>
>
>
>
> --
>
> Regards,
>
>
>
> Rajini
>



-- 
Regards,

Rajini

RE: Detecting when all the retries are expired for a message

Posted by "Mevada, Vatsal" <Me...@sky.optymyze.com>.
@Asaf



Do I need to raise new bug for this?



@Rajini



Please suggest some the configuration with which retries should work according to you. The code is already there in the mail chain. I am adding it here again:



public void produce(String topicName, String filePath, String bootstrapServers, String encoding) {

                try (BufferedReader bf = getBufferedReader(filePath, encoding);

                                KafkaProducer<Object, String> producer = initKafkaProducer(bootstrapServers)) {

                                String line;

                                while ((line = bf.readLine()) != null) {

                                                producer.send(new ProducerRecord<>(topicName, line), (metadata, e) -> {

                                                                if (e != null) {

                                                                                e.printStackTrace();

                                                                }

                                                });

                                }

                                producer.flush();

                } catch (IOException e) {

                                Throwables.propagate(e);

                }

}



private static KafkaProducer<Object, String> initKafkaProducer(String bootstrapServer) {

                Properties properties = new Properties();

                properties.put("bootstrap.servers", bootstrapServer);

                properties.put("key.serializer", StringSerializer.class.getCanonicalName());

                properties.put("value.serializer",StringSerializer.class.getCanonicalName());

                properties.put("acks", "-1");

                properties.put("retries", 50000);

                properties.put("request.timeout.ms", 1);

                return new KafkaProducer<>(properties);

}



private BufferedReader getBufferedReader(String filePath, String encoding) throws UnsupportedEncodingException, FileNotFoundException {

                return new BufferedReader(new InputStreamReader(new FileInputStream(filePath), Optional.ofNullable(encoding).orElse("UTF-8")));

}



Regards,

Vatsal



-----Original Message-----
From: Rajini Sivaram [mailto:rajinisivaram@googlemail.com]
Sent: 06 December 2016 17:27
To: users@kafka.apache.org
Subject: Re: Detecting when all the retries are expired for a message



I believe batches in RecordAccumulator are expired after request.timeout.ms, so they wouldn't get retried in this case. I think the config options are quite confusing, making it hard to figure out the behavior without looking into the code.



On Tue, Dec 6, 2016 at 10:10 AM, Asaf Mesika <as...@gmail.com>> wrote:



> Vatsal:

>

> I don't think they merged the fix for this bug (retries doesn't work)

> in 0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547

>

>

> On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal

> <Me...@sky.optymyze.com>>

> wrote:

>

> > Hello,

> >

> > Bumping up this thread in case anyone of you have any say on this issue.

> >

> > Regards,

> > Vatsal

> >

> > -----Original Message-----

> > From: Mevada, Vatsal

> > Sent: 02 December 2016 16:16

> > To: Kafka Users <us...@kafka.apache.org>>

> > Subject: RE: Detecting when all the retries are expired for a

> > message

> >

> > I executed the same producer code for a single record file with

> > following

> > config:

> >

> >         properties.put("bootstrap.servers", bootstrapServer);

> >         properties.put("key.serializer",

> > StringSerializer.class.getCanonicalName());

> >         properties.put("value.serializer",

> > StringSerializer.class.getCanonicalName());

> >         properties.put("acks", "-1");

> >         properties.put("retries", 50000);

> >         properties.put("request.timeout.ms", 1);

> >

> > I have kept request.timeout.ms=1 to make sure that message delivery

> > will fail with TimeoutException. Since the retries are 50000 then

> > the program should take at-least 50000 ms (50 seconds) to complete for single record.

> > However the program is completing almost instantly with only one

> > callback with TimeoutException. I suspect that producer is not going

> > for any retries. Or am I missing something in my code?

> >

> > My Kafka version is 0.10.0.1.

> >

> > Regards,

> > Vatsal

> > Am I missing any configuration or

> > -----Original Message-----

> > From: Ismael Juma [mailto:ismaelj@gmail.com]

> > Sent: 02 December 2016 13:30

> > To: Kafka Users <us...@kafka.apache.org>>

> > Subject: RE: Detecting when all the retries are expired for a

> > message

> >

> > The callback is called after the retries have been exhausted.

> >

> > Ismael

> >

> > On 2 Dec 2016 3:34 am, "Mevada, Vatsal" <Me...@sky.optymyze.com>> wrote:

> >

> > > @Ismael:

> > >

> > > I can handle TimeoutException in the callback. However as per the

> > > documentation of Callback(link: https://kafka.apache.org/0100/

> > > javadoc/org/apache/kafka/clients/producer/Callback.html),

> > > TimeoutException is a retriable exception and it says that it "may

> > > be covered by increasing #.retries". So even if I get

> > > TimeoutException in callback, wouldn't it try to send message

> > > again until all the retries are done? Would it be safe to assume

> > > that message delivery is failed permanently just by encountering TimeoutException in callback?

> > >

> > > Here is a snippet from above mentioned documentation:

> > > "exception - The exception thrown during processing of this record.

> > > Null if no error occurred. Possible thrown exceptions include:

> > > Non-Retriable exceptions (fatal, the message will never be sent):

> > > InvalidTopicException OffsetMetadataTooLargeException

> > > RecordBatchTooLargeException RecordTooLargeException

> > > UnknownServerException Retriable exceptions (transient, may be

> > > covered by increasing #.retries): CorruptRecordException

> > > InvalidMetadataException NotEnoughReplicasAfterAppendException

> > > NotEnoughReplicasException OffsetOutOfRangeException

> > > TimeoutException UnknownTopicOrPartitionException"

> > >

> > > @asaf :My kafka - API version is 0.10.0.1. So I think I should not

> > > face the issue that you are mentioning. I mentioned documentation

> > > link of 0.9 by mistake.

> > >

> > > Regards,

> > > Vatsal

> > > -----Original Message-----

> > > From: Asaf Mesika [mailto:asaf.mesika@gmail.com]

> > > Sent: 02 December 2016 00:32

> > > To: Kafka Users <us...@kafka.apache.org>>

> > > Subject: Re: Detecting when all the retries are expired for a

> > > message

> > >

> > > There's a critical bug in that section that has only been fixed in

> > > 0.9.0.2 which has not been release yet. Without the fix it doesn't

> > really retry.

> > > I forked the kafka repo, applied the fix, built it and placed it

> > > in our own Nexus Maven repository until 0.9.0.2 will be released.

> > >

> > > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio

> > >

> > > Feel free to use it.

> > >

> > > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <is...@juma.me.uk>> wrote:

> > >

> > > > The callback should give you what you are asking for. Has it not

> > > > worked as you expect when you tried it?

> > > >

> > > > Ismael

> > > >

> > > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal

> > > > <Me...@sky.optymyze.com>>

> > > > wrote:

> > > >

> > > > > Hi,

> > > > >

> > > > >

> > > > >

> > > > > I am reading a file and dumping each record on Kafka. Here is

> > > > > my producer

> > > > > code:

> > > > >

> > > > >

> > > > >

> > > > > public void produce(String topicName, String filePath, String

> > > > > bootstrapServers, String encoding) {

> > > > >

> > > > >                 try (BufferedReader bf =

> > > > > getBufferedReader(filePath, encoding);

> > > > >

> > > > >                                 KafkaProducer<Object, String>

> > > > > producer =

> > > > > initKafkaProducer(bootstrapServers)) {

> > > > >

> > > > >                                 String line;

> > > > >

> > > > >                                 while ((line = bf.readLine())

> > > > > !=

> > > > > null) {

> > > > >

> > > > >

> > > > > producer.send(new ProducerRecord<>(topicName, line),

> > > > > (metadata, e) -> {

> > > > >

> > > > >

> > > > > if (e !=

> > > > > null) {

> > > > >

> > > > >

> > > > >       e.printStackTrace();

> > > > >

> > > > >

> > > > > }

> > > > >

> > > > >                                                 });

> > > > >

> > > > >                                 }

> > > > >

> > > > >                                 producer.flush();

> > > > >

> > > > >                 } catch (IOException e) {

> > > > >

> > > > >                                 Throwables.propagate(e);

> > > > >

> > > > >                 }

> > > > >

> > > > > }

> > > > >

> > > > >

> > > > >

> > > > > private static KafkaProducer<Object, String>

> > > > > initKafkaProducer(String

> > > > > bootstrapServer) {

> > > > >

> > > > >                 Properties properties = new Properties();

> > > > >

> > > > >                 properties.put("bootstrap.servers",

> > > > > bootstrapServer);

> > > > >

> > > > >                 properties.put("key.serializer",

> > > StringSerializer.class.

> > > > > getCanonicalName());

> > > > >

> > > > >                 properties.put("value.serializer",

> > > > StringSerializer.class.

> > > > > getCanonicalName());

> > > > >

> > > > >                 properties.put("acks", "-1");

> > > > >

> > > > >                 properties.put("retries", 10);

> > > > >

> > > > >                 return new KafkaProducer<>(properties);

> > > > >

> > > > > }

> > > > >

> > > > >

> > > > >

> > > > > private BufferedReader getBufferedReader(String filePath,

> > > > > String

> > > > encoding)

> > > > > throws UnsupportedEncodingException, FileNotFoundException {

> > > > >

> > > > >                 return new BufferedReader(new

> > > > > InputStreamReader(new FileInputStream(filePath),

> > Optional.ofNullable(encoding).

> > > > > orElse("UTF-8")));

> > > > >

> > > > > }

> > > > >

> > > > >

> > > > >

> > > > > As per the official documentation of

> > > > > Callback<https://kafka.apache

> .

> > > > > org/090/javadoc/org/apache/kafka/clients/producer/Callback.htm

> > > > > l>, TimeoutException is a retriable exception. As I have kept

> > > > > retries 10, producer will try to resend the message if

> > > > > delivering some message fails with TimeoutException. I am

> > > > > looking for some reliable to way to detect

> > > > when

> > > > > delivery of a message is failed permanently after all retries.

> > > > >

> > > > >

> > > > >

> > > > > Regards,

> > > > >

> > > > > Vatsal

> > > > >

> > > >

> > >

> >

>







--

Regards,



Rajini

Re: Detecting when all the retries are expired for a message

Posted by Rajini Sivaram <ra...@googlemail.com>.
I believe batches in RecordAccumulator are expired after request.timeout.ms,
so they wouldn't get retried in this case. I think the config options are
quite confusing, making it hard to figure out the behavior without looking
into the code.

On Tue, Dec 6, 2016 at 10:10 AM, Asaf Mesika <as...@gmail.com> wrote:

> Vatsal:
>
> I don't think they merged the fix for this bug (retries doesn't work) in
> 0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547
>
>
> On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal <Me...@sky.optymyze.com>
> wrote:
>
> > Hello,
> >
> > Bumping up this thread in case anyone of you have any say on this issue.
> >
> > Regards,
> > Vatsal
> >
> > -----Original Message-----
> > From: Mevada, Vatsal
> > Sent: 02 December 2016 16:16
> > To: Kafka Users <us...@kafka.apache.org>
> > Subject: RE: Detecting when all the retries are expired for a message
> >
> > I executed the same producer code for a single record file with following
> > config:
> >
> >         properties.put("bootstrap.servers", bootstrapServer);
> >         properties.put("key.serializer",
> > StringSerializer.class.getCanonicalName());
> >         properties.put("value.serializer",
> > StringSerializer.class.getCanonicalName());
> >         properties.put("acks", "-1");
> >         properties.put("retries", 50000);
> >         properties.put("request.timeout.ms", 1);
> >
> > I have kept request.timeout.ms=1 to make sure that message delivery will
> > fail with TimeoutException. Since the retries are 50000 then the program
> > should take at-least 50000 ms (50 seconds) to complete for single record.
> > However the program is completing almost instantly with only one callback
> > with TimeoutException. I suspect that producer is not going for any
> > retries. Or am I missing something in my code?
> >
> > My Kafka version is 0.10.0.1.
> >
> > Regards,
> > Vatsal
> > Am I missing any configuration or
> > -----Original Message-----
> > From: Ismael Juma [mailto:ismaelj@gmail.com]
> > Sent: 02 December 2016 13:30
> > To: Kafka Users <us...@kafka.apache.org>
> > Subject: RE: Detecting when all the retries are expired for a message
> >
> > The callback is called after the retries have been exhausted.
> >
> > Ismael
> >
> > On 2 Dec 2016 3:34 am, "Mevada, Vatsal" <Me...@sky.optymyze.com> wrote:
> >
> > > @Ismael:
> > >
> > > I can handle TimeoutException in the callback. However as per the
> > > documentation of Callback(link: https://kafka.apache.org/0100/
> > > javadoc/org/apache/kafka/clients/producer/Callback.html),
> > > TimeoutException is a retriable exception and it says that it "may be
> > > covered by increasing #.retries". So even if I get TimeoutException in
> > > callback, wouldn't it try to send message again until all the retries
> > > are done? Would it be safe to assume that message delivery is failed
> > > permanently just by encountering TimeoutException in callback?
> > >
> > > Here is a snippet from above mentioned documentation:
> > > "exception - The exception thrown during processing of this record.
> > > Null if no error occurred. Possible thrown exceptions include:
> > > Non-Retriable exceptions (fatal, the message will never be sent):
> > > InvalidTopicException OffsetMetadataTooLargeException
> > > RecordBatchTooLargeException RecordTooLargeException
> > > UnknownServerException Retriable exceptions (transient, may be covered
> > > by increasing #.retries): CorruptRecordException
> > > InvalidMetadataException NotEnoughReplicasAfterAppendException
> > > NotEnoughReplicasException OffsetOutOfRangeException TimeoutException
> > > UnknownTopicOrPartitionException"
> > >
> > > @asaf :My kafka - API version is 0.10.0.1. So I think I should not
> > > face the issue that you are mentioning. I mentioned documentation link
> > > of 0.9 by mistake.
> > >
> > > Regards,
> > > Vatsal
> > > -----Original Message-----
> > > From: Asaf Mesika [mailto:asaf.mesika@gmail.com]
> > > Sent: 02 December 2016 00:32
> > > To: Kafka Users <us...@kafka.apache.org>
> > > Subject: Re: Detecting when all the retries are expired for a message
> > >
> > > There's a critical bug in that section that has only been fixed in
> > > 0.9.0.2 which has not been release yet. Without the fix it doesn't
> > really retry.
> > > I forked the kafka repo, applied the fix, built it and placed it in
> > > our own Nexus Maven repository until 0.9.0.2 will be released.
> > >
> > > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
> > >
> > > Feel free to use it.
> > >
> > > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <is...@juma.me.uk> wrote:
> > >
> > > > The callback should give you what you are asking for. Has it not
> > > > worked as you expect when you tried it?
> > > >
> > > > Ismael
> > > >
> > > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal
> > > > <Me...@sky.optymyze.com>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > >
> > > > >
> > > > > I am reading a file and dumping each record on Kafka. Here is my
> > > > > producer
> > > > > code:
> > > > >
> > > > >
> > > > >
> > > > > public void produce(String topicName, String filePath, String
> > > > > bootstrapServers, String encoding) {
> > > > >
> > > > >                 try (BufferedReader bf =
> > > > > getBufferedReader(filePath, encoding);
> > > > >
> > > > >                                 KafkaProducer<Object, String>
> > > > > producer =
> > > > > initKafkaProducer(bootstrapServers)) {
> > > > >
> > > > >                                 String line;
> > > > >
> > > > >                                 while ((line = bf.readLine()) !=
> > > > > null) {
> > > > >
> > > > >                                                 producer.send(new
> > > > > ProducerRecord<>(topicName, line), (metadata, e) -> {
> > > > >
> > > > >                                                                 if
> > > > > (e !=
> > > > > null) {
> > > > >
> > > > >
> > > > >       e.printStackTrace();
> > > > >
> > > > >                                                                 }
> > > > >
> > > > >                                                 });
> > > > >
> > > > >                                 }
> > > > >
> > > > >                                 producer.flush();
> > > > >
> > > > >                 } catch (IOException e) {
> > > > >
> > > > >                                 Throwables.propagate(e);
> > > > >
> > > > >                 }
> > > > >
> > > > > }
> > > > >
> > > > >
> > > > >
> > > > > private static KafkaProducer<Object, String>
> > > > > initKafkaProducer(String
> > > > > bootstrapServer) {
> > > > >
> > > > >                 Properties properties = new Properties();
> > > > >
> > > > >                 properties.put("bootstrap.servers",
> > > > > bootstrapServer);
> > > > >
> > > > >                 properties.put("key.serializer",
> > > StringSerializer.class.
> > > > > getCanonicalName());
> > > > >
> > > > >                 properties.put("value.serializer",
> > > > StringSerializer.class.
> > > > > getCanonicalName());
> > > > >
> > > > >                 properties.put("acks", "-1");
> > > > >
> > > > >                 properties.put("retries", 10);
> > > > >
> > > > >                 return new KafkaProducer<>(properties);
> > > > >
> > > > > }
> > > > >
> > > > >
> > > > >
> > > > > private BufferedReader getBufferedReader(String filePath, String
> > > > encoding)
> > > > > throws UnsupportedEncodingException, FileNotFoundException {
> > > > >
> > > > >                 return new BufferedReader(new
> > > > > InputStreamReader(new FileInputStream(filePath),
> > Optional.ofNullable(encoding).
> > > > > orElse("UTF-8")));
> > > > >
> > > > > }
> > > > >
> > > > >
> > > > >
> > > > > As per the official documentation of Callback<https://kafka.apache
> .
> > > > > org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
> > > > > TimeoutException is a retriable exception. As I have kept retries
> > > > > 10, producer will try to resend the message if delivering some
> > > > > message fails with TimeoutException. I am looking for some
> > > > > reliable to way to detect
> > > > when
> > > > > delivery of a message is failed permanently after all retries.
> > > > >
> > > > >
> > > > >
> > > > > Regards,
> > > > >
> > > > > Vatsal
> > > > >
> > > >
> > >
> >
>



-- 
Regards,

Rajini

Re: Detecting when all the retries are expired for a message

Posted by Asaf Mesika <as...@gmail.com>.
Vatsal:

I don't think they merged the fix for this bug (retries doesn't work) in
0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547


On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal <Me...@sky.optymyze.com>
wrote:

> Hello,
>
> Bumping up this thread in case anyone of you have any say on this issue.
>
> Regards,
> Vatsal
>
> -----Original Message-----
> From: Mevada, Vatsal
> Sent: 02 December 2016 16:16
> To: Kafka Users <us...@kafka.apache.org>
> Subject: RE: Detecting when all the retries are expired for a message
>
> I executed the same producer code for a single record file with following
> config:
>
>         properties.put("bootstrap.servers", bootstrapServer);
>         properties.put("key.serializer",
> StringSerializer.class.getCanonicalName());
>         properties.put("value.serializer",
> StringSerializer.class.getCanonicalName());
>         properties.put("acks", "-1");
>         properties.put("retries", 50000);
>         properties.put("request.timeout.ms", 1);
>
> I have kept request.timeout.ms=1 to make sure that message delivery will
> fail with TimeoutException. Since the retries are 50000 then the program
> should take at-least 50000 ms (50 seconds) to complete for single record.
> However the program is completing almost instantly with only one callback
> with TimeoutException. I suspect that producer is not going for any
> retries. Or am I missing something in my code?
>
> My Kafka version is 0.10.0.1.
>
> Regards,
> Vatsal
> Am I missing any configuration or
> -----Original Message-----
> From: Ismael Juma [mailto:ismaelj@gmail.com]
> Sent: 02 December 2016 13:30
> To: Kafka Users <us...@kafka.apache.org>
> Subject: RE: Detecting when all the retries are expired for a message
>
> The callback is called after the retries have been exhausted.
>
> Ismael
>
> On 2 Dec 2016 3:34 am, "Mevada, Vatsal" <Me...@sky.optymyze.com> wrote:
>
> > @Ismael:
> >
> > I can handle TimeoutException in the callback. However as per the
> > documentation of Callback(link: https://kafka.apache.org/0100/
> > javadoc/org/apache/kafka/clients/producer/Callback.html),
> > TimeoutException is a retriable exception and it says that it "may be
> > covered by increasing #.retries". So even if I get TimeoutException in
> > callback, wouldn't it try to send message again until all the retries
> > are done? Would it be safe to assume that message delivery is failed
> > permanently just by encountering TimeoutException in callback?
> >
> > Here is a snippet from above mentioned documentation:
> > "exception - The exception thrown during processing of this record.
> > Null if no error occurred. Possible thrown exceptions include:
> > Non-Retriable exceptions (fatal, the message will never be sent):
> > InvalidTopicException OffsetMetadataTooLargeException
> > RecordBatchTooLargeException RecordTooLargeException
> > UnknownServerException Retriable exceptions (transient, may be covered
> > by increasing #.retries): CorruptRecordException
> > InvalidMetadataException NotEnoughReplicasAfterAppendException
> > NotEnoughReplicasException OffsetOutOfRangeException TimeoutException
> > UnknownTopicOrPartitionException"
> >
> > @asaf :My kafka - API version is 0.10.0.1. So I think I should not
> > face the issue that you are mentioning. I mentioned documentation link
> > of 0.9 by mistake.
> >
> > Regards,
> > Vatsal
> > -----Original Message-----
> > From: Asaf Mesika [mailto:asaf.mesika@gmail.com]
> > Sent: 02 December 2016 00:32
> > To: Kafka Users <us...@kafka.apache.org>
> > Subject: Re: Detecting when all the retries are expired for a message
> >
> > There's a critical bug in that section that has only been fixed in
> > 0.9.0.2 which has not been release yet. Without the fix it doesn't
> really retry.
> > I forked the kafka repo, applied the fix, built it and placed it in
> > our own Nexus Maven repository until 0.9.0.2 will be released.
> >
> > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
> >
> > Feel free to use it.
> >
> > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <is...@juma.me.uk> wrote:
> >
> > > The callback should give you what you are asking for. Has it not
> > > worked as you expect when you tried it?
> > >
> > > Ismael
> > >
> > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal
> > > <Me...@sky.optymyze.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > >
> > > >
> > > > I am reading a file and dumping each record on Kafka. Here is my
> > > > producer
> > > > code:
> > > >
> > > >
> > > >
> > > > public void produce(String topicName, String filePath, String
> > > > bootstrapServers, String encoding) {
> > > >
> > > >                 try (BufferedReader bf =
> > > > getBufferedReader(filePath, encoding);
> > > >
> > > >                                 KafkaProducer<Object, String>
> > > > producer =
> > > > initKafkaProducer(bootstrapServers)) {
> > > >
> > > >                                 String line;
> > > >
> > > >                                 while ((line = bf.readLine()) !=
> > > > null) {
> > > >
> > > >                                                 producer.send(new
> > > > ProducerRecord<>(topicName, line), (metadata, e) -> {
> > > >
> > > >                                                                 if
> > > > (e !=
> > > > null) {
> > > >
> > > >
> > > >       e.printStackTrace();
> > > >
> > > >                                                                 }
> > > >
> > > >                                                 });
> > > >
> > > >                                 }
> > > >
> > > >                                 producer.flush();
> > > >
> > > >                 } catch (IOException e) {
> > > >
> > > >                                 Throwables.propagate(e);
> > > >
> > > >                 }
> > > >
> > > > }
> > > >
> > > >
> > > >
> > > > private static KafkaProducer<Object, String>
> > > > initKafkaProducer(String
> > > > bootstrapServer) {
> > > >
> > > >                 Properties properties = new Properties();
> > > >
> > > >                 properties.put("bootstrap.servers",
> > > > bootstrapServer);
> > > >
> > > >                 properties.put("key.serializer",
> > StringSerializer.class.
> > > > getCanonicalName());
> > > >
> > > >                 properties.put("value.serializer",
> > > StringSerializer.class.
> > > > getCanonicalName());
> > > >
> > > >                 properties.put("acks", "-1");
> > > >
> > > >                 properties.put("retries", 10);
> > > >
> > > >                 return new KafkaProducer<>(properties);
> > > >
> > > > }
> > > >
> > > >
> > > >
> > > > private BufferedReader getBufferedReader(String filePath, String
> > > encoding)
> > > > throws UnsupportedEncodingException, FileNotFoundException {
> > > >
> > > >                 return new BufferedReader(new
> > > > InputStreamReader(new FileInputStream(filePath),
> Optional.ofNullable(encoding).
> > > > orElse("UTF-8")));
> > > >
> > > > }
> > > >
> > > >
> > > >
> > > > As per the official documentation of Callback<https://kafka.apache.
> > > > org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
> > > > TimeoutException is a retriable exception. As I have kept retries
> > > > 10, producer will try to resend the message if delivering some
> > > > message fails with TimeoutException. I am looking for some
> > > > reliable to way to detect
> > > when
> > > > delivery of a message is failed permanently after all retries.
> > > >
> > > >
> > > >
> > > > Regards,
> > > >
> > > > Vatsal
> > > >
> > >
> >
>

RE: Detecting when all the retries are expired for a message

Posted by "Mevada, Vatsal" <Me...@sky.optymyze.com>.
Hello,

Bumping up this thread in case anyone of you have any say on this issue.

Regards,
Vatsal

-----Original Message-----
From: Mevada, Vatsal 
Sent: 02 December 2016 16:16
To: Kafka Users <us...@kafka.apache.org>
Subject: RE: Detecting when all the retries are expired for a message

I executed the same producer code for a single record file with following config:

        properties.put("bootstrap.servers", bootstrapServer);
        properties.put("key.serializer", StringSerializer.class.getCanonicalName());
        properties.put("value.serializer", StringSerializer.class.getCanonicalName());
        properties.put("acks", "-1");
        properties.put("retries", 50000);
        properties.put("request.timeout.ms", 1);

I have kept request.timeout.ms=1 to make sure that message delivery will fail with TimeoutException. Since the retries are 50000 then the program should take at-least 50000 ms (50 seconds) to complete for single record. However the program is completing almost instantly with only one callback with TimeoutException. I suspect that producer is not going for any retries. Or am I missing something in my code? 

My Kafka version is 0.10.0.1.

Regards,
Vatsal
Am I missing any configuration or
-----Original Message-----
From: Ismael Juma [mailto:ismaelj@gmail.com]
Sent: 02 December 2016 13:30
To: Kafka Users <us...@kafka.apache.org>
Subject: RE: Detecting when all the retries are expired for a message

The callback is called after the retries have been exhausted.

Ismael

On 2 Dec 2016 3:34 am, "Mevada, Vatsal" <Me...@sky.optymyze.com> wrote:

> @Ismael:
>
> I can handle TimeoutException in the callback. However as per the 
> documentation of Callback(link: https://kafka.apache.org/0100/ 
> javadoc/org/apache/kafka/clients/producer/Callback.html),
> TimeoutException is a retriable exception and it says that it "may be 
> covered by increasing #.retries". So even if I get TimeoutException in 
> callback, wouldn't it try to send message again until all the retries 
> are done? Would it be safe to assume that message delivery is failed 
> permanently just by encountering TimeoutException in callback?
>
> Here is a snippet from above mentioned documentation:
> "exception - The exception thrown during processing of this record. 
> Null if no error occurred. Possible thrown exceptions include: 
> Non-Retriable exceptions (fatal, the message will never be sent): 
> InvalidTopicException OffsetMetadataTooLargeException 
> RecordBatchTooLargeException RecordTooLargeException 
> UnknownServerException Retriable exceptions (transient, may be covered 
> by increasing #.retries): CorruptRecordException 
> InvalidMetadataException NotEnoughReplicasAfterAppendException
> NotEnoughReplicasException OffsetOutOfRangeException TimeoutException 
> UnknownTopicOrPartitionException"
>
> @asaf :My kafka - API version is 0.10.0.1. So I think I should not 
> face the issue that you are mentioning. I mentioned documentation link 
> of 0.9 by mistake.
>
> Regards,
> Vatsal
> -----Original Message-----
> From: Asaf Mesika [mailto:asaf.mesika@gmail.com]
> Sent: 02 December 2016 00:32
> To: Kafka Users <us...@kafka.apache.org>
> Subject: Re: Detecting when all the retries are expired for a message
>
> There's a critical bug in that section that has only been fixed in
> 0.9.0.2 which has not been release yet. Without the fix it doesn't really retry.
> I forked the kafka repo, applied the fix, built it and placed it in 
> our own Nexus Maven repository until 0.9.0.2 will be released.
>
> https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
>
> Feel free to use it.
>
> On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <is...@juma.me.uk> wrote:
>
> > The callback should give you what you are asking for. Has it not 
> > worked as you expect when you tried it?
> >
> > Ismael
> >
> > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal 
> > <Me...@sky.optymyze.com>
> > wrote:
> >
> > > Hi,
> > >
> > >
> > >
> > > I am reading a file and dumping each record on Kafka. Here is my 
> > > producer
> > > code:
> > >
> > >
> > >
> > > public void produce(String topicName, String filePath, String 
> > > bootstrapServers, String encoding) {
> > >
> > >                 try (BufferedReader bf = 
> > > getBufferedReader(filePath, encoding);
> > >
> > >                                 KafkaProducer<Object, String> 
> > > producer =
> > > initKafkaProducer(bootstrapServers)) {
> > >
> > >                                 String line;
> > >
> > >                                 while ((line = bf.readLine()) !=
> > > null) {
> > >
> > >                                                 producer.send(new 
> > > ProducerRecord<>(topicName, line), (metadata, e) -> {
> > >
> > >                                                                 if 
> > > (e !=
> > > null) {
> > >
> > >
> > >       e.printStackTrace();
> > >
> > >                                                                 }
> > >
> > >                                                 });
> > >
> > >                                 }
> > >
> > >                                 producer.flush();
> > >
> > >                 } catch (IOException e) {
> > >
> > >                                 Throwables.propagate(e);
> > >
> > >                 }
> > >
> > > }
> > >
> > >
> > >
> > > private static KafkaProducer<Object, String> 
> > > initKafkaProducer(String
> > > bootstrapServer) {
> > >
> > >                 Properties properties = new Properties();
> > >
> > >                 properties.put("bootstrap.servers",
> > > bootstrapServer);
> > >
> > >                 properties.put("key.serializer",
> StringSerializer.class.
> > > getCanonicalName());
> > >
> > >                 properties.put("value.serializer",
> > StringSerializer.class.
> > > getCanonicalName());
> > >
> > >                 properties.put("acks", "-1");
> > >
> > >                 properties.put("retries", 10);
> > >
> > >                 return new KafkaProducer<>(properties);
> > >
> > > }
> > >
> > >
> > >
> > > private BufferedReader getBufferedReader(String filePath, String
> > encoding)
> > > throws UnsupportedEncodingException, FileNotFoundException {
> > >
> > >                 return new BufferedReader(new 
> > > InputStreamReader(new FileInputStream(filePath), Optional.ofNullable(encoding).
> > > orElse("UTF-8")));
> > >
> > > }
> > >
> > >
> > >
> > > As per the official documentation of Callback<https://kafka.apache.
> > > org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
> > > TimeoutException is a retriable exception. As I have kept retries 
> > > 10, producer will try to resend the message if delivering some 
> > > message fails with TimeoutException. I am looking for some 
> > > reliable to way to detect
> > when
> > > delivery of a message is failed permanently after all retries.
> > >
> > >
> > >
> > > Regards,
> > >
> > > Vatsal
> > >
> >
>

RE: Detecting when all the retries are expired for a message

Posted by "Mevada, Vatsal" <Me...@sky.optymyze.com>.
I executed the same producer code for a single record file with following config:

        properties.put("bootstrap.servers", bootstrapServer);
        properties.put("key.serializer", StringSerializer.class.getCanonicalName());
        properties.put("value.serializer", StringSerializer.class.getCanonicalName());
        properties.put("acks", "-1");
        properties.put("retries", 50000);
        properties.put("request.timeout.ms", 1);

I have kept request.timeout.ms=1 to make sure that message delivery will fail with TimeoutException. Since the retries are 50000 then the program should take at-least 50000 ms (50 seconds) to complete for single record. However the program is completing almost instantly with only one callback with TimeoutException. I suspect that producer is not going for any retries. Or am I missing something in my code? 

My Kafka version is 0.10.0.1.

Regards,
Vatsal
Am I missing any configuration or 
-----Original Message-----
From: Ismael Juma [mailto:ismaelj@gmail.com] 
Sent: 02 December 2016 13:30
To: Kafka Users <us...@kafka.apache.org>
Subject: RE: Detecting when all the retries are expired for a message

The callback is called after the retries have been exhausted.

Ismael

On 2 Dec 2016 3:34 am, "Mevada, Vatsal" <Me...@sky.optymyze.com> wrote:

> @Ismael:
>
> I can handle TimeoutException in the callback. However as per the 
> documentation of Callback(link: https://kafka.apache.org/0100/ 
> javadoc/org/apache/kafka/clients/producer/Callback.html),
> TimeoutException is a retriable exception and it says that it "may be 
> covered by increasing #.retries". So even if I get TimeoutException in 
> callback, wouldn't it try to send message again until all the retries 
> are done? Would it be safe to assume that message delivery is failed 
> permanently just by encountering TimeoutException in callback?
>
> Here is a snippet from above mentioned documentation:
> "exception - The exception thrown during processing of this record. 
> Null if no error occurred. Possible thrown exceptions include: 
> Non-Retriable exceptions (fatal, the message will never be sent): 
> InvalidTopicException OffsetMetadataTooLargeException 
> RecordBatchTooLargeException RecordTooLargeException 
> UnknownServerException Retriable exceptions (transient, may be covered 
> by increasing #.retries): CorruptRecordException 
> InvalidMetadataException NotEnoughReplicasAfterAppendException
> NotEnoughReplicasException OffsetOutOfRangeException TimeoutException 
> UnknownTopicOrPartitionException"
>
> @asaf :My kafka - API version is 0.10.0.1. So I think I should not 
> face the issue that you are mentioning. I mentioned documentation link 
> of 0.9 by mistake.
>
> Regards,
> Vatsal
> -----Original Message-----
> From: Asaf Mesika [mailto:asaf.mesika@gmail.com]
> Sent: 02 December 2016 00:32
> To: Kafka Users <us...@kafka.apache.org>
> Subject: Re: Detecting when all the retries are expired for a message
>
> There's a critical bug in that section that has only been fixed in 
> 0.9.0.2 which has not been release yet. Without the fix it doesn't really retry.
> I forked the kafka repo, applied the fix, built it and placed it in 
> our own Nexus Maven repository until 0.9.0.2 will be released.
>
> https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
>
> Feel free to use it.
>
> On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <is...@juma.me.uk> wrote:
>
> > The callback should give you what you are asking for. Has it not 
> > worked as you expect when you tried it?
> >
> > Ismael
> >
> > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal 
> > <Me...@sky.optymyze.com>
> > wrote:
> >
> > > Hi,
> > >
> > >
> > >
> > > I am reading a file and dumping each record on Kafka. Here is my 
> > > producer
> > > code:
> > >
> > >
> > >
> > > public void produce(String topicName, String filePath, String 
> > > bootstrapServers, String encoding) {
> > >
> > >                 try (BufferedReader bf = 
> > > getBufferedReader(filePath, encoding);
> > >
> > >                                 KafkaProducer<Object, String> 
> > > producer =
> > > initKafkaProducer(bootstrapServers)) {
> > >
> > >                                 String line;
> > >
> > >                                 while ((line = bf.readLine()) !=
> > > null) {
> > >
> > >                                                 producer.send(new 
> > > ProducerRecord<>(topicName, line), (metadata, e) -> {
> > >
> > >                                                                 if 
> > > (e !=
> > > null) {
> > >
> > >
> > >       e.printStackTrace();
> > >
> > >                                                                 }
> > >
> > >                                                 });
> > >
> > >                                 }
> > >
> > >                                 producer.flush();
> > >
> > >                 } catch (IOException e) {
> > >
> > >                                 Throwables.propagate(e);
> > >
> > >                 }
> > >
> > > }
> > >
> > >
> > >
> > > private static KafkaProducer<Object, String> 
> > > initKafkaProducer(String
> > > bootstrapServer) {
> > >
> > >                 Properties properties = new Properties();
> > >
> > >                 properties.put("bootstrap.servers",
> > > bootstrapServer);
> > >
> > >                 properties.put("key.serializer",
> StringSerializer.class.
> > > getCanonicalName());
> > >
> > >                 properties.put("value.serializer",
> > StringSerializer.class.
> > > getCanonicalName());
> > >
> > >                 properties.put("acks", "-1");
> > >
> > >                 properties.put("retries", 10);
> > >
> > >                 return new KafkaProducer<>(properties);
> > >
> > > }
> > >
> > >
> > >
> > > private BufferedReader getBufferedReader(String filePath, String
> > encoding)
> > > throws UnsupportedEncodingException, FileNotFoundException {
> > >
> > >                 return new BufferedReader(new 
> > > InputStreamReader(new FileInputStream(filePath), Optional.ofNullable(encoding).
> > > orElse("UTF-8")));
> > >
> > > }
> > >
> > >
> > >
> > > As per the official documentation of Callback<https://kafka.apache.
> > > org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
> > > TimeoutException is a retriable exception. As I have kept retries 
> > > 10, producer will try to resend the message if delivering some 
> > > message fails with TimeoutException. I am looking for some 
> > > reliable to way to detect
> > when
> > > delivery of a message is failed permanently after all retries.
> > >
> > >
> > >
> > > Regards,
> > >
> > > Vatsal
> > >
> >
>

RE: Detecting when all the retries are expired for a message

Posted by Ismael Juma <is...@gmail.com>.
The callback is called after the retries have been exhausted.

Ismael

On 2 Dec 2016 3:34 am, "Mevada, Vatsal" <Me...@sky.optymyze.com> wrote:

> @Ismael:
>
> I can handle TimeoutException in the callback. However as per the
> documentation of Callback(link: https://kafka.apache.org/0100/
> javadoc/org/apache/kafka/clients/producer/Callback.html),
> TimeoutException is a retriable exception and it says that it "may be
> covered by increasing #.retries". So even if I get TimeoutException in
> callback, wouldn't it try to send message again until all the retries are
> done? Would it be safe to assume that message delivery is failed
> permanently just by encountering TimeoutException in callback?
>
> Here is a snippet from above mentioned documentation:
> "exception - The exception thrown during processing of this record. Null
> if no error occurred. Possible thrown exceptions include: Non-Retriable
> exceptions (fatal, the message will never be sent): InvalidTopicException
> OffsetMetadataTooLargeException RecordBatchTooLargeException
> RecordTooLargeException UnknownServerException Retriable exceptions
> (transient, may be covered by increasing #.retries): CorruptRecordException
> InvalidMetadataException NotEnoughReplicasAfterAppendException
> NotEnoughReplicasException OffsetOutOfRangeException TimeoutException
> UnknownTopicOrPartitionException"
>
> @asaf :My kafka - API version is 0.10.0.1. So I think I should not face
> the issue that you are mentioning. I mentioned documentation link of 0.9 by
> mistake.
>
> Regards,
> Vatsal
> -----Original Message-----
> From: Asaf Mesika [mailto:asaf.mesika@gmail.com]
> Sent: 02 December 2016 00:32
> To: Kafka Users <us...@kafka.apache.org>
> Subject: Re: Detecting when all the retries are expired for a message
>
> There's a critical bug in that section that has only been fixed in 0.9.0.2
> which has not been release yet. Without the fix it doesn't really retry.
> I forked the kafka repo, applied the fix, built it and placed it in our
> own Nexus Maven repository until 0.9.0.2 will be released.
>
> https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
>
> Feel free to use it.
>
> On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <is...@juma.me.uk> wrote:
>
> > The callback should give you what you are asking for. Has it not
> > worked as you expect when you tried it?
> >
> > Ismael
> >
> > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal
> > <Me...@sky.optymyze.com>
> > wrote:
> >
> > > Hi,
> > >
> > >
> > >
> > > I am reading a file and dumping each record on Kafka. Here is my
> > > producer
> > > code:
> > >
> > >
> > >
> > > public void produce(String topicName, String filePath, String
> > > bootstrapServers, String encoding) {
> > >
> > >                 try (BufferedReader bf = getBufferedReader(filePath,
> > > encoding);
> > >
> > >                                 KafkaProducer<Object, String>
> > > producer =
> > > initKafkaProducer(bootstrapServers)) {
> > >
> > >                                 String line;
> > >
> > >                                 while ((line = bf.readLine()) !=
> > > null) {
> > >
> > >                                                 producer.send(new
> > > ProducerRecord<>(topicName, line), (metadata, e) -> {
> > >
> > >                                                                 if
> > > (e !=
> > > null) {
> > >
> > >
> > >       e.printStackTrace();
> > >
> > >                                                                 }
> > >
> > >                                                 });
> > >
> > >                                 }
> > >
> > >                                 producer.flush();
> > >
> > >                 } catch (IOException e) {
> > >
> > >                                 Throwables.propagate(e);
> > >
> > >                 }
> > >
> > > }
> > >
> > >
> > >
> > > private static KafkaProducer<Object, String>
> > > initKafkaProducer(String
> > > bootstrapServer) {
> > >
> > >                 Properties properties = new Properties();
> > >
> > >                 properties.put("bootstrap.servers",
> > > bootstrapServer);
> > >
> > >                 properties.put("key.serializer",
> StringSerializer.class.
> > > getCanonicalName());
> > >
> > >                 properties.put("value.serializer",
> > StringSerializer.class.
> > > getCanonicalName());
> > >
> > >                 properties.put("acks", "-1");
> > >
> > >                 properties.put("retries", 10);
> > >
> > >                 return new KafkaProducer<>(properties);
> > >
> > > }
> > >
> > >
> > >
> > > private BufferedReader getBufferedReader(String filePath, String
> > encoding)
> > > throws UnsupportedEncodingException, FileNotFoundException {
> > >
> > >                 return new BufferedReader(new InputStreamReader(new
> > > FileInputStream(filePath), Optional.ofNullable(encoding).
> > > orElse("UTF-8")));
> > >
> > > }
> > >
> > >
> > >
> > > As per the official documentation of Callback<https://kafka.apache.
> > > org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
> > > TimeoutException is a retriable exception. As I have kept retries
> > > 10, producer will try to resend the message if delivering some
> > > message fails with TimeoutException. I am looking for some reliable
> > > to way to detect
> > when
> > > delivery of a message is failed permanently after all retries.
> > >
> > >
> > >
> > > Regards,
> > >
> > > Vatsal
> > >
> >
>

RE: Detecting when all the retries are expired for a message

Posted by "Mevada, Vatsal" <Me...@sky.optymyze.com>.
@Ismael:

I can handle TimeoutException in the callback. However as per the documentation of Callback(link: https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/Callback.html), TimeoutException is a retriable exception and it says that it "may be covered by increasing #.retries". So even if I get TimeoutException in callback, wouldn't it try to send message again until all the retries are done? Would it be safe to assume that message delivery is failed 
permanently just by encountering TimeoutException in callback?

Here is a snippet from above mentioned documentation: 
"exception - The exception thrown during processing of this record. Null if no error occurred. Possible thrown exceptions include: Non-Retriable exceptions (fatal, the message will never be sent): InvalidTopicException OffsetMetadataTooLargeException RecordBatchTooLargeException RecordTooLargeException UnknownServerException Retriable exceptions (transient, may be covered by increasing #.retries): CorruptRecordException InvalidMetadataException NotEnoughReplicasAfterAppendException NotEnoughReplicasException OffsetOutOfRangeException TimeoutException UnknownTopicOrPartitionException"

@asaf :My kafka - API version is 0.10.0.1. So I think I should not face the issue that you are mentioning. I mentioned documentation link of 0.9 by mistake.

Regards,
Vatsal
-----Original Message-----
From: Asaf Mesika [mailto:asaf.mesika@gmail.com] 
Sent: 02 December 2016 00:32
To: Kafka Users <us...@kafka.apache.org>
Subject: Re: Detecting when all the retries are expired for a message

There's a critical bug in that section that has only been fixed in 0.9.0.2 which has not been release yet. Without the fix it doesn't really retry.
I forked the kafka repo, applied the fix, built it and placed it in our own Nexus Maven repository until 0.9.0.2 will be released.

https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio

Feel free to use it.

On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <is...@juma.me.uk> wrote:

> The callback should give you what you are asking for. Has it not 
> worked as you expect when you tried it?
>
> Ismael
>
> On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal 
> <Me...@sky.optymyze.com>
> wrote:
>
> > Hi,
> >
> >
> >
> > I am reading a file and dumping each record on Kafka. Here is my 
> > producer
> > code:
> >
> >
> >
> > public void produce(String topicName, String filePath, String 
> > bootstrapServers, String encoding) {
> >
> >                 try (BufferedReader bf = getBufferedReader(filePath, 
> > encoding);
> >
> >                                 KafkaProducer<Object, String> 
> > producer =
> > initKafkaProducer(bootstrapServers)) {
> >
> >                                 String line;
> >
> >                                 while ((line = bf.readLine()) != 
> > null) {
> >
> >                                                 producer.send(new 
> > ProducerRecord<>(topicName, line), (metadata, e) -> {
> >
> >                                                                 if 
> > (e !=
> > null) {
> >
> >
> >       e.printStackTrace();
> >
> >                                                                 }
> >
> >                                                 });
> >
> >                                 }
> >
> >                                 producer.flush();
> >
> >                 } catch (IOException e) {
> >
> >                                 Throwables.propagate(e);
> >
> >                 }
> >
> > }
> >
> >
> >
> > private static KafkaProducer<Object, String> 
> > initKafkaProducer(String
> > bootstrapServer) {
> >
> >                 Properties properties = new Properties();
> >
> >                 properties.put("bootstrap.servers", 
> > bootstrapServer);
> >
> >                 properties.put("key.serializer", StringSerializer.class.
> > getCanonicalName());
> >
> >                 properties.put("value.serializer",
> StringSerializer.class.
> > getCanonicalName());
> >
> >                 properties.put("acks", "-1");
> >
> >                 properties.put("retries", 10);
> >
> >                 return new KafkaProducer<>(properties);
> >
> > }
> >
> >
> >
> > private BufferedReader getBufferedReader(String filePath, String
> encoding)
> > throws UnsupportedEncodingException, FileNotFoundException {
> >
> >                 return new BufferedReader(new InputStreamReader(new 
> > FileInputStream(filePath), Optional.ofNullable(encoding).
> > orElse("UTF-8")));
> >
> > }
> >
> >
> >
> > As per the official documentation of Callback<https://kafka.apache.
> > org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
> > TimeoutException is a retriable exception. As I have kept retries 
> > 10, producer will try to resend the message if delivering some 
> > message fails with TimeoutException. I am looking for some reliable 
> > to way to detect
> when
> > delivery of a message is failed permanently after all retries.
> >
> >
> >
> > Regards,
> >
> > Vatsal
> >
>

Re: Detecting when all the retries are expired for a message

Posted by Asaf Mesika <as...@gmail.com>.
There's a critical bug in that section that has only been fixed in 0.9.0.2
which has not been release yet. Without the fix it doesn't really retry.
I forked the kafka repo, applied the fix, built it and placed it in our own
Nexus Maven repository until 0.9.0.2 will be released.

https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio

Feel free to use it.

On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <is...@juma.me.uk> wrote:

> The callback should give you what you are asking for. Has it not worked as
> you expect when you tried it?
>
> Ismael
>
> On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal <Me...@sky.optymyze.com>
> wrote:
>
> > Hi,
> >
> >
> >
> > I am reading a file and dumping each record on Kafka. Here is my producer
> > code:
> >
> >
> >
> > public void produce(String topicName, String filePath, String
> > bootstrapServers, String encoding) {
> >
> >                 try (BufferedReader bf = getBufferedReader(filePath,
> > encoding);
> >
> >                                 KafkaProducer<Object, String> producer =
> > initKafkaProducer(bootstrapServers)) {
> >
> >                                 String line;
> >
> >                                 while ((line = bf.readLine()) != null) {
> >
> >                                                 producer.send(new
> > ProducerRecord<>(topicName, line), (metadata, e) -> {
> >
> >                                                                 if (e !=
> > null) {
> >
> >
> >       e.printStackTrace();
> >
> >                                                                 }
> >
> >                                                 });
> >
> >                                 }
> >
> >                                 producer.flush();
> >
> >                 } catch (IOException e) {
> >
> >                                 Throwables.propagate(e);
> >
> >                 }
> >
> > }
> >
> >
> >
> > private static KafkaProducer<Object, String> initKafkaProducer(String
> > bootstrapServer) {
> >
> >                 Properties properties = new Properties();
> >
> >                 properties.put("bootstrap.servers", bootstrapServer);
> >
> >                 properties.put("key.serializer", StringSerializer.class.
> > getCanonicalName());
> >
> >                 properties.put("value.serializer",
> StringSerializer.class.
> > getCanonicalName());
> >
> >                 properties.put("acks", "-1");
> >
> >                 properties.put("retries", 10);
> >
> >                 return new KafkaProducer<>(properties);
> >
> > }
> >
> >
> >
> > private BufferedReader getBufferedReader(String filePath, String
> encoding)
> > throws UnsupportedEncodingException, FileNotFoundException {
> >
> >                 return new BufferedReader(new InputStreamReader(new
> > FileInputStream(filePath), Optional.ofNullable(encoding).
> > orElse("UTF-8")));
> >
> > }
> >
> >
> >
> > As per the official documentation of Callback<https://kafka.apache.
> > org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
> > TimeoutException is a retriable exception. As I have kept retries 10,
> > producer will try to resend the message if delivering some message fails
> > with TimeoutException. I am looking for some reliable to way to detect
> when
> > delivery of a message is failed permanently after all retries.
> >
> >
> >
> > Regards,
> >
> > Vatsal
> >
>

Re: Detecting when all the retries are expired for a message

Posted by Ismael Juma <is...@juma.me.uk>.
The callback should give you what you are asking for. Has it not worked as
you expect when you tried it?

Ismael

On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal <Me...@sky.optymyze.com>
wrote:

> Hi,
>
>
>
> I am reading a file and dumping each record on Kafka. Here is my producer
> code:
>
>
>
> public void produce(String topicName, String filePath, String
> bootstrapServers, String encoding) {
>
>                 try (BufferedReader bf = getBufferedReader(filePath,
> encoding);
>
>                                 KafkaProducer<Object, String> producer =
> initKafkaProducer(bootstrapServers)) {
>
>                                 String line;
>
>                                 while ((line = bf.readLine()) != null) {
>
>                                                 producer.send(new
> ProducerRecord<>(topicName, line), (metadata, e) -> {
>
>                                                                 if (e !=
> null) {
>
>
>       e.printStackTrace();
>
>                                                                 }
>
>                                                 });
>
>                                 }
>
>                                 producer.flush();
>
>                 } catch (IOException e) {
>
>                                 Throwables.propagate(e);
>
>                 }
>
> }
>
>
>
> private static KafkaProducer<Object, String> initKafkaProducer(String
> bootstrapServer) {
>
>                 Properties properties = new Properties();
>
>                 properties.put("bootstrap.servers", bootstrapServer);
>
>                 properties.put("key.serializer", StringSerializer.class.
> getCanonicalName());
>
>                 properties.put("value.serializer", StringSerializer.class.
> getCanonicalName());
>
>                 properties.put("acks", "-1");
>
>                 properties.put("retries", 10);
>
>                 return new KafkaProducer<>(properties);
>
> }
>
>
>
> private BufferedReader getBufferedReader(String filePath, String encoding)
> throws UnsupportedEncodingException, FileNotFoundException {
>
>                 return new BufferedReader(new InputStreamReader(new
> FileInputStream(filePath), Optional.ofNullable(encoding).
> orElse("UTF-8")));
>
> }
>
>
>
> As per the official documentation of Callback<https://kafka.apache.
> org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
> TimeoutException is a retriable exception. As I have kept retries 10,
> producer will try to resend the message if delivering some message fails
> with TimeoutException. I am looking for some reliable to way to detect when
> delivery of a message is failed permanently after all retries.
>
>
>
> Regards,
>
> Vatsal
>