You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "M. Manna" <ma...@gmail.com> on 2021/01/23 11:14:27 UTC

Kafka Consumer Consumption based on TimeStamp-based position

Hello,

We know that using KafkaConsumer api we can replay messages from certain
offsets. However, we are not sure if we could specify timeStamp from which
we could replay messages again.

Does anyone know if this is possible?

Regards,

Re: Kafka Consumer Consumption based on TimeStamp-based position

Posted by "M. Manna" <ma...@gmail.com>.
Thanks. Just realised that it was in the API since 0.11.0. Thanks Steve.

On Sat, 23 Jan 2021 at 12:42, Steve Howard <st...@confluent.io>
wrote:

> Hi,
>
> Yes, you can use the offsetsForTimes() method.  See below for a simple
> example that should get you started...
>
> import org.apache.kafka.clients.consumer.*;
> import org.apache.kafka.common.config.ConfigException;
> import org.apache.kafka.common.*;
> import java.io.*;
> import java.time.Duration;
> import java.util.*;
> import java.text.*;
>
> public class searchByTime {
>   static KafkaConsumer<String, String> c;
>
>   public static void main(String args[]) throws Exception {
>     Properties props = new Properties();
>     props.put("bootstrap.servers","localhost:9092");
>     props.put("max.poll.records",1);
>     props.put("topic","yourtopicname");
>     props.put("group.id",UUID.randomUUID().toString());
>     props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>     props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
>     c = new KafkaConsumer<String, String>(props);
>     String topic = (String)props.get("topic");
>     c.subscribe(Collections.singletonList(topic));
>     System.out.println("subscribed to topic " + topic);
>     System.out.println(c.partitionsFor(topic));
>     List<TopicPartition> partitions = new ArrayList<TopicPartition>();
>     for (PartitionInfo p: c.partitionsFor(topic)) {
>       partitions.add(new TopicPartition(topic,p.partition()));
>     }
>     System.out.println(partitions);
>
>     long timestamp = Long.parseLong(args[0]);
>     Map<TopicPartition, Long> partitionOffsetsRequest = new
> HashMap<>(partitions.size());
>     for (TopicPartition partition : partitions) {
>       partitionOffsetsRequest.put(new TopicPartition(partition.topic(),
> partition.partition()),
>                                   timestamp);
>     }
>
>     final Map<TopicPartition, Long> result = new
> HashMap<>(partitions.size());
>
>     for (Map.Entry<TopicPartition, OffsetAndTimestamp> partitionToOffset :
>       c.offsetsForTimes(partitionOffsetsRequest).entrySet()) {
>         result.put(new TopicPartition(partitionToOffset.getKey().topic(),
> partitionToOffset.getKey().partition()),
>                                      (partitionToOffset.getValue() == null)
> ? null : partitionToOffset.getValue().offset());
>       }
>
>     System.out.println(result);
>     ConsumerRecords<String, String> records =
> c.poll(Duration.ofSeconds(1));
>     for (TopicPartition part: result.keySet()){
>       long offset = result.get(part);
>       c.seek(part,offset);
>     }
>
>     System.out.println("trying to get records...");
>     records = c.poll(Duration.ofSeconds(1));
>     for (ConsumerRecord<String, String> record : records) {
>       Date date = new Date(record.timestamp());
>       DateFormat formatter = new SimpleDateFormat("HH:mm:ss.SSS");
>       formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
>       String dateFormatted = formatter.format(date);
>       System.out.println("Received message: (" + record.key() + ", " +
> record.value() + ") at offset " + record.offset() + " at time " +
> dateFormatted);
>     }
>   }
> }
>
> Thanks,
>
> Steve
>
>
> On Sat, Jan 23, 2021 at 6:14 AM M. Manna <ma...@gmail.com> wrote:
>
> > Hello,
> >
> > We know that using KafkaConsumer api we can replay messages from certain
> > offsets. However, we are not sure if we could specify timeStamp from
> which
> > we could replay messages again.
> >
> > Does anyone know if this is possible?
> >
> > Regards,
> >
>

Re: Kafka Consumer Consumption based on TimeStamp-based position

Posted by Steve Howard <st...@confluent.io>.
Hi,

Yes, you can use the offsetsForTimes() method.  See below for a simple
example that should get you started...

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.*;
import java.io.*;
import java.time.Duration;
import java.util.*;
import java.text.*;

public class searchByTime {
  static KafkaConsumer<String, String> c;

  public static void main(String args[]) throws Exception {
    Properties props = new Properties();
    props.put("bootstrap.servers","localhost:9092");
    props.put("max.poll.records",1);
    props.put("topic","yourtopicname");
    props.put("group.id",UUID.randomUUID().toString());
    props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

    c = new KafkaConsumer<String, String>(props);
    String topic = (String)props.get("topic");
    c.subscribe(Collections.singletonList(topic));
    System.out.println("subscribed to topic " + topic);
    System.out.println(c.partitionsFor(topic));
    List<TopicPartition> partitions = new ArrayList<TopicPartition>();
    for (PartitionInfo p: c.partitionsFor(topic)) {
      partitions.add(new TopicPartition(topic,p.partition()));
    }
    System.out.println(partitions);

    long timestamp = Long.parseLong(args[0]);
    Map<TopicPartition, Long> partitionOffsetsRequest = new
HashMap<>(partitions.size());
    for (TopicPartition partition : partitions) {
      partitionOffsetsRequest.put(new TopicPartition(partition.topic(),
partition.partition()),
                                  timestamp);
    }

    final Map<TopicPartition, Long> result = new
HashMap<>(partitions.size());

    for (Map.Entry<TopicPartition, OffsetAndTimestamp> partitionToOffset :
      c.offsetsForTimes(partitionOffsetsRequest).entrySet()) {
        result.put(new TopicPartition(partitionToOffset.getKey().topic(),
partitionToOffset.getKey().partition()),
                                     (partitionToOffset.getValue() == null)
? null : partitionToOffset.getValue().offset());
      }

    System.out.println(result);
    ConsumerRecords<String, String> records = c.poll(Duration.ofSeconds(1));
    for (TopicPartition part: result.keySet()){
      long offset = result.get(part);
      c.seek(part,offset);
    }

    System.out.println("trying to get records...");
    records = c.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record : records) {
      Date date = new Date(record.timestamp());
      DateFormat formatter = new SimpleDateFormat("HH:mm:ss.SSS");
      formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
      String dateFormatted = formatter.format(date);
      System.out.println("Received message: (" + record.key() + ", " +
record.value() + ") at offset " + record.offset() + " at time " +
dateFormatted);
    }
  }
}

Thanks,

Steve


On Sat, Jan 23, 2021 at 6:14 AM M. Manna <ma...@gmail.com> wrote:

> Hello,
>
> We know that using KafkaConsumer api we can replay messages from certain
> offsets. However, we are not sure if we could specify timeStamp from which
> we could replay messages again.
>
> Does anyone know if this is possible?
>
> Regards,
>