You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Bruno D. Rodrigues" <br...@litux.org> on 2013/10/07 13:52:14 UTC

Async and error handling

Apologise in advance if it's a dummy or common question but as usual I couldn't yet find the answer anywhere to it.

How can we setup some kind of handler to catch async errors?

Let's say something as simple as this:

        final Properties props = new Properties();
        props.put("metadata.broker.list", "something:12345");
        props.put("producer.type", "async");
        final ProducerConfig config = new ProducerConfig(props);
        final Producer<String, String> producer = new Producer<String, String>(config);
        while (true) {
final KeyedMessage<String, String> data = new KeyedMessage<>("test", "foo", line);
            final List<KeyedMessage<String, String>> l = Arrays.asList(data);
            producer.send(JavaConversions.asBuffer(l).toList());
        }

Using sync, the producer.send does throw an exception if something goes wrong.

Using async, I can only see on the logs something like this:

java.nio.channels.UnresolvedAddressException
	at sun.nio.ch.Net.checkAddress(Net.java:127)
	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:640)
	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)

Albeit in this case one can assume that the host is correct and it's down so the code should keep running until the server gets back on, my issue was related to the lack of log4j properties ;) and seeing this code producing messages without anything reaching the server.

For example by setting the right server, and then enabling snappy compression, I was impressed that the msg/sec did go up to amazing values, until I noticed that no message was going out at all. Now that I had the intelligence of putting the log4j.properties file, now I can see the real reason:


java.lang.NoClassDefFoundError: org/xerial/snappy/SnappyInputStream
	at kafka.message.ByteBufferMessageSet$.kafka$message$ByteBufferMessageSet$$crea
te(ByteBufferMessageSet.scala:41)
	at kafka.message.ByteBufferMessageSet.<init>(ByteBufferMessageSet.scala:102)
	at kafka.producer.async.DefaultEventHandler$$anonfun$7.apply(DefaultEventHandle
r.scala:301)


So coming back to the core question, is there a way to programatically know what is going on if such errors occur, via some callback? What I understood so far was that there were some callbacks on 0.7 that are gone on 0.8, so this change is causing me confusion reading the documentation and searching on google.

thanks in advance

Re: Async and error handling

Posted by Neha Narkhede <ne...@gmail.com>.
The async producer's send() API is never supposed to block. If, for some
reason, the producer's queue is full and you try to send more messages, it
will drop those messages and raise a QueueFullException. You can configure
the "message.send.max.retries" config to retry sending the messages n
times, if it fails for some reason. If the producer fails to send the
messages after n retires, it registers the failure in an mbean and logs a
FailedToSendMessageException.

You can monitor the *FailedSendsPerSec mbean to detect the number of
messages dropped at the producer.

Thanks,
Neha


On Mon, Oct 7, 2013 at 8:10 AM, Bruno D. Rodrigues <
bruno.rodrigues@litux.org> wrote:

> So the concept to keep in mind is that as long as we set the whole kafka
> servers list on the producer and the zookeeper(s) list on the consumers,
> from a producer and consumer's perspective it should just work and the code
> won't get any information, but instead one should look at the logs?
>
> What will then happen to the messages on the producer side? I've set a
> small value on these two properties:
>
> props.put("batch.num.messages", "200"); // 200
> props.put("queue.buffering.max.messages", "1000"); // 10000
>
> and then expected the .send(), even in async mode, to block, but it
> doesn't seem to block.
>
> I can't understand if those messages are being queued to infinite on the
> producer VM, if they are being silently dropped after those 1000, or
> whatever is happening. What is the pattern to look at in this case?
>
>
> A 07/10/2013, às 15:33, Neha Narkhede <ne...@gmail.com> escreveu:
>
> Kafka never had a callback for the async producer yet. But this is proposed
> for Kafka 0.9. You can find the proposal here -
>
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI
>
> Thanks,
> Neha
> On Oct 7, 2013 4:52 AM, "Bruno D. Rodrigues" <br...@litux.org>
> wrote:
>
> Apologise in advance if it's a dummy or common question but as usual I
> couldn't yet find the answer anywhere to it.
>
> How can we setup some kind of handler to catch async errors?
>
> Let's say something as simple as this:
>
>        final Properties props = new Properties();
>        props.put("metadata.broker.list", "something:12345");
>        props.put("producer.type", "async");
>        final ProducerConfig config = new ProducerConfig(props);
>        final Producer<String, String> producer = new Producer<String,
> String>(config);
>        while (true) {
> final KeyedMessage<String, String> data = new KeyedMessage<>("test", "foo",
> line);
>            final List<KeyedMessage<String, String>> l =
> Arrays.asList(data);
>            producer.send(JavaConversions.asBuffer(l).toList());
>        }
>
> Using sync, the producer.send does throw an exception if something goes
> wrong.
>
> Using async, I can only see on the logs something like this:
>
> java.nio.channels.UnresolvedAddressException
> at sun.nio.ch.Net.checkAddress(Net.java:127)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:640)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>
> Albeit in this case one can assume that the host is correct and it's down
> so the code should keep running until the server gets back on, my issue was
> related to the lack of log4j properties ;) and seeing this code producing
> messages without anything reaching the server.
>
> For example by setting the right server, and then enabling snappy
> compression, I was impressed that the msg/sec did go up to amazing values,
> until I noticed that no message was going out at all. Now that I had the
> intelligence of putting the log4j.properties file, now I can see the real
> reason:
>
>
> java.lang.NoClassDefFoundError: org/xerial/snappy/SnappyInputStream
> at
>
> kafka.message.ByteBufferMessageSet$.kafka$message$ByteBufferMessageSet$$crea
> te(ByteBufferMessageSet.scala:41)
> at
> kafka.message.ByteBufferMessageSet.<init>(ByteBufferMessageSet.scala:102)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$7.apply(DefaultEventHandle
> r.scala:301)
>
>
> So coming back to the core question, is there a way to programatically
> know what is going on if such errors occur, via some callback? What I
> understood so far was that there were some callbacks on 0.7 that are gone
> on 0.8, so this change is causing me confusion reading the documentation
> and searching on google.
>
> thanks in advance
>
>
>

Re: Async and error handling

Posted by "Bruno D. Rodrigues" <br...@litux.org>.
So the concept to keep in mind is that as long as we set the whole kafka servers list on the producer and the zookeeper(s) list on the consumers, from a producer and consumer's perspective it should just work and the code won't get any information, but instead one should look at the logs?

What will then happen to the messages on the producer side? I've set a small value on these two properties:

props.put("batch.num.messages", "200"); // 200
props.put("queue.buffering.max.messages", "1000"); // 10000   

and then expected the .send(), even in async mode, to block, but it doesn't seem to block. 

I can't understand if those messages are being queued to infinite on the producer VM, if they are being silently dropped after those 1000, or whatever is happening. What is the pattern to look at in this case?


A 07/10/2013, às 15:33, Neha Narkhede <ne...@gmail.com> escreveu:

> Kafka never had a callback for the async producer yet. But this is proposed
> for Kafka 0.9. You can find the proposal here -
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI
> 
> Thanks,
> Neha
> On Oct 7, 2013 4:52 AM, "Bruno D. Rodrigues" <br...@litux.org>
> wrote:
> 
>> Apologise in advance if it's a dummy or common question but as usual I
>> couldn't yet find the answer anywhere to it.
>> 
>> How can we setup some kind of handler to catch async errors?
>> 
>> Let's say something as simple as this:
>> 
>>        final Properties props = new Properties();
>>        props.put("metadata.broker.list", "something:12345");
>>        props.put("producer.type", "async");
>>        final ProducerConfig config = new ProducerConfig(props);
>>        final Producer<String, String> producer = new Producer<String,
>> String>(config);
>>        while (true) {
>> final KeyedMessage<String, String> data = new KeyedMessage<>("test", "foo",
>> line);
>>            final List<KeyedMessage<String, String>> l =
>> Arrays.asList(data);
>>            producer.send(JavaConversions.asBuffer(l).toList());
>>        }
>> 
>> Using sync, the producer.send does throw an exception if something goes
>> wrong.
>> 
>> Using async, I can only see on the logs something like this:
>> 
>> java.nio.channels.UnresolvedAddressException
>> at sun.nio.ch.Net.checkAddress(Net.java:127)
>> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:640)
>> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>> 
>> Albeit in this case one can assume that the host is correct and it's down
>> so the code should keep running until the server gets back on, my issue was
>> related to the lack of log4j properties ;) and seeing this code producing
>> messages without anything reaching the server.
>> 
>> For example by setting the right server, and then enabling snappy
>> compression, I was impressed that the msg/sec did go up to amazing values,
>> until I noticed that no message was going out at all. Now that I had the
>> intelligence of putting the log4j.properties file, now I can see the real
>> reason:
>> 
>> 
>> java.lang.NoClassDefFoundError: org/xerial/snappy/SnappyInputStream
>> at
>> kafka.message.ByteBufferMessageSet$.kafka$message$ByteBufferMessageSet$$crea
>> te(ByteBufferMessageSet.scala:41)
>> at
>> kafka.message.ByteBufferMessageSet.<init>(ByteBufferMessageSet.scala:102)
>> at
>> kafka.producer.async.DefaultEventHandler$$anonfun$7.apply(DefaultEventHandle
>> r.scala:301)
>> 
>> 
>> So coming back to the core question, is there a way to programatically
>> know what is going on if such errors occur, via some callback? What I
>> understood so far was that there were some callbacks on 0.7 that are gone
>> on 0.8, so this change is causing me confusion reading the documentation
>> and searching on google.
>> 
>> thanks in advance
>> 


Re: Async and error handling

Posted by Neha Narkhede <ne...@gmail.com>.
Kafka never had a callback for the async producer yet. But this is proposed
for Kafka 0.9. You can find the proposal here -
https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI

Thanks,
Neha
On Oct 7, 2013 4:52 AM, "Bruno D. Rodrigues" <br...@litux.org>
wrote:

> Apologise in advance if it's a dummy or common question but as usual I
> couldn't yet find the answer anywhere to it.
>
> How can we setup some kind of handler to catch async errors?
>
> Let's say something as simple as this:
>
>         final Properties props = new Properties();
>         props.put("metadata.broker.list", "something:12345");
>         props.put("producer.type", "async");
>         final ProducerConfig config = new ProducerConfig(props);
>         final Producer<String, String> producer = new Producer<String,
> String>(config);
>         while (true) {
> final KeyedMessage<String, String> data = new KeyedMessage<>("test", "foo",
> line);
>             final List<KeyedMessage<String, String>> l =
> Arrays.asList(data);
>             producer.send(JavaConversions.asBuffer(l).toList());
>         }
>
> Using sync, the producer.send does throw an exception if something goes
> wrong.
>
> Using async, I can only see on the logs something like this:
>
> java.nio.channels.UnresolvedAddressException
> at sun.nio.ch.Net.checkAddress(Net.java:127)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:640)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>
> Albeit in this case one can assume that the host is correct and it's down
> so the code should keep running until the server gets back on, my issue was
> related to the lack of log4j properties ;) and seeing this code producing
> messages without anything reaching the server.
>
> For example by setting the right server, and then enabling snappy
> compression, I was impressed that the msg/sec did go up to amazing values,
> until I noticed that no message was going out at all. Now that I had the
> intelligence of putting the log4j.properties file, now I can see the real
> reason:
>
>
> java.lang.NoClassDefFoundError: org/xerial/snappy/SnappyInputStream
> at
> kafka.message.ByteBufferMessageSet$.kafka$message$ByteBufferMessageSet$$crea
> te(ByteBufferMessageSet.scala:41)
> at
> kafka.message.ByteBufferMessageSet.<init>(ByteBufferMessageSet.scala:102)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$7.apply(DefaultEventHandle
> r.scala:301)
>
>
> So coming back to the core question, is there a way to programatically
> know what is going on if such errors occur, via some callback? What I
> understood so far was that there were some callbacks on 0.7 that are gone
> on 0.8, so this change is causing me confusion reading the documentation
> and searching on google.
>
> thanks in advance
>