You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by prateek arora <pr...@gmail.com> on 2015/10/14 19:29:04 UTC

Facing Issue to create asyn producer in kafka 0.8.2

Hi

I want to create async producer so i can buffer messages in queue and send
after every 5 sec .

my kafka version is 0.8.2.0.

and i am using  kafka-clients 0.8.2.0 to create kafka producer in java.


below is my sample code :

package com.intel.labs.ive.cloud.testKafkaProducerJ;

import java.nio.charset.Charset;
import java.util.HashMap;

import java.util.Map;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;

import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

public class TestKafkaProducer {

Map<String, Object> props = new HashMap<String, Object>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, metadataBroker);
        props.put("producer.type", "async");
        props.put("queue.buffering.max.ms", "5000");

Serializer<String> keySerializer = new StringSerializer();
        Serializer<byte[]> valueSerializer = new ByteArraySerializer();

        producer = new KafkaProducer<String, byte[]>(props, keySerializer,
valueSerializer);

ProducerRecord<String, byte[]> imageRecord;

while ( true ) {
imageRecord = new ProducerRecord<String, byte[]>(topicName,
recordKey,imageBytes);

        producer.send(imageRecord);
}
}

size of my message is around 77K

but its work like a synchronous producer , send every message to broker  .
not buffering a message in to queue and send after 5 sec


please help to find out a solution.


Regards
Prateek

Re: Facing Issue to create asyn producer in kafka 0.8.2

Posted by prateek arora <pr...@gmail.com>.
Thanks
 its work ...

Regards
Prateek

On Wed, Oct 14, 2015 at 11:46 AM, Grant Henke <gh...@cloudera.com> wrote:

> Looks like you may be mixing the new producer with old producer configs.
> See the new config documentation here:
> http://kafka.apache.org/documentation.html#newproducerconfigs. You will
> likely want to set the "batch.size" and "linger.ms" to achieve your goal.
>
> Thanks,
> Grant
>
> On Wed, Oct 14, 2015 at 1:29 PM, prateek arora <prateek.arora2k6@gmail.com
> >
> wrote:
>
> > Hi
> >
> > Thanks for help .
> >
> > but same behavior even after changing batch.size
> >
> > I have changes  batch.size value to 33554432.
> >  props.put("batch.size","33554432");
> >
> >
> >
> > On Wed, Oct 14, 2015 at 11:09 AM, Zakee <kz...@netzero.net> wrote:
> >
> > > Hi Prateek,
> > >
> > > Looks like you are using default batch.size which is ~16K and it forces
> > > the send of messages immediately as your single message is larger than
> > > that. Try using larger batch.size.
> > >
> > > Thanks
> > > Zakee
> > >
> > >
> > >
> > > > On Oct 14, 2015, at 10:29 AM, prateek arora <
> > prateek.arora2k6@gmail.com>
> > > wrote:
> > > >
> > > > Hi
> > > >
> > > > I want to create async producer so i can buffer messages in queue and
> > > send
> > > > after every 5 sec .
> > > >
> > > > my kafka version is 0.8.2.0.
> > > >
> > > > and i am using  kafka-clients 0.8.2.0 to create kafka producer in
> java.
> > > >
> > > >
> > > > below is my sample code :
> > > >
> > > > package com.intel.labs.ive.cloud.testKafkaProducerJ;
> > > >
> > > > import java.nio.charset.Charset;
> > > > import java.util.HashMap;
> > > >
> > > > import java.util.Map;
> > > >
> > > > import org.apache.kafka.clients.producer.KafkaProducer;
> > > > import org.apache.kafka.clients.producer.Producer;
> > > > import org.apache.kafka.clients.producer.ProducerConfig;
> > > > import org.apache.kafka.clients.producer.ProducerRecord;
> > > > import org.apache.kafka.common.Metric;
> > > > import org.apache.kafka.common.MetricName;
> > > > import org.apache.kafka.common.serialization.Serializer;
> > > > import org.apache.kafka.common.serialization.StringSerializer;
> > > > import org.apache.kafka.common.serialization.ByteArraySerializer;
> > > >
> > > > import java.nio.file.DirectoryStream;
> > > > import java.nio.file.Files;
> > > > import java.nio.file.Path;
> > > > import java.nio.file.Paths;
> > > >
> > > > public class TestKafkaProducer {
> > > >
> > > > Map<String, Object> props = new HashMap<String, Object>();
> > > >        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> > > metadataBroker);
> > > >        props.put("producer.type", "async");
> > > >        props.put("queue.buffering.max.ms", "5000");
> > > >
> > > > Serializer<String> keySerializer = new StringSerializer();
> > > >        Serializer<byte[]> valueSerializer = new
> ByteArraySerializer();
> > > >
> > > >        producer = new KafkaProducer<String, byte[]>(props,
> > keySerializer,
> > > > valueSerializer);
> > > >
> > > > ProducerRecord<String, byte[]> imageRecord;
> > > >
> > > > while ( true ) {
> > > > imageRecord = new ProducerRecord<String, byte[]>(topicName,
> > > > recordKey,imageBytes);
> > > >
> > > >        producer.send(imageRecord);
> > > > }
> > > > }
> > > >
> > > > size of my message is around 77K
> > > >
> > > > but its work like a synchronous producer , send every message to
> broker
> > > .
> > > > not buffering a message in to queue and send after 5 sec
> > > >
> > > >
> > > > please help to find out a solution.
> > > >
> > > >
> > > > Regards
> > > > Prateek
> > >
> > > ____________________________________________________________
> > > A Balance Transfer Card With An Outrageously Long Intro Rate And No
> > > Balance Transfer Fees That Can Save You Thousands
> > >
> http://thirdpartyoffers.netzero.net/TGL3231/561e9a75a77071a74763fst04vuc
> >
>
>
>
> --
> Grant Henke
> Software Engineer | Cloudera
> grant@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
>

Re: Facing Issue to create asyn producer in kafka 0.8.2

Posted by Grant Henke <gh...@cloudera.com>.
Looks like you may be mixing the new producer with old producer configs.
See the new config documentation here:
http://kafka.apache.org/documentation.html#newproducerconfigs. You will
likely want to set the "batch.size" and "linger.ms" to achieve your goal.

Thanks,
Grant

On Wed, Oct 14, 2015 at 1:29 PM, prateek arora <pr...@gmail.com>
wrote:

> Hi
>
> Thanks for help .
>
> but same behavior even after changing batch.size
>
> I have changes  batch.size value to 33554432.
>  props.put("batch.size","33554432");
>
>
>
> On Wed, Oct 14, 2015 at 11:09 AM, Zakee <kz...@netzero.net> wrote:
>
> > Hi Prateek,
> >
> > Looks like you are using default batch.size which is ~16K and it forces
> > the send of messages immediately as your single message is larger than
> > that. Try using larger batch.size.
> >
> > Thanks
> > Zakee
> >
> >
> >
> > > On Oct 14, 2015, at 10:29 AM, prateek arora <
> prateek.arora2k6@gmail.com>
> > wrote:
> > >
> > > Hi
> > >
> > > I want to create async producer so i can buffer messages in queue and
> > send
> > > after every 5 sec .
> > >
> > > my kafka version is 0.8.2.0.
> > >
> > > and i am using  kafka-clients 0.8.2.0 to create kafka producer in java.
> > >
> > >
> > > below is my sample code :
> > >
> > > package com.intel.labs.ive.cloud.testKafkaProducerJ;
> > >
> > > import java.nio.charset.Charset;
> > > import java.util.HashMap;
> > >
> > > import java.util.Map;
> > >
> > > import org.apache.kafka.clients.producer.KafkaProducer;
> > > import org.apache.kafka.clients.producer.Producer;
> > > import org.apache.kafka.clients.producer.ProducerConfig;
> > > import org.apache.kafka.clients.producer.ProducerRecord;
> > > import org.apache.kafka.common.Metric;
> > > import org.apache.kafka.common.MetricName;
> > > import org.apache.kafka.common.serialization.Serializer;
> > > import org.apache.kafka.common.serialization.StringSerializer;
> > > import org.apache.kafka.common.serialization.ByteArraySerializer;
> > >
> > > import java.nio.file.DirectoryStream;
> > > import java.nio.file.Files;
> > > import java.nio.file.Path;
> > > import java.nio.file.Paths;
> > >
> > > public class TestKafkaProducer {
> > >
> > > Map<String, Object> props = new HashMap<String, Object>();
> > >        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> > metadataBroker);
> > >        props.put("producer.type", "async");
> > >        props.put("queue.buffering.max.ms", "5000");
> > >
> > > Serializer<String> keySerializer = new StringSerializer();
> > >        Serializer<byte[]> valueSerializer = new ByteArraySerializer();
> > >
> > >        producer = new KafkaProducer<String, byte[]>(props,
> keySerializer,
> > > valueSerializer);
> > >
> > > ProducerRecord<String, byte[]> imageRecord;
> > >
> > > while ( true ) {
> > > imageRecord = new ProducerRecord<String, byte[]>(topicName,
> > > recordKey,imageBytes);
> > >
> > >        producer.send(imageRecord);
> > > }
> > > }
> > >
> > > size of my message is around 77K
> > >
> > > but its work like a synchronous producer , send every message to broker
> > .
> > > not buffering a message in to queue and send after 5 sec
> > >
> > >
> > > please help to find out a solution.
> > >
> > >
> > > Regards
> > > Prateek
> >
> > ____________________________________________________________
> > A Balance Transfer Card With An Outrageously Long Intro Rate And No
> > Balance Transfer Fees That Can Save You Thousands
> > http://thirdpartyoffers.netzero.net/TGL3231/561e9a75a77071a74763fst04vuc
>



-- 
Grant Henke
Software Engineer | Cloudera
grant@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke

Re: Facing Issue to create asyn producer in kafka 0.8.2

Posted by prateek arora <pr...@gmail.com>.
Hi

Thanks for help .

but same behavior even after changing batch.size

I have changes  batch.size value to 33554432.
 props.put("batch.size","33554432");



On Wed, Oct 14, 2015 at 11:09 AM, Zakee <kz...@netzero.net> wrote:

> Hi Prateek,
>
> Looks like you are using default batch.size which is ~16K and it forces
> the send of messages immediately as your single message is larger than
> that. Try using larger batch.size.
>
> Thanks
> Zakee
>
>
>
> > On Oct 14, 2015, at 10:29 AM, prateek arora <pr...@gmail.com>
> wrote:
> >
> > Hi
> >
> > I want to create async producer so i can buffer messages in queue and
> send
> > after every 5 sec .
> >
> > my kafka version is 0.8.2.0.
> >
> > and i am using  kafka-clients 0.8.2.0 to create kafka producer in java.
> >
> >
> > below is my sample code :
> >
> > package com.intel.labs.ive.cloud.testKafkaProducerJ;
> >
> > import java.nio.charset.Charset;
> > import java.util.HashMap;
> >
> > import java.util.Map;
> >
> > import org.apache.kafka.clients.producer.KafkaProducer;
> > import org.apache.kafka.clients.producer.Producer;
> > import org.apache.kafka.clients.producer.ProducerConfig;
> > import org.apache.kafka.clients.producer.ProducerRecord;
> > import org.apache.kafka.common.Metric;
> > import org.apache.kafka.common.MetricName;
> > import org.apache.kafka.common.serialization.Serializer;
> > import org.apache.kafka.common.serialization.StringSerializer;
> > import org.apache.kafka.common.serialization.ByteArraySerializer;
> >
> > import java.nio.file.DirectoryStream;
> > import java.nio.file.Files;
> > import java.nio.file.Path;
> > import java.nio.file.Paths;
> >
> > public class TestKafkaProducer {
> >
> > Map<String, Object> props = new HashMap<String, Object>();
> >        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> metadataBroker);
> >        props.put("producer.type", "async");
> >        props.put("queue.buffering.max.ms", "5000");
> >
> > Serializer<String> keySerializer = new StringSerializer();
> >        Serializer<byte[]> valueSerializer = new ByteArraySerializer();
> >
> >        producer = new KafkaProducer<String, byte[]>(props, keySerializer,
> > valueSerializer);
> >
> > ProducerRecord<String, byte[]> imageRecord;
> >
> > while ( true ) {
> > imageRecord = new ProducerRecord<String, byte[]>(topicName,
> > recordKey,imageBytes);
> >
> >        producer.send(imageRecord);
> > }
> > }
> >
> > size of my message is around 77K
> >
> > but its work like a synchronous producer , send every message to broker
> .
> > not buffering a message in to queue and send after 5 sec
> >
> >
> > please help to find out a solution.
> >
> >
> > Regards
> > Prateek
>
> ____________________________________________________________
> A Balance Transfer Card With An Outrageously Long Intro Rate And No
> Balance Transfer Fees That Can Save You Thousands
> http://thirdpartyoffers.netzero.net/TGL3231/561e9a75a77071a74763fst04vuc

Re: Facing Issue to create asyn producer in kafka 0.8.2

Posted by Zakee <kz...@netzero.net>.
Hi Prateek,

Looks like you are using default batch.size which is ~16K and it forces the send of messages immediately as your single message is larger than that. Try using larger batch.size.

Thanks
Zakee



> On Oct 14, 2015, at 10:29 AM, prateek arora <pr...@gmail.com> wrote:
> 
> Hi
> 
> I want to create async producer so i can buffer messages in queue and send
> after every 5 sec .
> 
> my kafka version is 0.8.2.0.
> 
> and i am using  kafka-clients 0.8.2.0 to create kafka producer in java.
> 
> 
> below is my sample code :
> 
> package com.intel.labs.ive.cloud.testKafkaProducerJ;
> 
> import java.nio.charset.Charset;
> import java.util.HashMap;
> 
> import java.util.Map;
> 
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.Producer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.Metric;
> import org.apache.kafka.common.MetricName;
> import org.apache.kafka.common.serialization.Serializer;
> import org.apache.kafka.common.serialization.StringSerializer;
> import org.apache.kafka.common.serialization.ByteArraySerializer;
> 
> import java.nio.file.DirectoryStream;
> import java.nio.file.Files;
> import java.nio.file.Path;
> import java.nio.file.Paths;
> 
> public class TestKafkaProducer {
> 
> Map<String, Object> props = new HashMap<String, Object>();
>        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, metadataBroker);
>        props.put("producer.type", "async");
>        props.put("queue.buffering.max.ms", "5000");
> 
> Serializer<String> keySerializer = new StringSerializer();
>        Serializer<byte[]> valueSerializer = new ByteArraySerializer();
> 
>        producer = new KafkaProducer<String, byte[]>(props, keySerializer,
> valueSerializer);
> 
> ProducerRecord<String, byte[]> imageRecord;
> 
> while ( true ) {
> imageRecord = new ProducerRecord<String, byte[]>(topicName,
> recordKey,imageBytes);
> 
>        producer.send(imageRecord);
> }
> }
> 
> size of my message is around 77K
> 
> but its work like a synchronous producer , send every message to broker  .
> not buffering a message in to queue and send after 5 sec
> 
> 
> please help to find out a solution.
> 
> 
> Regards
> Prateek

____________________________________________________________
A Balance Transfer Card With An Outrageously Long Intro Rate And No Balance Transfer Fees That Can Save You Thousands
http://thirdpartyoffers.netzero.net/TGL3231/561e9a75a77071a74763fst04vuc