You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Xavier Noria <fx...@hashref.com> on 2018/02/19 15:36:52 UTC

timestamp-oriented API

In the mental model I am building of how Kafka works (new to this), the
broker keeps offsets by consumer group, and individual consumers basically
depend on the offset of the consumer group they join. Also consumer groups
may opt to start from the beginning.

OK, in that mental model there is a linearization of messages per
partition. As the documentation says, there is a total order per partition,
and the order is based on the offset, unrelated to the timestamp.

But I see the Java library has timestamp-oriented methods like:


https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/Consumer.html#offsetsForTimes(java.util.Map)

How does that make sense given the model described above? How is that
implemented? Does the broker has builtin support for this? What happens if
due to race conditions or machines with clocks out of sync you have
messages with timestamps interleaved?

Could anyone concile that API with the intrinsec offset-based contract?

Re: timestamp-oriented API

Posted by "Matthias J. Sax" <ma...@confluent.io>.
The broker maintains a timestamp index and uses this timestamp index to
answer the "offsetForTimes" request.

The returned offset guarantees, that there is no record with a smaller
timestamp and smaller offset in the topic. Thus, if there are
out-of-order records in the topic, and you start reading from the
returned offset, you might see record with _smaller_ timestamps than the
specified in offsetForTimes().

Ie, it is guaranteed, that you receive _all_ records that have a
timestamp equal or larger than the specified one (however, it is not
guaranteed that you get _only_ record with equal or larger timestamps --
there might be later records with smaller timestamps, too).


-Matthias



On 2/19/18 10:00 AM, Steve Jang wrote:
> If you set *message.timestamp.type* (or *log.message.timestamp.type*) to be
> LogAppendTime, this would make sense.
> 
> I am new to Kafka, too, and if this was set to CreateTime, I don't know
> what the behavior would be.  There is *message.timestamp.difference.max.ms
> <http://message.timestamp.difference.max.ms>* setting too, so there seem to
> be certain "boundedness" of how much clock skew is allowed between the
> producer and the broker, so you could implement various types of policies
> (min, max, etc) for this API.
> 
> 
> On Mon, Feb 19, 2018 at 7:36 AM, Xavier Noria <fx...@hashref.com> wrote:
> 
>> In the mental model I am building of how Kafka works (new to this), the
>> broker keeps offsets by consumer group, and individual consumers basically
>> depend on the offset of the consumer group they join. Also consumer groups
>> may opt to start from the beginning.
>>
>> OK, in that mental model there is a linearization of messages per
>> partition. As the documentation says, there is a total order per partition,
>> and the order is based on the offset, unrelated to the timestamp.
>>
>> But I see the Java library has timestamp-oriented methods like:
>>
>>
>> https://kafka.apache.org/0102/javadoc/org/apache/kafka/
>> clients/consumer/Consumer.html#offsetsForTimes(java.util.Map)
>>
>> How does that make sense given the model described above? How is that
>> implemented? Does the broker has builtin support for this? What happens if
>> due to race conditions or machines with clocks out of sync you have
>> messages with timestamps interleaved?
>>
>> Could anyone concile that API with the intrinsec offset-based contract?
>>
> 
> 
> 


Re: timestamp-oriented API

Posted by Steve Jang <st...@qualtrics.com>.
If you set *message.timestamp.type* (or *log.message.timestamp.type*) to be
LogAppendTime, this would make sense.

I am new to Kafka, too, and if this was set to CreateTime, I don't know
what the behavior would be.  There is *message.timestamp.difference.max.ms
<http://message.timestamp.difference.max.ms>* setting too, so there seem to
be certain "boundedness" of how much clock skew is allowed between the
producer and the broker, so you could implement various types of policies
(min, max, etc) for this API.


On Mon, Feb 19, 2018 at 7:36 AM, Xavier Noria <fx...@hashref.com> wrote:

> In the mental model I am building of how Kafka works (new to this), the
> broker keeps offsets by consumer group, and individual consumers basically
> depend on the offset of the consumer group they join. Also consumer groups
> may opt to start from the beginning.
>
> OK, in that mental model there is a linearization of messages per
> partition. As the documentation says, there is a total order per partition,
> and the order is based on the offset, unrelated to the timestamp.
>
> But I see the Java library has timestamp-oriented methods like:
>
>
> https://kafka.apache.org/0102/javadoc/org/apache/kafka/
> clients/consumer/Consumer.html#offsetsForTimes(java.util.Map)
>
> How does that make sense given the model described above? How is that
> implemented? Does the broker has builtin support for this? What happens if
> due to race conditions or machines with clocks out of sync you have
> messages with timestamps interleaved?
>
> Could anyone concile that API with the intrinsec offset-based contract?
>



-- 




*Steve JangPRINCIPAL ENGINEER Mobile +1.206.384.2999  |
Support +1.800.340.9194
<https://www.qualtrics.com/?utm_medium=email+signature&utm_source=internal+initiatives>*