You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sachin Mittal <sj...@gmail.com> on 2019/12/07 04:20:25 UTC

What timestamp is used by streams when doing windowed joins

Hi,
I have noticed some issues when doing stream to stream windowed joins.
Looks like my joined stream does not include all the records.

Say I am doing join like this:
stream1.join(
            stream2,
            (lv, rv) -> ...,
            JoinWindows.of(Duration.ofMinutes(5)),
           ....)
What I have checked from the docs is that it will join 2 records within the
specified window.
However its not clear as what time it would take for each record?
Would it be
1.  event-time or
2. processing-time or
3. ingestion-time

I am right now using default configuration for
log.message.timestamp.type = CreateTime and default.timestamp.extractor

From the docs I gather is that in default case it uses event-time.
So does it mean that there has to be a timestamp field in the record which
is to be extracted by custom timestamp extractor?

Also in downstream when streams application actually writes (produces) new
record types, do we need to provide timestamp extractor for all such record
types
so the next process in the pipeline can pick up the timestamp to do the
windowed operations?

Also when and how processing time is used at all by streams application?

Finally say I don't want to worry about if timestamp is set by the
producers, is it better to simply set
log.message.timestamp.type =  LogAppendTime

Thanks
Sachin

Re: What timestamp is used by streams when doing windowed joins

Posted by John Roesler <vv...@apache.org>.
Ah,

I didn’t remember that the docs defined the terms that way. Those definitions make sense to me. 

Yes, if your topics are configured with LogAppendTime, then when we poll records from the topic, the timestamp that comes back attached to the record would be the log append time. If you’re using the default TimestampExtractor, then that’s the timestamp that Streams would use for the record.

And, yes, your description of JoinWindows seems correct. 

-John

On Sat, Dec 7, 2019, at 01:19, Sachin Mittal wrote:
> Hi John,
> If I check https://docs.confluent.io/current/streams/concepts.html#time
> It has three notions of time => *event-time*, *processing-time*,
> *ingestion-time* .
> 
> If I check
> https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-timestamp-extractor
> It says that under default case:
> if log.message.timestamp.type is set to CreateTime then * event-time* is
> used.
> if log.message.timestamp.type is set to LogAppendTime then 
> *ingestion-time *is
> used.
> 
> However what you are saying is that under Steams DSL we always use
> event-time which can either be CreateTime or LogAppendTime.
> 
> Both of the statements makes sense to me but looks like they are sightly
> different on how they relate the times.
> One basically says
> * event-time* <=> CreateTime
> *ingestion-time *<=>  LogAppendTime
> Where as other
> event-time =  CreateTime or  LogAppendTime (depending on your broker/topic
> config).
> 
> Yes setting  log.message.timestamp.type to LogAppendTime seems to be doing
> window join better.
> 
> So what I understand than from JoinWindows.of(Duration.ofMinutes(5))is that
> when joining the two records it checks if their LogAppendTime are within 5
> minutes then they would get joined.
> Please let me know if I got this part right?
> 
> Thanks
> Sachin
> 
> 
> 
> 
> On Sat, Dec 7, 2019 at 10:43 AM John Roesler <vv...@apache.org> wrote:
> 
> > Hi Sachin,
> >
> > I'd need more information to speculate about why your records are missing,
> > but it sounds like you're suspecting something to do with the records'
> > timestamps, so I'll just focus on answering your questions.
> >
> > Streams always uses the same timestamp for all operations, which is the
> > timestamp returned by the timestamp extractor. Whether this is event time
> > or ingestion time is up to the timestamp extractor you're using.
> >
> > If you're using the default timestamp extractor, then Streams will use the
> > timestamp field on the ConsumerRecord that comes back from the broker. If
> > you're using CreateTime, then it would hold the value of the timestamp
> > written by the producer. If you're using LogAppendTime, then it's the
> > timestamp representing when the broker actually adds the record to the
> > topic.
> >
> > One potential point of confusion is that when we say a "record", we mean
> > more than just the key and value that you typically manipulate using the
> > Streams DSL. In addition to these fields, there is a separate timestamp
> > field, which is part of the Consumer/Producer/Broker protocols. That's what
> > we use for time tracking, so you do not need to worry about embedding and
> > extracting the timestamp in your values.
> >
> > Streams will set the timestamp field on outgoing ProducerRecords it sends
> > to the broker, so this would just be used by default for further stages in
> > the pipeline. You don't need to add timestamp extractors further on.
> >
> > The only usage of processing time (aka "wall-clock time") is in wall-clock
> > based punctuation, if you're using the low-level Processor API. Also, the
> > commit interval is defined in terms of wall-clock time. If all you're
> > considering is the semantics of the Streams DSL, processing/wall-clock time
> > would not play any part in those semantics.
> >
> > I know that stream processing literature in general discusses event- vs.
> > processing- vs. ingestion-time quite a bit, but for practical purposes,
> > event time (either CreateTime or LogAppendTime) is the one that's useful
> > for writing programs. Both ingestion time and processing time lead to
> > non-deterministic programs with unclear semantics. That's why we pretty
> > much stick to event time in the Streams DSL.
> >
> > Finally, yeah, if you just want to process records in the same order they
> > appear in the topics, then LogAppendTime might be better.
> >
> > I hope this helps clear things up a bit.
> >
> > Thanks,
> > -John
> >
> > On Fri, Dec 6, 2019, at 22:20, Sachin Mittal wrote:
> > > Hi,
> > > I have noticed some issues when doing stream to stream windowed joins.
> > > Looks like my joined stream does not include all the records.
> > >
> > > Say I am doing join like this:
> > > stream1.join(
> > >             stream2,
> > >             (lv, rv) -> ...,
> > >             JoinWindows.of(Duration.ofMinutes(5)),
> > >            ....)
> > > What I have checked from the docs is that it will join 2 records within
> > the
> > > specified window.
> > > However its not clear as what time it would take for each record?
> > > Would it be
> > > 1.  event-time or
> > > 2. processing-time or
> > > 3. ingestion-time
> > >
> > > I am right now using default configuration for
> > > log.message.timestamp.type = CreateTime and default.timestamp.extractor
> > >
> > > From the docs I gather is that in default case it uses event-time.
> > > So does it mean that there has to be a timestamp field in the record
> > which
> > > is to be extracted by custom timestamp extractor?
> > >
> > > Also in downstream when streams application actually writes (produces)
> > new
> > > record types, do we need to provide timestamp extractor for all such
> > record
> > > types
> > > so the next process in the pipeline can pick up the timestamp to do the
> > > windowed operations?
> > >
> > > Also when and how processing time is used at all by streams application?
> > >
> > > Finally say I don't want to worry about if timestamp is set by the
> > > producers, is it better to simply set
> > > log.message.timestamp.type =  LogAppendTime
> > >
> > > Thanks
> > > Sachin
> > >
> >
>

Re: What timestamp is used by streams when doing windowed joins

Posted by Sachin Mittal <sj...@gmail.com>.
Hi John,
If I check https://docs.confluent.io/current/streams/concepts.html#time
It has three notions of time => *event-time*, *processing-time*,
*ingestion-time* .

If I check
https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-timestamp-extractor
It says that under default case:
if log.message.timestamp.type is set to CreateTime then * event-time* is
used.
if log.message.timestamp.type is set to LogAppendTime then *ingestion-time *is
used.

However what you are saying is that under Steams DSL we always use
event-time which can either be CreateTime or LogAppendTime.

Both of the statements makes sense to me but looks like they are sightly
different on how they relate the times.
One basically says
* event-time* <=> CreateTime
*ingestion-time *<=>  LogAppendTime
Where as other
event-time =  CreateTime or  LogAppendTime (depending on your broker/topic
config).

Yes setting  log.message.timestamp.type to LogAppendTime seems to be doing
window join better.

So what I understand than from JoinWindows.of(Duration.ofMinutes(5))is that
when joining the two records it checks if their LogAppendTime are within 5
minutes then they would get joined.
Please let me know if I got this part right?

Thanks
Sachin




On Sat, Dec 7, 2019 at 10:43 AM John Roesler <vv...@apache.org> wrote:

> Hi Sachin,
>
> I'd need more information to speculate about why your records are missing,
> but it sounds like you're suspecting something to do with the records'
> timestamps, so I'll just focus on answering your questions.
>
> Streams always uses the same timestamp for all operations, which is the
> timestamp returned by the timestamp extractor. Whether this is event time
> or ingestion time is up to the timestamp extractor you're using.
>
> If you're using the default timestamp extractor, then Streams will use the
> timestamp field on the ConsumerRecord that comes back from the broker. If
> you're using CreateTime, then it would hold the value of the timestamp
> written by the producer. If you're using LogAppendTime, then it's the
> timestamp representing when the broker actually adds the record to the
> topic.
>
> One potential point of confusion is that when we say a "record", we mean
> more than just the key and value that you typically manipulate using the
> Streams DSL. In addition to these fields, there is a separate timestamp
> field, which is part of the Consumer/Producer/Broker protocols. That's what
> we use for time tracking, so you do not need to worry about embedding and
> extracting the timestamp in your values.
>
> Streams will set the timestamp field on outgoing ProducerRecords it sends
> to the broker, so this would just be used by default for further stages in
> the pipeline. You don't need to add timestamp extractors further on.
>
> The only usage of processing time (aka "wall-clock time") is in wall-clock
> based punctuation, if you're using the low-level Processor API. Also, the
> commit interval is defined in terms of wall-clock time. If all you're
> considering is the semantics of the Streams DSL, processing/wall-clock time
> would not play any part in those semantics.
>
> I know that stream processing literature in general discusses event- vs.
> processing- vs. ingestion-time quite a bit, but for practical purposes,
> event time (either CreateTime or LogAppendTime) is the one that's useful
> for writing programs. Both ingestion time and processing time lead to
> non-deterministic programs with unclear semantics. That's why we pretty
> much stick to event time in the Streams DSL.
>
> Finally, yeah, if you just want to process records in the same order they
> appear in the topics, then LogAppendTime might be better.
>
> I hope this helps clear things up a bit.
>
> Thanks,
> -John
>
> On Fri, Dec 6, 2019, at 22:20, Sachin Mittal wrote:
> > Hi,
> > I have noticed some issues when doing stream to stream windowed joins.
> > Looks like my joined stream does not include all the records.
> >
> > Say I am doing join like this:
> > stream1.join(
> >             stream2,
> >             (lv, rv) -> ...,
> >             JoinWindows.of(Duration.ofMinutes(5)),
> >            ....)
> > What I have checked from the docs is that it will join 2 records within
> the
> > specified window.
> > However its not clear as what time it would take for each record?
> > Would it be
> > 1.  event-time or
> > 2. processing-time or
> > 3. ingestion-time
> >
> > I am right now using default configuration for
> > log.message.timestamp.type = CreateTime and default.timestamp.extractor
> >
> > From the docs I gather is that in default case it uses event-time.
> > So does it mean that there has to be a timestamp field in the record
> which
> > is to be extracted by custom timestamp extractor?
> >
> > Also in downstream when streams application actually writes (produces)
> new
> > record types, do we need to provide timestamp extractor for all such
> record
> > types
> > so the next process in the pipeline can pick up the timestamp to do the
> > windowed operations?
> >
> > Also when and how processing time is used at all by streams application?
> >
> > Finally say I don't want to worry about if timestamp is set by the
> > producers, is it better to simply set
> > log.message.timestamp.type =  LogAppendTime
> >
> > Thanks
> > Sachin
> >
>

Re: What timestamp is used by streams when doing windowed joins

Posted by John Roesler <vv...@apache.org>.
Hi Sachin,

I'd need more information to speculate about why your records are missing, but it sounds like you're suspecting something to do with the records' timestamps, so I'll just focus on answering your questions.

Streams always uses the same timestamp for all operations, which is the timestamp returned by the timestamp extractor. Whether this is event time or ingestion time is up to the timestamp extractor you're using.

If you're using the default timestamp extractor, then Streams will use the timestamp field on the ConsumerRecord that comes back from the broker. If you're using CreateTime, then it would hold the value of the timestamp written by the producer. If you're using LogAppendTime, then it's the timestamp representing when the broker actually adds the record to the topic.

One potential point of confusion is that when we say a "record", we mean more than just the key and value that you typically manipulate using the Streams DSL. In addition to these fields, there is a separate timestamp field, which is part of the Consumer/Producer/Broker protocols. That's what we use for time tracking, so you do not need to worry about embedding and extracting the timestamp in your values.

Streams will set the timestamp field on outgoing ProducerRecords it sends to the broker, so this would just be used by default for further stages in the pipeline. You don't need to add timestamp extractors further on.

The only usage of processing time (aka "wall-clock time") is in wall-clock based punctuation, if you're using the low-level Processor API. Also, the commit interval is defined in terms of wall-clock time. If all you're considering is the semantics of the Streams DSL, processing/wall-clock time would not play any part in those semantics.

I know that stream processing literature in general discusses event- vs. processing- vs. ingestion-time quite a bit, but for practical purposes, event time (either CreateTime or LogAppendTime) is the one that's useful for writing programs. Both ingestion time and processing time lead to non-deterministic programs with unclear semantics. That's why we pretty much stick to event time in the Streams DSL.

Finally, yeah, if you just want to process records in the same order they appear in the topics, then LogAppendTime might be better. 

I hope this helps clear things up a bit.

Thanks,
-John

On Fri, Dec 6, 2019, at 22:20, Sachin Mittal wrote:
> Hi,
> I have noticed some issues when doing stream to stream windowed joins.
> Looks like my joined stream does not include all the records.
> 
> Say I am doing join like this:
> stream1.join(
>             stream2,
>             (lv, rv) -> ...,
>             JoinWindows.of(Duration.ofMinutes(5)),
>            ....)
> What I have checked from the docs is that it will join 2 records within the
> specified window.
> However its not clear as what time it would take for each record?
> Would it be
> 1.  event-time or
> 2. processing-time or
> 3. ingestion-time
> 
> I am right now using default configuration for
> log.message.timestamp.type = CreateTime and default.timestamp.extractor
> 
> From the docs I gather is that in default case it uses event-time.
> So does it mean that there has to be a timestamp field in the record which
> is to be extracted by custom timestamp extractor?
> 
> Also in downstream when streams application actually writes (produces) new
> record types, do we need to provide timestamp extractor for all such record
> types
> so the next process in the pipeline can pick up the timestamp to do the
> windowed operations?
> 
> Also when and how processing time is used at all by streams application?
> 
> Finally say I don't want to worry about if timestamp is set by the
> producers, is it better to simply set
> log.message.timestamp.type =  LogAppendTime
> 
> Thanks
> Sachin
>