You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Subbu Srinivasan (JIRA)" <ji...@apache.org> on 2013/07/13 03:14:46 UTC

[jira] [Created] (KAFKA-973) Messages From Producer Not being Partitioned

Subbu Srinivasan created KAFKA-973:
--------------------------------------

             Summary: Messages From Producer Not being Partitioned 
                 Key: KAFKA-973
                 URL: https://issues.apache.org/jira/browse/KAFKA-973
             Project: Kafka
          Issue Type: Bug
          Components: consumer
    Affects Versions: 0.8
         Environment: Linux
            Reporter: Subbu Srinivasan
            Assignee: Neha Narkhede


I created a two node cluster.

2 zoo keepers
2 brokers
1 topic with replication factor (2) and no of partition 2.
my consumer group has two threads

1) From my Java client - I send few  messages to the topic. I have set multiple brokers
kafka2:9092,kafka1:9092.

Only one thread in my consumer always gets the messages. It looks like producer is not
partitioning the requests properly.

2) However if I send some sample using the simple console producer, I see multiple threads getting
requests and is load balanced.

What am I doing wrong in my client?


public class KafkaProducer {

	  
	  private final Properties props = new Properties();

 	  private static AtomicLong counter = new AtomicLong(0);
 	  kafka.javaapi.producer.Producer<Integer, String> producer = null;
	  
	  public KafkaProducer() 
	  {
	    props.put("serializer.class", "kafka.serializer.StringEncoder");
	    props.put("metadata.broker.list", ConfigurationUtility.getKafkaHost());
	    producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
	  } 
	  
	  public void sendMessage(String msg) throws Exception
	  {
		  producer.send(new KeyedMessage<Integer, String>(ConfigurationUtility.getTopicName(), msg));
	  }	  
	  
	  
	  public static void main(String arg[]) throws Exception
	  {
		  
		  ConfigurationUtility.setKafkaHost("kafka2:9092,kafka1:9092");
			ConfigurationUtility.setTopicName("dnslog");
			ConfigurationUtility.setZooKeeperHost("kafka1:2181,kafka2:2181");
			ConfigurationUtility.setConsumerGroupId("dnslog");
			
			for(int i = 0 ; i < 2 ; ++i)
			{
				(new KafkaProducer()).sendMessage(UUID.randomUUID().toString());
			}
	  }
}




--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira