You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by ankit tyagi <an...@gmail.com> on 2015/02/16 21:11:23 UTC

CallBackHandler is not being called after successful delivery of message

Hey,

I am doing POC on kafka .8.2.0 version.Currently I am using kafka-client
of 0.8.2.0 version for producing async message with callBackHandler.

I am using batch.size =1 in my producer cleint. As per my Understanding ,
This should behave like a sync client though message can be published from
different thread. Problem is my callBackHandler is not being called after
message is sent successfully to the server..

Below is my code of KafkaProducer

public class KafkaProducer
{
    private static Logger LOG =
LoggerFactory.getLogger(KafkaProducer.class);



    private Properties kafkaProducerProp = new Properties();
    private org.apache.kafka.clients.producer.KafkaProducer<String, String>
myProducer;

    public void initializeProperty()
    {
        kafkaProducerProp.put("bootstrap.servers", "localhost:9092");
        kafkaProducerProp.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
        kafkaProducerProp.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
        kafkaProducerProp.put("client.id", "ankitLocalTest");
        kafkaProducerProp.put("batch.size", 1);
    }

    public void initialize()
    {
        myProducer = new
org.apache.kafka.clients.producer.KafkaProducer<String,
String>(kafkaProducerProp);
    }

    public void sendMessage(String myEventKey, String myEventValue)
    {

        ProducerRecord<String, String> myProducerRecord = new
ProducerRecord<String, String>("testTopic2", myEventKey,
            myEventValue);
        myProducer.send(myProducerRecord, new ProducerCallBackHandler());
    }

