You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Bartłomiej Kępa <ba...@gmail.com> on 2019/09/08 12:28:04 UTC

ValueJoiner apply() called with nulls???

Hi All,
Since some time I’m involved in development of application that incorporates Kafka Streams API, I’m facing the problem with joining two Kafka topics. The problem is illustrated in simple test that ws prepared based on our production code. It is available here: https://bitbucket.org/b_a_r_t_k/streams-join-problem/ <https://bitbucket.org/b_a_r_t_k/streams-join-problem/>
As seen in the class JoinStreamBuilder:

val builder = StreamsBuilder()

val reducedLookupStoreName = "reduced_$lookupTableTopicName$streamId"

val streamToJoin = builder.stream(mainTopicName, Consumed.with(Serdes.String(), genericAvroSerde))
        .selectKey(MainKeySelector())

val lookupTable = builder.stream(lookupTableTopicName, Consumed.with(Serdes.String(), genericAvroSerde))
        .selectKey(LookupKeySelector())
        .groupByKey(Serialized.with(Serdes.String(), genericAvroSerde))
        .reduce({ _, new -> new },
                Materialized.`as`<String, GenericRecord, KeyValueStore<Bytes, ByteArray>>(reducedLookupStoreName).withKeySerde(Serdes.String()).withValueSerde(genericAvroSerde))

streamToJoin
        .leftJoin(lookupTable, Joiner(streamId), Joined.with(Serdes.String(), genericAvroSerde, genericAvroSerde))
        .to(targetTopicName, Produced.with(Serdes.String(), genericAvroSerde))
val topology = builder.build()

It is simple kind of lookup table to stream join. The Joiner implementation looks as follows 

class Joiner(private val streamId: Int) : ValueJoiner<GenericRecord, GenericRecord, GenericRecord> {
    override fun apply(main: GenericRecord?, lookup: GenericRecord?): GenericRecord {
        if (main == null) LOG.warn("for streamId: $streamId record from main is null")
        if (lookup == null) LOG.warn("for streamId: $streamId record from lookup is null")

        return GenericData.Record(MySampleData.schema)
                .apply {
                    put(MySampleData::stringField.name, main?.get(MySampleData::stringField.name))
                    put(MySampleData::booleanField.name, main?.get(MySampleData::booleanField.name))
                    put(MySampleData::intField.name, lookup?.get(MySampleData::intField.name))
                }
    }
}

The problem is that sometimes in not deterministic way Joiner’s apply() method gets null for lookup parameter, while in some cases the parameter is not null - as expected.
The repo I referred above contains a test that is supposed to use that topology. It iterates 10 times building new instance of the topology each time and then it feeds two topics with sample data (10 records for each topic) expecting 1 to 1 join will be performed for each records pair. 
As seen in log output:
2019-09-08 13:49:09,634 [main] INFO  com.example.demo.JoinStreamTest [tenantId=]  - Number of not properly joined per iteration (iteration number -> number of errors): {0=1, 1=1, 2=1, 3=1, 4=0, 5=1, 6=1, 7=1, 8=1, 9=0}. Total errors: 8 

Some of of the iteration produce no errors, while most of them does.

Any help welcome. At this point I have no clue what may clause such behaviour.
Best regards
BK

Re: ValueJoiner apply() called with nulls???

Posted by "Matthias J. Sax" <ma...@confluent.io>.
@Alex:

Yes, if the table state is small and static, using a GlobalKTable is a
good fit. However, I would assume that most use cases require event time
base join semantics (otherwise your application is non-deterministic)
and that the table state is large and must be sharded. Hence, I would
expect the usage of a GlobalKTable to be a fit for a limited amount use
cases.


-Mathtias



