You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Bartłomiej Kępa <ba...@me.com.INVALID> on 2019/09/08 12:24:04 UTC

ValueJoiner producing called with nulls

Hi All,
Since some time I’m involved in development of application that incorporates Kafka Streams API, I’m facing the problem with joining two Kafka topics. The problem is illustrated in simple test that ws prepared based on our production code. It is available here: https://bitbucket.org/b_a_r_t_k/streams-join-problem/
As seen in the class JoinStreamBuilder:

val builder = StreamsBuilder()

val reducedLookupStoreName = "reduced_$lookupTableTopicName$streamId"

val streamToJoin = builder.stream(mainTopicName, Consumed.with(Serdes.String(), genericAvroSerde))
        .selectKey(MainKeySelector())

val lookupTable = builder.stream(lookupTableTopicName, Consumed.with(Serdes.String(), genericAvroSerde))
        .selectKey(LookupKeySelector())
        .groupByKey(Serialized.with(Serdes.String(), genericAvroSerde))
        .reduce({ _, new -> new },
                Materialized.`as`<String, GenericRecord, KeyValueStore<Bytes, ByteArray>>(reducedLookupStoreName).withKeySerde(Serdes.String()).withValueSerde(genericAvroSerde))

streamToJoin
        .leftJoin(lookupTable, Joiner(streamId), Joined.with(Serdes.String(), genericAvroSerde, genericAvroSerde))
        .to(targetTopicName, Produced.with(Serdes.String(), genericAvroSerde))
val topology = builder.build()

It is simple kind of lookup table to stream join. The Joiner implementation looks as follows 

class Joiner(private val streamId: Int) : ValueJoiner<GenericRecord, GenericRecord, GenericRecord> {
    override fun apply(main: GenericRecord?, lookup: GenericRecord?): GenericRecord {
        if (main == null) LOG.warn("for streamId: $streamId record from main is null")
        if (lookup == null) LOG.warn("for streamId: $streamId record from lookup is null")

        return GenericData.Record(MySampleData.schema)
                .apply {
                    put(MySampleData::stringField.name, main?.get(MySampleData::stringField.name))
                    put(MySampleData::booleanField.name, main?.get(MySampleData::booleanField.name))
                    put(MySampleData::intField.name, lookup?.get(MySampleData::intField.name))
                }
    }
}

The problem is that sometimes in not deterministic way Joiner’s apply() method gets null for lookup parameter, while in some cases the parameter is not null - as expected.
The repo I referred above contains a test that is supposed to use that topology. It iterates 10 times building new instance of the topology each time and then it feeds two topics with sample data (10 records for each topic) expecting 1 to 1 join will be performed for each records pair. 
As seen in log output:
2019-09-08 13:49:09,634 [main] INFO  com.example.demo.JoinStreamTest [tenantId=]  - Number of not properly joined per iteration (iteration number -> number of errors): {0=1, 1=1, 2=1, 3=1, 4=0, 5=1, 6=1, 7=1, 8=1, 9=0}. Total errors: 8 

Some of of the iteration produce no errors, while most of them does.

Any help welcome. At this point I have no clue what may clause such behaviour.
Best regards
BK