You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Thaler, Michael" <mi...@forcepoint.com> on 2017/11/09 22:53:48 UTC

How do I gracefully handle stream joins where the other side never appears?

Hi all,

So let's say I have 2 topics coming that I want to join using KStream#join. I set them up like so:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> a = builder.stream(TOPIC_A);
KStream<String, String> b = builder.stream(TOPIC_B);

a.join(b, (msgA, msgB) -> msgA + msgB, JoinWindows.of(TimeUnit.HOURS.inMillis(1))
  .print();

So this works fine and joins the messages together. But what happens to messages that don't find a join partner in the other topic within the window? If I get a message in topic A and its partner doesn't occur in B, when and how does the message get consumed? Is there a way to write my application so that this is caught somehow and handled?

I'm aware that I could use a leftJoin instead, but that would call the merge function twice, once with (msgA, null) and the second time with (msgA, msgB). I'm trying to find a solution that only calls one or the other.

Is there a way to do this cleanly?

Thanks!
--Michael Thaler

Re: How do I gracefully handle stream joins where the other side never appears?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Yes. That is guaranteed.

It can also be that you get only one message, namely (msgA, msgB). If
msgB is processed first, it will be in the window buffer already when
msgA arrives, and thus, msgA will just join with msgB and there won't be
a (msgA, null) result.


-Matthias


On 11/14/17 1:43 PM, Thaler, Michael wrote:
> Thanks, Matthias!
> 
> One more question then. If I use a leftJoin, am I guaranteed to see the messages in the order of (msgA, null) and then (msgA, msgB), and not the other way around? I would assume so, from my understanding of how the join works, but I'm just confirming.
> 
> --Michael
> 
> 
> 
> From: Matthias J. Sax <ma...@confluent.io>
> Sent: Friday, November 10, 2017 2:52 PM
> To: users@kafka.apache.org
> Subject: EXTERNAL: Re: How do I gracefully handle stream joins where the other side never appears?
>   
> 
> Messages that don't find a join partner are dropped.
> 
> For each incoming message, we do the following:
>  1. insert it into it's window store
>  2. lookup other window store for matching record
>     a) if matching records are found, compute join and emit
> 
> Note, that we maintain all records in the window store until retention
> time passes. Thus, if there will be no matching join record, a record
> will eventually be dropped.
> 
> There is no API to be notified about this.
> 
> With regard to left/outer join: your observation is correct -- we need
> to implement it this way, as it's unclear for how long to delay the
> computation and to wait for a matching record -- note, that Streams is
> able to handle late data, thus, the only "save" way to avoid a "double"
> call would be to change the implementation to wait until retention time
> is over (default is 24h) -- this implies a way to high latency and also
> result in out-of-order results.
> 
> To tackle this issue, you could implement a "de-duplication" operator
> that consumes the join output stream. This stateful `Transfomer` could
> buffer all "early" (msg,null) and (null,msg) record for some time to see
> if there will be a "proper" join result later. Using punctuation you can
> emit (msg,null)/(null,msg) join result if you think(!) there will be not
> "proper" join result anymore.
> 
> Note, that there always might be a late join result, and thus, this
> approach has it's own issues (of course, you could drop late "proper"
> join result in case you did emit a (msg,null)/(null,msg) already.
> 
> Hope this helps.
> 
> 
> -Matthias
> 
> 
> 
> On 11/9/17 2:53 PM, Thaler, Michael wrote:
>> Hi all,
>>
>> So let's say I have 2 topics coming that I want to join using KStream#join. I set them up like so:
>>
>> KStreamBuilder builder = new KStreamBuilder();
>> KStream<String, String> a = builder.stream(TOPIC_A);
>> KStream<String, String> b = builder.stream(TOPIC_B);
>>
>> a.join(b, (msgA, msgB) -> msgA + msgB, JoinWindows.of(TimeUnit.HOURS.inMillis(1))
>>   .print();
>>
>> So this works fine and joins the messages together. But what happens to messages that don't find a join partner in the other topic within the window? If I get a message in topic A and its partner doesn't occur in B, when and how does the message get consumed?  Is there a way to write my application so that this is caught somehow and handled?
>>
>> I'm aware that I could use a leftJoin instead, but that would call the merge function twice, once with (msgA, null) and the second time with (msgA, msgB). I'm trying to find a solution that only calls one or the other.
>>
>> Is there a way to do this cleanly?
>>
>> Thanks!
>> --Michael Thaler
>>
> 
>     
> 


Re: Re: How do I gracefully handle stream joins where the other side never appears?

Posted by "Thaler, Michael" <mi...@forcepoint.com>.
Thanks, Matthias!

One more question then. If I use a leftJoin, am I guaranteed to see the messages in the order of (msgA, null) and then (msgA, msgB), and not the other way around? I would assume so, from my understanding of how the join works, but I'm just confirming.

--Michael



From: Matthias J. Sax <ma...@confluent.io>
Sent: Friday, November 10, 2017 2:52 PM
To: users@kafka.apache.org
Subject: EXTERNAL: Re: How do I gracefully handle stream joins where the other side never appears?
  

Messages that don't find a join partner are dropped.

For each incoming message, we do the following:
 1. insert it into it's window store
 2. lookup other window store for matching record
    a) if matching records are found, compute join and emit