On 9/10/19 3:31 AM, Bartłomiej Kępa wrote:
> Hi all,
> I think I found the root cause of the problem. In the code I’ve provided I’m feeding both input topics with data without keys. I’m selecting keys based on the message contents actually in a step that is a part of declared topology. Having that, even if I make sure that input topics are populated upfront and lookup topic is populated before the stream data (as I believe Alex suggested), it does not guarantee that the same order will be applied in intermediate topics created internally by KafkaStream when it builds the topology.
> I’ve updated my GitHub example with the working topology and corresponding test which seems to prove what I’ve just described.
> Anyway thank you for all suggestions.
> Best regards,
> BK
> 
> 
>> Wiadomość napisana przez Alex Brekken <br...@gmail.com> w dniu 10.09.2019, o godz. 03:25:
>>
>> Matthias: if the data you're joining against is relatively small and
>> unchanging, would a global KTable be a reasonable choice since you get the
>> guarantee that it will get loaded first on startup?  Not sure if that's the
>> case for Bartlomiej, but just want to make sure that that kind of scenario
>> is a good fit for a global KTable.  (thanks for the info on the changes to
>> timestamp synchronization - I missed that)
>>
>> Alex
>>
>> On Mon, Sep 9, 2019 at 7:29 PM Matthias J. Sax <ma...@confluent.io>
>> wrote:
>>
>>> Using a GlobalKTable has many implication and I would not recommend it
>>> necessarily. It makes processing non-deterministic because there is
>>> synchronization between the main processing thread and the global-threads.
>>>
>>> Note that data is processed base in their timestamps. Hence, if you
>>> pre-populate both topics, you need to ensure that the data in the KTable
>>> topis has smaller timestamps than in the KStream topic -- that way, the
>>> KTable will be "loaded" before any KStream record will be processed.
>>>
>>> Note, that before 2.1 release, the timestamp synchronization was best
>>> effort only -- hence, you should use 2.1 or newer for Kafka Streams
>>> (broker version does not matter).
>>>
>>> Also consider `max.task.idle.ms` configuration parameter that can
>>> "block" one side from processing for some time in case you write the
>>> data after the application was started (ie, tradeoff between latency vs
>>> better ordering guarantees)
>>>
>>>
>>> -Matthias
>>>
>>> On 9/9/19 1:54 PM, Alex Brekken wrote:
>>>> Just to be clear, the timing issue I was referring to was with consuming
>>>> the data, not publishing.  In order for your join to work correctly (and
>>>> consistently), all the data in the lookupTable needs to be there BEFORE
>>> the
>>>> streamToJoin data starts processing right?   Your topology won't wait for
>>>> the lookupTable to get fully populated before processing data, which
>>> means
>>>> there might be cases where streamToJoin is trying to find a match in but
>>> it
>>>> isn't there yet because it hasn't fully consumed its topic.  This is why
>>> I
>>>> think using a GlobalKTable might solve your problem since it bootstraps
>>> the
>>>> global ktable first.  You might have already understood what I meant,
>>> just
>>>> making sure.  Good luck!
>>>>
>>>> Alex
>>>>
>>>> On Mon, Sep 9, 2019 at 5:27 AM Bartłomiej Kępa <
>>> bartlomiej.kepa@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Alex
>>>>> Thank you for your quick response. Unfortunately it seems that it is not
>>>>> timing issue. At least in following understanding. I modified the test
>>> is a
>>>>> way that it ensures that the messages were committed to both topics
>>> before
>>>>> I actually start the topology. Still no improvement with regards to
>>>>> expected result. Seems I need to investigate second option with
>>>>> GlobalKTable.
>>>>> Best regard,
>>>>> BK
>>>>>
>>>>>> Wiadomość napisana przez Alex Brekken <br...@gmail.com> w dniu
>>>>> 08.09.2019, o godz. 14:49:
>>>>>>
>>>>>> The non-deterministic behavior you're seeing might be the result of a
>>>>>> timing issue.  In other words, in some cases your KTable is fully
>>>>> populated
>>>>>> by the time data in "streamToJoin" is trying to find a match in
>>>>>> "lookupTable" and in other cases it isn't. If you haven't already, you
>>>>>> might want to take a look at using a GlobalKTable to see if that will
>>>>> work
>>>>>> for your use-case.  On startup, I believe Kafka Streams will wait until
>>>>> the
>>>>>> GlobalKTable has fully consumed the topic before data starts flowing.
>>>>>> There are downsides to  GlobalKTable's (check the documentation), but
>>> if
>>>>>> this is just a lookup table where the data is fairly static then it
>>> might
>>>>>> make sense.
>>>>>>
>>>>>> Alex
>>>>>>
>>>>>> On Sun, Sep 8, 2019 at 7:28 AM Bartłomiej Kępa <
>>>>> bartlomiej.kepa@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>> Since some time I’m involved in development of application that
>>>>>>> incorporates Kafka Streams API, I’m facing the problem with joining
>>> two
>>>>>>> Kafka topics. The problem is illustrated in simple test that ws
>>> prepared
>>>>>>> based on our production code. It is available here:
>>>>>>> https://bitbucket.org/b_a_r_t_k/streams-join-problem/ <
>>>>>>> https://bitbucket.org/b_a_r_t_k/streams-join-problem/>
>>>>>>> As seen in the class JoinStreamBuilder:
>>>>>>>
>>>>>>> val builder = StreamsBuilder()
>>>>>>>
>>>>>>> val reducedLookupStoreName = "reduced_$lookupTableTopicName$streamId"
>>>>>>>
>>>>>>> val streamToJoin = builder.stream(mainTopicName,
>>>>>>> Consumed.with(Serdes.String(), genericAvroSerde))
>>>>>>>       .selectKey(MainKeySelector())
>>>>>>>
>>>>>>> val lookupTable = builder.stream(lookupTableTopicName,
>>>>>>> Consumed.with(Serdes.String(), genericAvroSerde))
>>>>>>>       .selectKey(LookupKeySelector())
>>>>>>>       .groupByKey(Serialized.with(Serdes.String(), genericAvroSerde))
>>>>>>>       .reduce({ _, new -> new },
>>>>>>>               Materialized.`as`<String, GenericRecord,
>>>>>>> KeyValueStore<Bytes,
>>>>>>>
>>>>>
>>> ByteArray>>(reducedLookupStoreName).withKeySerde(Serdes.String()).withValueSerde(genericAvroSerde))
>>>>>>>
>>>>>>> streamToJoin
>>>>>>>       .leftJoin(lookupTable, Joiner(streamId),
>>>>>>> Joined.with(Serdes.String(), genericAvroSerde, genericAvroSerde))
>>>>>>>       .to(targetTopicName, Produced.with(Serdes.String(),
>>>>>>> genericAvroSerde))
>>>>>>> val topology = builder.build()
>>>>>>>
>>>>>>> It is simple kind of lookup table to stream join. The Joiner
>>>>>>> implementation looks as follows
>>>>>>>
>>>>>>> class Joiner(private val streamId: Int) : ValueJoiner<GenericRecord,
>>>>>>> GenericRecord, GenericRecord> {
>>>>>>>   override fun apply(main: GenericRecord?, lookup: GenericRecord?):
>>>>>>> GenericRecord {
>>>>>>>       if (main == null) LOG.warn("for streamId: $streamId record from
>>>>>>> main is null")
>>>>>>>       if (lookup == null) LOG.warn("for streamId: $streamId record
>>> from
>>>>>>> lookup is null")
>>>>>>>
>>>>>>>       return GenericData.Record(MySampleData.schema)
>>>>>>>               .apply {
>>>>>>>                   put(MySampleData::stringField.name,
>>>>>>> main?.get(MySampleData::stringField.name))
>>>>>>>                   put(MySampleData::booleanField.name,
>>>>>>> main?.get(MySampleData::booleanField.name))
>>>>>>>                   put(MySampleData::intField.name,
>>>>>>> lookup?.get(MySampleData::intField.name))
>>>>>>>               }
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>> The problem is that sometimes in not deterministic way Joiner’s
>>> apply()
>>>>>>> method gets null for lookup parameter, while in some cases the
>>>>> parameter is
>>>>>>> not null - as expected.
>>>>>>> The repo I referred above contains a test that is supposed to use that
>>>>>>> topology. It iterates 10 times building new instance of the topology
>>>>> each
>>>>>>> time and then it feeds two topics with sample data (10 records for
>>> each
>>>>>>> topic) expecting 1 to 1 join will be performed for each records pair.
>>>>>>> As seen in log output:
>>>>>>> 2019-09-08 13:49:09,634 [main] INFO  com.example.demo.JoinStreamTest
>>>>>>> [tenantId=]  - Number of not properly joined per iteration (iteration
>>>>>>> number -> number of errors): {0=1, 1=1, 2=1, 3=1, 4=0, 5=1, 6=1, 7=1,
>>>>> 8=1,
>>>>>>> 9=0}. Total errors: 8
>>>>>>>
>>>>>>> Some of of the iteration produce no errors, while most of them does.
>>>>>>>
>>>>>>> Any help welcome. At this point I have no clue what may clause such
>>>>>>> behaviour.
>>>>>>> Best regards
>>>>>>> BK
>>>>>
>>>>>
>>>>
>>>
>>>
> 


Re: ValueJoiner apply() called with nulls???

Posted by Bartłomiej Kępa <ba...@gmail.com>.
Hi all,
I think I found the root cause of the problem. In the code I’ve provided I’m feeding both input topics with data without keys. I’m selecting keys based on the message contents actually in a step that is a part of declared topology. Having that, even if I make sure that input topics are populated upfront and lookup topic is populated before the stream data (as I believe Alex suggested), it does not guarantee that the same order will be applied in intermediate topics created internally by KafkaStream when it builds the topology.
I’ve updated my GitHub example with the working topology and corresponding test which seems to prove what I’ve just described.
Anyway thank you for all suggestions.
Best regards,
BK


> Wiadomość napisana przez Alex Brekken <br...@gmail.com> w dniu 10.09.2019, o godz. 03:25:
> 
> Matthias: if the data you're joining against is relatively small and
> unchanging, would a global KTable be a reasonable choice since you get the
> guarantee that it will get loaded first on startup?  Not sure if that's the
> case for Bartlomiej, but just want to make sure that that kind of scenario
> is a good fit for a global KTable.  (thanks for the info on the changes to
> timestamp synchronization - I missed that)
> 
> Alex
> 
> On Mon, Sep 9, 2019 at 7:29 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Using a GlobalKTable has many implication and I would not recommend it
>> necessarily. It makes processing non-deterministic because there is
>> synchronization between the main processing thread and the global-threads.
>> 
>> Note that data is processed base in their timestamps. Hence, if you
>> pre-populate both topics, you need to ensure that the data in the KTable
>> topis has smaller timestamps than in the KStream topic -- that way, the
>> KTable will be "loaded" before any KStream record will be processed.
>> 
>> Note, that before 2.1 release, the timestamp synchronization was best
>> effort only -- hence, you should use 2.1 or newer for Kafka Streams
>> (broker version does not matter).
>> 
>> Also consider `max.task.idle.ms` configuration parameter that can
>> "block" one side from processing for some time in case you write the
>> data after the application was started (ie, tradeoff between latency vs
>> better ordering guarantees)
>> 
>> 
>> -Matthias
>> 
>> On 9/9/19 1:54 PM, Alex Brekken wrote:
>>> Just to be clear, the timing issue I was referring to was with consuming
>>> the data, not publishing.  In order for your join to work correctly (and
>>> consistently), all the data in the lookupTable needs to be there BEFORE
>> the
>>> streamToJoin data starts processing right?   Your topology won't wait for
>>> the lookupTable to get fully populated before processing data, which
>> means
>>> there might be cases where streamToJoin is trying to find a match in but
>> it
>>> isn't there yet because it hasn't fully consumed its topic.  This is why
>> I
>>> think using a GlobalKTable might solve your problem since it bootstraps
>> the
>>> global ktable first.  You might have already understood what I meant,
>> just
>>> making sure.  Good luck!
>>> 
>>> Alex
>>> 
>>> On Mon, Sep 9, 2019 at 5:27 AM Bartłomiej Kępa <
>> bartlomiej.kepa@gmail.com>
>>> wrote:
>>> 
>>>> Hi Alex
>>>> Thank you for your quick response. Unfortunately it seems that it is not
>>>> timing issue. At least in following understanding. I modified the test
>> is a
>>>> way that it ensures that the messages were committed to both topics
>> before
>>>> I actually start the topology. Still no improvement with regards to
>>>> expected result. Seems I need to investigate second option with
>>>> GlobalKTable.
>>>> Best regard,
>>>> BK
>>>> 
>>>>> Wiadomość napisana przez Alex Brekken <br...@gmail.com> w dniu
>>>> 08.09.2019, o godz. 14:49:
>>>>> 
>>>>> The non-deterministic behavior you're seeing might be the result of a
>>>>> timing issue.  In other words, in some cases your KTable is fully
>>>> populated
>>>>> by the time data in "streamToJoin" is trying to find a match in
>>>>> "lookupTable" and in other cases it isn't. If you haven't already, you
>>>>> might want to take a look at using a GlobalKTable to see if that will
>>>> work
>>>>> for your use-case.  On startup, I believe Kafka Streams will wait until
>>>> the
>>>>> GlobalKTable has fully consumed the topic before data starts flowing.
>>>>> There are downsides to  GlobalKTable's (check the documentation), but
>> if
>>>>> this is just a lookup table where the data is fairly static then it
>> might
>>>>> make sense.
>>>>> 
>>>>> Alex
>>>>> 
>>>>> On Sun, Sep 8, 2019 at 7:28 AM Bartłomiej Kępa <
>>>> bartlomiej.kepa@gmail.com>
>>>>> wrote:
>>>>> 
>>>>>> Hi All,
>>>>>> Since some time I’m involved in development of application that
>>>>>> incorporates Kafka Streams API, I’m facing the problem with joining
>> two
>>>>>> Kafka topics. The problem is illustrated in simple test that ws
>> prepared
>>>>>> based on our production code. It is available here:
>>>>>> https://bitbucket.org/b_a_r_t_k/streams-join-problem/ <
>>>>>> https://bitbucket.org/b_a_r_t_k/streams-join-problem/>
>>>>>> As seen in the class JoinStreamBuilder:
>>>>>> 
>>>>>> val builder = StreamsBuilder()
>>>>>> 
>>>>>> val reducedLookupStoreName = "reduced_$lookupTableTopicName$streamId"
>>>>>> 
>>>>>> val streamToJoin = builder.stream(mainTopicName,
>>>>>> Consumed.with(Serdes.String(), genericAvroSerde))
>>>>>>       .selectKey(MainKeySelector())
>>>>>> 
>>>>>> val lookupTable = builder.stream(lookupTableTopicName,
>>>>>> Consumed.with(Serdes.String(), genericAvroSerde))
>>>>>>       .selectKey(LookupKeySelector())
>>>>>>       .groupByKey(Serialized.with(Serdes.String(), genericAvroSerde))
>>>>>>       .reduce({ _, new -> new },
>>>>>>               Materialized.`as`<String, GenericRecord,
>>>>>> KeyValueStore<Bytes,
>>>>>> 
>>>> 
>> ByteArray>>(reducedLookupStoreName).withKeySerde(Serdes.String()).withValueSerde(genericAvroSerde))
>>>>>> 
>>>>>> streamToJoin
>>>>>>       .leftJoin(lookupTable, Joiner(streamId),
>>>>>> Joined.with(Serdes.String(), genericAvroSerde, genericAvroSerde))
>>>>>>       .to(targetTopicName, Produced.with(Serdes.String(),
>>>>>> genericAvroSerde))
>>>>>> val topology = builder.build()
>>>>>> 
>>>>>> It is simple kind of lookup table to stream join. The Joiner
>>>>>> implementation looks as follows
>>>>>> 
>>>>>> class Joiner(private val streamId: Int) : ValueJoiner<GenericRecord,
>>>>>> GenericRecord, GenericRecord> {
>>>>>>   override fun apply(main: GenericRecord?, lookup: GenericRecord?):
>>>>>> GenericRecord {
>>>>>>       if (main == null) LOG.warn("for streamId: $streamId record from
>>>>>> main is null")
>>>>>>       if (lookup == null) LOG.warn("for streamId: $streamId record
>> from
>>>>>> lookup is null")
>>>>>> 
>>>>>>       return GenericData.Record(MySampleData.schema)
>>>>>>               .apply {
>>>>>>                   put(MySampleData::stringField.name,
>>>>>> main?.get(MySampleData::stringField.name))
>>>>>>                   put(MySampleData::booleanField.name,
>>>>>> main?.get(MySampleData::booleanField.name))
>>>>>>                   put(MySampleData::intField.name,
>>>>>> lookup?.get(MySampleData::intField.name))
>>>>>>               }
>>>>>>   }
>>>>>> }
>>>>>> 
>>>>>> The problem is that sometimes in not deterministic way Joiner’s
>> apply()
>>>>>> method gets null for lookup parameter, while in some cases the
>>>> parameter is
>>>>>> not null - as expected.
>>>>>> The repo I referred above contains a test that is supposed to use that
>>>>>> topology. It iterates 10 times building new instance of the topology
>>>> each
>>>>>> time and then it feeds two topics with sample data (10 records for
>> each
>>>>>> topic) expecting 1 to 1 join will be performed for each records pair.
>>>>>> As seen in log output:
>>>>>> 2019-09-08 13:49:09,634 [main] INFO  com.example.demo.JoinStreamTest
>>>>>> [tenantId=]  - Number of not properly joined per iteration (iteration
>>>>>> number -> number of errors): {0=1, 1=1, 2=1, 3=1, 4=0, 5=1, 6=1, 7=1,
>>>> 8=1,
>>>>>> 9=0}. Total errors: 8
>>>>>> 
>>>>>> Some of of the iteration produce no errors, while most of them does.
>>>>>> 
>>>>>> Any help welcome. At this point I have no clue what may clause such
>>>>>> behaviour.
>>>>>> Best regards
>>>>>> BK
>>>> 
>>>> 
>>> 
>> 
>> 


Re: ValueJoiner apply() called with nulls???

Posted by Alex Brekken <br...@gmail.com>.
Matthias: if the data you're joining against is relatively small and
unchanging, would a global KTable be a reasonable choice since you get the
guarantee that it will get loaded first on startup?  Not sure if that's the
case for Bartlomiej, but just want to make sure that that kind of scenario
is a good fit for a global KTable.  (thanks for the info on the changes to
timestamp synchronization - I missed that)

Alex

On Mon, Sep 9, 2019 at 7:29 PM Matthias J. Sax <ma...@confluent.io>
wrote:

> Using a GlobalKTable has many implication and I would not recommend it
> necessarily. It makes processing non-deterministic because there is
> synchronization between the main processing thread and the global-threads.
>
> Note that data is processed base in their timestamps. Hence, if you
> pre-populate both topics, you need to ensure that the data in the KTable
> topis has smaller timestamps than in the KStream topic -- that way, the
> KTable will be "loaded" before any KStream record will be processed.
>
> Note, that before 2.1 release, the timestamp synchronization was best
> effort only -- hence, you should use 2.1 or newer for Kafka Streams
> (broker version does not matter).
>
> Also consider `max.task.idle.ms` configuration parameter that can
> "block" one side from processing for some time in case you write the
> data after the application was started (ie, tradeoff between latency vs
> better ordering guarantees)
>
>
> -Matthias
>
> On 9/9/19 1:54 PM, Alex Brekken wrote:
> > Just to be clear, the timing issue I was referring to was with consuming
> > the data, not publishing.  In order for your join to work correctly (and
> > consistently), all the data in the lookupTable needs to be there BEFORE
> the
> > streamToJoin data starts processing right?   Your topology won't wait for
> > the lookupTable to get fully populated before processing data, which
> means
> > there might be cases where streamToJoin is trying to find a match in but
> it
> > isn't there yet because it hasn't fully consumed its topic.  This is why
> I
> > think using a GlobalKTable might solve your problem since it bootstraps
> the
> > global ktable first.  You might have already understood what I meant,
> just
> > making sure.  Good luck!
> >
> > Alex
> >
> > On Mon, Sep 9, 2019 at 5:27 AM Bartłomiej Kępa <
> bartlomiej.kepa@gmail.com>
> > wrote:
> >
> >> Hi Alex
> >> Thank you for your quick response. Unfortunately it seems that it is not
> >> timing issue. At least in following understanding. I modified the test
> is a
> >> way that it ensures that the messages were committed to both topics
> before
> >> I actually start the topology. Still no improvement with regards to
> >> expected result. Seems I need to investigate second option with
> >> GlobalKTable.
> >> Best regard,
> >> BK
> >>
> >>> Wiadomość napisana przez Alex Brekken <br...@gmail.com> w dniu
> >> 08.09.2019, o godz. 14:49:
> >>>
> >>> The non-deterministic behavior you're seeing might be the result of a
> >>> timing issue.  In other words, in some cases your KTable is fully
> >> populated
> >>> by the time data in "streamToJoin" is trying to find a match in
> >>> "lookupTable" and in other cases it isn't. If you haven't already, you
> >>> might want to take a look at using a GlobalKTable to see if that will
> >> work
> >>> for your use-case.  On startup, I believe Kafka Streams will wait until
> >> the
> >>> GlobalKTable has fully consumed the topic before data starts flowing.
> >>> There are downsides to  GlobalKTable's (check the documentation), but
> if
> >>> this is just a lookup table where the data is fairly static then it
> might
> >>> make sense.
> >>>
> >>> Alex
> >>>
> >>> On Sun, Sep 8, 2019 at 7:28 AM Bartłomiej Kępa <
> >> bartlomiej.kepa@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi All,
> >>>> Since some time I’m involved in development of application that
> >>>> incorporates Kafka Streams API, I’m facing the problem with joining
> two
> >>>> Kafka topics. The problem is illustrated in simple test that ws
> prepared
> >>>> based on our production code. It is available here:
> >>>> https://bitbucket.org/b_a_r_t_k/streams-join-problem/ <
> >>>> https://bitbucket.org/b_a_r_t_k/streams-join-problem/>
> >>>> As seen in the class JoinStreamBuilder:
> >>>>
> >>>> val builder = StreamsBuilder()
> >>>>
> >>>> val reducedLookupStoreName = "reduced_$lookupTableTopicName$streamId"
> >>>>
> >>>> val streamToJoin = builder.stream(mainTopicName,
> >>>> Consumed.with(Serdes.String(), genericAvroSerde))
> >>>>        .selectKey(MainKeySelector())
> >>>>
> >>>> val lookupTable = builder.stream(lookupTableTopicName,
> >>>> Consumed.with(Serdes.String(), genericAvroSerde))
> >>>>        .selectKey(LookupKeySelector())
> >>>>        .groupByKey(Serialized.with(Serdes.String(), genericAvroSerde))
> >>>>        .reduce({ _, new -> new },
> >>>>                Materialized.`as`<String, GenericRecord,
> >>>> KeyValueStore<Bytes,
> >>>>
> >>
> ByteArray>>(reducedLookupStoreName).withKeySerde(Serdes.String()).withValueSerde(genericAvroSerde))
> >>>>
> >>>> streamToJoin
> >>>>        .leftJoin(lookupTable, Joiner(streamId),
> >>>> Joined.with(Serdes.String(), genericAvroSerde, genericAvroSerde))
> >>>>        .to(targetTopicName, Produced.with(Serdes.String(),
> >>>> genericAvroSerde))
> >>>> val topology = builder.build()
> >>>>
> >>>> It is simple kind of lookup table to stream join. The Joiner
> >>>> implementation looks as follows
> >>>>
> >>>> class Joiner(private val streamId: Int) : ValueJoiner<GenericRecord,
> >>>> GenericRecord, GenericRecord> {
> >>>>    override fun apply(main: GenericRecord?, lookup: GenericRecord?):
> >>>> GenericRecord {
> >>>>        if (main == null) LOG.warn("for streamId: $streamId record from
> >>>> main is null")
> >>>>        if (lookup == null) LOG.warn("for streamId: $streamId record
> from
> >>>> lookup is null")
> >>>>
> >>>>        return GenericData.Record(MySampleData.schema)
> >>>>                .apply {
> >>>>                    put(MySampleData::stringField.name,
> >>>> main?.get(MySampleData::stringField.name))
> >>>>                    put(MySampleData::booleanField.name,
> >>>> main?.get(MySampleData::booleanField.name))
> >>>>                    put(MySampleData::intField.name,
> >>>> lookup?.get(MySampleData::intField.name))
> >>>>                }
> >>>>    }
> >>>> }
> >>>>
> >>>> The problem is that sometimes in not deterministic way Joiner’s
> apply()
> >>>> method gets null for lookup parameter, while in some cases the
> >> parameter is
> >>>> not null - as expected.
> >>>> The repo I referred above contains a test that is supposed to use that
> >>>> topology. It iterates 10 times building new instance of the topology
> >> each
> >>>> time and then it feeds two topics with sample data (10 records for
> each
> >>>> topic) expecting 1 to 1 join will be performed for each records pair.
> >>>> As seen in log output:
> >>>> 2019-09-08 13:49:09,634 [main] INFO  com.example.demo.JoinStreamTest
> >>>> [tenantId=]  - Number of not properly joined per iteration (iteration
> >>>> number -> number of errors): {0=1, 1=1, 2=1, 3=1, 4=0, 5=1, 6=1, 7=1,
> >> 8=1,
> >>>> 9=0}. Total errors: 8
> >>>>
> >>>> Some of of the iteration produce no errors, while most of them does.
> >>>>
> >>>> Any help welcome. At this point I have no clue what may clause such
> >>>> behaviour.
> >>>> Best regards
> >>>> BK
> >>
> >>
> >
>
>

Re: ValueJoiner apply() called with nulls???

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Using a GlobalKTable has many implication and I would not recommend it
necessarily. It makes processing non-deterministic because there is
synchronization between the main processing thread and the global-threads.

Note that data is processed base in their timestamps. Hence, if you
pre-populate both topics, you need to ensure that the data in the KTable
topis has smaller timestamps than in the KStream topic -- that way, the
KTable will be "loaded" before any KStream record will be processed.

Note, that before 2.1 release, the timestamp synchronization was best
effort only -- hence, you should use 2.1 or newer for Kafka Streams
(broker version does not matter).

Also consider `max.task.idle.ms` configuration parameter that can
"block" one side from processing for some time in case you write the
data after the application was started (ie, tradeoff between latency vs
better ordering guarantees)


-Matthias

On 9/9/19 1:54 PM, Alex Brekken wrote:
> Just to be clear, the timing issue I was referring to was with consuming
> the data, not publishing.  In order for your join to work correctly (and
> consistently), all the data in the lookupTable needs to be there BEFORE the
> streamToJoin data starts processing right?   Your topology won't wait for
> the lookupTable to get fully populated before processing data, which means
> there might be cases where streamToJoin is trying to find a match in but it
> isn't there yet because it hasn't fully consumed its topic.  This is why I
> think using a GlobalKTable might solve your problem since it bootstraps the
> global ktable first.  You might have already understood what I meant, just
> making sure.  Good luck!
> 
> Alex
> 
> On Mon, Sep 9, 2019 at 5:27 AM Bartłomiej Kępa <ba...@gmail.com>
> wrote:
> 
>> Hi Alex
>> Thank you for your quick response. Unfortunately it seems that it is not
>> timing issue. At least in following understanding. I modified the test is a
>> way that it ensures that the messages were committed to both topics before
>> I actually start the topology. Still no improvement with regards to
>> expected result. Seems I need to investigate second option with
>> GlobalKTable.
>> Best regard,
>> BK
>>
>>> Wiadomość napisana przez Alex Brekken <br...@gmail.com> w dniu
>> 08.09.2019, o godz. 14:49:
>>>
>>> The non-deterministic behavior you're seeing might be the result of a
>>> timing issue.  In other words, in some cases your KTable is fully
>> populated
>>> by the time data in "streamToJoin" is trying to find a match in
>>> "lookupTable" and in other cases it isn't. If you haven't already, you
>>> might want to take a look at using a GlobalKTable to see if that will
>> work
>>> for your use-case.  On startup, I believe Kafka Streams will wait until
>> the
>>> GlobalKTable has fully consumed the topic before data starts flowing.
>>> There are downsides to  GlobalKTable's (check the documentation), but if
>>> this is just a lookup table where the data is fairly static then it might
>>> make sense.
>>>
>>> Alex
>>>
>>> On Sun, Sep 8, 2019 at 7:28 AM Bartłomiej Kępa <
>> bartlomiej.kepa@gmail.com>
>>> wrote:
>>>
>>>> Hi All,
>>>> Since some time I’m involved in development of application that
>>>> incorporates Kafka Streams API, I’m facing the problem with joining two
>>>> Kafka topics. The problem is illustrated in simple test that ws prepared
>>>> based on our production code. It is available here:
>>>> https://bitbucket.org/b_a_r_t_k/streams-join-problem/ <
>>>> https://bitbucket.org/b_a_r_t_k/streams-join-problem/>
>>>> As seen in the class JoinStreamBuilder:
>>>>
>>>> val builder = StreamsBuilder()
>>>>
>>>> val reducedLookupStoreName = "reduced_$lookupTableTopicName$streamId"
>>>>
>>>> val streamToJoin = builder.stream(mainTopicName,
>>>> Consumed.with(Serdes.String(), genericAvroSerde))
>>>>        .selectKey(MainKeySelector())
>>>>
>>>> val lookupTable = builder.stream(lookupTableTopicName,
>>>> Consumed.with(Serdes.String(), genericAvroSerde))
>>>>        .selectKey(LookupKeySelector())
>>>>        .groupByKey(Serialized.with(Serdes.String(), genericAvroSerde))
>>>>        .reduce({ _, new -> new },
>>>>                Materialized.`as`<String, GenericRecord,
>>>> KeyValueStore<Bytes,
>>>>
>> ByteArray>>(reducedLookupStoreName).withKeySerde(Serdes.String()).withValueSerde(genericAvroSerde))
>>>>
>>>> streamToJoin
>>>>        .leftJoin(lookupTable, Joiner(streamId),
>>>> Joined.with(Serdes.String(), genericAvroSerde, genericAvroSerde))
>>>>        .to(targetTopicName, Produced.with(Serdes.String(),
>>>> genericAvroSerde))
>>>> val topology = builder.build()
>>>>
>>>> It is simple kind of lookup table to stream join. The Joiner
>>>> implementation looks as follows
>>>>
>>>> class Joiner(private val streamId: Int) : ValueJoiner<GenericRecord,
>>>> GenericRecord, GenericRecord> {
>>>>    override fun apply(main: GenericRecord?, lookup: GenericRecord?):
>>>> GenericRecord {
>>>>        if (main == null) LOG.warn("for streamId: $streamId record from
>>>> main is null")
>>>>        if (lookup == null) LOG.warn("for streamId: $streamId record from
>>>> lookup is null")
>>>>
>>>>        return GenericData.Record(MySampleData.schema)
>>>>                .apply {
>>>>                    put(MySampleData::stringField.name,
>>>> main?.get(MySampleData::stringField.name))
>>>>                    put(MySampleData::booleanField.name,
>>>> main?.get(MySampleData::booleanField.name))
>>>>                    put(MySampleData::intField.name,
>>>> lookup?.get(MySampleData::intField.name))
>>>>                }
>>>>    }
>>>> }
>>>>
>>>> The problem is that sometimes in not deterministic way Joiner’s apply()
>>>> method gets null for lookup parameter, while in some cases the
>> parameter is
>>>> not null - as expected.
>>>> The repo I referred above contains a test that is supposed to use that
>>>> topology. It iterates 10 times building new instance of the topology
>> each
>>>> time and then it feeds two topics with sample data (10 records for each
>>>> topic) expecting 1 to 1 join will be performed for each records pair.
>>>> As seen in log output:
>>>> 2019-09-08 13:49:09,634 [main] INFO  com.example.demo.JoinStreamTest
>>>> [tenantId=]  - Number of not properly joined per iteration (iteration
>>>> number -> number of errors): {0=1, 1=1, 2=1, 3=1, 4=0, 5=1, 6=1, 7=1,
>> 8=1,
>>>> 9=0}. Total errors: 8
>>>>
>>>> Some of of the iteration produce no errors, while most of them does.
>>>>
>>>> Any help welcome. At this point I have no clue what may clause such
>>>> behaviour.
>>>> Best regards
>>>> BK
>>
>>
> 


Re: ValueJoiner apply() called with nulls???

Posted by Alex Brekken <br...@gmail.com>.
Just to be clear, the timing issue I was referring to was with consuming
the data, not publishing.  In order for your join to work correctly (and
consistently), all the data in the lookupTable needs to be there BEFORE the
streamToJoin data starts processing right?   Your topology won't wait for
the lookupTable to get fully populated before processing data, which means
there might be cases where streamToJoin is trying to find a match in but it
isn't there yet because it hasn't fully consumed its topic.  This is why I
think using a GlobalKTable might solve your problem since it bootstraps the
global ktable first.  You might have already understood what I meant, just
making sure.  Good luck!

Alex

On Mon, Sep 9, 2019 at 5:27 AM Bartłomiej Kępa <ba...@gmail.com>
wrote:

> Hi Alex
> Thank you for your quick response. Unfortunately it seems that it is not
> timing issue. At least in following understanding. I modified the test is a
> way that it ensures that the messages were committed to both topics before
> I actually start the topology. Still no improvement with regards to
> expected result. Seems I need to investigate second option with
> GlobalKTable.
> Best regard,
> BK
>
> > Wiadomość napisana przez Alex Brekken <br...@gmail.com> w dniu
> 08.09.2019, o godz. 14:49:
> >
> > The non-deterministic behavior you're seeing might be the result of a
> > timing issue.  In other words, in some cases your KTable is fully
> populated
> > by the time data in "streamToJoin" is trying to find a match in
> > "lookupTable" and in other cases it isn't. If you haven't already, you
> > might want to take a look at using a GlobalKTable to see if that will
> work
> > for your use-case.  On startup, I believe Kafka Streams will wait until
> the
> > GlobalKTable has fully consumed the topic before data starts flowing.
> > There are downsides to  GlobalKTable's (check the documentation), but if
> > this is just a lookup table where the data is fairly static then it might
> > make sense.
> >
> > Alex
> >
> > On Sun, Sep 8, 2019 at 7:28 AM Bartłomiej Kępa <
> bartlomiej.kepa@gmail.com>
> > wrote:
> >
> >> Hi All,
> >> Since some time I’m involved in development of application that
> >> incorporates Kafka Streams API, I’m facing the problem with joining two
> >> Kafka topics. The problem is illustrated in simple test that ws prepared
> >> based on our production code. It is available here:
> >> https://bitbucket.org/b_a_r_t_k/streams-join-problem/ <
> >> https://bitbucket.org/b_a_r_t_k/streams-join-problem/>
> >> As seen in the class JoinStreamBuilder:
> >>
> >> val builder = StreamsBuilder()
> >>
> >> val reducedLookupStoreName = "reduced_$lookupTableTopicName$streamId"
> >>
> >> val streamToJoin = builder.stream(mainTopicName,
> >> Consumed.with(Serdes.String(), genericAvroSerde))
> >>        .selectKey(MainKeySelector())
> >>
> >> val lookupTable = builder.stream(lookupTableTopicName,
> >> Consumed.with(Serdes.String(), genericAvroSerde))
> >>        .selectKey(LookupKeySelector())
> >>        .groupByKey(Serialized.with(Serdes.String(), genericAvroSerde))
> >>        .reduce({ _, new -> new },
> >>                Materialized.`as`<String, GenericRecord,
> >> KeyValueStore<Bytes,
> >>
> ByteArray>>(reducedLookupStoreName).withKeySerde(Serdes.String()).withValueSerde(genericAvroSerde))
> >>
> >> streamToJoin
> >>        .leftJoin(lookupTable, Joiner(streamId),
> >> Joined.with(Serdes.String(), genericAvroSerde, genericAvroSerde))
> >>        .to(targetTopicName, Produced.with(Serdes.String(),
> >> genericAvroSerde))
> >> val topology = builder.build()
> >>
> >> It is simple kind of lookup table to stream join. The Joiner
> >> implementation looks as follows
> >>
> >> class Joiner(private val streamId: Int) : ValueJoiner<GenericRecord,
> >> GenericRecord, GenericRecord> {
> >>    override fun apply(main: GenericRecord?, lookup: GenericRecord?):
> >> GenericRecord {
> >>        if (main == null) LOG.warn("for streamId: $streamId record from
> >> main is null")
> >>        if (lookup == null) LOG.warn("for streamId: $streamId record from
> >> lookup is null")
> >>
> >>        return GenericData.Record(MySampleData.schema)
> >>                .apply {
> >>                    put(MySampleData::stringField.name,
> >> main?.get(MySampleData::stringField.name))
> >>                    put(MySampleData::booleanField.name,
> >> main?.get(MySampleData::booleanField.name))
> >>                    put(MySampleData::intField.name,
> >> lookup?.get(MySampleData::intField.name))
> >>                }
> >>    }
> >> }
> >>
> >> The problem is that sometimes in not deterministic way Joiner’s apply()
> >> method gets null for lookup parameter, while in some cases the
> parameter is
> >> not null - as expected.
> >> The repo I referred above contains a test that is supposed to use that
> >> topology. It iterates 10 times building new instance of the topology
> each
> >> time and then it feeds two topics with sample data (10 records for each
> >> topic) expecting 1 to 1 join will be performed for each records pair.
> >> As seen in log output:
> >> 2019-09-08 13:49:09,634 [main] INFO  com.example.demo.JoinStreamTest
> >> [tenantId=]  - Number of not properly joined per iteration (iteration
> >> number -> number of errors): {0=1, 1=1, 2=1, 3=1, 4=0, 5=1, 6=1, 7=1,
> 8=1,
> >> 9=0}. Total errors: 8
> >>
> >> Some of of the iteration produce no errors, while most of them does.
> >>
> >> Any help welcome. At this point I have no clue what may clause such
> >> behaviour.
> >> Best regards
> >> BK
>
>

Re: ValueJoiner apply() called with nulls???

Posted by Bartłomiej Kępa <ba...@gmail.com>.
Hi Alex
Thank you for your quick response. Unfortunately it seems that it is not timing issue. At least in following understanding. I modified the test is a way that it ensures that the messages were committed to both topics before I actually start the topology. Still no improvement with regards to expected result. Seems I need to investigate second option with GlobalKTable.
Best regard,
BK

> Wiadomość napisana przez Alex Brekken <br...@gmail.com> w dniu 08.09.2019, o godz. 14:49:
> 
> The non-deterministic behavior you're seeing might be the result of a
> timing issue.  In other words, in some cases your KTable is fully populated
> by the time data in "streamToJoin" is trying to find a match in
> "lookupTable" and in other cases it isn't. If you haven't already, you
> might want to take a look at using a GlobalKTable to see if that will work
> for your use-case.  On startup, I believe Kafka Streams will wait until the
> GlobalKTable has fully consumed the topic before data starts flowing.
> There are downsides to  GlobalKTable's (check the documentation), but if
> this is just a lookup table where the data is fairly static then it might
> make sense.
> 
> Alex
> 
> On Sun, Sep 8, 2019 at 7:28 AM Bartłomiej Kępa <ba...@gmail.com>
> wrote:
> 
>> Hi All,
>> Since some time I’m involved in development of application that
>> incorporates Kafka Streams API, I’m facing the problem with joining two
>> Kafka topics. The problem is illustrated in simple test that ws prepared
>> based on our production code. It is available here:
>> https://bitbucket.org/b_a_r_t_k/streams-join-problem/ <
>> https://bitbucket.org/b_a_r_t_k/streams-join-problem/>
>> As seen in the class JoinStreamBuilder:
>> 
>> val builder = StreamsBuilder()
>> 
>> val reducedLookupStoreName = "reduced_$lookupTableTopicName$streamId"
>> 
>> val streamToJoin = builder.stream(mainTopicName,
>> Consumed.with(Serdes.String(), genericAvroSerde))
>>        .selectKey(MainKeySelector())
>> 
>> val lookupTable = builder.stream(lookupTableTopicName,
>> Consumed.with(Serdes.String(), genericAvroSerde))
>>        .selectKey(LookupKeySelector())
>>        .groupByKey(Serialized.with(Serdes.String(), genericAvroSerde))
>>        .reduce({ _, new -> new },
>>                Materialized.`as`<String, GenericRecord,
>> KeyValueStore<Bytes,
>> ByteArray>>(reducedLookupStoreName).withKeySerde(Serdes.String()).withValueSerde(genericAvroSerde))
>> 
>> streamToJoin
>>        .leftJoin(lookupTable, Joiner(streamId),
>> Joined.with(Serdes.String(), genericAvroSerde, genericAvroSerde))
>>        .to(targetTopicName, Produced.with(Serdes.String(),
>> genericAvroSerde))
>> val topology = builder.build()
>> 
>> It is simple kind of lookup table to stream join. The Joiner
>> implementation looks as follows
>> 
>> class Joiner(private val streamId: Int) : ValueJoiner<GenericRecord,
>> GenericRecord, GenericRecord> {
>>    override fun apply(main: GenericRecord?, lookup: GenericRecord?):
>> GenericRecord {
>>        if (main == null) LOG.warn("for streamId: $streamId record from
>> main is null")
>>        if (lookup == null) LOG.warn("for streamId: $streamId record from
>> lookup is null")
>> 
>>        return GenericData.Record(MySampleData.schema)
>>                .apply {
>>                    put(MySampleData::stringField.name,
>> main?.get(MySampleData::stringField.name))
>>                    put(MySampleData::booleanField.name,
>> main?.get(MySampleData::booleanField.name))
>>                    put(MySampleData::intField.name,
>> lookup?.get(MySampleData::intField.name))
>>                }
>>    }
>> }
>> 
>> The problem is that sometimes in not deterministic way Joiner’s apply()
>> method gets null for lookup parameter, while in some cases the parameter is
>> not null - as expected.
>> The repo I referred above contains a test that is supposed to use that
>> topology. It iterates 10 times building new instance of the topology each
>> time and then it feeds two topics with sample data (10 records for each
>> topic) expecting 1 to 1 join will be performed for each records pair.
>> As seen in log output:
>> 2019-09-08 13:49:09,634 [main] INFO  com.example.demo.JoinStreamTest
>> [tenantId=]  - Number of not properly joined per iteration (iteration
>> number -> number of errors): {0=1, 1=1, 2=1, 3=1, 4=0, 5=1, 6=1, 7=1, 8=1,
>> 9=0}. Total errors: 8
>> 
>> Some of of the iteration produce no errors, while most of them does.
>> 
>> Any help welcome. At this point I have no clue what may clause such
>> behaviour.
>> Best regards
>> BK


Re: ValueJoiner apply() called with nulls???

Posted by Alex Brekken <br...@gmail.com>.
The non-deterministic behavior you're seeing might be the result of a
timing issue.  In other words, in some cases your KTable is fully populated
by the time data in "streamToJoin" is trying to find a match in
"lookupTable" and in other cases it isn't. If you haven't already, you
might want to take a look at using a GlobalKTable to see if that will work
for your use-case.  On startup, I believe Kafka Streams will wait until the
GlobalKTable has fully consumed the topic before data starts flowing.
There are downsides to  GlobalKTable's (check the documentation), but if
this is just a lookup table where the data is fairly static then it might
make sense.

Alex

On Sun, Sep 8, 2019 at 7:28 AM Bartłomiej Kępa <ba...@gmail.com>
wrote:

> Hi All,
> Since some time I’m involved in development of application that
> incorporates Kafka Streams API, I’m facing the problem with joining two
> Kafka topics. The problem is illustrated in simple test that ws prepared
> based on our production code. It is available here:
> https://bitbucket.org/b_a_r_t_k/streams-join-problem/ <
> https://bitbucket.org/b_a_r_t_k/streams-join-problem/>
> As seen in the class JoinStreamBuilder:
>
> val builder = StreamsBuilder()
>
> val reducedLookupStoreName = "reduced_$lookupTableTopicName$streamId"
>
> val streamToJoin = builder.stream(mainTopicName,
> Consumed.with(Serdes.String(), genericAvroSerde))
>         .selectKey(MainKeySelector())
>
> val lookupTable = builder.stream(lookupTableTopicName,
> Consumed.with(Serdes.String(), genericAvroSerde))
>         .selectKey(LookupKeySelector())
>         .groupByKey(Serialized.with(Serdes.String(), genericAvroSerde))
>         .reduce({ _, new -> new },
>                 Materialized.`as`<String, GenericRecord,
> KeyValueStore<Bytes,
> ByteArray>>(reducedLookupStoreName).withKeySerde(Serdes.String()).withValueSerde(genericAvroSerde))
>
> streamToJoin
>         .leftJoin(lookupTable, Joiner(streamId),
> Joined.with(Serdes.String(), genericAvroSerde, genericAvroSerde))
>         .to(targetTopicName, Produced.with(Serdes.String(),
> genericAvroSerde))
> val topology = builder.build()
>
> It is simple kind of lookup table to stream join. The Joiner
> implementation looks as follows
>
> class Joiner(private val streamId: Int) : ValueJoiner<GenericRecord,
> GenericRecord, GenericRecord> {
>     override fun apply(main: GenericRecord?, lookup: GenericRecord?):
> GenericRecord {
>         if (main == null) LOG.warn("for streamId: $streamId record from
> main is null")
>         if (lookup == null) LOG.warn("for streamId: $streamId record from
> lookup is null")
>
>         return GenericData.Record(MySampleData.schema)
>                 .apply {
>                     put(MySampleData::stringField.name,
> main?.get(MySampleData::stringField.name))
>                     put(MySampleData::booleanField.name,
> main?.get(MySampleData::booleanField.name))
>                     put(MySampleData::intField.name,
> lookup?.get(MySampleData::intField.name))
>                 }
>     }
> }
>
> The problem is that sometimes in not deterministic way Joiner’s apply()
> method gets null for lookup parameter, while in some cases the parameter is
> not null - as expected.
> The repo I referred above contains a test that is supposed to use that
> topology. It iterates 10 times building new instance of the topology each
> time and then it feeds two topics with sample data (10 records for each
> topic) expecting 1 to 1 join will be performed for each records pair.
> As seen in log output:
> 2019-09-08 13:49:09,634 [main] INFO  com.example.demo.JoinStreamTest
> [tenantId=]  - Number of not properly joined per iteration (iteration
> number -> number of errors): {0=1, 1=1, 2=1, 3=1, 4=0, 5=1, 6=1, 7=1, 8=1,
> 9=0}. Total errors: 8
>
> Some of of the iteration produce no errors, while most of them does.
>
> Any help welcome. At this point I have no clue what may clause such
> behaviour.
> Best regards
> BK