    public void sendBlockingMessage(String myEventKey, String myEventValue)
    {

        ProducerRecord<String, String> myProducerRecord = new
ProducerRecord<String, String>("testTopic", myEventKey,
            myEventValue);
        try {
            RecordMetadata
recordMetaData=myProducer.send(myProducerRecord).get();
            LOG.info("offset :{} and partition : {} of last published
message in blocking mode
",recordMetaData.offset(),recordMetaData.partition());
        }
        catch (Exception e) {
            LOG.info("Exception : {} occured while publishing
event",e.getCause());
        }

    }

}




public class ProducerCallBackHandler implements Callback
{

    Logger LOG = LoggerFactory.getLogger(ProducerCallBackHandler.class);

    @Override
    public void onCompletion(RecordMetadata recordMetaData, Exception
exceptionOccured)
    {
       if(recordMetaData == null){
           LOG.info("error Occured while publishing event on Kafka");
           return;
       }
       if(exceptionOccured != null){
           LOG.info("Exception :{} Occured while publishing event on Kafka"
+exceptionOccured.getCause());
           return;
       }

       LOG.info("Message published on kafka with offset : {} and  partition
: {}",recordMetaData.offset(),recordMetaData.partition());
    }

}



I wrote a junit case to publish around 1000 message on the topic.

public class KafkaProducerTest
{

    @Test
    @Ignore
    public void ProducerBlockingTest()
    {
        KafkaProducer producer = new KafkaProducer();
        producer.initializeProperty();
        producer.initialize();
        for (int i = 0; i < 1000; i++) {
            producer.sendBlockingMessage("firstMessageKey",
"firstMessageValue");
        }
    }

    @Test
    public void ProducerNonBlockingTest()
    {
        KafkaProducer producer = new KafkaProducer();
        producer.initializeProperty();
        producer.initialize();
        for (int i = 0; i < 1000; i++) {
            producer.sendMessage("firstMessageKey", "firstMessageValue");
        }

    }

}


when i saw my Logs, i shows call back handler was called till 48 messages,
not after that

[2015-02-17 01:30:26.068][kafka-producer-network-thread |
ankitLocalTest][INFO][ProducerCallBackHandler:25] Message published on
kafka with offset : 46 and  partition : 0
[2015-02-17 01:30:26.071][kafka-producer-network-thread |
ankitLocalTest][INFO][ProducerCallBackHandler:25] Message published on
kafka with offset : 47 and  partition : 0
[2015-02-17 01:30:26.072][*kafka-producer-network-thread *|
ankitLocalTest][INFO][ProducerCallBackHandler:25] *Message published on
kafka with offset : 48 and  partition : 0*


*Am I doing something wrong??*

Re: CallBackHandler is not being called after successful delivery of message

Posted by Jay Kreps <ja...@gmail.com>.
i. We are debating adding a flush() call to ensure all message sends are
complete. At the moment you can achieve this by just checking the futures
but that is a bit less convenient.
ii. If you set retries > 0 those retries will occur before the callback is
invoked. The callback is invoked when the request succeeds or all retries
have failed (in which case we give you the error). So you shouldn't need to
do anything special to retry other than change the config.

-Jay

On Tue, Feb 17, 2015 at 4:34 AM, ankit tyagi <an...@gmail.com>
wrote:

> Hi Jey,
>
> I have debugged further and looks like it was problem with my test case.
> After calling send method for specific time my main thread was getting
> finished and i was not waiting for *kafka-producer-network-thread* to send
> all the messages from the buffer hence my test case was failing.
>
>
> I have few question now
>  i) Is there any parameter or flag to find out whether all messages from
> buffer has been published? I want to attach my shutdownhook so that i could
> make sure that all my messages has been published at the time of restart.
>
>  ii) if I am using async producer with callback handler , will retry logic
> in case of rebalance exception or any other exception handle by
> kafka-producer-network-thread or that logic has be to implemented in
> callback handler.
>
>
>
>
>
>
>
> On Tue, Feb 17, 2015 at 4:42 AM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Sounds like a potential bug, and it sounds like you can easily reproduce
> > it. Can you post your test code and a description of the server version
> and
> > how you started/configured it, and what you expect to see from your test
> > and what you actually see:
> > https://issues.apache.org/jira/browse/KAFKA/
> >
> > This will help make the debugging a bit easier on our end and avoid
> > confusion.
> >
> > As Steven mentions your description of batch.size isn't quite right. The
> > batch size is the buffer in bytes used for batching messages
> together--that
> > is the target size the client tries to batch together for a partition (it
> > won't batch together more than that). Setting it to 1 won't effect the
> > sync/async aspects of the send but it will ensure that each message is
> sent
> > in a request with no other messages from that partition (no reason you
> > would want that). So setting this to 1 is maybe weird but you should
> still
> > see all the callbacks fire.
> >
> > -Jay
> >
> > On Mon, Feb 16, 2015 at 12:11 PM, ankit tyagi <
> ankittyagi.mnnit@gmail.com>
> > wrote:
> >
> > > Hey,
> > >
> > > I am doing POC on kafka .8.2.0 version.Currently I am using
> kafka-client
> > > of 0.8.2.0 version for producing async message with callBackHandler.
> > >
> > > I am using batch.size =1 in my producer cleint. As per my
> Understanding ,
> > > This should behave like a sync client though message can be published
> > from
> > > different thread. Problem is my callBackHandler is not being called
> after
> > > message is sent successfully to the server..
> > >
> > > Below is my code of KafkaProducer
> > >
> > > public class KafkaProducer
> > > {
> > >     private static Logger LOG =
> > > LoggerFactory.getLogger(KafkaProducer.class);
> > >
> > >
> > >
> > >     private Properties kafkaProducerProp = new Properties();
> > >     private org.apache.kafka.clients.producer.KafkaProducer<String,
> > String>
> > > myProducer;
> > >
> > >     public void initializeProperty()
> > >     {
> > >         kafkaProducerProp.put("bootstrap.servers", "localhost:9092");
> > >         kafkaProducerProp.put("value.serializer",
> > > "org.apache.kafka.common.serialization.StringSerializer");
> > >         kafkaProducerProp.put("key.serializer",
> > > "org.apache.kafka.common.serialization.StringSerializer");
> > >         kafkaProducerProp.put("client.id", "ankitLocalTest");
> > >         kafkaProducerProp.put("batch.size", 1);
> > >     }
> > >
> > >     public void initialize()
> > >     {
> > >         myProducer = new
> > > org.apache.kafka.clients.producer.KafkaProducer<String,
> > > String>(kafkaProducerProp);
> > >     }
> > >
> > >     public void sendMessage(String myEventKey, String myEventValue)
> > >     {
> > >
> > >         ProducerRecord<String, String> myProducerRecord = new
> > > ProducerRecord<String, String>("testTopic2", myEventKey,
> > >             myEventValue);
> > >         myProducer.send(myProducerRecord, new
> ProducerCallBackHandler());
> > >     }
> > >
> > >     public void sendBlockingMessage(String myEventKey, String
> > myEventValue)
> > >     {
> > >
> > >         ProducerRecord<String, String> myProducerRecord = new
> > > ProducerRecord<String, String>("testTopic", myEventKey,
> > >             myEventValue);
> > >         try {
> > >             RecordMetadata
> > > recordMetaData=myProducer.send(myProducerRecord).get();
> > >             LOG.info("offset :{} and partition : {} of last published
> > > message in blocking mode
> > > ",recordMetaData.offset(),recordMetaData.partition());
> > >         }
> > >         catch (Exception e) {
> > >             LOG.info("Exception : {} occured while publishing
> > > event",e.getCause());
> > >         }
> > >
> > >     }
> > >
> > > }
> > >
> > >
> > >
> > >
> > > public class ProducerCallBackHandler implements Callback
> > > {
> > >
> > >     Logger LOG =
> LoggerFactory.getLogger(ProducerCallBackHandler.class);
> > >
> > >     @Override
> > >     public void onCompletion(RecordMetadata recordMetaData, Exception
> > > exceptionOccured)
> > >     {
> > >        if(recordMetaData == null){
> > >            LOG.info("error Occured while publishing event on Kafka");
> > >            return;
> > >        }
> > >        if(exceptionOccured != null){
> > >            LOG.info("Exception :{} Occured while publishing event on
> > Kafka"
> > > +exceptionOccured.getCause());
> > >            return;
> > >        }
> > >
> > >        LOG.info("Message published on kafka with offset : {} and
> > partition
> > > : {}",recordMetaData.offset(),recordMetaData.partition());
> > >     }
> > >
> > > }
> > >
> > >
> > >
> > > I wrote a junit case to publish around 1000 message on the topic.
> > >
> > > public class KafkaProducerTest
> > > {
> > >
> > >     @Test
> > >     @Ignore
> > >     public void ProducerBlockingTest()
> > >     {
> > >         KafkaProducer producer = new KafkaProducer();
> > >         producer.initializeProperty();
> > >         producer.initialize();
> > >         for (int i = 0; i < 1000; i++) {
> > >             producer.sendBlockingMessage("firstMessageKey",
> > > "firstMessageValue");
> > >         }
> > >     }
> > >
> > >     @Test
> > >     public void ProducerNonBlockingTest()
> > >     {
> > >         KafkaProducer producer = new KafkaProducer();
> > >         producer.initializeProperty();
> > >         producer.initialize();
> > >         for (int i = 0; i < 1000; i++) {
> > >             producer.sendMessage("firstMessageKey",
> "firstMessageValue");
> > >         }
> > >
> > >     }
> > >
> > > }
> > >
> > >
> > > when i saw my Logs, i shows call back handler was called till 48
> > messages,
> > > not after that
> > >
> > > [2015-02-17 01:30:26.068][kafka-producer-network-thread |
> > > ankitLocalTest][INFO][ProducerCallBackHandler:25] Message published on
> > > kafka with offset : 46 and  partition : 0
> > > [2015-02-17 01:30:26.071][kafka-producer-network-thread |
> > > ankitLocalTest][INFO][ProducerCallBackHandler:25] Message published on
> > > kafka with offset : 47 and  partition : 0
> > > [2015-02-17 01:30:26.072][*kafka-producer-network-thread *|
> > > ankitLocalTest][INFO][ProducerCallBackHandler:25] *Message published on
> > > kafka with offset : 48 and  partition : 0*
> > >
> > >
> > > *Am I doing something wrong??*
> > >
> >
>

Re: CallBackHandler is not being called after successful delivery of message

Posted by ankit tyagi <an...@gmail.com>.
Hi Jey,

I have debugged further and looks like it was problem with my test case.
After calling send method for specific time my main thread was getting
finished and i was not waiting for *kafka-producer-network-thread* to send
all the messages from the buffer hence my test case was failing.


I have few question now
 i) Is there any parameter or flag to find out whether all messages from
buffer has been published? I want to attach my shutdownhook so that i could
make sure that all my messages has been published at the time of restart.

 ii) if I am using async producer with callback handler , will retry logic
in case of rebalance exception or any other exception handle by
kafka-producer-network-thread or that logic has be to implemented in
callback handler.







On Tue, Feb 17, 2015 at 4:42 AM, Jay Kreps <ja...@gmail.com> wrote:

> Sounds like a potential bug, and it sounds like you can easily reproduce
> it. Can you post your test code and a description of the server version and
> how you started/configured it, and what you expect to see from your test
> and what you actually see:
> https://issues.apache.org/jira/browse/KAFKA/
>
> This will help make the debugging a bit easier on our end and avoid
> confusion.
>
> As Steven mentions your description of batch.size isn't quite right. The
> batch size is the buffer in bytes used for batching messages together--that
> is the target size the client tries to batch together for a partition (it
> won't batch together more than that). Setting it to 1 won't effect the
> sync/async aspects of the send but it will ensure that each message is sent
> in a request with no other messages from that partition (no reason you
> would want that). So setting this to 1 is maybe weird but you should still
> see all the callbacks fire.
>
> -Jay
>
> On Mon, Feb 16, 2015 at 12:11 PM, ankit tyagi <an...@gmail.com>
> wrote:
>
> > Hey,
> >
> > I am doing POC on kafka .8.2.0 version.Currently I am using kafka-client
> > of 0.8.2.0 version for producing async message with callBackHandler.
> >
> > I am using batch.size =1 in my producer cleint. As per my Understanding ,
> > This should behave like a sync client though message can be published
> from
> > different thread. Problem is my callBackHandler is not being called after
> > message is sent successfully to the server..
> >
> > Below is my code of KafkaProducer
> >
> > public class KafkaProducer
> > {
> >     private static Logger LOG =
> > LoggerFactory.getLogger(KafkaProducer.class);
> >
> >
> >
> >     private Properties kafkaProducerProp = new Properties();
> >     private org.apache.kafka.clients.producer.KafkaProducer<String,
> String>
> > myProducer;
> >
> >     public void initializeProperty()
> >     {
> >         kafkaProducerProp.put("bootstrap.servers", "localhost:9092");
> >         kafkaProducerProp.put("value.serializer",
> > "org.apache.kafka.common.serialization.StringSerializer");
> >         kafkaProducerProp.put("key.serializer",
> > "org.apache.kafka.common.serialization.StringSerializer");
> >         kafkaProducerProp.put("client.id", "ankitLocalTest");
> >         kafkaProducerProp.put("batch.size", 1);
> >     }
> >
> >     public void initialize()
> >     {
> >         myProducer = new
> > org.apache.kafka.clients.producer.KafkaProducer<String,
> > String>(kafkaProducerProp);
> >     }
> >
> >     public void sendMessage(String myEventKey, String myEventValue)
> >     {
> >
> >         ProducerRecord<String, String> myProducerRecord = new
> > ProducerRecord<String, String>("testTopic2", myEventKey,
> >             myEventValue);
> >         myProducer.send(myProducerRecord, new ProducerCallBackHandler());
> >     }
> >
> >     public void sendBlockingMessage(String myEventKey, String
> myEventValue)
> >     {
> >
> >         ProducerRecord<String, String> myProducerRecord = new
> > ProducerRecord<String, String>("testTopic", myEventKey,
> >             myEventValue);
> >         try {
> >             RecordMetadata
> > recordMetaData=myProducer.send(myProducerRecord).get();
> >             LOG.info("offset :{} and partition : {} of last published
> > message in blocking mode
> > ",recordMetaData.offset(),recordMetaData.partition());
> >         }
> >         catch (Exception e) {
> >             LOG.info("Exception : {} occured while publishing
> > event",e.getCause());
> >         }
> >
> >     }
> >
> > }
> >
> >
> >
> >
> > public class ProducerCallBackHandler implements Callback
> > {
> >
> >     Logger LOG = LoggerFactory.getLogger(ProducerCallBackHandler.class);
> >
> >     @Override
> >     public void onCompletion(RecordMetadata recordMetaData, Exception
> > exceptionOccured)
> >     {
> >        if(recordMetaData == null){
> >            LOG.info("error Occured while publishing event on Kafka");
> >            return;
> >        }
> >        if(exceptionOccured != null){
> >            LOG.info("Exception :{} Occured while publishing event on
> Kafka"
> > +exceptionOccured.getCause());
> >            return;
> >        }
> >
> >        LOG.info("Message published on kafka with offset : {} and
> partition
> > : {}",recordMetaData.offset(),recordMetaData.partition());
> >     }
> >
> > }
> >
> >
> >
> > I wrote a junit case to publish around 1000 message on the topic.
> >
> > public class KafkaProducerTest
> > {
> >
> >     @Test
> >     @Ignore
> >     public void ProducerBlockingTest()
> >     {
> >         KafkaProducer producer = new KafkaProducer();
> >         producer.initializeProperty();
> >         producer.initialize();
> >         for (int i = 0; i < 1000; i++) {
> >             producer.sendBlockingMessage("firstMessageKey",
> > "firstMessageValue");
> >         }
> >     }
> >
> >     @Test
> >     public void ProducerNonBlockingTest()
> >     {
> >         KafkaProducer producer = new KafkaProducer();
> >         producer.initializeProperty();
> >         producer.initialize();
> >         for (int i = 0; i < 1000; i++) {
> >             producer.sendMessage("firstMessageKey", "firstMessageValue");
> >         }
> >
> >     }
> >
> > }
> >
> >
> > when i saw my Logs, i shows call back handler was called till 48
> messages,
> > not after that
> >
> > [2015-02-17 01:30:26.068][kafka-producer-network-thread |
> > ankitLocalTest][INFO][ProducerCallBackHandler:25] Message published on
> > kafka with offset : 46 and  partition : 0
> > [2015-02-17 01:30:26.071][kafka-producer-network-thread |
> > ankitLocalTest][INFO][ProducerCallBackHandler:25] Message published on
> > kafka with offset : 47 and  partition : 0
> > [2015-02-17 01:30:26.072][*kafka-producer-network-thread *|
> > ankitLocalTest][INFO][ProducerCallBackHandler:25] *Message published on
> > kafka with offset : 48 and  partition : 0*
> >
> >
> > *Am I doing something wrong??*
> >
>

Re: CallBackHandler is not being called after successful delivery of message

Posted by Jay Kreps <ja...@gmail.com>.
Sounds like a potential bug, and it sounds like you can easily reproduce
it. Can you post your test code and a description of the server version and
how you started/configured it, and what you expect to see from your test
and what you actually see:
https://issues.apache.org/jira/browse/KAFKA/

This will help make the debugging a bit easier on our end and avoid
confusion.

As Steven mentions your description of batch.size isn't quite right. The
batch size is the buffer in bytes used for batching messages together--that
is the target size the client tries to batch together for a partition (it
won't batch together more than that). Setting it to 1 won't effect the
sync/async aspects of the send but it will ensure that each message is sent
in a request with no other messages from that partition (no reason you
would want that). So setting this to 1 is maybe weird but you should still
see all the callbacks fire.

-Jay

On Mon, Feb 16, 2015 at 12:11 PM, ankit tyagi <an...@gmail.com>
wrote:

> Hey,
>
> I am doing POC on kafka .8.2.0 version.Currently I am using kafka-client
> of 0.8.2.0 version for producing async message with callBackHandler.
>
> I am using batch.size =1 in my producer cleint. As per my Understanding ,
> This should behave like a sync client though message can be published from
> different thread. Problem is my callBackHandler is not being called after
> message is sent successfully to the server..
>
> Below is my code of KafkaProducer
>
> public class KafkaProducer
> {
>     private static Logger LOG =
> LoggerFactory.getLogger(KafkaProducer.class);
>
>
>
>     private Properties kafkaProducerProp = new Properties();
>     private org.apache.kafka.clients.producer.KafkaProducer<String, String>
> myProducer;
>
>     public void initializeProperty()
>     {
>         kafkaProducerProp.put("bootstrap.servers", "localhost:9092");
>         kafkaProducerProp.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>         kafkaProducerProp.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>         kafkaProducerProp.put("client.id", "ankitLocalTest");
>         kafkaProducerProp.put("batch.size", 1);
>     }
>
>     public void initialize()
>     {
>         myProducer = new
> org.apache.kafka.clients.producer.KafkaProducer<String,
> String>(kafkaProducerProp);
>     }
>
>     public void sendMessage(String myEventKey, String myEventValue)
>     {
>
>         ProducerRecord<String, String> myProducerRecord = new
> ProducerRecord<String, String>("testTopic2", myEventKey,
>             myEventValue);
>         myProducer.send(myProducerRecord, new ProducerCallBackHandler());
>     }
>
>     public void sendBlockingMessage(String myEventKey, String myEventValue)
>     {
>
>         ProducerRecord<String, String> myProducerRecord = new
> ProducerRecord<String, String>("testTopic", myEventKey,
>             myEventValue);
>         try {
>             RecordMetadata
> recordMetaData=myProducer.send(myProducerRecord).get();
>             LOG.info("offset :{} and partition : {} of last published
> message in blocking mode
> ",recordMetaData.offset(),recordMetaData.partition());
>         }
>         catch (Exception e) {
>             LOG.info("Exception : {} occured while publishing
> event",e.getCause());
>         }
>
>     }
>
> }
>
>
>
>
> public class ProducerCallBackHandler implements Callback
> {
>
>     Logger LOG = LoggerFactory.getLogger(ProducerCallBackHandler.class);
>
>     @Override
>     public void onCompletion(RecordMetadata recordMetaData, Exception
> exceptionOccured)
>     {
>        if(recordMetaData == null){
>            LOG.info("error Occured while publishing event on Kafka");
>            return;
>        }
>        if(exceptionOccured != null){
>            LOG.info("Exception :{} Occured while publishing event on Kafka"
> +exceptionOccured.getCause());
>            return;
>        }
>
>        LOG.info("Message published on kafka with offset : {} and  partition
> : {}",recordMetaData.offset(),recordMetaData.partition());
>     }
>
> }
>
>
>
> I wrote a junit case to publish around 1000 message on the topic.
>
> public class KafkaProducerTest
> {
>
>     @Test
>     @Ignore
>     public void ProducerBlockingTest()
>     {
>         KafkaProducer producer = new KafkaProducer();
>         producer.initializeProperty();
>         producer.initialize();
>         for (int i = 0; i < 1000; i++) {
>             producer.sendBlockingMessage("firstMessageKey",
> "firstMessageValue");
>         }
>     }
>
>     @Test
>     public void ProducerNonBlockingTest()
>     {
>         KafkaProducer producer = new KafkaProducer();
>         producer.initializeProperty();
>         producer.initialize();
>         for (int i = 0; i < 1000; i++) {
>             producer.sendMessage("firstMessageKey", "firstMessageValue");
>         }
>
>     }
>
> }
>
>
> when i saw my Logs, i shows call back handler was called till 48 messages,
> not after that
>
> [2015-02-17 01:30:26.068][kafka-producer-network-thread |
> ankitLocalTest][INFO][ProducerCallBackHandler:25] Message published on
> kafka with offset : 46 and  partition : 0
> [2015-02-17 01:30:26.071][kafka-producer-network-thread |
> ankitLocalTest][INFO][ProducerCallBackHandler:25] Message published on
> kafka with offset : 47 and  partition : 0
> [2015-02-17 01:30:26.072][*kafka-producer-network-thread *|
> ankitLocalTest][INFO][ProducerCallBackHandler:25] *Message published on
> kafka with offset : 48 and  partition : 0*
>
>
> *Am I doing something wrong??*
>

Re: CallBackHandler is not being called after successful delivery of message

Posted by Steven Wu <st...@gmail.com>.
don't know whether it is the cause of your issue or not. but "batch.size"
is measured as bytes (not number of messages). default is 16384

On Mon, Feb 16, 2015 at 12:11 PM, ankit tyagi <an...@gmail.com>
wrote:

> Hey,
>
> I am doing POC on kafka .8.2.0 version.Currently I am using kafka-client
> of 0.8.2.0 version for producing async message with callBackHandler.
>
> I am using batch.size =1 in my producer cleint. As per my Understanding ,
> This should behave like a sync client though message can be published from
> different thread. Problem is my callBackHandler is not being called after
> message is sent successfully to the server..
>
> Below is my code of KafkaProducer
>
> public class KafkaProducer
> {
>     private static Logger LOG =
> LoggerFactory.getLogger(KafkaProducer.class);
>
>
>
>     private Properties kafkaProducerProp = new Properties();
>     private org.apache.kafka.clients.producer.KafkaProducer<String, String>
> myProducer;
>
>     public void initializeProperty()
>     {
>         kafkaProducerProp.put("bootstrap.servers", "localhost:9092");
>         kafkaProducerProp.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>         kafkaProducerProp.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>         kafkaProducerProp.put("client.id", "ankitLocalTest");
>         kafkaProducerProp.put("batch.size", 1);
>     }
>
>     public void initialize()
>     {
>         myProducer = new
> org.apache.kafka.clients.producer.KafkaProducer<String,
> String>(kafkaProducerProp);
>     }
>
>     public void sendMessage(String myEventKey, String myEventValue)
>     {
>
>         ProducerRecord<String, String> myProducerRecord = new
> ProducerRecord<String, String>("testTopic2", myEventKey,
>             myEventValue);
>         myProducer.send(myProducerRecord, new ProducerCallBackHandler());
>     }
>
>     public void sendBlockingMessage(String myEventKey, String myEventValue)
>     {
>
>         ProducerRecord<String, String> myProducerRecord = new
> ProducerRecord<String, String>("testTopic", myEventKey,
>             myEventValue);
>         try {
>             RecordMetadata
> recordMetaData=myProducer.send(myProducerRecord).get();
>             LOG.info("offset :{} and partition : {} of last published
> message in blocking mode
> ",recordMetaData.offset(),recordMetaData.partition());
>         }
>         catch (Exception e) {
>             LOG.info("Exception : {} occured while publishing
> event",e.getCause());
>         }
>
>     }
>
> }
>
>
>
>
> public class ProducerCallBackHandler implements Callback
> {
>
>     Logger LOG = LoggerFactory.getLogger(ProducerCallBackHandler.class);
>
>     @Override
>     public void onCompletion(RecordMetadata recordMetaData, Exception
> exceptionOccured)
>     {
>        if(recordMetaData == null){
>            LOG.info("error Occured while publishing event on Kafka");
>            return;
>        }
>        if(exceptionOccured != null){
>            LOG.info("Exception :{} Occured while publishing event on Kafka"
> +exceptionOccured.getCause());
>            return;
>        }
>
>        LOG.info("Message published on kafka with offset : {} and  partition
> : {}",recordMetaData.offset(),recordMetaData.partition());
>     }
>
> }
>
>
>
> I wrote a junit case to publish around 1000 message on the topic.
>
> public class KafkaProducerTest
> {
>
>     @Test
>     @Ignore
>     public void ProducerBlockingTest()
>     {
>         KafkaProducer producer = new KafkaProducer();
>         producer.initializeProperty();
>         producer.initialize();
>         for (int i = 0; i < 1000; i++) {
>             producer.sendBlockingMessage("firstMessageKey",
> "firstMessageValue");
>         }
>     }
>
>     @Test
>     public void ProducerNonBlockingTest()
>     {
>         KafkaProducer producer = new KafkaProducer();
>         producer.initializeProperty();
>         producer.initialize();
>         for (int i = 0; i < 1000; i++) {
>             producer.sendMessage("firstMessageKey", "firstMessageValue");
>         }
>
>     }
>
> }
>
>
> when i saw my Logs, i shows call back handler was called till 48 messages,
> not after that
>
> [2015-02-17 01:30:26.068][kafka-producer-network-thread |
> ankitLocalTest][INFO][ProducerCallBackHandler:25] Message published on
> kafka with offset : 46 and  partition : 0
> [2015-02-17 01:30:26.071][kafka-producer-network-thread |
> ankitLocalTest][INFO][ProducerCallBackHandler:25] Message published on
> kafka with offset : 47 and  partition : 0
> [2015-02-17 01:30:26.072][*kafka-producer-network-thread *|
> ankitLocalTest][INFO][ProducerCallBackHandler:25] *Message published on
> kafka with offset : 48 and  partition : 0*
>
>
> *Am I doing something wrong??*
>