You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mattia Barbon <ma...@booking.com.INVALID> on 2019/01/29 12:54:06 UTC

Interaction between transactions, CreateTime topics and offsetsForTimes

Hi,

tl;dr: transaction markers are added to the time index with the timestamp
of the broker, and this makes offsetsForTimes confusing/unreliable for
topics set up with CreateTime timestamp

Longer version

I am working on a system where we are taking multiple streams of messages
from Kafka, applying some filtering/transformation and then writing the
results back to the same Kafka cluster.

As additional constraints, I want to be able to fetch the transformation
results based on the time of the source messages (with second granularity).
For example, if the source messages are

source = ..., A(time=0.9), B(t=1.1), C(t=1.7), D(t=2.1), ...

and the respective results are

destination = ..., A', B', C', D', ...

I want a way to seek to the offset corresponding to time=1 and fetch B',
C', ...

The destination topic is set up with, message.timestamp.type=CreateTime
message.timestamp.difference.max.ms=86400000, I am copying the source
timestamp in the transformed message, and I am using transactions in the
transformer; the destination topic actually looks like (checked with
kafka-dump-log.sh)

destination = ..., tx1(time=2.2), A'(ttime=0.9), tx2(t=3.3), B'(t=1.1),
C'(t=1.7), tx3(t=4.2), D'(t=2.1), ...

In this situation, offsetsForTimes(t=2.0) returns the offset of tx1, while
I would have expected it to return the offset for D'.

I think the current behavior is a bug and should be fixed, but t the very
least it warrants a mention in the documentation.

I will open a JIRA ticket as a follow-up, unless there is consensus this is
not a bug.

Thanks in advance,
Mattia