You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Miguel González <mi...@klar.mx> on 2021/11/29 17:21:32 UTC

Kafka Streams - left join behavior

Hello

I have been developing a Kafka Streams app that takes as input two topics
as KStreams, processes them in some way and joins them and sends the
combined message to an output topic.

Here's some code,

final StreamJoined<String, TransactionEvent, BalanceEvent> joinParams =
    StreamJoined.with(
        STRING_SERDE,
        StreamSerdeConstants.TRANSACTION_EVENT_SERDE,
        StreamSerdeConstants.BALANCE_EVENT_SERDE);

JoinWindows joinWindows = JoinWindows
    .of(Duration.ofSeconds(streamsProperties.getJoinWindowDuration()))
    .grace(Duration.ofSeconds(streamsProperties.getJoinGraceDuration()));

ValueJoiner<TransactionEvent, BalanceEvent, BalanceHistoryEvent> valueJoiner =
    (transactionEvent, balanceEvent) -> buildMessage(balanceEvent,
transactionEvent);


transactions
    // TODO: change to leftJoin
    .join(beWithTransaction, valueJoiner, joinWindows, joinParams)


It's pretty simple, but for my use case I need to process in some way the
messages that are not joined, so I thought I could use a LEFT JOIN. But
according to my tests and this documentation
https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/

I have seen in the end I could end up with both the combined message as the
regular inner join performs and the message with one side as NULL, for
example (A,B) and (A, null)

I thought the JOIN Window could force the output of the left join to just
output if it found a match to just (A,B) not both. Maybe I have a bug in my
Window configuration

Is there a way to force the behavior I need, meaning... using left join and
a JoinWindows output only one message (A,B) or (A, null)

regards
- Miguel

Re: Kafka Streams - left join behavior

Posted by Luke Chen <sh...@gmail.com>.
Hi Miguel,

Kafka v3.1.0 has already code freezed, and is dealing with some blocker
issues. It should be released soon.

