You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Elias Levy <fe...@gmail.com> on 2016/09/09 23:19:38 UTC

Time of derived records in Kafka Streams

The Kafka Streams documentation discussed how to assign timestamps to
records received from source topic via TimestampExtractor.  But neither the
Kafka nor the Confluent documentation on Kafka Streams explain what
timestamp is associated with a record that has been transformed.

What timestamp is associated with records that are output by stateless
transformations like map or flatMap?

What timestamp is associated with records that are outputted by stageful
transformations like aggregations or joins?

What about transformations on windows?

What timestamp does the Kafka publisher use, if any, when writing to an
intermediate topic via through() or a sink via to()?

Re: Time of derived records in Kafka Streams

Posted by Eno Thereska <en...@gmail.com>.
Hi Elias,

- out of order records: the timestamp is that of the out of order record, i.e., time goes backwards sometimes
- joins: the same, the timestamp could be that of either record.

We'll update the docs, thanks for your question.

Eno

> On 17 Sep 2016, at 00:43, Elias Levy <fe...@gmail.com> wrote:
> 
> On Sat, Sep 10, 2016 at 9:17 AM, Eno Thereska <en...@gmail.com>
> wrote:
> 
>> 
>> For aggregations, the timestamp will be that of the latest record being
>> aggregated.
>> 
> 
> How does that account for out of order records?
> 
> What about kstream-kstream joins?  The output from the join could be
> triggered by a record received from either stream depending on the order
> they are received and processed.  If the timestamp of the output is just
> the timestamp of the latest received record, then it seems that the
> timestamp could be that of either record.  Although I suppose that the best
> effort stream synchronization effort that Kafka Streams attempts means that
> usually the timestamp will be that of the later record.


Re: Time of derived records in Kafka Streams

Posted by Elias Levy <fe...@gmail.com>.
On Sat, Sep 10, 2016 at 9:17 AM, Eno Thereska <en...@gmail.com>
wrote:

>
> For aggregations, the timestamp will be that of the latest record being
> aggregated.
>

How does that account for out of order records?

What about kstream-kstream joins?  The output from the join could be
triggered by a record received from either stream depending on the order
they are received and processed.  If the timestamp of the output is just
the timestamp of the latest received record, then it seems that the
timestamp could be that of either record.  Although I suppose that the best
effort stream synchronization effort that Kafka Streams attempts means that
usually the timestamp will be that of the later record.

Re: Time of derived records in Kafka Streams

Posted by Eno Thereska <en...@gmail.com>.
Hi Elias,

Good question. The general answer is that each time a record is output, the timestamp is that of the current Kafka Streams task that processes it, so it's the internal Kafka Streams time. If the Kafka Streams task is processing records with event time, the timestamp at any point is the smallest among its input stream partition timestamps (see ProcessorContext.java, the timestamp() definition). 

This might sound complicated, but some examples should help: for output by stateless transformations the timestamp will be that of the record being transformed. For aggregations, the timestamp will be that of the latest record being aggregated.

Cheers
Eno


> On 10 Sep 2016, at 00:19, Elias Levy <fe...@gmail.com> wrote:
> 
> The Kafka Streams documentation discussed how to assign timestamps to
> records received from source topic via TimestampExtractor.  But neither the
> Kafka nor the Confluent documentation on Kafka Streams explain what
> timestamp is associated with a record that has been transformed.
> 
> What timestamp is associated with records that are output by stateless
> transformations like map or flatMap?
> 
> What timestamp is associated with records that are outputted by stageful
> transformations like aggregations or joins?
> 
> What about transformations on windows?
> 
> What timestamp does the Kafka publisher use, if any, when writing to an
> intermediate topic via through() or a sink via to()?