You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Snehalata Nagaje <sn...@harbingergroup.com> on 2016/06/21 11:38:45 UTC

How to get earliest valid offset


Hi All, 


We are using kafka 2.10_0.9 new version, but consumer we are using old high level and low level api. 

I am trying to fetch earliest valid offset for topic, but it is returning latest offset if the data(log) is deleted after certain interval(which is configured in server properties) 

in previous version even if data gets deleted , we were able to get original earliest valid offset, I am using this code to get the offset 

suppose I have added 2 messages starting from offset 40 and 41, now file is deleted and now inserted new message at 42, this will create new log file, but when i fetch earlist valid offset I am getting as 42,I was expecting 40 


I am not committing offsets anywhere, I do not want to do that. 

PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 10 ); 
Map<TopicAndPartition, PartitionOffsetRequestInfo> map = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); 
map.put(topicAndPartition, partitionOffsetRequestInfo); 
OffsetRequest request = new OffsetRequest(map, ( short ) 0 , CLIENT_ID ); 
OffsetResponse startOffsets = simpleConsumer.getOffsetsBefore(request); 

if (startOffsets.offsets(topicName, partition).length > 0 ) { 
long validoffset = startOffsets.offsets(topicName, partition)[ 0 ]; 
simpleConsumer.close(); 
return validoffset; 
} 
simpleConsumer.close(); 


Can you help me on this? 


Thanks, 
snehalata