You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sachin Mittal <sj...@gmail.com> on 2016/10/09 07:19:30 UTC

Understanding how joins work in Kafka streams

Hi,
I needed some light on how joins actually work on continuous stream of data.

Say I have 2 topics which I need to join (left join).
Data record in each topic is aggregated like (key, value) <=> (string, list)

Topic 1
key1: [A01, A02, A03, A04 ..]
Key2: [A11, A12, A13, A14 ..]
....

Topic 2
key1: [B01, B02, B03, B04 ..]
Key2: [B11, B12, B13, B14 ..]
....

Joined topic
Key1: [A01, B01...]
Key2: [A11, B11 ...]

Now let us say I get 2 records [Key1: A05] & [Key1: B05]
So as per aggregation they are appended to the Topic 1 and Topic 2.

I assume this will again call the join operation and the records would get
appended to Key1 data? Let me know if my understanding is correct here.

If I am reading the joined topic using foreach will I again get record for
key1 with new appended data in the original list so now my record is
Key1: [A01, B01..., A05, B05 ... ]

What I wanted to ask was in case of reading each record from a topic, if
the value against that key is modified will it be read again (if it was
read before also)?
Or the record is read only once via that stream program?

Please let me know how such a scenario works.

Thanks
Sachin

Re: Understanding how joins work in Kafka streams

Posted by Eno Thereska <en...@gmail.com>.
Hi Sachin,

Yes it will be called each time a key is modified, it will do this continuously until you stop the app.

Eno
> On 9 Oct 2016, at 16:50, Sachin Mittal <sj...@gmail.com> wrote:
> 
> Hi,
> It is actually a KTable-KTable join.
> 
> I have a stream (K1, A) which is aggregated as (Key, List<A>) hence it
> creates a KTable.
> I have another stream (K2, B) which is aggregated as (Key, List<B>) hence
> it creates another KTable.
> 
> Then I have
> KTable (Key, List<A>).leftJoin(  KTable(Key, List<B>),   ValueJoiner {
>     new List<AB> = merge (List<A>, List<B>)
>    return (Key, new List<AB>)
> }).to("new-result-topic)
> 
> So what I understand is that every time ValueJoiner is called, it picks the
> latest modified List<A> and List<B> for a key and merges them and then
> updates the new-result-topic with new modified list for same key.
> 
> So then when I do KStream("new-result-topic).forEach((key, List<AB>)
>    //this callback is called multiple times for same key kx and each time
> it contains new modified List<AB> (as and when it gets modified by above
> process)
> });
> 
> So please let me know if my understanding is correct. I suppose it will be
> called every time a key is modified or it buffers the changes and calls it
> once in a given time span.
> 
> Thanks
> Sachin
> 
> 
> 
> On Sun, Oct 9, 2016 at 3:07 PM, Eno Thereska <en...@gmail.com> wrote:
> 
>> Hi Sachin,
>> 
>> Some comments inline:
>> 
>>> On 9 Oct 2016, at 08:19, Sachin Mittal <sj...@gmail.com> wrote:
>>> 
>>> Hi,
>>> I needed some light on how joins actually work on continuous stream of
>> data.
>>> 
>>> Say I have 2 topics which I need to join (left join).
>>> Data record in each topic is aggregated like (key, value) <=> (string,
>> list)
>>> 
>>> Topic 1
>>> key1: [A01, A02, A03, A04 ..]
>>> Key2: [A11, A12, A13, A14 ..]
>>> ....
>>> 
>>> Topic 2
>>> key1: [B01, B02, B03, B04 ..]
>>> Key2: [B11, B12, B13, B14 ..]
>>> ....
>>> 
>>> Joined topic
>>> Key1: [A01, B01...]
>>> Key2: [A11, B11 ...]
>>> 
>>> Now let us say I get 2 records [Key1: A05] & [Key1: B05]
>>> So as per aggregation they are appended to the Topic 1 and Topic 2.
>>> 
>>> I assume this will again call the join operation and the records would
>> get
>>> appended to Key1 data? Let me know if my understanding is correct here.
>> 
>> Yes, that is correct. The join operation is continuously called each time
>> there are new records consumed from the topic. The consuming happens
>> continuously too.
>> 
>>> 
>>> If I am reading the joined topic using foreach will I again get record
>> for
>>> key1 with new appended data in the original list so now my record is
>>> Key1: [A01, B01..., A05, B05 ... ]
>> 
>> Correct.
>> 
>> 
>>> 
>>> What I wanted to ask was in case of reading each record from a topic, if
>>> the value against that key is modified will it be read again (if it was
>>> read before also)?
>>> Or the record is read only once via that stream program?
>> 
>> So this depends on how the value for a key is modified. I'm assuming a new
>> record with the new value is produced to the topic. There will be two broad
>> options here:
>> 
>> - if you are doing a KSTream-KStream join, the "time" when the new value
>> is updated will matter (these kinds of joins are done with a time boundary,
>> e.g., join everything within a time difference of 10 minutes). E.g., say
>> the join result so far is Key1: [A01, B01..., A05, B05 ... ]. If the value
>> for Key1 is now [B06] then the output will depend on the time of the join.
>> - if you are doing a KStream-KTable join, it depends on whether the value
>> change happens on the KStream or KTable.
>> 
>> Before going further, could you clarify if you'll have a KStream-KStream
>> join or a KStream-KTable join?
>> 
>> Thanks
>> Eno
>> 
>>> 
>>> Please let me know how such a scenario works.
>>> 
>>> Thanks
>>> Sachin
>> 
>> 


Re: Understanding how joins work in Kafka streams

Posted by Sachin Mittal <sj...@gmail.com>.
Hi,
It is actually a KTable-KTable join.

I have a stream (K1, A) which is aggregated as (Key, List<A>) hence it
creates a KTable.
I have another stream (K2, B) which is aggregated as (Key, List<B>) hence
it creates another KTable.

Then I have
KTable (Key, List<A>).leftJoin(  KTable(Key, List<B>),   ValueJoiner {
     new List<AB> = merge (List<A>, List<B>)
    return (Key, new List<AB>)
}).to("new-result-topic)

So what I understand is that every time ValueJoiner is called, it picks the
latest modified List<A> and List<B> for a key and merges them and then
updates the new-result-topic with new modified list for same key.

So then when I do KStream("new-result-topic).forEach((key, List<AB>)
    //this callback is called multiple times for same key kx and each time
it contains new modified List<AB> (as and when it gets modified by above
process)
});

So please let me know if my understanding is correct. I suppose it will be
called every time a key is modified or it buffers the changes and calls it
once in a given time span.

Thanks
Sachin



On Sun, Oct 9, 2016 at 3:07 PM, Eno Thereska <en...@gmail.com> wrote:

> Hi Sachin,
>
> Some comments inline:
>
> > On 9 Oct 2016, at 08:19, Sachin Mittal <sj...@gmail.com> wrote:
> >
> > Hi,
> > I needed some light on how joins actually work on continuous stream of
> data.
> >
> > Say I have 2 topics which I need to join (left join).
> > Data record in each topic is aggregated like (key, value) <=> (string,
> list)
> >
> > Topic 1
> > key1: [A01, A02, A03, A04 ..]
> > Key2: [A11, A12, A13, A14 ..]
> > ....
> >
> > Topic 2
> > key1: [B01, B02, B03, B04 ..]
> > Key2: [B11, B12, B13, B14 ..]
> > ....
> >
> > Joined topic
> > Key1: [A01, B01...]
> > Key2: [A11, B11 ...]
> >
> > Now let us say I get 2 records [Key1: A05] & [Key1: B05]
> > So as per aggregation they are appended to the Topic 1 and Topic 2.
> >
> > I assume this will again call the join operation and the records would
> get
> > appended to Key1 data? Let me know if my understanding is correct here.
>
> Yes, that is correct. The join operation is continuously called each time
> there are new records consumed from the topic. The consuming happens
> continuously too.
>
> >
> > If I am reading the joined topic using foreach will I again get record
> for
> > key1 with new appended data in the original list so now my record is
> > Key1: [A01, B01..., A05, B05 ... ]
>
> Correct.
>
>
> >
> > What I wanted to ask was in case of reading each record from a topic, if
> > the value against that key is modified will it be read again (if it was
> > read before also)?
> > Or the record is read only once via that stream program?
>
> So this depends on how the value for a key is modified. I'm assuming a new
> record with the new value is produced to the topic. There will be two broad
> options here:
>
> - if you are doing a KSTream-KStream join, the "time" when the new value
> is updated will matter (these kinds of joins are done with a time boundary,
> e.g., join everything within a time difference of 10 minutes). E.g., say
> the join result so far is Key1: [A01, B01..., A05, B05 ... ]. If the value
> for Key1 is now [B06] then the output will depend on the time of the join.
> - if you are doing a KStream-KTable join, it depends on whether the value
> change happens on the KStream or KTable.
>
> Before going further, could you clarify if you'll have a KStream-KStream
> join or a KStream-KTable join?
>
> Thanks
> Eno
>
> >
> > Please let me know how such a scenario works.
> >
> > Thanks
> > Sachin
>
>

Re: Understanding how joins work in Kafka streams

Posted by Eno Thereska <en...@gmail.com>.
Hi Sachin,

Some comments inline:

> On 9 Oct 2016, at 08:19, Sachin Mittal <sj...@gmail.com> wrote:
> 
> Hi,
> I needed some light on how joins actually work on continuous stream of data.
> 
> Say I have 2 topics which I need to join (left join).
> Data record in each topic is aggregated like (key, value) <=> (string, list)
> 
> Topic 1
> key1: [A01, A02, A03, A04 ..]
> Key2: [A11, A12, A13, A14 ..]
> ....
> 
> Topic 2
> key1: [B01, B02, B03, B04 ..]
> Key2: [B11, B12, B13, B14 ..]
> ....
> 
> Joined topic
> Key1: [A01, B01...]
> Key2: [A11, B11 ...]
> 
> Now let us say I get 2 records [Key1: A05] & [Key1: B05]
> So as per aggregation they are appended to the Topic 1 and Topic 2.
> 
> I assume this will again call the join operation and the records would get
> appended to Key1 data? Let me know if my understanding is correct here.

Yes, that is correct. The join operation is continuously called each time there are new records consumed from the topic. The consuming happens continuously too.

> 
> If I am reading the joined topic using foreach will I again get record for
> key1 with new appended data in the original list so now my record is
> Key1: [A01, B01..., A05, B05 ... ]

Correct.


> 
> What I wanted to ask was in case of reading each record from a topic, if
> the value against that key is modified will it be read again (if it was
> read before also)?
> Or the record is read only once via that stream program?

So this depends on how the value for a key is modified. I'm assuming a new record with the new value is produced to the topic. There will be two broad options here:

- if you are doing a KSTream-KStream join, the "time" when the new value is updated will matter (these kinds of joins are done with a time boundary, e.g., join everything within a time difference of 10 minutes). E.g., say the join result so far is Key1: [A01, B01..., A05, B05 ... ]. If the value for Key1 is now [B06] then the output will depend on the time of the join. 
- if you are doing a KStream-KTable join, it depends on whether the value change happens on the KStream or KTable. 

Before going further, could you clarify if you'll have a KStream-KStream join or a KStream-KTable join?

Thanks
Eno

> 
> Please let me know how such a scenario works.
> 
> Thanks
> Sachin