You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Pranav Nakhe (JIRA)" <ji...@apache.org> on 2016/12/15 09:23:58 UTC

[jira] [Updated] (KAFKA-4547) Consumer.position returns incorrect results for Kafka 0.10.1.0 client

     [ https://issues.apache.org/jira/browse/KAFKA-4547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Pranav Nakhe updated KAFKA-4547:
--------------------------------
    Description: 
Consider the following code -

		KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
		List<TopicPartition> listOfPartitions = new ArrayList();
		for (int i = 0; i < consumer.partitionsFor("IssueTopic").size(); i++) {
			listOfPartitions.add(new TopicPartition("IssueTopic", i));
		}
		consumer.assign(listOfPartitions);		
		consumer.pause(listOfPartitions);
		consumer.seekToEnd(listOfPartitions);
//		consumer.resume(listOfPartitions); -- commented out
		for(int i = 0; i < listOfPartitions.size(); i++) {
			System.out.println(consumer.position(listOfPartitions.get(i)));
		}
		
I have created a topic IssueTopic with 3 partitions with a single replica on my single node kafka installation (0.10.1.0)

The behavior noticed for Kafka client 0.10.1.0 as against Kafka client 0.10.0.1

A) Initially when there are no messages on IssueTopic running the above program returns
0.10.1.0                   
0                              
0                              
0           

0.10.0.1
0
0
0

B) Next I send 6 messages and see that the messages have been evenly distributed across the three partitions. Running the above program now returns 
0.10.1.0                   
0                              
0                              
2                              

0.10.0.1
2
2
2

Clearly there is a difference in behavior for the 2 clients.

Now after seekToEnd call if I make a call to resume (uncomment the resume call in code above) then the behavior is

0.10.1.0                   
2                              
2                              
2                              

0.10.0.1
2
2
2

This is an issue I came across when using the spark kafka integration for 0.10. When I use kafka 0.10.1.0 I started seeing this issue. I had raised a pull request to resolve that issue [SPARK-18779] but when looking at the kafka client implementation/documentation now it seems the issue is with kafka and not with spark. There does not seem to be any documentation which specifies/implies that we need to call resume after seekToEnd for position to return the correct value. Also there is a clear difference in the behavior in the two kafka client implementations. 


  was:
Consider the following code -

		KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
		List<TopicPartition> listOfPartitions = new ArrayList();
		for (int i = 0; i < consumer.partitionsFor("IssueTopic").size(); i++) {
			listOfPartitions.add(new TopicPartition("IssueTopic", i));
		}
		consumer.assign(listOfPartitions);		
		consumer.pause(listOfPartitions);
		consumer.seekToEnd(listOfPartitions);
//		consumer.resume(listOfPartitions); -- commented out
		for(int i = 0; i < listOfPartitions.size(); i++) {
			System.out.println(consumer.position(listOfPartitions.get(i)));
		}
		
I have created a topic IssueTopic with 3 partitions with a single replica on my single node kafka installation (0.10.1.0)

The behavior noticed for Kafka client 0.10.1.0 as against Kafka client 0.10.0.1

A) Initially when there are no messages on IssueTopic running the above program returns
0.10.1.0		0.10.0.1
0				0
0				0
0				0

B) Next I send 6 messages and see that the messages have been evenly distributed across the three partitions. Running the above program now returns 
0.10.1.0		0.10.0.1
0				2
0				2
2				2

Clearly there is a difference in behavior for the 2 clients.

Now after seekToEnd call if I make a call to resume (uncomment the resume call in code above) then the behavior is

0.10.1.0		0.10.0.1
2				2
2				2
2				2

This is an issue I came across when using the spark kafka integration for 0.10. When I use kafka 0.10.1.0 I started seeing this issue. I had raised a pull request to resolve that issue [SPARK-18779] but when looking at the kafka client implementation/documentation now it seems the issue is with kafka and not with spark. There does not seem to be any documentation which specifies/implies that we need to call resume after seekToEnd for position to return the correct value. Also there is a clear difference in the behavior in the two kafka client implementations. 



> Consumer.position returns incorrect results for Kafka 0.10.1.0 client
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-4547
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4547
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.10.1.0
>         Environment: Windows Kafka 0.10.1.0
>            Reporter: Pranav Nakhe
>              Labels: clients
>
> Consider the following code -
> 		KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
> 		List<TopicPartition> listOfPartitions = new ArrayList();
> 		for (int i = 0; i < consumer.partitionsFor("IssueTopic").size(); i++) {
> 			listOfPartitions.add(new TopicPartition("IssueTopic", i));
> 		}
> 		consumer.assign(listOfPartitions);		
> 		consumer.pause(listOfPartitions);
> 		consumer.seekToEnd(listOfPartitions);
> //		consumer.resume(listOfPartitions); -- commented out
> 		for(int i = 0; i < listOfPartitions.size(); i++) {
> 			System.out.println(consumer.position(listOfPartitions.get(i)));
> 		}
> 		
> I have created a topic IssueTopic with 3 partitions with a single replica on my single node kafka installation (0.10.1.0)
> The behavior noticed for Kafka client 0.10.1.0 as against Kafka client 0.10.0.1
> A) Initially when there are no messages on IssueTopic running the above program returns
> 0.10.1.0                   
> 0                              
> 0                              
> 0           
> 0.10.0.1
> 0
> 0
> 0
> B) Next I send 6 messages and see that the messages have been evenly distributed across the three partitions. Running the above program now returns 
> 0.10.1.0                   
> 0                              
> 0                              
> 2                              
> 0.10.0.1
> 2
> 2
> 2
> Clearly there is a difference in behavior for the 2 clients.
> Now after seekToEnd call if I make a call to resume (uncomment the resume call in code above) then the behavior is
> 0.10.1.0                   
> 2                              
> 2                              
> 2                              
> 0.10.0.1
> 2
> 2
> 2
> This is an issue I came across when using the spark kafka integration for 0.10. When I use kafka 0.10.1.0 I started seeing this issue. I had raised a pull request to resolve that issue [SPARK-18779] but when looking at the kafka client implementation/documentation now it seems the issue is with kafka and not with spark. There does not seem to be any documentation which specifies/implies that we need to call resume after seekToEnd for position to return the correct value. Also there is a clear difference in the behavior in the two kafka client implementations. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)