You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jun Rao (JIRA)" <ji...@apache.org> on 2013/09/13 17:58:51 UTC

[jira] [Resolved] (KAFKA-973) Messages From Producer Not being Partitioned

     [ https://issues.apache.org/jira/browse/KAFKA-973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jun Rao resolved KAFKA-973.
---------------------------

    Resolution: Not A Problem
    
> Messages From Producer Not being Partitioned 
> ---------------------------------------------
>
>                 Key: KAFKA-973
>                 URL: https://issues.apache.org/jira/browse/KAFKA-973
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8
>         Environment: Linux
>            Reporter: Subbu Srinivasan
>            Assignee: Neha Narkhede
>              Labels: newbie
>
> I created a two node cluster.
> 2 zoo keepers
> 2 brokers
> 1 topic with replication factor (2) and no of partition 2.
> my consumer group has two threads
> 1) From my Java client - I send few  messages to the topic. I have set multiple brokers
> kafka2:9092,kafka1:9092.
> Only one thread in my consumer always gets the messages. It looks like producer is not
> partitioning the requests properly.
> 2) However if I send some sample using the simple console producer, I see multiple threads getting
> requests and is load balanced.
> What am I doing wrong in my client?
> public class KafkaProducer {
> 	  
> 	  private final Properties props = new Properties();
>  	  private static AtomicLong counter = new AtomicLong(0);
>  	  kafka.javaapi.producer.Producer<Integer, String> producer = null;
> 	  
> 	  public KafkaProducer() 
> 	  {
> 	    props.put("serializer.class", "kafka.serializer.StringEncoder");
> 	    props.put("metadata.broker.list", ConfigurationUtility.getKafkaHost());
> 	    producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
> 	  } 
> 	  
> 	  public void sendMessage(String msg) throws Exception
> 	  {
> 		  producer.send(new KeyedMessage<Integer, String>(ConfigurationUtility.getTopicName(), msg));
> 	  }	  
> 	  
> 	  
> 	  public static void main(String arg[]) throws Exception
> 	  {
> 		  
> 		  ConfigurationUtility.setKafkaHost("kafka2:9092,kafka1:9092");
> 			ConfigurationUtility.setTopicName("dnslog");
> 			ConfigurationUtility.setZooKeeperHost("kafka1:2181,kafka2:2181");
> 			ConfigurationUtility.setConsumerGroupId("dnslog");
> 			
> 			for(int i = 0 ; i < 2 ; ++i)
> 			{
> 				(new KafkaProducer()).sendMessage(UUID.randomUUID().toString());
> 			}
> 	  }
> }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira