You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ilya Goberman <ig...@kcg.com> on 2015/10/30 21:25:59 UTC

question about async publisher blocking when broker is down.

I am new to kafka and apologize if this is already answered. I am testing a simple async publisher behavior when broker is down. I use kafka version 8.2.2.


I have set up "queue.buffering.max.messages" to 200 and "queue.enqueue.timeout.ms" set to -1. My understanding is that if "queue.enqueue.timeout.ms" is set to -1, the call to 'producer.send' should block when queue of 200 is reached. But this is not what I am seeing.


My publisher has these properties.
        Properties props = new Properties();
        props.put("metadata.broker.list", "cno-d-igoberman2:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("producer.type", "async");
        props.put("partitioner.class", "com.kcg.kafka.test.SimplePartitioner");
        props.put("request.required.acks", "1");
        props.put("queue.buffering.max.messages", "200");
        props.put("queue.enqueue.timeout.ms", "-1");


This is scenario I am testing:

1) start broker.

2) start publishing in a loop.

3) kill broker.


At this point my producer keeps calling 'producer.send' without blocking (but slows down considerably). I suspect that messages are lost - but this is not what I want. Is this a known limitation of producers in kafka?

Any help in clarifying it will be appreciated. Also, I understand that producers are in the process of being redesigned in the next release. When will it be available? Should I even bother with the current version?

Thanks


This is what I am seeing in the log:


2015-10-30 14:50:29 INFO  SyncProducer:68 - Disconnecting from cno-d-igoberman2:9092
2015-10-30 14:50:29 ERROR Utils$:106 - fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:cno-d-igoberman2,port:9092)] failed
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:cno-d-igoberman2,port:9092)] failed
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
    at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
    at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
    at kafka.utils.Utils$.swallow(Utils.scala:172)
    at kafka.utils.Logging$class.swallowError(Logging.scala:106)
    at kafka.utils.Utils$.swallowError(Utils.scala:45)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
    at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
    at scala.collection.immutable.Stream.foreach(Stream.scala:547)
    at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
    at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
Caused by: java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
    ... 12 more
2015-10-30 14:50:29 ERROR DefaultEventHandler:97 - Failed to send requests for topics test with correlation ids in [0,8]
2015-10-30 14:50:29 ERROR ProducerSendThread:103 - Error in handling batch of 5 events
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
    at scala.collection.immutable.Stream.foreach(Stream.scala:547)
    at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
    at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
2015-10-30 14:50:29 TRACE ProducerSendThread:36 - Dequeued item for topic test, partition key: 5, data: 1446234628741: 5
2015-10-30 14:50:29 TRACE Producer:36 - Added to send queue an event: KeyedMessage(test,6,6,1446234629741: 6)
2015-10-30 14:50:29 TRACE Producer:36 - Remaining queue size: 200



This e-mail and its attachments are intended only for the individual or entity to whom it is addressed and may contain information that is confidential, privileged, inside information, or subject to other restrictions on use or disclosure. Any unauthorized use, dissemination or copying of this transmission or the information in it is prohibited and may be unlawful. If you have received this transmission in error, please notify the sender immediately by return e-mail, and permanently delete or destroy this e-mail, any attachments, and all copies (digital or paper). Unless expressly stated in this e-mail, nothing in this message should be construed as a digital or electronic signature. For additional important disclaimers and disclosures regarding KCG's products and services, please click on the following link:

http://www.kcg.com/legal/global-disclosures

question about async publisher detecting when broker is down with 'new' producer.

Posted by Ilya Goberman <ig...@kcg.com>.
I have published variation of this question before regarding 'old' publisher. I figured out how to use 'new' redesigned publisher and I have similar question. Basically, I want

#1) Detect when broker is down when using asynchronous publisher. If broker is down, I want to generate alert and stop publication.

#2) Most of all, I want to avoid message gaps when broker recovers - in other words (I do not want to loose messages when broker is down and silently resume publication when broker is up again).

Apparently, this is not possible with old publisher when asynchronous mode mode is used.


My test program is below. I do not see the way to detect when broker is down. I see errors published in the log, but my publisher keeps publishing without reporting any errors via the API. Once broker connection is up again, it will report exceptions for the messages that it failed to publish in the Callback. However, I suspect it would also publish new messages at the point when exception is reported, creating a 'gap'.


I there a way to address these concerns?


I would hate to use synch publisher - my publishing rate drops dramatically. From 300,000 m/s to 7,000 m/s.


package com.kcg.kafka.test;

import java.util.*;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

public class TestProducer {
    public static void main(String[] args) {
        System.setProperty("log4j.debug", "true");
        long events = Long.parseLong(args[0]);

        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"cno-d-igoberman2:9092");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.ACKS_CONFIG, "1");

        KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props);

        long start = System.currentTimeMillis();

        long nEvents = 0;
        for (; nEvents < events; nEvents++) {
            long runtime = new Date().getTime();
            final String key = Long.toString(nEvents);
            String msg = runtime + ": " + key + new String(new byte[300]);
            ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>("test", key, msg);

            try {
                System.out.println("Sending " + key);

                // 'send' will not throw an exception when broker is down.
                producer.send(producerRecord,
                        new Callback() {
                            public void onCompletion(RecordMetadata metadata, Exception e) {
                                if(e != null)

                                {
                                    e.printStackTrace();

                                   // Stop publisher. But is there guarantee that no more messages are published after this one?

                                }
                                else
                                    System.out.println("The offset of the record we just sent is: " + metadata.offset() + " " + key);
                            }
                });//.get(20, TimeUnit.SECONDS);
            }
            catch (Throwable e) {
                e.printStackTrace();
            }

            if (nEvents % 10000 == 0) {
                System.out.println("" + key);
            }

            try {
                Thread.sleep(1000);
            }
            catch (InterruptedException e) {
            }
        }

        long duration = (System.currentTimeMillis() - start);
        System.out.println("Published " + nEvents + " messages in " + duration + "ms. " + (int)((double)nEvents/((double)duration / 1000.0)) + " m/s.");

        producer.close();
    }
}



log4j: Trying to find [log4j.xml] using context classloader sun.misc.Launcher$AppClassLoader@6da21389.
log4j: Trying to find [log4j.xml] using sun.misc.Launcher$AppClassLoader@6da21389 class loader.
log4j: Trying to find [log4j.xml] using ClassLoader.getSystemResource().
log4j: Trying to find [log4j.properties] using context classloader sun.misc.Launcher$AppClassLoader@6da21389.
log4j: Using URL [file:/home/apprun/kafkatest/TestProducer/bin/log4j.properties] for automatic log4j configuration.
log4j: Reading configuration from URL file:/home/apprun/kafkatest/TestProducer/bin/log4j.properties
log4j: Parsing for [root] with value=[INFO, stdout].
log4j: Level token is [INFO].
log4j: Category root set to INFO
log4j: Parsing appender named "stdout".
log4j: Parsing layout options for "stdout".
log4j: Setting property [conversionPattern] to [%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n].
log4j: End of parsing for "stdout".
log4j: Setting property [target] to [System.out].
log4j: Parsed "stdout" options.
log4j: Finished configuring.
2015-11-10 10:23:16 INFO  ProducerConfig:113 - ProducerConfig values:
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    block.on.buffer.full = true
    retry.backoff.ms = 100
    buffer.memory = 33554432
    batch.size = 16384
    metrics.sample.window.ms = 30000
    metadata.max.age.ms = 300000
    receive.buffer.bytes = 32768
    timeout.ms = 30000
    max.in.flight.requests.per.connection = 5
    bootstrap.servers = [cno-d-igoberman2:9092]
    metric.reporters = []
    client.id =
    compression.type = none
    retries = 0
    max.request.size = 1048576
    send.buffer.bytes = 131072
    acks = 1
    reconnect.backoff.ms = 10
    linger.ms = 0
    metrics.num.samples = 2
    metadata.fetch.timeout.ms = 60000

Sending 0
0
The offset of the record we just sent is: 12 0
Sending 1
The offset of the record we just sent is: 13 1
Sending 2
The offset of the record we just sent is: 14 2
Sending 3
The offset of the record we just sent is: 15 3
Sending 4
The offset of the record we just sent is: 16 4
Sending 5
The offset of the record we just sent is: 17 5
Sending 6
The offset of the record we just sent is: 18 6
Sending 7
The offset of the record we just sent is: 19 7
Sending 8
The offset of the record we just sent is: 20 8
Sending 9
The offset of the record we just sent is: 21 9
2015-11-10 10:23:25 WARN  Selector:276 - Error in I/O with cno-d-igoberman2/10.83.55.13
java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
    at java.lang.Thread.run(Thread.java:745)

....

Sending 10


This e-mail and its attachments are intended only for the individual or entity to whom it is addressed and may contain information that is confidential, privileged, inside information, or subject to other restrictions on use or disclosure. Any unauthorized use, dissemination or copying of this transmission or the information in it is prohibited and may be unlawful. If you have received this transmission in error, please notify the sender immediately by return e-mail, and permanently delete or destroy this e-mail, any attachments, and all copies (digital or paper). Unless expressly stated in this e-mail, nothing in this message should be construed as a digital or electronic signature. For additional important disclaimers and disclosures regarding KCG's products and services, please click on the following link:

http://www.kcg.com/legal/global-disclosures

Re: question about async publisher blocking when broker is down.

Posted by Damian Guy <da...@gmail.com>.
Hi,
If you are using the Scala Producer then yes it will drop messages. It will
try up to num retries times and then throw a FailedToSendMessageException.
This is caught in the ProducerSendThread and logged, you'd see something
like:
"Error in handling batch of 10 events ..."

If you don't want to drop messages (who does?) then i suggest using the
sync producer and doing your own batching.

Cheers,
Damian

On 8 November 2015 at 08:16, Li Tao <ah...@gmail.com> wrote:

> Hi, according to my undersanding, your scenario does not apply here. Async
> does not mean it buffers message when connection is lost(you killed the
> broker). If the connection is down, the producer should detect it as a
> exceptional condition, and throw this exception to application level to
> handle it.
>
> Correct me if I am wrong.
>
> On Sat, Oct 31, 2015 at 4:25 AM, Ilya Goberman <ig...@kcg.com> wrote:
>
> > I am new to kafka and apologize if this is already answered. I am testing
> > a simple async publisher behavior when broker is down. I use kafka
> version
> > 8.2.2.
> >
> >
> > I have set up "queue.buffering.max.messages" to 200 and "
> > queue.enqueue.timeout.ms" set to -1. My understanding is that if "
> > queue.enqueue.timeout.ms" is set to -1, the call to 'producer.send'
> > should block when queue of 200 is reached. But this is not what I am
> seeing.
> >
> >
> > My publisher has these properties.
> >         Properties props = new Properties();
> >         props.put("metadata.broker.list", "cno-d-igoberman2:9092");
> >         props.put("serializer.class", "kafka.serializer.StringEncoder");
> >         props.put("producer.type", "async");
> >         props.put("partitioner.class",
> > "com.kcg.kafka.test.SimplePartitioner");
> >         props.put("request.required.acks", "1");
> >         props.put("queue.buffering.max.messages", "200");
> >         props.put("queue.enqueue.timeout.ms", "-1");
> >
> >
> > This is scenario I am testing:
> >
> > 1) start broker.
> >
> > 2) start publishing in a loop.
> >
> > 3) kill broker.
> >
> >
> > At this point my producer keeps calling 'producer.send' without blocking
> > (but slows down considerably). I suspect that messages are lost - but
> this
> > is not what I want. Is this a known limitation of producers in kafka?
> >
> > Any help in clarifying it will be appreciated. Also, I understand that
> > producers are in the process of being redesigned in the next release.
> When
> > will it be available? Should I even bother with the current version?
> >
> > Thanks
> >
> >
> > This is what I am seeing in the log:
> >
> >
> > 2015-10-30 14:50:29 INFO  SyncProducer:68 - Disconnecting from
> > cno-d-igoberman2:9092
> > 2015-10-30 14:50:29 ERROR Utils$:106 - fetching topic metadata for topics
> > [Set(test)] from broker
> [ArrayBuffer(id:0,host:cno-d-igoberman2,port:9092)]
> > failed
> > kafka.common.KafkaException: fetching topic metadata for topics
> > [Set(test)] from broker
> [ArrayBuffer(id:0,host:cno-d-igoberman2,port:9092)]
> > failed
> >     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
> >     at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> >     at
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
> >     at kafka.utils.Utils$.swallow(Utils.scala:172)
> >     at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> >     at kafka.utils.Utils$.swallowError(Utils.scala:45)
> >     at
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
> >     at
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> >     at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> >     at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> >     at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> >     at
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> >     at
> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > Caused by: java.nio.channels.ClosedChannelException
> >     at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> >     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> >     at
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> >     at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> >     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> >     ... 12 more
> > 2015-10-30 14:50:29 ERROR DefaultEventHandler:97 - Failed to send
> requests
> > for topics test with correlation ids in [0,8]
> > 2015-10-30 14:50:29 ERROR ProducerSendThread:103 - Error in handling
> batch
> > of 5 events
> > kafka.common.FailedToSendMessageException: Failed to send messages after
> 3
> > tries.
> >     at
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> >     at
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> >     at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> >     at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> >     at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> >     at
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> >     at
> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > 2015-10-30 14:50:29 TRACE ProducerSendThread:36 - Dequeued item for topic
> > test, partition key: 5, data: 1446234628741: 5
> > 2015-10-30 14:50:29 TRACE Producer:36 - Added to send queue an event:
> > KeyedMessage(test,6,6,1446234629741: 6)
> > 2015-10-30 14:50:29 TRACE Producer:36 - Remaining queue size: 200
> >
> >
> >
> > This e-mail and its attachments are intended only for the individual or
> > entity to whom it is addressed and may contain information that is
> > confidential, privileged, inside information, or subject to other
> > restrictions on use or disclosure. Any unauthorized use, dissemination or
> > copying of this transmission or the information in it is prohibited and
> may
> > be unlawful. If you have received this transmission in error, please
> notify
> > the sender immediately by return e-mail, and permanently delete or
> destroy
> > this e-mail, any attachments, and all copies (digital or paper). Unless
> > expressly stated in this e-mail, nothing in this message should be
> > construed as a digital or electronic signature. For additional important
> > disclaimers and disclosures regarding KCG's products and services, please
> > click on the following link:
> >
> > http://www.kcg.com/legal/global-disclosures
> >
>

Re: question about async publisher blocking when broker is down.

Posted by Li Tao <ah...@gmail.com>.
Hi, according to my undersanding, your scenario does not apply here. Async
does not mean it buffers message when connection is lost(you killed the
broker). If the connection is down, the producer should detect it as a
exceptional condition, and throw this exception to application level to
handle it.

Correct me if I am wrong.

On Sat, Oct 31, 2015 at 4:25 AM, Ilya Goberman <ig...@kcg.com> wrote:

> I am new to kafka and apologize if this is already answered. I am testing
> a simple async publisher behavior when broker is down. I use kafka version
> 8.2.2.
>
>
> I have set up "queue.buffering.max.messages" to 200 and "
> queue.enqueue.timeout.ms" set to -1. My understanding is that if "
> queue.enqueue.timeout.ms" is set to -1, the call to 'producer.send'
> should block when queue of 200 is reached. But this is not what I am seeing.
>
>
> My publisher has these properties.
>         Properties props = new Properties();
>         props.put("metadata.broker.list", "cno-d-igoberman2:9092");
>         props.put("serializer.class", "kafka.serializer.StringEncoder");
>         props.put("producer.type", "async");
>         props.put("partitioner.class",
> "com.kcg.kafka.test.SimplePartitioner");
>         props.put("request.required.acks", "1");
>         props.put("queue.buffering.max.messages", "200");
>         props.put("queue.enqueue.timeout.ms", "-1");
>
>
> This is scenario I am testing:
>
> 1) start broker.
>
> 2) start publishing in a loop.
>
> 3) kill broker.
>
>
> At this point my producer keeps calling 'producer.send' without blocking
> (but slows down considerably). I suspect that messages are lost - but this
> is not what I want. Is this a known limitation of producers in kafka?
>
> Any help in clarifying it will be appreciated. Also, I understand that
> producers are in the process of being redesigned in the next release. When
> will it be available? Should I even bother with the current version?
>
> Thanks
>
>
> This is what I am seeing in the log:
>
>
> 2015-10-30 14:50:29 INFO  SyncProducer:68 - Disconnecting from
> cno-d-igoberman2:9092
> 2015-10-30 14:50:29 ERROR Utils$:106 - fetching topic metadata for topics
> [Set(test)] from broker [ArrayBuffer(id:0,host:cno-d-igoberman2,port:9092)]
> failed
> kafka.common.KafkaException: fetching topic metadata for topics
> [Set(test)] from broker [ArrayBuffer(id:0,host:cno-d-igoberman2,port:9092)]
> failed
>     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
>     at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>     at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
>     at kafka.utils.Utils$.swallow(Utils.scala:172)
>     at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>     at kafka.utils.Utils$.swallowError(Utils.scala:45)
>     at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
>     at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>     at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>     at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>     at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>     at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
>     at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> Caused by: java.nio.channels.ClosedChannelException
>     at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>     at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>     at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>     ... 12 more
> 2015-10-30 14:50:29 ERROR DefaultEventHandler:97 - Failed to send requests
> for topics test with correlation ids in [0,8]
> 2015-10-30 14:50:29 ERROR ProducerSendThread:103 - Error in handling batch
> of 5 events
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
>     at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
>     at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>     at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>     at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>     at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>     at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
>     at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> 2015-10-30 14:50:29 TRACE ProducerSendThread:36 - Dequeued item for topic
> test, partition key: 5, data: 1446234628741: 5
> 2015-10-30 14:50:29 TRACE Producer:36 - Added to send queue an event:
> KeyedMessage(test,6,6,1446234629741: 6)
> 2015-10-30 14:50:29 TRACE Producer:36 - Remaining queue size: 200
>
>
>
> This e-mail and its attachments are intended only for the individual or
> entity to whom it is addressed and may contain information that is
> confidential, privileged, inside information, or subject to other
> restrictions on use or disclosure. Any unauthorized use, dissemination or
> copying of this transmission or the information in it is prohibited and may
> be unlawful. If you have received this transmission in error, please notify
> the sender immediately by return e-mail, and permanently delete or destroy
> this e-mail, any attachments, and all copies (digital or paper). Unless
> expressly stated in this e-mail, nothing in this message should be
> construed as a digital or electronic signature. For additional important
> disclaimers and disclosures regarding KCG's products and services, please
> click on the following link:
>
> http://www.kcg.com/legal/global-disclosures
>