You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jeff Klukas <jk...@simple.com> on 2016/06/08 14:56:26 UTC

Handling of nulls in KTable groupBy

I have a seemingly simple case where I want to join two KTables to produce
a new table with a different key, but I am getting NPEs. My understanding
is that to change the key of a KTable, I need to do a groupBy and a reduce.

What I believe is going on is that the inner join operation is emitting
nulls in the case that no matching record is found in one of the source
KTables. The groupBy operation then receives null inputs that it's not
expecting.

Here is the snippet of code where I define the join and the groupBy:

customerIdToAccountIdLookup.join(customerIdToUserIdLookup,
                (Integer accountId, String userId) -> {
                    return new KeyValue<>(accountId, userId);
                })
                .groupBy((Integer customerId, KeyValue<Integer, String> kv)
-> {
                    return kv;
                }, Serdes.Integer(), Serdes.String())

This produces the following exception:

! java.lang.NullPointerException: null
! at
org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$KTableMapProcessor.process(KTableRepartitionMap.java:88)

Am I approaching this incorrectly, or is there a bug going on? Should a
KTable-KTable inner join be emitting records when no match is found?