You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ivan Ilichev <iv...@gmail.com> on 2016/12/01 12:42:14 UTC

Kafka Streams question - KStream.leftJoin(KTable)

Hi Guys,

I am implementing a stream processor where I aggregate a stream of events
by their keys into a KTable tableA and then I am “enriching” another
streamB by the values of tableA.

So essentially I have this:

streamC = streamB
  .selectKey(..)
  .leftJoin(tableA);

This works great however in add to also need to produce a stream of records
from streamB which are the inverse, in other words records which failed the
join (key was null for them). This is similar to what the “branch” API does
for filtering on multiple predicates. So when the leftJoin fails I need to
do something else with the result - potentially another enrichment by join.

Is this something that can be accomplished by Kafka Streams DSL directly or
do I need to implement my processor which does this branching?

In this case - I would have to query the state store directly which should
not be a problem. However - would that not be a problem in terms of
partitioning of the state store (for tableA) and the selectKey operation on
streamB. In other words - if two streams use the same partitioning on the
same key, their partitions should be visible to the same instances, correct?

Using Kafka Streams 0.10.1.0 here.

Regards,
-Ivan

Re: Kafka Streams question - KStream.leftJoin(KTable)

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hi Ivan,

If I understand you correct, the issue with the leftJoin is that your
stream does contain records with key==null and thus those records get
dropped?

What about this:

streamBB = streamB.selectKey(..);
streamC = streamB.leftJoin(tableA);
streamBNull = streamB.filter((k,v) -> k == null);

Thus streamBNull contains all the record that will drop out and not be
contained in streamC.

Does this help?


-Matthias



On 12/1/16 4:42 AM, Ivan Ilichev wrote:
> Hi Guys,
> 
> I am implementing a stream processor where I aggregate a stream of events
> by their keys into a KTable tableA and then I am “enriching” another
> streamB by the values of tableA.
> 
> So essentially I have this:
> 
> streamC = streamB
>   .selectKey(..)
>   .leftJoin(tableA);
> 
> This works great however in add to also need to produce a stream of records
> from streamB which are the inverse, in other words records which failed the
> join (key was null for them). This is similar to what the “branch” API does
> for filtering on multiple predicates. So when the leftJoin fails I need to
> do something else with the result - potentially another enrichment by join.
> 
> Is this something that can be accomplished by Kafka Streams DSL directly or
> do I need to implement my processor which does this branching?
> 
> In this case - I would have to query the state store directly which should
> not be a problem. However - would that not be a problem in terms of
> partitioning of the state store (for tableA) and the selectKey operation on
> streamB. In other words - if two streams use the same partitioning on the
> same key, their partitions should be visible to the same instances, correct?
> 
> Using Kafka Streams 0.10.1.0 here.
> 
> Regards,
> -Ivan
>