You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Gerd Behrmann <be...@gmail.com> on 2017/07/24 20:27:13 UTC

Kafka Streams internal repartitioning topics, retention time, and reprocessing old data

Hi,

While adding a new Streams based micro service to an existing Kafka infrastructure, I have run into some issues processing older data in existing topics. I am uncertain of the exact cause of the problems, but am looking for advice to clarify how things are supposed to work to eliminate possibilities. The following is my hypothesis of what may be happening; maybe somebody can tell me why what I describe is impossible - or whether this might be a bug. The following relates to Kafka 0.10.2.1 (confluent distribution).



TL;DR: When (re)processing old data with Kafka Streams in a topology that causes the stream to be repartitioned, the records in the -repartition topic carry the timestamp of the original input records as extracted by the timestamp extractor. The default retention policy of the -repartition topic is however 7 days, allowing Kafka to delete data from the -repartition topic even before the Streams application has a chance of reading it back in.





The situation is basically that I have a topic with existing records dating back several months. Each record contains a timestamp and a client identifier (among other things). The task is quite simple: Produce an output topic that contains the largest timestamp for each client. Distilled this looks something like this:

    class Record {
      ...
      long client;
      long time;
      ...
    }

    builder.stream(EARLIEST, Bytes(), Record(), “input")
           .map((hash, record) -> new KeyValue<>(record.client, record.time))
           .groupByKey(Long(), Long())
           .reduce(Long::max, “store")
           .to(Long(), Long(), “output”);

where Bytes(), Long(), Record() return the appropriate Serde. This takes each input record and throws the original key away, repartitions on the embedded id, runs a reduction operation keeping the largest timestamp, and stores the result back into a topic. The repartitioning causes an internal topic to be created. This topic will have a cleanup.policy=delete and the server default retention policy of 7 days. 


I am using the default FailOnInvalidTimestamp timestamp extractor. As far as I can determine this causes the record in the -repartition topic to have the same metadata timestamps as the input records. Also, as far as I can see in the Kafka server side code, log segments will be deleted once the largest timestamp (as extracted from the records stored in the segment) is older than the retention policy. 


This is where I wonder how this is supposed to work when ingesting months old data: it would appear that Kafka could start to delete segments of the -repartition aggressively as the timestamps are several months old. This could happen even before Kafka Streams had a chance to read the data back in for the reduce operation. The Kafka server log would seem to support that this happens as I see several segments be created *and deleted* right after the application was started:

    [2017-07-24 09:37:54,735] INFO Rolled new log segment for ‘xxx-repartition-2' in 1 ms. (kafka.log.Log)
    [2017-07-24 09:37:54,735] INFO Scheduling log segment 0 for log xxx-repartition-2 for deletion. (kafka.log.Log)


This repeats quite a number of times for the first half our or so - presumably until the computation has caught up with newer data that didn’t get deleted right away. Also the client side log seemed to indicate that something like this was happening:


    2017-07-24 09:45:02,550  INFO StreamThread stream-thread [StreamThread-1] no custom setting defined for topic xxx-repartition using original config earliest for offset reset


This message too repeated quite a number of times for the first half our or so. Looking at the Kafka Streams code, this message would get logged as a result of the consumer failing due to an invalid offset.


Assuming my theory is correct, I could probably solve this problem by using a WallclockTimestampExtractor (something I will test tomorrow). However one of the use cases often repeated in the Kafka material is that one can reprocess old data - surely it must be possible to reprocess using a timestamp extractor that reflects the original time, not the current processing time?


I tried to Google for information about retention time and internal topics. The closest thing I could come is that Kafka 0.11 has gained supported for applications asking for records before a particular offset to be deleted; a future Kafka Streams could use this to eliminate the need for having a retention time on the internal topic and thus resolve the problem.



Cheers,

Gerd