You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Subbu Srinivasan (JIRA)" <ji...@apache.org> on 2013/07/13 03:14:46 UTC
[jira] [Created] (KAFKA-973) Messages From Producer Not being
Partitioned
Subbu Srinivasan created KAFKA-973:
--------------------------------------
Summary: Messages From Producer Not being Partitioned
Key: KAFKA-973
URL: https://issues.apache.org/jira/browse/KAFKA-973
Project: Kafka
Issue Type: Bug
Components: consumer
Affects Versions: 0.8
Environment: Linux
Reporter: Subbu Srinivasan
Assignee: Neha Narkhede
I created a two node cluster.
2 zoo keepers
2 brokers
1 topic with replication factor (2) and no of partition 2.
my consumer group has two threads
1) From my Java client - I send few messages to the topic. I have set multiple brokers
kafka2:9092,kafka1:9092.
Only one thread in my consumer always gets the messages. It looks like producer is not
partitioning the requests properly.
2) However if I send some sample using the simple console producer, I see multiple threads getting
requests and is load balanced.
What am I doing wrong in my client?
public class KafkaProducer {
private final Properties props = new Properties();
private static AtomicLong counter = new AtomicLong(0);
kafka.javaapi.producer.Producer<Integer, String> producer = null;
public KafkaProducer()
{
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", ConfigurationUtility.getKafkaHost());
producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
}
public void sendMessage(String msg) throws Exception
{
producer.send(new KeyedMessage<Integer, String>(ConfigurationUtility.getTopicName(), msg));
}
public static void main(String arg[]) throws Exception
{
ConfigurationUtility.setKafkaHost("kafka2:9092,kafka1:9092");
ConfigurationUtility.setTopicName("dnslog");
ConfigurationUtility.setZooKeeperHost("kafka1:2181,kafka2:2181");
ConfigurationUtility.setConsumerGroupId("dnslog");
for(int i = 0 ; i < 2 ; ++i)
{
(new KafkaProducer()).sendMessage(UUID.randomUUID().toString());
}
}
}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira