You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jon Yeargers <jo...@cedexis.com> on 2017/01/30 19:05:30 UTC

Understanding output of KTable->KTable join

I want to do a one:many join between two streams. There should be ~ 1:100
with < 1% having no match.

My topology is relatively simple:

KTable1.join(KTable2)->to("other topic")
               \
                \---> toStream().print()

In the join it takes both Value1 and Value2 as JSON, converts them back to
Java Objects and combines them. This is returned as the JSON representation
of a new Object.

If either value was NULL or unable to convert back to its source Object an
exception would be thrown.

The output sent to the debugger looks like this for many thousands of rows

[KTABLE-TOSTREAM-0000000009]: 20bebc12136be4226b29c5d1b6183d8ed2b117c5 ,
null
[KTABLE-TOSTREAM-0000000009]: c6f038b5182b8a2409a5eeee2be71f171d54e3b4 ,
null
[KTABLE-TOSTREAM-0000000009]: f4b0aa0516c37c2725ce409cc5766df9a942950f ,
null
[KTABLE-TOSTREAM-0000000009]: e7d8912ac1b660d21d1dd94955386fb9561abbab ,
null

Then I will get many more that are matched.

Questions:

1. Im assuming the ",null" indicates no match was found. This is a problem.
The source of the data is well understood and is < 1% unmatched. If either
object is null it throws an exception - which is doesn't.
2. Is this the appropriate way to do a one:many join?

Re: Understanding output of KTable->KTable join

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Yes.

See my answer below: it highlights the difference between joining two
KTables (only primary key join) and KStream-KTable join (even if I did
mention KStream-GlobalKTable join for this case).

However, KStream-KTable joins are not symmetric, because updates to
KTable do not trigger a join computation. Only arriving KStream records
can trigger a result. Thus, it's not exactly the same thing as a
one-to-many join, even if it has similar properties.

I don't want to say it will not work for you -- just be aware of the
provided semantics.


-Matthias

On 1/31/17 5:12 AM, Jon Yeargers wrote:
> If I KStream.leftJoin(Ktable) this article (
> https://www.confluent.io/blog/distributed-real-time-joins-and-aggregations-on-user-activity-events-using-kafka-streams/)
> seems to suggest that I could have one:many.   (ktable:kstream)
> 
> Accurate?
> 
> On Mon, Jan 30, 2017 at 4:35 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> If you join two KTables, one-to-many join is currently not supported
>> (only one-to-one, ie, primary key join).
>>
>> In upcoming 0.10.2 there will be global-KTables that allow something
>> similar to one-to many joins -- however, only for KStream-GlobalKTable
>> joins, so not sure if this can help you.
>>
>> About <key:null>: yes, it indicates that there was no join computed,
>> because no matching key was found. Cf.
>> https://cwiki.apache.org/confluence/display/KAFKA/
>> Kafka+Streams+Join+Semantics
>>
>> Not sure what your keys are, the output you shared is hard to read...
>> (eg., 20bebc12136be4226b29c5d1b6183d8ed2b117c5)
>>
>>
>> We might add one-to-many KTable-GlobalKTable joins in 0.10.3 though. For
>> now, you would need to build a custom Processor and implement the join
>> by yourself.
>>
>> There is another JIRA for foreign-key join feature (unrelated to
>> GlobalKTable): https://issues.apache.org/jira/browse/KAFKA-3705
>>
>> Maybe the discussion helps you do implement you own join.
>>
>>
>> -Matthias
>>
>> On 1/30/17 11:05 AM, Jon Yeargers wrote:
>>> I want to do a one:many join between two streams. There should be ~ 1:100
>>> with < 1% having no match.
>>>
>>> My topology is relatively simple:
>>>
>>> KTable1.join(KTable2)->to("other topic")
>>>                \
>>>                 \---> toStream().print()
>>>
>>> In the join it takes both Value1 and Value2 as JSON, converts them back
>> to
>>> Java Objects and combines them. This is returned as the JSON
>> representation
>>> of a new Object.
>>>
>>> If either value was NULL or unable to convert back to its source Object
>> an
>>> exception would be thrown.
>>>
>>> The output sent to the debugger looks like this for many thousands of
>> rows
>>>
>>> [KTABLE-TOSTREAM-0000000009]: 20bebc12136be4226b29c5d1b6183d8ed2b117c5 ,
>>> null
>>> [KTABLE-TOSTREAM-0000000009]: c6f038b5182b8a2409a5eeee2be71f171d54e3b4 ,
>>> null
>>> [KTABLE-TOSTREAM-0000000009]: f4b0aa0516c37c2725ce409cc5766df9a942950f ,
>>> null
>>> [KTABLE-TOSTREAM-0000000009]: e7d8912ac1b660d21d1dd94955386fb9561abbab ,
>>> null
>>>
>>> Then I will get many more that are matched.
>>>
>>> Questions:
>>>
>>> 1. Im assuming the ",null" indicates no match was found. This is a
>> problem.
>>> The source of the data is well understood and is < 1% unmatched. If
>> either
>>> object is null it throws an exception - which is doesn't.
>>> 2. Is this the appropriate way to do a one:many join?
>>>
>>
>>
> 


Re: Understanding output of KTable->KTable join

Posted by Jon Yeargers <jo...@cedexis.com>.
If I KStream.leftJoin(Ktable) this article (
https://www.confluent.io/blog/distributed-real-time-joins-and-aggregations-on-user-activity-events-using-kafka-streams/)
seems to suggest that I could have one:many.   (ktable:kstream)

Accurate?

On Mon, Jan 30, 2017 at 4:35 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> If you join two KTables, one-to-many join is currently not supported
> (only one-to-one, ie, primary key join).
>
> In upcoming 0.10.2 there will be global-KTables that allow something
> similar to one-to many joins -- however, only for KStream-GlobalKTable
> joins, so not sure if this can help you.
>
> About <key:null>: yes, it indicates that there was no join computed,
> because no matching key was found. Cf.
> https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Streams+Join+Semantics
>
> Not sure what your keys are, the output you shared is hard to read...
> (eg., 20bebc12136be4226b29c5d1b6183d8ed2b117c5)
>
>
> We might add one-to-many KTable-GlobalKTable joins in 0.10.3 though. For
> now, you would need to build a custom Processor and implement the join
> by yourself.
>
> There is another JIRA for foreign-key join feature (unrelated to
> GlobalKTable): https://issues.apache.org/jira/browse/KAFKA-3705
>
> Maybe the discussion helps you do implement you own join.
>
>
> -Matthias
>
> On 1/30/17 11:05 AM, Jon Yeargers wrote:
> > I want to do a one:many join between two streams. There should be ~ 1:100
> > with < 1% having no match.
> >
> > My topology is relatively simple:
> >
> > KTable1.join(KTable2)->to("other topic")
> >                \
> >                 \---> toStream().print()
> >
> > In the join it takes both Value1 and Value2 as JSON, converts them back
> to
> > Java Objects and combines them. This is returned as the JSON
> representation
> > of a new Object.
> >
> > If either value was NULL or unable to convert back to its source Object
> an
> > exception would be thrown.
> >
> > The output sent to the debugger looks like this for many thousands of
> rows
> >
> > [KTABLE-TOSTREAM-0000000009]: 20bebc12136be4226b29c5d1b6183d8ed2b117c5 ,
> > null
> > [KTABLE-TOSTREAM-0000000009]: c6f038b5182b8a2409a5eeee2be71f171d54e3b4 ,
> > null
> > [KTABLE-TOSTREAM-0000000009]: f4b0aa0516c37c2725ce409cc5766df9a942950f ,
> > null
> > [KTABLE-TOSTREAM-0000000009]: e7d8912ac1b660d21d1dd94955386fb9561abbab ,
> > null
> >
> > Then I will get many more that are matched.
> >
> > Questions:
> >
> > 1. Im assuming the ",null" indicates no match was found. This is a
> problem.
> > The source of the data is well understood and is < 1% unmatched. If
> either
> > object is null it throws an exception - which is doesn't.
> > 2. Is this the appropriate way to do a one:many join?
> >
>
>

Re: Understanding output of KTable->KTable join

Posted by "Matthias J. Sax" <ma...@confluent.io>.
If you join two KTables, one-to-many join is currently not supported
(only one-to-one, ie, primary key join).

In upcoming 0.10.2 there will be global-KTables that allow something
similar to one-to many joins -- however, only for KStream-GlobalKTable
joins, so not sure if this can help you.

About <key:null>: yes, it indicates that there was no join computed,
because no matching key was found. Cf.
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics

Not sure what your keys are, the output you shared is hard to read...
(eg., 20bebc12136be4226b29c5d1b6183d8ed2b117c5)


We might add one-to-many KTable-GlobalKTable joins in 0.10.3 though. For
now, you would need to build a custom Processor and implement the join
by yourself.

There is another JIRA for foreign-key join feature (unrelated to
GlobalKTable): https://issues.apache.org/jira/browse/KAFKA-3705

Maybe the discussion helps you do implement you own join.


-Matthias

On 1/30/17 11:05 AM, Jon Yeargers wrote:
> I want to do a one:many join between two streams. There should be ~ 1:100
> with < 1% having no match.
> 
> My topology is relatively simple:
> 
> KTable1.join(KTable2)->to("other topic")
>                \
>                 \---> toStream().print()
> 
> In the join it takes both Value1 and Value2 as JSON, converts them back to
> Java Objects and combines them. This is returned as the JSON representation
> of a new Object.
> 
> If either value was NULL or unable to convert back to its source Object an
> exception would be thrown.
> 
> The output sent to the debugger looks like this for many thousands of rows
> 
> [KTABLE-TOSTREAM-0000000009]: 20bebc12136be4226b29c5d1b6183d8ed2b117c5 ,
> null
> [KTABLE-TOSTREAM-0000000009]: c6f038b5182b8a2409a5eeee2be71f171d54e3b4 ,
> null
> [KTABLE-TOSTREAM-0000000009]: f4b0aa0516c37c2725ce409cc5766df9a942950f ,
> null
> [KTABLE-TOSTREAM-0000000009]: e7d8912ac1b660d21d1dd94955386fb9561abbab ,
> null
> 
> Then I will get many more that are matched.
> 
> Questions:
> 
> 1. Im assuming the ",null" indicates no match was found. This is a problem.
> The source of the data is well understood and is < 1% unmatched. If either
> object is null it throws an exception - which is doesn't.
> 2. Is this the appropriate way to do a one:many join?
>