Note, that we maintain all records in the window store until retention
time passes. Thus, if there will be no matching join record, a record
will eventually be dropped.

There is no API to be notified about this.

With regard to left/outer join: your observation is correct -- we need
to implement it this way, as it's unclear for how long to delay the
computation and to wait for a matching record -- note, that Streams is
able to handle late data, thus, the only "save" way to avoid a "double"
call would be to change the implementation to wait until retention time
is over (default is 24h) -- this implies a way to high latency and also
result in out-of-order results.

To tackle this issue, you could implement a "de-duplication" operator
that consumes the join output stream. This stateful `Transfomer` could
buffer all "early" (msg,null) and (null,msg) record for some time to see
if there will be a "proper" join result later. Using punctuation you can
emit (msg,null)/(null,msg) join result if you think(!) there will be not
"proper" join result anymore.

Note, that there always might be a late join result, and thus, this
approach has it's own issues (of course, you could drop late "proper"
join result in case you did emit a (msg,null)/(null,msg) already.

Hope this helps.


-Matthias



On 11/9/17 2:53 PM, Thaler, Michael wrote:
> Hi all,
> 
> So let's say I have 2 topics coming that I want to join using KStream#join. I set them up like so:
> 
> KStreamBuilder builder = new KStreamBuilder();
> KStream<String, String> a = builder.stream(TOPIC_A);
> KStream<String, String> b = builder.stream(TOPIC_B);
> 
> a.join(b, (msgA, msgB) -> msgA + msgB, JoinWindows.of(TimeUnit.HOURS.inMillis(1))
>   .print();
> 
> So this works fine and joins the messages together. But what happens to messages that don't find a join partner in the other topic within the window? If I get a message in topic A and its partner doesn't occur in B, when and how does the message get consumed?  Is there a way to write my application so that this is caught somehow and handled?
> 
> I'm aware that I could use a leftJoin instead, but that would call the merge function twice, once with (msgA, null) and the second time with (msgA, msgB). I'm trying to find a solution that only calls one or the other.
> 
> Is there a way to do this cleanly?
> 
> Thanks!
> --Michael Thaler
> 

    

Re: How do I gracefully handle stream joins where the other side never appears?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Messages that don't find a join partner are dropped.

For each incoming message, we do the following:
 1. insert it into it's window store
 2. lookup other window store for matching record
    a) if matching records are found, compute join and emit

Note, that we maintain all records in the window store until retention
time passes. Thus, if there will be no matching join record, a record
will eventually be dropped.

There is no API to be notified about this.

With regard to left/outer join: your observation is correct -- we need
to implement it this way, as it's unclear for how long to delay the
computation and to wait for a matching record -- note, that Streams is
able to handle late data, thus, the only "save" way to avoid a "double"
call would be to change the implementation to wait until retention time
is over (default is 24h) -- this implies a way to high latency and also
result in out-of-order results.

To tackle this issue, you could implement a "de-duplication" operator
that consumes the join output stream. This stateful `Transfomer` could
buffer all "early" (msg,null) and (null,msg) record for some time to see
if there will be a "proper" join result later. Using punctuation you can
emit (msg,null)/(null,msg) join result if you think(!) there will be not
"proper" join result anymore.

Note, that there always might be a late join result, and thus, this
approach has it's own issues (of course, you could drop late "proper"
join result in case you did emit a (msg,null)/(null,msg) already.

Hope this helps.


-Matthias



On 11/9/17 2:53 PM, Thaler, Michael wrote:
> Hi all,
> 
> So let's say I have 2 topics coming that I want to join using KStream#join. I set them up like so:
> 
> KStreamBuilder builder = new KStreamBuilder();
> KStream<String, String> a = builder.stream(TOPIC_A);
> KStream<String, String> b = builder.stream(TOPIC_B);
> 
> a.join(b, (msgA, msgB) -> msgA + msgB, JoinWindows.of(TimeUnit.HOURS.inMillis(1))
>   .print();
> 
> So this works fine and joins the messages together. But what happens to messages that don't find a join partner in the other topic within the window? If I get a message in topic A and its partner doesn't occur in B, when and how does the message get consumed? Is there a way to write my application so that this is caught somehow and handled?
> 
> I'm aware that I could use a leftJoin instead, but that would call the merge function twice, once with (msgA, null) and the second time with (msgA, msgB). I'm trying to find a solution that only calls one or the other.
> 
> Is there a way to do this cleanly?
> 
> Thanks!
> --Michael Thaler
>