You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jun Rao (JIRA)" <ji...@apache.org> on 2013/09/13 17:58:59 UTC
[jira] [Closed] (KAFKA-973) Messages From Producer Not being
Partitioned
[ https://issues.apache.org/jira/browse/KAFKA-973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jun Rao closed KAFKA-973.
-------------------------
> Messages From Producer Not being Partitioned
> ---------------------------------------------
>
> Key: KAFKA-973
> URL: https://issues.apache.org/jira/browse/KAFKA-973
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 0.8
> Environment: Linux
> Reporter: Subbu Srinivasan
> Assignee: Neha Narkhede
> Labels: newbie
>
> I created a two node cluster.
> 2 zoo keepers
> 2 brokers
> 1 topic with replication factor (2) and no of partition 2.
> my consumer group has two threads
> 1) From my Java client - I send few messages to the topic. I have set multiple brokers
> kafka2:9092,kafka1:9092.
> Only one thread in my consumer always gets the messages. It looks like producer is not
> partitioning the requests properly.
> 2) However if I send some sample using the simple console producer, I see multiple threads getting
> requests and is load balanced.
> What am I doing wrong in my client?
> public class KafkaProducer {
>
> private final Properties props = new Properties();
> private static AtomicLong counter = new AtomicLong(0);
> kafka.javaapi.producer.Producer<Integer, String> producer = null;
>
> public KafkaProducer()
> {
> props.put("serializer.class", "kafka.serializer.StringEncoder");
> props.put("metadata.broker.list", ConfigurationUtility.getKafkaHost());
> producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
> }
>
> public void sendMessage(String msg) throws Exception
> {
> producer.send(new KeyedMessage<Integer, String>(ConfigurationUtility.getTopicName(), msg));
> }
>
>
> public static void main(String arg[]) throws Exception
> {
>
> ConfigurationUtility.setKafkaHost("kafka2:9092,kafka1:9092");
> ConfigurationUtility.setTopicName("dnslog");
> ConfigurationUtility.setZooKeeperHost("kafka1:2181,kafka2:2181");
> ConfigurationUtility.setConsumerGroupId("dnslog");
>
> for(int i = 0 ; i < 2 ; ++i)
> {
> (new KafkaProducer()).sendMessage(UUID.randomUUID().toString());
> }
> }
> }
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira