You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Shahar Cizer Kobrinsky <sh...@stripe.com.INVALID> on 2023/08/16 20:51:57 UTC

list offsets of compacted topics

Hey Folks,

I have a process which aims to measure time lag (in Scala).

When the process bootstraps it looks at the history of offsets and collect
the offset that existed for different timestamps (7 days ago, 6 days ago...
etc in more frequency as it gets closer to *now*). In order to do that it
uses the "consumer.offsetsForTimes" method and keeps that information in
memory. I observed that in some cases, mainly in *some* compacted topics
partitions (observed on a few partitions from __consumer_offsets and
__transaction_state) that the resulted offsets arent monotonically
increasing as times are getting closer to now.

In these specific cases (again, not all partitions) I see for example (a
few data points from many):

time (in seconds)=1691512025 offset=16101908
1691550724/15121538
1691645078/15473125
1691789229/15473125
1692104539/16078952
1692116770/16101908
1692116809/16101908
1692116833/16101908
..
..

Code looks like:
    BootstrapTimes.reverse // bootstrap from old to new time
      .foreach { duration =>
        val time = now - duration
        val lookup = partitions
          .map {
            _ -> (time.inMillis: java.lang.Long)
          }
          .toMap
          .asJava
        val offsets =
          try consumer.offsetsForTimes(lookup).asScala.toMap
          catch KafkaConsumerException.rethrow
        val partitionToOffset =
          offsets.collect { case (partition, value) if value != null =>
partition -> value.offset }
}

So no data point is after the earliest time, which seems - odd?
What could the reason for it be? can you help me understand why it would
mostly be observed with compacted topic (but once in a while in other
topics too)?