For this feature status in v3.0.0, I think Matthias knows it the most.
As far as I know, it was ready for v3.0.0 originally, but there's a
regression bug found(KAFKA-13216
<https://issues.apache.org/jira/browse/KAFKA-13216>), so this feature was
disabled.
I'm not sure if this feature still worked as designed in v3.0.0 if you
manually enable it.

I still think that waiting for v3.1.0 release is a better choice (if
possible).

Thank you.
Luke

On Tue, Dec 14, 2021 at 1:34 AM Miguel González <mi...@klar.mx>
wrote:

> Hi Matthias
>
> Do you know when the 3.1 version is going to be released?
>
> I noticed the JoinWindows class has a boolean property called
> enableSpuriousResultFix
>
> If I extend the class the set that flag to true will it eliminate spurious
> messages in kafka streams 3.0.0 ?
>
>
> thanks
> - Miguel
>
>
> On Mon, Dec 6, 2021 at 2:49 PM Matthias J. Sax <mj...@apache.org> wrote:
>
> > It's fixed in upcoming 3.1 release.
> >
> > Cf https://issues.apache.org/jira/browse/KAFKA-10847
> >
> >
> > A stream-(global)table join has different semantics, so I am not sure if
> > it would help.
> >
> > One workaround would be to apply a stateful` faltTransformValues()`
> > after the join to "buffer" all NULL-results and only emit them after you
> > know no consecutive inner-join result will happen. It's tricky to build
> > though.
> >
> > I would recommend to wait and upgrade to 3.1 after it was releases.
> >
> >
> > -Matthias
> >
> > On 11/30/21 12:59 AM, Luke Chen wrote:
> > > Hi Miguel,
> > >> Is there a way to force the behavior I need, meaning... using left
> join
> > > and
> > > a JoinWindows output only one message (A,B) or (A, null)
> > >
> > > I think you can try to achieve it by using *KStream-GlobalKTable left
> > join*,
> > > where the GlobalKTable should read all records at the right topic, and
> > then
> > > doing the left join operation. This should then output either (A,B), or
> > (A,
> > > null).
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Tue, Nov 30, 2021 at 1:23 AM Miguel González <
> miguel.gonzalez@klar.mx
> > >
> > > wrote:
> > >
> > >> Hello
> > >>
> > >> I have been developing a Kafka Streams app that takes as input two
> > topics
> > >> as KStreams, processes them in some way and joins them and sends the
> > >> combined message to an output topic.
> > >>
> > >> Here's some code,
> > >>
> > >> final StreamJoined<String, TransactionEvent, BalanceEvent> joinParams
> =
> > >>      StreamJoined.with(
> > >>          STRING_SERDE,
> > >>          StreamSerdeConstants.TRANSACTION_EVENT_SERDE,
> > >>          StreamSerdeConstants.BALANCE_EVENT_SERDE);
> > >>
> > >> JoinWindows joinWindows = JoinWindows
> > >>
> .of(Duration.ofSeconds(streamsProperties.getJoinWindowDuration()))
> > >>
> > .grace(Duration.ofSeconds(streamsProperties.getJoinGraceDuration()));
> > >>
> > >> ValueJoiner<TransactionEvent, BalanceEvent, BalanceHistoryEvent>
> > >> valueJoiner =
> > >>      (transactionEvent, balanceEvent) -> buildMessage(balanceEvent,
> > >> transactionEvent);
> > >>
> > >>
> > >> transactions
> > >>      // TODO: change to leftJoin
> > >>      .join(beWithTransaction, valueJoiner, joinWindows, joinParams)
> > >>
> > >>
> > >> It's pretty simple, but for my use case I need to process in some way
> > the
> > >> messages that are not joined, so I thought I could use a LEFT JOIN.
> But
> > >> according to my tests and this documentation
> > >> https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/
> > >>
> > >> I have seen in the end I could end up with both the combined message
> as
> > the
> > >> regular inner join performs and the message with one side as NULL, for
> > >> example (A,B) and (A, null)
> > >>
> > >> I thought the JOIN Window could force the output of the left join to
> > just
> > >> output if it found a match to just (A,B) not both. Maybe I have a bug
> > in my
> > >> Window configuration
> > >>
> > >> Is there a way to force the behavior I need, meaning... using left
> join
> > and
> > >> a JoinWindows output only one message (A,B) or (A, null)
> > >>
> > >> regards
> > >> - Miguel
> > >>
> > >
> >
>

Re: Kafka Streams - left join behavior

Posted by Miguel González <mi...@klar.mx>.
Hi Matthias

Do you know when the 3.1 version is going to be released?

I noticed the JoinWindows class has a boolean property called
enableSpuriousResultFix

If I extend the class the set that flag to true will it eliminate spurious
messages in kafka streams 3.0.0 ?


thanks
- Miguel


On Mon, Dec 6, 2021 at 2:49 PM Matthias J. Sax <mj...@apache.org> wrote:

> It's fixed in upcoming 3.1 release.
>
> Cf https://issues.apache.org/jira/browse/KAFKA-10847
>
>
> A stream-(global)table join has different semantics, so I am not sure if
> it would help.
>
> One workaround would be to apply a stateful` faltTransformValues()`
> after the join to "buffer" all NULL-results and only emit them after you
> know no consecutive inner-join result will happen. It's tricky to build
> though.
>
> I would recommend to wait and upgrade to 3.1 after it was releases.
>
>
> -Matthias
>
> On 11/30/21 12:59 AM, Luke Chen wrote:
> > Hi Miguel,
> >> Is there a way to force the behavior I need, meaning... using left join
> > and
> > a JoinWindows output only one message (A,B) or (A, null)
> >
> > I think you can try to achieve it by using *KStream-GlobalKTable left
> join*,
> > where the GlobalKTable should read all records at the right topic, and
> then
> > doing the left join operation. This should then output either (A,B), or
> (A,
> > null).
> >
> > Thank you.
> > Luke
> >
> > On Tue, Nov 30, 2021 at 1:23 AM Miguel González <miguel.gonzalez@klar.mx
> >
> > wrote:
> >
> >> Hello
> >>
> >> I have been developing a Kafka Streams app that takes as input two
> topics
> >> as KStreams, processes them in some way and joins them and sends the
> >> combined message to an output topic.
> >>
> >> Here's some code,
> >>
> >> final StreamJoined<String, TransactionEvent, BalanceEvent> joinParams =
> >>      StreamJoined.with(
> >>          STRING_SERDE,
> >>          StreamSerdeConstants.TRANSACTION_EVENT_SERDE,
> >>          StreamSerdeConstants.BALANCE_EVENT_SERDE);
> >>
> >> JoinWindows joinWindows = JoinWindows
> >>      .of(Duration.ofSeconds(streamsProperties.getJoinWindowDuration()))
> >>
> .grace(Duration.ofSeconds(streamsProperties.getJoinGraceDuration()));
> >>
> >> ValueJoiner<TransactionEvent, BalanceEvent, BalanceHistoryEvent>
> >> valueJoiner =
> >>      (transactionEvent, balanceEvent) -> buildMessage(balanceEvent,
> >> transactionEvent);
> >>
> >>
> >> transactions
> >>      // TODO: change to leftJoin
> >>      .join(beWithTransaction, valueJoiner, joinWindows, joinParams)
> >>
> >>
> >> It's pretty simple, but for my use case I need to process in some way
> the
> >> messages that are not joined, so I thought I could use a LEFT JOIN. But
> >> according to my tests and this documentation
> >> https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/
> >>
> >> I have seen in the end I could end up with both the combined message as
> the
> >> regular inner join performs and the message with one side as NULL, for
> >> example (A,B) and (A, null)
> >>
> >> I thought the JOIN Window could force the output of the left join to
> just
> >> output if it found a match to just (A,B) not both. Maybe I have a bug
> in my
> >> Window configuration
> >>
> >> Is there a way to force the behavior I need, meaning... using left join
> and
> >> a JoinWindows output only one message (A,B) or (A, null)
> >>
> >> regards
> >> - Miguel
> >>
> >
>

Re: Kafka Streams - left join behavior

Posted by "Matthias J. Sax" <mj...@apache.org>.
It's fixed in upcoming 3.1 release.

Cf https://issues.apache.org/jira/browse/KAFKA-10847


A stream-(global)table join has different semantics, so I am not sure if 
it would help.

One workaround would be to apply a stateful` faltTransformValues()` 
after the join to "buffer" all NULL-results and only emit them after you 
know no consecutive inner-join result will happen. It's tricky to build 
though.

I would recommend to wait and upgrade to 3.1 after it was releases.


-Matthias

On 11/30/21 12:59 AM, Luke Chen wrote:
> Hi Miguel,
>> Is there a way to force the behavior I need, meaning... using left join
> and
> a JoinWindows output only one message (A,B) or (A, null)
> 
> I think you can try to achieve it by using *KStream-GlobalKTable left join*,
> where the GlobalKTable should read all records at the right topic, and then
> doing the left join operation. This should then output either (A,B), or (A,
> null).
> 
> Thank you.
> Luke
> 
> On Tue, Nov 30, 2021 at 1:23 AM Miguel González <mi...@klar.mx>
> wrote:
> 
>> Hello
>>
>> I have been developing a Kafka Streams app that takes as input two topics
>> as KStreams, processes them in some way and joins them and sends the
>> combined message to an output topic.
>>
>> Here's some code,
>>
>> final StreamJoined<String, TransactionEvent, BalanceEvent> joinParams =
>>      StreamJoined.with(
>>          STRING_SERDE,
>>          StreamSerdeConstants.TRANSACTION_EVENT_SERDE,
>>          StreamSerdeConstants.BALANCE_EVENT_SERDE);
>>
>> JoinWindows joinWindows = JoinWindows
>>      .of(Duration.ofSeconds(streamsProperties.getJoinWindowDuration()))
>>      .grace(Duration.ofSeconds(streamsProperties.getJoinGraceDuration()));
>>
>> ValueJoiner<TransactionEvent, BalanceEvent, BalanceHistoryEvent>
>> valueJoiner =
>>      (transactionEvent, balanceEvent) -> buildMessage(balanceEvent,
>> transactionEvent);
>>
>>
>> transactions
>>      // TODO: change to leftJoin
>>      .join(beWithTransaction, valueJoiner, joinWindows, joinParams)
>>
>>
>> It's pretty simple, but for my use case I need to process in some way the
>> messages that are not joined, so I thought I could use a LEFT JOIN. But
>> according to my tests and this documentation
>> https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/
>>
>> I have seen in the end I could end up with both the combined message as the
>> regular inner join performs and the message with one side as NULL, for
>> example (A,B) and (A, null)
>>
>> I thought the JOIN Window could force the output of the left join to just
>> output if it found a match to just (A,B) not both. Maybe I have a bug in my
>> Window configuration
>>
>> Is there a way to force the behavior I need, meaning... using left join and
>> a JoinWindows output only one message (A,B) or (A, null)
>>
>> regards
>> - Miguel
>>
> 

Re: Kafka Streams - left join behavior

Posted by Luke Chen <sh...@gmail.com>.
Hi Miguel,
> Is there a way to force the behavior I need, meaning... using left join
and
a JoinWindows output only one message (A,B) or (A, null)

I think you can try to achieve it by using *KStream-GlobalKTable left join*,
where the GlobalKTable should read all records at the right topic, and then
doing the left join operation. This should then output either (A,B), or (A,
null).

Thank you.
Luke

On Tue, Nov 30, 2021 at 1:23 AM Miguel González <mi...@klar.mx>
wrote:

> Hello
>
> I have been developing a Kafka Streams app that takes as input two topics
> as KStreams, processes them in some way and joins them and sends the
> combined message to an output topic.
>
> Here's some code,
>
> final StreamJoined<String, TransactionEvent, BalanceEvent> joinParams =
>     StreamJoined.with(
>         STRING_SERDE,
>         StreamSerdeConstants.TRANSACTION_EVENT_SERDE,
>         StreamSerdeConstants.BALANCE_EVENT_SERDE);
>
> JoinWindows joinWindows = JoinWindows
>     .of(Duration.ofSeconds(streamsProperties.getJoinWindowDuration()))
>     .grace(Duration.ofSeconds(streamsProperties.getJoinGraceDuration()));
>
> ValueJoiner<TransactionEvent, BalanceEvent, BalanceHistoryEvent>
> valueJoiner =
>     (transactionEvent, balanceEvent) -> buildMessage(balanceEvent,
> transactionEvent);
>
>
> transactions
>     // TODO: change to leftJoin
>     .join(beWithTransaction, valueJoiner, joinWindows, joinParams)
>
>
> It's pretty simple, but for my use case I need to process in some way the
> messages that are not joined, so I thought I could use a LEFT JOIN. But
> according to my tests and this documentation
> https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/
>
> I have seen in the end I could end up with both the combined message as the
> regular inner join performs and the message with one side as NULL, for
> example (A,B) and (A, null)
>
> I thought the JOIN Window could force the output of the left join to just
> output if it found a match to just (A,B) not both. Maybe I have a bug in my
> Window configuration
>
> Is there a way to force the behavior I need, meaning... using left join and
> a JoinWindows output only one message (A,B) or (A, null)
>
> regards
> - Miguel
>