You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by srimugunthan dhandapani <sr...@gmail.com> on 2017/09/30 12:17:45 UTC

fetchOffsetByTime: reading from multiple partitions

Hi all,
I am using kafka 0.10.2 version and I have a kafka cluster with topics
configured for 5 partitions.
I am  trying to use with Kafka consumers assigned manually to multiple
partitions.
I see that in the consumer poll loop , i am getting records from multiple
partitions, but the number of records keeps varying everytime i run the
program. One run gives 400 records. but another run gives 200 records.
Kafka retention times are configured for more than a day. I dont know if i
am using the APIs wrongly. The code that i use is below:



import java.util.*;



import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

public class ConsumerGroup {


   public static OffsetAndTimestamp fetchOffsetByTime( KafkaConsumer<Long,
String> consumer , TopicPartition partition , long startTime){

      Map<TopicPartition, Long> query = new HashMap<>();
      query.put(
              partition,
              startTime);

      final Map<TopicPartition, OffsetAndTimestamp> offsetResult =
consumer.offsetsForTimes(query);
      if( offsetResult == null || offsetResult.isEmpty() ) {
         System.out.println(" No Offset to Fetch ");
         System.out.println(" Offset Size "+offsetResult.size());



         return null;
      }
      final OffsetAndTimestamp offsetTimestamp =
offsetResult.get(partition);
      if(offsetTimestamp == null ){
         System.out.println("No Offset Found for partition :
"+partition.partition());
      }
      return offsetTimestamp;
   }

   public static KafkaConsumer<Long, String>  assignOffsetToConsumer(
KafkaConsumer<Long, String> consumer, String topic , long startTime ){
      final List<PartitionInfo> partitionInfoList =
consumer.partitionsFor(topic);
      System.out.println("Number of Partitions :
"+partitionInfoList.size());
      final List<TopicPartition> topicPartitions = new ArrayList<>();
      for (PartitionInfo pInfo : partitionInfoList) {
         TopicPartition partition = new TopicPartition(topic,
pInfo.partition());
         topicPartitions.add(partition);
      }
      consumer.assign(topicPartitions);
      for(TopicPartition partition : topicPartitions ){
         OffsetAndTimestamp offSetTs = fetchOffsetByTime(consumer,
partition, startTime);


         if( offSetTs == null ){
            System.out.println("No Offset Found for partition : " +
partition.partition());
            consumer.seekToEnd(Arrays.asList(partition));
         }else {
            System.out.println(" Offset Found for partition : "
+offSetTs.offset()+" " +partition.partition());
            System.out.println("FETCH offset success"+
                    " Offset " + offSetTs.offset() +
                    " offSetTs " + offSetTs);
            consumer.seek(partition, offSetTs.offset());
         }
      }
      return consumer;
   }

   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>
<startTimestamp>");
         return;
      }

      String topic = args[0].toString();
      String group = args[1].toString();
      long start_time_Stamp =  Long.parseLong( args[2].toString());
      Properties props = new Properties();

      props.put("bootstrap.servers", "198.18.134.4:9092,198.18.134.6:9092,
198.18.134.7:9092,198.18.134.8:9092,198.18.134.12:9092 ");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",
         "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer",
         "org.apache.kafka.common.serialization.StringDeserializer");

      KafkaConsumer<Long, String> consumer = new KafkaConsumer<Long,
String>(props);
      assignOffsetToConsumer(consumer, topic, start_time_Stamp);


      System.out.println("Subscribed to topic " + topic);
      int i = 0;

      int arr[] = {0,0,0,0,0};
      while (true) {
         ConsumerRecords<Long, String> records = consumer.poll(1000);
         int count= 0;
            for (ConsumerRecord<Long, String> record : records) {

               count++;

               if(arr[record.partition()] == 0){
                  arr[record.partition()] =1;
               }

               System.out.println("record=>"+" offset="
                       +record.offset()
                       + " timestamp="+record.timestamp()
                        + " :"+record);
               System.out.println("recordcount = "+count+"
bitmap"+Arrays.toString(arr));

            }


      }
   }



}

What am i missing here?
thanks,
mugunthan