You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by mishadoff <mi...@gmail.com> on 2016/06/29 16:02:05 UTC

[Kafka Streams] Source Nodes

Hey,

I am trying to understand kafka-streams and doing a simple prototype for joining KStream with KTable, but stuck at the error:

Invalid topology building: KSTREAM-MAP-0000000001 and KSTREAM-AGGREGATE-0000000004 are not joinable

I tracked down the issue is thrown where stream or table have set sourceNodes to null, and confirmed both of them are null in my application.

Interesting, when I initially read kafka topic into KStream, sourceNodes are not null, but later after doing simpel map operation (to exclude unneded fields an apply conversion) source nodes are deleted.

Could someone clarify what sourceNodes are needed for and why they erased after map?

Thanks!

— Misha

Re: [Kafka Streams] Source Nodes

Posted by mishadoff <mi...@gmail.com>.
Thanks Matthias!

Got it working with through, still use 0.10.0.0 version.

> On Jun 29, 2016, at 22:42, Matthias J. Sax <ma...@confluent.io> wrote:
> 
> Hi,
> 
> for joins, data of both inputs must be co-located, ie, partitioned on
> the same key and have the same number of partitions:
> 
> See "Note" box at:
> http://docs.confluent.io/3.0.0/streams/developer-guide.html?highlight=join#joining-streams
> 
> From an older email thread about the same issue:
> 
>>>> If you change the key, your partitioning changes, ie, is not valid anymore.
>>>> Thus, the joins (which assumes co-located data) cannot be performed
>>>> (this is the reason why sources get set to null). You can write to an
>>>> intermediate topic via .through(...) to get a valid partitioning:
>>>> 
>>>> KStream dataStream = builder.stream(...).map(...).through(...);
>>>> 
> 
> Btw: this problem got fixed already; if you use current trunk version
> you do not need the additional call to through(...)
> 
> -Matthias
> 
> 
> On 06/29/2016 06:02 PM, mishadoff wrote:
>> Hey,
>> 
>> I am trying to understand kafka-streams and doing a simple prototype for joining KStream with KTable, but stuck at the error:
>> 
>> Invalid topology building: KSTREAM-MAP-0000000001 and KSTREAM-AGGREGATE-0000000004 are not joinable
>> 
>> I tracked down the issue is thrown where stream or table have set sourceNodes to null, and confirmed both of them are null in my application.
>> 
>> Interesting, when I initially read kafka topic into KStream, sourceNodes are not null, but later after doing simpel map operation (to exclude unneded fields an apply conversion) source nodes are deleted.
>> 
>> Could someone clarify what sourceNodes are needed for and why they erased after map?
>> 
>> Thanks!
>> 
>> — Misha
>> 
> 


Re: [Kafka Streams] Source Nodes

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hi,

for joins, data of both inputs must be co-located, ie, partitioned on
the same key and have the same number of partitions:

See "Note" box at:
http://docs.confluent.io/3.0.0/streams/developer-guide.html?highlight=join#joining-streams

From an older email thread about the same issue:

>>> If you change the key, your partitioning changes, ie, is not valid anymore.
>>> Thus, the joins (which assumes co-located data) cannot be performed
>>> (this is the reason why sources get set to null). You can write to an
>>> intermediate topic via .through(...) to get a valid partitioning:
>>>
>>> KStream dataStream = builder.stream(...).map(...).through(...);
>>>

Btw: this problem got fixed already; if you use current trunk version
you do not need the additional call to through(...)

-Matthias


On 06/29/2016 06:02 PM, mishadoff wrote:
> Hey,
> 
> I am trying to understand kafka-streams and doing a simple prototype for joining KStream with KTable, but stuck at the error:
> 
> Invalid topology building: KSTREAM-MAP-0000000001 and KSTREAM-AGGREGATE-0000000004 are not joinable
> 
> I tracked down the issue is thrown where stream or table have set sourceNodes to null, and confirmed both of them are null in my application.
> 
> Interesting, when I initially read kafka topic into KStream, sourceNodes are not null, but later after doing simpel map operation (to exclude unneded fields an apply conversion) source nodes are deleted.
> 
> Could someone clarify what sourceNodes are needed for and why they erased after map?
> 
> Thanks!
> 
> — Misha
>