You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Murad Mamedov <ma...@muradm.net> on 2017/04/26 12:37:03 UTC

Time synchronization between streams

Hi,

Suppose that we have two topics, one with each event size 100 bytes and the
other each event size 5000 bytes. Producers are producing events (with
timestamps) for 3 days in both topics, same amount of events, let assume
100000 events in each topic.

Kafka Client API consumers will obviously consume both topics in different
amount of time.

Now after 3 days application starts consuming these topics as High-level
API KStream.

If we need to join these two streams on time based, let's say 15 minutes
window, how it would behave then?

Obviously, stream with smaller events will be consumed faster than stream
with larger event size. Stream100 could be reading 1 day ago data, while
stream5000 would still be reading 3 days ago events.

Is there any global time synchronization between streams in Kafka Streams
API? So that, it would not consume more events from one stream while the
other is still behind in time. Or probably better to rephrase it like, is
there global event ordering based on timestamp of event?

The other thing could be to join streams in window, however same question
arises, if one stream days behind the other, will the join window of 15
minutes ever work?

I'm trying to grasp a way on how to design replay of long periods of time
for application with multiple topics/streams. Especially when combining
with low-level API processors and transformers which relay on each other
via GlobalKTable or KTable stores on these streams. For instance, smaller
topic could have the following sequence of events:

T1 - (k1, v1)
T1 + 10 minutes - (k1, null)
T1 + 20 minutes - (k1, v2)

While topic with larger events:

T1 - (k1, vt1)
T1 + 5 minutes - (k1, null)
T1 + 15 minutes - (k1, vt2)

If one would join or lookup these streams in realtime (timestamp of event
is approximately = wall clock time) result would be:

T1 - topic_small (k1, v1) - topic_large (k1, vt1)
T1 + 5 minutes - topic_small (k1, v1) - topic_large (k1, null)
T1 + 10 minutes - topic_small (k1, null) - topic_large (k1, null)
T1 + 15 minutes - topic_small (k1, null) - topic_large (k1, vt2)
T1 + 20 minutes - topic_small (k1, v2) - topic_large (k1, vt2)

However, when replaying streams from beginning, from perspective of topic
with large events, it would see topic with small events as (k1, v2),
completely missing v1 and null states in case of GlobalKTable/KTable
presentation or events in case of KStream-KStream windowed join.

Do I miss something here? Should application be responsible in global
synchronization between topics, or Kafka Streams does / can do that? If
application should, then what could be approach to solve it?

I hope I could explain myself.

Thanks in advance

Re: Re: Time synchronization between streams

Posted by Murad Mamedov <ma...@muradm.net>.
Hi,

Thanks for clarifying, so Kafka Streams Task attempts to synchronize
timestamps of topics on best effort.

Then Kafka Clients API users have to maintain similar functionality in
order to preserve such behaviour.

Does it mean that, if we have two streams having same number of events,
assuming there is no outside influence on business logic, applying same
business logic from offset 0 at both streams, will always produce same
result?

Thanks

On 27 Apr 2017 14:29, "Damian Guy" <da...@gmail.com> wrote:

> As i said in my previous email:
> > Yes. When streams are joined each partition from the joined streams are
> > grouped together into a single Task. Each Task maintains a record buffer
> > for all of the topics it is consuming from. When it is time process a
> > record it will chose a record from the partition that has the smallest
> > timestamp. So in this way it makes a best effort to keep the streams in
> > sync.
>
> So if you are joining two topics, they are both starting from offset 0 with
> no local state, they will be consumed at roughly the same rate and kept in
> sync based on the time extracted from the records.
>
>
> On Thu, 27 Apr 2017 at 08:02 Murad Mamedov <ma...@muradm.net> wrote:
>
> > Hi Devs,
> >
> > May be you can shed some light..
> >
> > Thanks in advance
> > ---------- Forwarded message ----------
> > From: "mail@muradm.net" <ma...@muradm.net>
> > Date: 26 Apr 2017 19:53
> > Subject: Re: Time synchronization between streams
> > To: <us...@kafka.apache.org>
> > Cc:
> >
> > Yes, basically I'm ok with how join works including window and retention
> > > periods, under normal circumstances. In real time of occurrence of
> > events,
> > > application joining streams will get something like this:
> > >
> > > T1 + 0 => topic_small (K1, V1)  => join result (None)
> > > T1 + 1 min =>  topic_large (K1, VT1) => join result (K1, V1, VT1)
> > > T1 + 3 mins => topic_large (K1, VT2) => join result (K1, V1, VT2)
> > > T1 + 7 mins => topic_small (K1, V2) => join result (K1, V2, VT2)
> > >
> > > According to Windowed<K> and WindowedSerializer it keeps only start of
> > > window with key when storing it to state store. Assuming that window
> > start
> > > time same for both topics/KStreams (not sure yet, still reading
> source),
> > > but even if not same, state stores actions of Kafka Streams will be
> like
> > > this:
> > >
> > > join_left_side_store.put ( K1-W1, V1 )
> > > join_right_side_store.put ( K1-W1, VT1 )
> > > join_left_side_store.put ( K1-W1, V2 )
> > > join_right_side_store.put ( K1-W1, VT2 )
> > >
> > > However when consuming same topics by the same application from
> beginning
> > > from scratch (no application local state stores) for large period of
> time
> > > (greater than window period, but less than retention period), join
> result
> > > for 10 minutes window will be different, like this:
> > >
> > > join result (None)
> > > join result (K1, V2, VT1)
> > > join result (K1, V2, VT2)
> > >
> > > Because topic_large's stream is being read slower, value of topic_small
> > in
> > > window will change from V1 to V2, before Kafka Streams will receive
> VT1.
> > >
> > > I.e. state stores actions of Kafka Streams will be like this:
> > >
> > > join_left_side_store.put ( K1-W1, V1 )
> > > join_left_side_store.put ( K1-W1, V2 )
> > > join_right_side_store.put ( K1-W1, VT1 )
> > > join_right_side_store.put ( K1-W1, VT2 )
> > >
> > > Isn't it?
> > >
> > > On Wed, Apr 26, 2017 at 6:50 PM, Damian Guy <da...@gmail.com>
> > wrote:
> > >
> > > Hi Murad, On Wed, 26 Apr 2017 at 13:37 Murad Mamedov <ma...@muradm.net>
> > > wrote:
> > >
> > > Is there any global time synchronization between streams in Kafka
> Streams
> > > API? So that, it would not consume more events from one stream while
> the
> > > other is still behind in time. Or probably better to rephrase it like,
> is
> > > there global event ordering based on timestamp of event?
> > >
> > > Yes. When streams are joined each partition from the joined streams are
> > > grouped together into a single Task. Each Task maintains a record
> buffer
> > > for all of the topics it is consuming from. When it is time process a
> > > record it will chose a record from the partition that has the smallest
> > > timestamp. So in this way it makes a best effort to keep the streams in
> > > sync.
> > >
> > > The other thing could be to join streams in window, however same
> question
> > > arises, if one stream days behind the other, will the join window of 15
> > > minutes ever work?
> > >
> > > If the data is arriving much later you can use
> > JoinWindows.until(SOME_TIME_PERIOD)
> > > to keep the data around. In this case the streams will still join. Once
> > > SOME_TIME_PERIOD has expired the streams will no longer be able to
> join.
> > >
> > > I'm trying to grasp a way on how to design replay of long periods of
> time
> > > for application with multiple topics/streams. Especially when combining
> > > with low-level API processors and transformers which relay on each
> other
> > > via GlobalKTable or KTable stores on these streams. For instance,
> smaller
> > > topic could have the following sequence of events: T1 - (k1, v1) T1 +
> 10
> > > minutes - (k1, null) T1 + 20 minutes - (k1, v2) While topic with larger
> > > events: T1 - (k1, vt1) T1 + 5 minutes - (k1, null) T1 + 15 minutes -
> (k1,
> > > vt2) If one would join or lookup these streams in realtime (timestamp
> of
> > > event is approximately = wall clock time) result would be: T1 -
> > topic_small
> > > (k1, v1) - topic_large (k1, vt1) T1 + 5 minutes - topic_small (k1, v1)
> -
> > > topic_large (k1, null) T1 + 10 minutes - topic_small (k1, null) -
> > > topic_large (k1, null) T1 + 15 minutes - topic_small (k1, null) -
> > > topic_large (k1, vt2) T1 + 20 minutes - topic_small (k1, v2) -
> > topic_large
> > > (k1, vt2) However, when replaying streams from beginning, from
> > perspective
> > > of topic with large events, it would see topic with small events as
> (k1,
> > > v2), completely missing v1 and null states in case of
> GlobalKTable/KTable
> > > presentation or events in case of KStream-KStream windowed join.
> > >
> > > I don't really follow here. In the case of a GlobalKTable it will be
> > > initialized with all of the existing data before the rest of the
> streams
> > > start processing.
> > >
> > > Do I miss something here? Should application be responsible in global
> > > synchronization between topics, or Kafka Streams does / can do that? If
> > > application should, then what could be approach to solve it? I hope I
> > could
> > > explain myself. Thanks in advance
> > >
> > >
> >
>

Re: Re: Time synchronization between streams

Posted by Damian Guy <da...@gmail.com>.
As i said in my previous email:
> Yes. When streams are joined each partition from the joined streams are
> grouped together into a single Task. Each Task maintains a record buffer
> for all of the topics it is consuming from. When it is time process a
> record it will chose a record from the partition that has the smallest
> timestamp. So in this way it makes a best effort to keep the streams in
> sync.

So if you are joining two topics, they are both starting from offset 0 with
no local state, they will be consumed at roughly the same rate and kept in
sync based on the time extracted from the records.


On Thu, 27 Apr 2017 at 08:02 Murad Mamedov <ma...@muradm.net> wrote:

> Hi Devs,
>
> May be you can shed some light..
>
> Thanks in advance
> ---------- Forwarded message ----------
> From: "mail@muradm.net" <ma...@muradm.net>
> Date: 26 Apr 2017 19:53
> Subject: Re: Time synchronization between streams
> To: <us...@kafka.apache.org>
> Cc:
>
> Yes, basically I'm ok with how join works including window and retention
> > periods, under normal circumstances. In real time of occurrence of
> events,
> > application joining streams will get something like this:
> >
> > T1 + 0 => topic_small (K1, V1)  => join result (None)
> > T1 + 1 min =>  topic_large (K1, VT1) => join result (K1, V1, VT1)
> > T1 + 3 mins => topic_large (K1, VT2) => join result (K1, V1, VT2)
> > T1 + 7 mins => topic_small (K1, V2) => join result (K1, V2, VT2)
> >
> > According to Windowed<K> and WindowedSerializer it keeps only start of
> > window with key when storing it to state store. Assuming that window
> start
> > time same for both topics/KStreams (not sure yet, still reading source),
> > but even if not same, state stores actions of Kafka Streams will be like
> > this:
> >
> > join_left_side_store.put ( K1-W1, V1 )
> > join_right_side_store.put ( K1-W1, VT1 )
> > join_left_side_store.put ( K1-W1, V2 )
> > join_right_side_store.put ( K1-W1, VT2 )
> >
> > However when consuming same topics by the same application from beginning
> > from scratch (no application local state stores) for large period of time
> > (greater than window period, but less than retention period), join result
> > for 10 minutes window will be different, like this:
> >
> > join result (None)
> > join result (K1, V2, VT1)
> > join result (K1, V2, VT2)
> >
> > Because topic_large's stream is being read slower, value of topic_small
> in
> > window will change from V1 to V2, before Kafka Streams will receive VT1.
> >
> > I.e. state stores actions of Kafka Streams will be like this:
> >
> > join_left_side_store.put ( K1-W1, V1 )
> > join_left_side_store.put ( K1-W1, V2 )
> > join_right_side_store.put ( K1-W1, VT1 )
> > join_right_side_store.put ( K1-W1, VT2 )
> >
> > Isn't it?
> >
> > On Wed, Apr 26, 2017 at 6:50 PM, Damian Guy <da...@gmail.com>
> wrote:
> >
> > Hi Murad, On Wed, 26 Apr 2017 at 13:37 Murad Mamedov <ma...@muradm.net>
> > wrote:
> >
> > Is there any global time synchronization between streams in Kafka Streams
> > API? So that, it would not consume more events from one stream while the
> > other is still behind in time. Or probably better to rephrase it like, is
> > there global event ordering based on timestamp of event?
> >
> > Yes. When streams are joined each partition from the joined streams are
> > grouped together into a single Task. Each Task maintains a record buffer
> > for all of the topics it is consuming from. When it is time process a
> > record it will chose a record from the partition that has the smallest
> > timestamp. So in this way it makes a best effort to keep the streams in
> > sync.
> >
> > The other thing could be to join streams in window, however same question
> > arises, if one stream days behind the other, will the join window of 15
> > minutes ever work?
> >
> > If the data is arriving much later you can use
> JoinWindows.until(SOME_TIME_PERIOD)
> > to keep the data around. In this case the streams will still join. Once
> > SOME_TIME_PERIOD has expired the streams will no longer be able to join.
> >
> > I'm trying to grasp a way on how to design replay of long periods of time
> > for application with multiple topics/streams. Especially when combining
> > with low-level API processors and transformers which relay on each other
> > via GlobalKTable or KTable stores on these streams. For instance, smaller
> > topic could have the following sequence of events: T1 - (k1, v1) T1 + 10
> > minutes - (k1, null) T1 + 20 minutes - (k1, v2) While topic with larger
> > events: T1 - (k1, vt1) T1 + 5 minutes - (k1, null) T1 + 15 minutes - (k1,
> > vt2) If one would join or lookup these streams in realtime (timestamp of
> > event is approximately = wall clock time) result would be: T1 -
> topic_small
> > (k1, v1) - topic_large (k1, vt1) T1 + 5 minutes - topic_small (k1, v1) -
> > topic_large (k1, null) T1 + 10 minutes - topic_small (k1, null) -
> > topic_large (k1, null) T1 + 15 minutes - topic_small (k1, null) -
> > topic_large (k1, vt2) T1 + 20 minutes - topic_small (k1, v2) -
> topic_large
> > (k1, vt2) However, when replaying streams from beginning, from
> perspective
> > of topic with large events, it would see topic with small events as (k1,
> > v2), completely missing v1 and null states in case of GlobalKTable/KTable
> > presentation or events in case of KStream-KStream windowed join.
> >
> > I don't really follow here. In the case of a GlobalKTable it will be
> > initialized with all of the existing data before the rest of the streams
> > start processing.
> >
> > Do I miss something here? Should application be responsible in global
> > synchronization between topics, or Kafka Streams does / can do that? If
> > application should, then what could be approach to solve it? I hope I
> could
> > explain myself. Thanks in advance
> >
> >
>

Fwd: Re: Time synchronization between streams

Posted by Murad Mamedov <ma...@muradm.net>.
Hi Devs,

May be you can shed some light..

Thanks in advance
---------- Forwarded message ----------
From: "mail@muradm.net" <ma...@muradm.net>
Date: 26 Apr 2017 19:53
Subject: Re: Time synchronization between streams
To: <us...@kafka.apache.org>
Cc:

Yes, basically I'm ok with how join works including window and retention
> periods, under normal circumstances. In real time of occurrence of events,
> application joining streams will get something like this:
>
> T1 + 0 => topic_small (K1, V1)  => join result (None)
> T1 + 1 min =>  topic_large (K1, VT1) => join result (K1, V1, VT1)
> T1 + 3 mins => topic_large (K1, VT2) => join result (K1, V1, VT2)
> T1 + 7 mins => topic_small (K1, V2) => join result (K1, V2, VT2)
>
> According to Windowed<K> and WindowedSerializer it keeps only start of
> window with key when storing it to state store. Assuming that window start
> time same for both topics/KStreams (not sure yet, still reading source),
> but even if not same, state stores actions of Kafka Streams will be like
> this:
>
> join_left_side_store.put ( K1-W1, V1 )
> join_right_side_store.put ( K1-W1, VT1 )
> join_left_side_store.put ( K1-W1, V2 )
> join_right_side_store.put ( K1-W1, VT2 )
>
> However when consuming same topics by the same application from beginning
> from scratch (no application local state stores) for large period of time
> (greater than window period, but less than retention period), join result
> for 10 minutes window will be different, like this:
>
> join result (None)
> join result (K1, V2, VT1)
> join result (K1, V2, VT2)
>
> Because topic_large's stream is being read slower, value of topic_small in
> window will change from V1 to V2, before Kafka Streams will receive VT1.
>
> I.e. state stores actions of Kafka Streams will be like this:
>
> join_left_side_store.put ( K1-W1, V1 )
> join_left_side_store.put ( K1-W1, V2 )
> join_right_side_store.put ( K1-W1, VT1 )
> join_right_side_store.put ( K1-W1, VT2 )
>
> Isn't it?
>
> On Wed, Apr 26, 2017 at 6:50 PM, Damian Guy <da...@gmail.com> wrote:
>
> Hi Murad, On Wed, 26 Apr 2017 at 13:37 Murad Mamedov <ma...@muradm.net>
> wrote:
>
> Is there any global time synchronization between streams in Kafka Streams
> API? So that, it would not consume more events from one stream while the
> other is still behind in time. Or probably better to rephrase it like, is
> there global event ordering based on timestamp of event?
>
> Yes. When streams are joined each partition from the joined streams are
> grouped together into a single Task. Each Task maintains a record buffer
> for all of the topics it is consuming from. When it is time process a
> record it will chose a record from the partition that has the smallest
> timestamp. So in this way it makes a best effort to keep the streams in
> sync.
>
> The other thing could be to join streams in window, however same question
> arises, if one stream days behind the other, will the join window of 15
> minutes ever work?
>
> If the data is arriving much later you can use JoinWindows.until(SOME_TIME_PERIOD)
> to keep the data around. In this case the streams will still join. Once
> SOME_TIME_PERIOD has expired the streams will no longer be able to join.
>
> I'm trying to grasp a way on how to design replay of long periods of time
> for application with multiple topics/streams. Especially when combining
> with low-level API processors and transformers which relay on each other
> via GlobalKTable or KTable stores on these streams. For instance, smaller
> topic could have the following sequence of events: T1 - (k1, v1) T1 + 10
> minutes - (k1, null) T1 + 20 minutes - (k1, v2) While topic with larger
> events: T1 - (k1, vt1) T1 + 5 minutes - (k1, null) T1 + 15 minutes - (k1,
> vt2) If one would join or lookup these streams in realtime (timestamp of
> event is approximately = wall clock time) result would be: T1 - topic_small
> (k1, v1) - topic_large (k1, vt1) T1 + 5 minutes - topic_small (k1, v1) -
> topic_large (k1, null) T1 + 10 minutes - topic_small (k1, null) -
> topic_large (k1, null) T1 + 15 minutes - topic_small (k1, null) -
> topic_large (k1, vt2) T1 + 20 minutes - topic_small (k1, v2) - topic_large
> (k1, vt2) However, when replaying streams from beginning, from perspective
> of topic with large events, it would see topic with small events as (k1,
> v2), completely missing v1 and null states in case of GlobalKTable/KTable
> presentation or events in case of KStream-KStream windowed join.
>
> I don't really follow here. In the case of a GlobalKTable it will be
> initialized with all of the existing data before the rest of the streams
> start processing.
>
> Do I miss something here? Should application be responsible in global
> synchronization between topics, or Kafka Streams does / can do that? If
> application should, then what could be approach to solve it? I hope I could
> explain myself. Thanks in advance
>
>

Re: Time synchronization between streams

Posted by "mail@muradm.net" <ma...@muradm.net>.
Yes, basically I'm ok with how join works including window and 
retention periods, under normal circumstances. In real time of 
occurrence of events, application joining streams will get something 
like this:

T1 + 0 => topic_small (K1, V1)  => join result (None)
T1 + 1 min =>  topic_large (K1, VT1) => join result (K1, V1, VT1)
T1 + 3 mins => topic_large (K1, VT2) => join result (K1, V1, VT2)
T1 + 7 mins => topic_small (K1, V2) => join result (K1, V2, VT2)

According to Windowed<K> and WindowedSerializer it keeps only start of 
window with key when storing it to state store. Assuming that window 
start time same for both topics/KStreams (not sure yet, still reading 
source), but even if not same, state stores actions of Kafka Streams 
will be like this:

join_left_side_store.put ( K1-W1, V1 )
join_right_side_store.put ( K1-W1, VT1 )
join_left_side_store.put ( K1-W1, V2 )
join_right_side_store.put ( K1-W1, VT2 )

However when consuming same topics by the same application from 
beginning from scratch (no application local state stores) for large 
period of time (greater than window period, but less than retention 
period), join result for 10 minutes window will be different, like this:

join result (None)
join result (K1, V2, VT1)
join result (K1, V2, VT2)

Because topic_large's stream is being read slower, value of topic_small 
in window will change from V1 to V2, before Kafka Streams will receive 
VT1.

I.e. state stores actions of Kafka Streams will be like this:

join_left_side_store.put ( K1-W1, V1 )
join_left_side_store.put ( K1-W1, V2 )
join_right_side_store.put ( K1-W1, VT1 )
join_right_side_store.put ( K1-W1, VT2 )

Isn't it?

On Wed, Apr 26, 2017 at 6:50 PM, Damian Guy <da...@gmail.com> 
wrote:
> Hi Murad,
> 
> On Wed, 26 Apr 2017 at 13:37 Murad Mamedov <ma...@muradm.net> wrote:
> 
>>  Is there any global time synchronization between streams in Kafka 
>> Streams
>>  API? So that, it would not consume more events from one stream 
>> while the
>>  other is still behind in time. Or probably better to rephrase it 
>> like, is
>>  there global event ordering based on timestamp of event?
>> 
> 
> Yes. When streams are joined each partition from the joined streams 
> are
> grouped together into a single Task. Each Task maintains a record 
> buffer
> for all of the topics it is consuming from. When it is time process a
> record it will chose a record from the partition that has the smallest
> timestamp. So in this way it makes a best effort to keep the streams 
> in
> sync.
> 
> 
>> 
>>  The other thing could be to join streams in window, however same 
>> question
>>  arises, if one stream days behind the other, will the join window 
>> of 15
>>  minutes ever work?
>> 
>> 
> If the data is arriving much later you can use
> JoinWindows.until(SOME_TIME_PERIOD) to keep the data around. In this 
> case
> the streams will still join. Once SOME_TIME_PERIOD has expired the 
> streams
> will no longer be able to join.
> 
> 
> 
>>  I'm trying to grasp a way on how to design replay of long periods 
>> of time
>>  for application with multiple topics/streams. Especially when 
>> combining
>>  with low-level API processors and transformers which relay on each 
>> other
>>  via GlobalKTable or KTable stores on these streams. For instance, 
>> smaller
>>  topic could have the following sequence of events:
>> 
>>  T1 - (k1, v1)
>>  T1 + 10 minutes - (k1, null)
>>  T1 + 20 minutes - (k1, v2)
>> 
>>  While topic with larger events:
>> 
>>  T1 - (k1, vt1)
>>  T1 + 5 minutes - (k1, null)
>>  T1 + 15 minutes - (k1, vt2)
>> 
>>  If one would join or lookup these streams in realtime (timestamp of 
>> event
>>  is approximately = wall clock time) result would be:
>> 
>>  T1 - topic_small (k1, v1) - topic_large (k1, vt1)
>>  T1 + 5 minutes - topic_small (k1, v1) - topic_large (k1, null)
>>  T1 + 10 minutes - topic_small (k1, null) - topic_large (k1, null)
>>  T1 + 15 minutes - topic_small (k1, null) - topic_large (k1, vt2)
>>  T1 + 20 minutes - topic_small (k1, v2) - topic_large (k1, vt2)
>> 
>>  However, when replaying streams from beginning, from perspective of 
>> topic
>>  with large events, it would see topic with small events as (k1, v2),
>>  completely missing v1 and null states in case of GlobalKTable/KTable
>>  presentation or events in case of KStream-KStream windowed join.
>> 
>> 
> I don't really follow here. In the case of a GlobalKTable it will be
> initialized with all of the existing data before the rest of the 
> streams
> start processing.
> 
> 
>>  Do I miss something here? Should application be responsible in 
>> global
>>  synchronization between topics, or Kafka Streams does / can do 
>> that? If
>>  application should, then what could be approach to solve it?
>> 
>>  I hope I could explain myself.
>> 
>>  Thanks in advance
>> 

Re: Time synchronization between streams

Posted by Damian Guy <da...@gmail.com>.
Hi Murad,

On Wed, 26 Apr 2017 at 13:37 Murad Mamedov <ma...@muradm.net> wrote:

> Is there any global time synchronization between streams in Kafka Streams
> API? So that, it would not consume more events from one stream while the
> other is still behind in time. Or probably better to rephrase it like, is
> there global event ordering based on timestamp of event?
>

Yes. When streams are joined each partition from the joined streams are
grouped together into a single Task. Each Task maintains a record buffer
for all of the topics it is consuming from. When it is time process a
record it will chose a record from the partition that has the smallest
timestamp. So in this way it makes a best effort to keep the streams in
sync.


>
> The other thing could be to join streams in window, however same question
> arises, if one stream days behind the other, will the join window of 15
> minutes ever work?
>
>
If the data is arriving much later you can use
JoinWindows.until(SOME_TIME_PERIOD) to keep the data around. In this case
the streams will still join. Once SOME_TIME_PERIOD has expired the streams
will no longer be able to join.


> I'm trying to grasp a way on how to design replay of long periods of time
> for application with multiple topics/streams. Especially when combining
> with low-level API processors and transformers which relay on each other
> via GlobalKTable or KTable stores on these streams. For instance, smaller
> topic could have the following sequence of events:
>
> T1 - (k1, v1)
> T1 + 10 minutes - (k1, null)
> T1 + 20 minutes - (k1, v2)
>
> While topic with larger events:
>
> T1 - (k1, vt1)
> T1 + 5 minutes - (k1, null)
> T1 + 15 minutes - (k1, vt2)
>
> If one would join or lookup these streams in realtime (timestamp of event
> is approximately = wall clock time) result would be:
>
> T1 - topic_small (k1, v1) - topic_large (k1, vt1)
> T1 + 5 minutes - topic_small (k1, v1) - topic_large (k1, null)
> T1 + 10 minutes - topic_small (k1, null) - topic_large (k1, null)
> T1 + 15 minutes - topic_small (k1, null) - topic_large (k1, vt2)
> T1 + 20 minutes - topic_small (k1, v2) - topic_large (k1, vt2)
>
> However, when replaying streams from beginning, from perspective of topic
> with large events, it would see topic with small events as (k1, v2),
> completely missing v1 and null states in case of GlobalKTable/KTable
> presentation or events in case of KStream-KStream windowed join.
>
>
I don't really follow here. In the case of a GlobalKTable it will be
initialized with all of the existing data before the rest of the streams
start processing.


> Do I miss something here? Should application be responsible in global
> synchronization between topics, or Kafka Streams does / can do that? If
> application should, then what could be approach to solve it?
>
> I hope I could explain myself.
>
> Thanks in advance
>