You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jan Hruban <ja...@altworx.com> on 2019/10/09 11:54:31 UTC

LogAppendTime handling in consumer

Hi,

I have a Kafka topic configured with:

  message.timestamp.type=LogAppendTime

I'm using the "brod" [1] Kafka client and I have noticed that it does
return the CreateTime instead of LogAppendTime when fetching the
messages.

I have tracked down that the "kafka_protocol" library (used by the brod
client) always uses the firstTimestamp from the Record Batch and
timestampDelta from the Record to compute each record's timestamp [2].
This always gives the CreateTime.

In the official Java client, it looks like that when LogAppendTime is in
effect (determined by the attribute timestampType in the Record Batch),
it uses the maxTimestamp from the Record Batch [3] to set the timestamp
in each Record [4].

Is this the exact behaviour which is expected to be followed by clients?
I've come just across several resources which gave me few hints:

  * KIP-32 [5], which just talks about the Message format with magic < 2.

  * KAFKA-5353 [6], which changed the baseTimeStamp to always be the
    create timestamp.

On the other hand, the documentation does not give a clue that clients
should use the maxTimestamp when LogAppendTime is in use:

  * The Record Batch documentation [7] does not explain the individual
    fields semantics.

  * Wiki page "A Guide To The Kafka Protocol" [8] is more detailed on
    the FirstTimestamp, TimestampDelta and MaxTimestamp, but does not
    mention what implications does have the timestamp type on those
    fields.

From my point of view, this is either a deficiency in Kafka, which
should instead always provide the correct authoritative timestamp to
consumers. Or if it is indeed expected that this logic is handled by
clients, it should be explicitly written in the official documentation.


For the record, here's a pull request [9] to the kafka_protocol
library.




[1] https://github.com/klarna/brod

[2] https://github.com/klarna/kafka_protocol/blob/cc13902191b9ca3970a65388697c1069ae68fd2a/src/kpro_batch.erl#L249

[3] https://github.com/apache/kafka/blob/1f1179ea64bbaf068d759aae988bd2a6fe966161/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L558

[4] https://github.com/apache/kafka/blob/1f1179ea64bbaf068d759aae988bd2a6fe966161/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L330-L331

[5] https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message

[6] https://issues.apache.org/jira/browse/KAFKA-5353

[7] http://kafka.apache.org/documentation/#recordbatch

[8] https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets

[9] https://github.com/klarna/kafka_protocol/pull/60


-- 
Jan Hruban