You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sachin Mittal <sj...@gmail.com> on 2019/12/07 06:38:35 UTC

How are custom keys compared during joins

Hi,
I am facing some weird problems when joining two streams or a stream/table.
The joined stream does not contain all the joined records.

Also note that my keys are custom keys for which I have implemented equals
and hashcode method
Is there something else also I need to do to ensure key1 === key2

Let me illustrate it with a example:
//created a new stream with a new custom key
stream1Mapped = stream1.map((k,v) -> ...)
//checked the re-partition data and it has 2 records

// created a new stream with a new custom key and converted a stream to
table using standard way
table2Mapped = stream2.map((k,v) -> ...).groupByKey().reduce((av, nv) ->
nv)
//checked the re-partition and change-log data and it has 3 records

//now I join the stream with table on the new custom key
joinedStream = stream1Mapped.join(table2Mapped, (lv, rv) -> ..)

//printed the data for stream
joinedStream.peek((k, v) -> print(v))
//is called only once ??

This should to be called twice as keys for both the records in the stream
are there in table too.

Please let me know if I understood the case well enough and if there is
anything I can do to debug this problem better.

Thanks
Sachin

Re: How are custom keys compared during joins

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Sachin,

Thanks for the detailed description.

Your find is right that for stream-table join the table-side updates would
not trigger a join since stream records are not "materialized" or buffered
during the processing. The community has requested similar semantics to
improve as table-table joins and it is indeed on the discussion roadmap to
do this soon.

At the moment I think your can consider two options: if your table records
are just late, but their timestamps are still at least early than the
stream records that should be joined together, you can consider configure
max.idle.ms (details can be found here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization,
but note there's a minor bug regarding this which is just fixed in trunk so
maybe you'd want to cherry-pick:
https://github.com/apache/kafka/commit/6d8da96ba8c892aab53ec9e1d5a074c5043c2092).
If your table record's timestamp may even be later than the stream records
but you'd still want to process that record first than the corresponding
stream record in order to fulfill the join, then you'd need to consider
shifting the table record's timestamp in order to make sure that when
Streams tries to synchronize the joining streams / tables, those records
are picked first.


Guozhang




On Sat, Dec 7, 2019 at 10:05 PM Sachin Mittal <sj...@gmail.com> wrote:

> I figured out the issue.
> Basically my table2Mapped which is created from stream2 has some messages
> that arrive later than they arrive at  stream1 for same key.
> After checking stream to table semantics I found that the left side is
> joined to right side only for the record that exist for that key that time
> on the right side.
> It will not join if records arrive on right side at a later time since
> windowed joins are not applicable for stream to table.
>
> Anyway a question her can be does a stream to table window join makes sense
> like in this case?
>
> The reason I mapped the stream2 to  table2Mapped because stream2 usually
> has only one record per key, in some cases it may have multiple records
> with same value for same key.
> Hence converting to table made sense as I am only interested in the latest
> record for a key.
>
> But I guess if that records arrives later than the some other record in
> stream1
> for same key, it won't get joined.
>
> So now I have switched back to stream to stream window join.
>
> Let me know if there is any other way to handle such a case.
>
> Thanks
> Sachin
>
>
>
>
>
> On Sat, Dec 7, 2019 at 12:08 PM Sachin Mittal <sj...@gmail.com> wrote:
>
> > Hi,
> > I am facing some weird problems when joining two streams or a
> stream/table.
> > The joined stream does not contain all the joined records.
> >
> > Also note that my keys are custom keys for which I have implemented
> equals
> > and hashcode method
> > Is there something else also I need to do to ensure key1 === key2
> >
> > Let me illustrate it with a example:
> > //created a new stream with a new custom key
> > stream1Mapped = stream1.map((k,v) -> ...)
> > //checked the re-partition data and it has 2 records
> >
> > // created a new stream with a new custom key and converted a stream to
> > table using standard way
> > table2Mapped = stream2.map((k,v) -> ...).groupByKey().reduce((av, nv) ->
> > nv)
> > //checked the re-partition and change-log data and it has 3 records
> >
> > //now I join the stream with table on the new custom key
> > joinedStream = stream1Mapped.join(table2Mapped, (lv, rv) -> ..)
> >
> > //printed the data for stream
> > joinedStream.peek((k, v) -> print(v))
> > //is called only once ??
> >
> > This should to be called twice as keys for both the records in the stream
> > are there in table too.
> >
> > Please let me know if I understood the case well enough and if there is
> > anything I can do to debug this problem better.
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> >
>


-- 
-- Guozhang

Re: How are custom keys compared during joins

Posted by Sachin Mittal <sj...@gmail.com>.
I figured out the issue.
Basically my table2Mapped which is created from stream2 has some messages
that arrive later than they arrive at  stream1 for same key.
After checking stream to table semantics I found that the left side is
joined to right side only for the record that exist for that key that time
on the right side.
It will not join if records arrive on right side at a later time since
windowed joins are not applicable for stream to table.

Anyway a question her can be does a stream to table window join makes sense
like in this case?

The reason I mapped the stream2 to  table2Mapped because stream2 usually
has only one record per key, in some cases it may have multiple records
with same value for same key.
Hence converting to table made sense as I am only interested in the latest
record for a key.

But I guess if that records arrives later than the some other record in stream1
for same key, it won't get joined.

So now I have switched back to stream to stream window join.

Let me know if there is any other way to handle such a case.

Thanks
Sachin





On Sat, Dec 7, 2019 at 12:08 PM Sachin Mittal <sj...@gmail.com> wrote:

> Hi,
> I am facing some weird problems when joining two streams or a stream/table.
> The joined stream does not contain all the joined records.
>
> Also note that my keys are custom keys for which I have implemented equals
> and hashcode method
> Is there something else also I need to do to ensure key1 === key2
>
> Let me illustrate it with a example:
> //created a new stream with a new custom key
> stream1Mapped = stream1.map((k,v) -> ...)
> //checked the re-partition data and it has 2 records
>
> // created a new stream with a new custom key and converted a stream to
> table using standard way
> table2Mapped = stream2.map((k,v) -> ...).groupByKey().reduce((av, nv) ->
> nv)
> //checked the re-partition and change-log data and it has 3 records
>
> //now I join the stream with table on the new custom key
> joinedStream = stream1Mapped.join(table2Mapped, (lv, rv) -> ..)
>
> //printed the data for stream
> joinedStream.peek((k, v) -> print(v))
> //is called only once ??
>
> This should to be called twice as keys for both the records in the stream
> are there in table too.
>
> Please let me know if I understood the case well enough and if there is
> anything I can do to debug this problem better.
>
> Thanks
> Sachin
>
>
>
>
>