You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Murilo Tavares <mu...@gmail.com> on 2019/01/29 19:10:09 UTC

Issues with KTable to KTable leftJoin

Hi
I am trying to understand why a KTable to KTable left join is being called
twice when I receive a message on the right table.
Here is my Topology:

Serde<Author> authorSerde = ...
Serde<Set<Book>> bSetSerde = ...
Serde<Set<AutorPublisherAssociation>> apSetSerde = ...
KTable<String, Author> authorTable = builder.table(AUTHOR_TOPIC,
Consumed.with(Serdes.String(), authorSerde));
KTable<String, Set<Book>> booksByAuthorTable =
builder.table(BOOKS_BY_AUTHOR,
Consumed.with(Serdes.String(), bSetSerde));
KTable<String, Set<AutorPublisherAssociation>> apTable =
builder.table(PUBLISHER_ASSOCIATIONS_BY_AUTHOR,
Consumed.with(Serdes.String(), apSetSerde));
KTable<String, Author> enrichedAuthorTable = authorTable.leftJoin(apTable,
(a,apSet) -> {
if (apSet == null) {
a.setPublishers(new HashSet<>());
} else {
a.setPublishers(apSet.stream().map(ap ->
ap.getPublisher()).collect(Collectors.toSet()));
}
return a;
}).leftJoin(booksByAuthorTable, (a, b)-> {
a.setBooks(b);
return a;
});
enrichedAuthorTable.toStream().to(ENRICHED_AUTHORS,
Produced.with(Serdes.String(),authorSerde));

Note I have 3 topics, all of them keyed by Author:
- AUTHOR_TOPIC is keyed by the authorKey and has the Author message;
- BOOKS_BY_AUTHOR is keyed by authorKey and has a Set of Books;
- PUBLISHER_ASSOCIATIONS_BY_AUTHOR is keyed by authorKey and has a set of
AutorPublisherAssociation (this is a Pojo that links one author to one
publisher);

Also note that the IF is intended to avoid NPEs, and also to deal with
tombstones, where if I want to delete the list of publishers associated to
an author, an tombstone to the PUBLISHER_ASSOCIATIONS topics would override
the list of Publishers on the Author.

In my simple testcase, I am not sending any updates, just one message on
each topic, on this order: author, booksByAuthor, publisherByAuthor.
When author arrives, both ValueJoiners are called with author message and
null for the right table.
When a set of books arrive, both joins will be called ONCE, the first
joiner receives an author and null, the second joiner receives an author
and the set of books.
The problem comes next:
When the set of  AutorPublisherAssociation arrives, the first ValueJoiner
is called TWICE, one with author and apSet, and the second time it's called
with author and null.

I don't understand why in this scenario the ValueJoiner is called twice,
with a null instead of the message at last, overriding the correct value.

Thanks
Murilo

Re: Issues with KTable to KTable leftJoin

Posted by "Matthias J. Sax" <ma...@confluent.io>.
> So, per my understanding maybe if I materialize
>> the first join and then use join it with the other topic, this should
>> change?

Yes. For this case, we don't need to recompute the old value, because
the downstream operator can lookup the old join result in it's store.

Note that the `ValueJoiner#apply()` method is called twice, however
both, old and new value are sent in a single message to the downstream
operator though (thus, the order does not really matter). Furthermore,
to compute the old join result, we first need to recompute the old join
result, base on the old value before we can update the input KTable
store with the new value.

About modifying input values: yes, they are expected to not be modified
(Java does unfortunately not have a mechanism to enforce this.) If you
write your code differently, than it might work correctly if there is
only one join (but it's a coincidence and there is no actual guarantee
provided) -- it still considered a bug in the ValueJoiner UDF even if
you got lucky and it works for this case.

If you think the JavaDocs are not explicit enough, please open a PR and
propose an change. We happily improve them!


-Matthias



On 2/1/19 12:24 PM, Murilo Tavares wrote:
> Hi Matthias
> Thank you for your help. So, per my understanding maybe if I materialize
> the first join and then use join it with the other topic, this should
> change?
> What I don't understand is why sending the old value AFTER the new value?
> That still looks wrong to me. Specially because in a leftJoin we can't know
> if this is an old or new value like we do in aggregations (where we have an
> adder and a subtractor).
> 
> Anyway, I managed to overcome this with the help of this SO question:
> https://stackoverflow.com/questions/51565727/kafka-stream-chained-leftjoin-processing-previous-old-message-again-after-the,
> where there's a comment that says: " When joining, I need to initialize and
> return a new object rather than assign value to the old object".
> 
> In my scenario, when I receive an update on the right table, I am UPDATING
> the value from the left table, and returning it. Then the joinner would be
> called again with the old value from the right table, and the left table's
> value would be updated with the old, wrong value.
> This can be fixed by cloning the value that will be modified, so that the
> second call will not incorrectly modify it.
> For reference, the KafkaStreams code that calls my joiner is this:
> 
> 
>             newValue = joiner.apply(change.newValue, value2);
> 
> 
>             *if* (sendOldValues) {
> 
>                 oldValue = joiner.apply(change.oldValue, value2);
> 
>             }
> 
> 
>             context().forward(key, *new* Change<>(newValue, oldValue));
> 
> 
> https://github.com/axbaretto/kafka/blob/master/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java#L94
> 
> In my situation, value2 is modified inside "joiner" and returned (thus
> assigned to newValue). Then on the second call when we change value2 again,
> with the oldValue, we are also changing the newValue.
> 
> Although cloning value2 fixes my problem, I still think it's an issue.
> Mainly because this behaviour doesn't affect single (non-cascading) left
> joins, which sounds like an inconsistency to me. And also because the order
> of the operations seems wrong to me. Calling a joiner with the old value
> AFTER the new value, does not allow this function to know what's the last
> known value for that message.
> Still, even if you are not convinced this is an issue, I believe at least
> the "leftJoin"'s Javadoc should mention the side effects of modifying and
> returning the same instance received as argument.
> 
> Thanks
> Murilo
> 
> 
> 
> 
> On Fri, 1 Feb 2019 at 14:38, Matthias J. Sax <ma...@confluent.io> wrote:
> 
>> Sounds like expected behavior to me.
>>
>> Note, that by default, the result KTable for a KTable-KTable join is not
>> materialized. To be able to compute the correct result for this case, we
>> need to send the old and new join result downstream to allow the
>> downstream join to compute the correct result. It's storage/computation
>> trade-off.
>>
>> Does this answer your question?
>>
>>
>> -Matthias
>>
>> On 1/29/19 1:51 PM, Murilo Tavares wrote:
>>> Adding a bit more to this, it looks like an issue with cascading
>>> leftJoins...
>>> I can see that internally, the ValueJoiner is being called by a
>>> KTableKTableRightJoin$KTableKTableRightJoinProcessor, with sendOldValues
>> =
>>> true.
>>> Not quite sure, but it looks to me that when we create "tmp
>>> = A.leftJoin(B)", internally KafkaStreams also creates B.rightJoin(A),
>> and
>>> sets sendOldValues on A. Later, by calling tmp.leftJoin(C), it could be
>>> setting sendOldValues on B, or something like that...
>>> Is there any known issues on cascading leftJoins?
>>> Thanks
>>> Murilo
>>>
>>>
>>>
>>> On Tue, 29 Jan 2019 at 14:10, Murilo Tavares <mu...@gmail.com>
>> wrote:
>>>
>>>> Hi
>>>> I am trying to understand why a KTable to KTable left join is being
>> called
>>>> twice when I receive a message on the right table.
>>>> Here is my Topology:
>>>>
>>>> Serde<Author> authorSerde = ...
>>>> Serde<Set<Book>> bSetSerde = ...
>>>> Serde<Set<AutorPublisherAssociation>> apSetSerde = ...
>>>> KTable<String, Author> authorTable = builder.table(AUTHOR_TOPIC,
>>>> Consumed.with(Serdes.String(), authorSerde));
>>>> KTable<String, Set<Book>> booksByAuthorTable =
>>>> builder.table(BOOKS_BY_AUTHOR,
>>>> Consumed.with(Serdes.String(), bSetSerde));
>>>> KTable<String, Set<AutorPublisherAssociation>> apTable =
>>>> builder.table(PUBLISHER_ASSOCIATIONS_BY_AUTHOR,
>>>> Consumed.with(Serdes.String(), apSetSerde));
>>>> KTable<String, Author> enrichedAuthorTable =
>> authorTable.leftJoin(apTable,
>>>> (a,apSet) -> {
>>>> if (apSet == null) {
>>>> a.setPublishers(new HashSet<>());
>>>> } else {
>>>> a.setPublishers(apSet.stream().map(ap ->
>>>> ap.getPublisher()).collect(Collectors.toSet()));
>>>> }
>>>> return a;
>>>> }).leftJoin(booksByAuthorTable, (a, b)-> {
>>>> a.setBooks(b);
>>>> return a;
>>>> });
>>>> enrichedAuthorTable.toStream().to(ENRICHED_AUTHORS,
>>>> Produced.with(Serdes.String(),authorSerde));
>>>>
>>>> Note I have 3 topics, all of them keyed by Author:
>>>> - AUTHOR_TOPIC is keyed by the authorKey and has the Author message;
>>>> - BOOKS_BY_AUTHOR is keyed by authorKey and has a Set of Books;
>>>> - PUBLISHER_ASSOCIATIONS_BY_AUTHOR is keyed by authorKey and has a set
>> of
>>>> AutorPublisherAssociation (this is a Pojo that links one author to one
>>>> publisher);
>>>>
>>>> Also note that the IF is intended to avoid NPEs, and also to deal with
>>>> tombstones, where if I want to delete the list of publishers associated
>> to
>>>> an author, an tombstone to the PUBLISHER_ASSOCIATIONS topics would
>> override
>>>> the list of Publishers on the Author.
>>>>
>>>> In my simple testcase, I am not sending any updates, just one message on
>>>> each topic, on this order: author, booksByAuthor, publisherByAuthor.
>>>> When author arrives, both ValueJoiners are called with author message
>> and
>>>> null for the right table.
>>>> When a set of books arrive, both joins will be called ONCE, the first
>>>> joiner receives an author and null, the second joiner receives an author
>>>> and the set of books.
>>>> The problem comes next:
>>>> When the set of  AutorPublisherAssociation arrives, the first
>> ValueJoiner
>>>> is called TWICE, one with author and apSet, and the second time it's
>> called
>>>> with author and null.
>>>>
>>>> I don't understand why in this scenario the ValueJoiner is called twice,
>>>> with a null instead of the message at last, overriding the correct
>> value.
>>>>
>>>> Thanks
>>>> Murilo
>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: Issues with KTable to KTable leftJoin

Posted by Murilo Tavares <mu...@gmail.com>.
Hi Matthias
Thank you for your help. So, per my understanding maybe if I materialize
the first join and then use join it with the other topic, this should
change?
What I don't understand is why sending the old value AFTER the new value?
That still looks wrong to me. Specially because in a leftJoin we can't know
if this is an old or new value like we do in aggregations (where we have an
adder and a subtractor).

Anyway, I managed to overcome this with the help of this SO question:
https://stackoverflow.com/questions/51565727/kafka-stream-chained-leftjoin-processing-previous-old-message-again-after-the,
where there's a comment that says: " When joining, I need to initialize and
return a new object rather than assign value to the old object".

In my scenario, when I receive an update on the right table, I am UPDATING
the value from the left table, and returning it. Then the joinner would be
called again with the old value from the right table, and the left table's
value would be updated with the old, wrong value.
This can be fixed by cloning the value that will be modified, so that the
second call will not incorrectly modify it.
For reference, the KafkaStreams code that calls my joiner is this:


            newValue = joiner.apply(change.newValue, value2);


            *if* (sendOldValues) {

                oldValue = joiner.apply(change.oldValue, value2);

            }


            context().forward(key, *new* Change<>(newValue, oldValue));


https://github.com/axbaretto/kafka/blob/master/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java#L94

In my situation, value2 is modified inside "joiner" and returned (thus
assigned to newValue). Then on the second call when we change value2 again,
with the oldValue, we are also changing the newValue.

Although cloning value2 fixes my problem, I still think it's an issue.
Mainly because this behaviour doesn't affect single (non-cascading) left
joins, which sounds like an inconsistency to me. And also because the order
of the operations seems wrong to me. Calling a joiner with the old value
AFTER the new value, does not allow this function to know what's the last
known value for that message.
Still, even if you are not convinced this is an issue, I believe at least
the "leftJoin"'s Javadoc should mention the side effects of modifying and
returning the same instance received as argument.

Thanks
Murilo




On Fri, 1 Feb 2019 at 14:38, Matthias J. Sax <ma...@confluent.io> wrote:

> Sounds like expected behavior to me.
>
> Note, that by default, the result KTable for a KTable-KTable join is not
> materialized. To be able to compute the correct result for this case, we
> need to send the old and new join result downstream to allow the
> downstream join to compute the correct result. It's storage/computation
> trade-off.
>
> Does this answer your question?
>
>
> -Matthias
>
> On 1/29/19 1:51 PM, Murilo Tavares wrote:
> > Adding a bit more to this, it looks like an issue with cascading
> > leftJoins...
> > I can see that internally, the ValueJoiner is being called by a
> > KTableKTableRightJoin$KTableKTableRightJoinProcessor, with sendOldValues
> =
> > true.
> > Not quite sure, but it looks to me that when we create "tmp
> > = A.leftJoin(B)", internally KafkaStreams also creates B.rightJoin(A),
> and
> > sets sendOldValues on A. Later, by calling tmp.leftJoin(C), it could be
> > setting sendOldValues on B, or something like that...
> > Is there any known issues on cascading leftJoins?
> > Thanks
> > Murilo
> >
> >
> >
> > On Tue, 29 Jan 2019 at 14:10, Murilo Tavares <mu...@gmail.com>
> wrote:
> >
> >> Hi
> >> I am trying to understand why a KTable to KTable left join is being
> called
> >> twice when I receive a message on the right table.
> >> Here is my Topology:
> >>
> >> Serde<Author> authorSerde = ...
> >> Serde<Set<Book>> bSetSerde = ...
> >> Serde<Set<AutorPublisherAssociation>> apSetSerde = ...
> >> KTable<String, Author> authorTable = builder.table(AUTHOR_TOPIC,
> >> Consumed.with(Serdes.String(), authorSerde));
> >> KTable<String, Set<Book>> booksByAuthorTable =
> >> builder.table(BOOKS_BY_AUTHOR,
> >> Consumed.with(Serdes.String(), bSetSerde));
> >> KTable<String, Set<AutorPublisherAssociation>> apTable =
> >> builder.table(PUBLISHER_ASSOCIATIONS_BY_AUTHOR,
> >> Consumed.with(Serdes.String(), apSetSerde));
> >> KTable<String, Author> enrichedAuthorTable =
> authorTable.leftJoin(apTable,
> >> (a,apSet) -> {
> >> if (apSet == null) {
> >> a.setPublishers(new HashSet<>());
> >> } else {
> >> a.setPublishers(apSet.stream().map(ap ->
> >> ap.getPublisher()).collect(Collectors.toSet()));
> >> }
> >> return a;
> >> }).leftJoin(booksByAuthorTable, (a, b)-> {
> >> a.setBooks(b);
> >> return a;
> >> });
> >> enrichedAuthorTable.toStream().to(ENRICHED_AUTHORS,
> >> Produced.with(Serdes.String(),authorSerde));
> >>
> >> Note I have 3 topics, all of them keyed by Author:
> >> - AUTHOR_TOPIC is keyed by the authorKey and has the Author message;
> >> - BOOKS_BY_AUTHOR is keyed by authorKey and has a Set of Books;
> >> - PUBLISHER_ASSOCIATIONS_BY_AUTHOR is keyed by authorKey and has a set
> of
> >> AutorPublisherAssociation (this is a Pojo that links one author to one
> >> publisher);
> >>
> >> Also note that the IF is intended to avoid NPEs, and also to deal with
> >> tombstones, where if I want to delete the list of publishers associated
> to
> >> an author, an tombstone to the PUBLISHER_ASSOCIATIONS topics would
> override
> >> the list of Publishers on the Author.
> >>
> >> In my simple testcase, I am not sending any updates, just one message on
> >> each topic, on this order: author, booksByAuthor, publisherByAuthor.
> >> When author arrives, both ValueJoiners are called with author message
> and
> >> null for the right table.
> >> When a set of books arrive, both joins will be called ONCE, the first
> >> joiner receives an author and null, the second joiner receives an author
> >> and the set of books.
> >> The problem comes next:
> >> When the set of  AutorPublisherAssociation arrives, the first
> ValueJoiner
> >> is called TWICE, one with author and apSet, and the second time it's
> called
> >> with author and null.
> >>
> >> I don't understand why in this scenario the ValueJoiner is called twice,
> >> with a null instead of the message at last, overriding the correct
> value.
> >>
> >> Thanks
> >> Murilo
> >>
> >>
> >>
> >
>
>

Re: Issues with KTable to KTable leftJoin

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Sounds like expected behavior to me.

Note, that by default, the result KTable for a KTable-KTable join is not
materialized. To be able to compute the correct result for this case, we
need to send the old and new join result downstream to allow the
downstream join to compute the correct result. It's storage/computation
trade-off.

Does this answer your question?


-Matthias

On 1/29/19 1:51 PM, Murilo Tavares wrote:
> Adding a bit more to this, it looks like an issue with cascading
> leftJoins...
> I can see that internally, the ValueJoiner is being called by a
> KTableKTableRightJoin$KTableKTableRightJoinProcessor, with sendOldValues =
> true.
> Not quite sure, but it looks to me that when we create "tmp
> = A.leftJoin(B)", internally KafkaStreams also creates B.rightJoin(A), and
> sets sendOldValues on A. Later, by calling tmp.leftJoin(C), it could be
> setting sendOldValues on B, or something like that...
> Is there any known issues on cascading leftJoins?
> Thanks
> Murilo
> 
> 
> 
> On Tue, 29 Jan 2019 at 14:10, Murilo Tavares <mu...@gmail.com> wrote:
> 
>> Hi
>> I am trying to understand why a KTable to KTable left join is being called
>> twice when I receive a message on the right table.
>> Here is my Topology:
>>
>> Serde<Author> authorSerde = ...
>> Serde<Set<Book>> bSetSerde = ...
>> Serde<Set<AutorPublisherAssociation>> apSetSerde = ...
>> KTable<String, Author> authorTable = builder.table(AUTHOR_TOPIC,
>> Consumed.with(Serdes.String(), authorSerde));
>> KTable<String, Set<Book>> booksByAuthorTable =
>> builder.table(BOOKS_BY_AUTHOR,
>> Consumed.with(Serdes.String(), bSetSerde));
>> KTable<String, Set<AutorPublisherAssociation>> apTable =
>> builder.table(PUBLISHER_ASSOCIATIONS_BY_AUTHOR,
>> Consumed.with(Serdes.String(), apSetSerde));
>> KTable<String, Author> enrichedAuthorTable = authorTable.leftJoin(apTable,
>> (a,apSet) -> {
>> if (apSet == null) {
>> a.setPublishers(new HashSet<>());
>> } else {
>> a.setPublishers(apSet.stream().map(ap ->
>> ap.getPublisher()).collect(Collectors.toSet()));
>> }
>> return a;
>> }).leftJoin(booksByAuthorTable, (a, b)-> {
>> a.setBooks(b);
>> return a;
>> });
>> enrichedAuthorTable.toStream().to(ENRICHED_AUTHORS,
>> Produced.with(Serdes.String(),authorSerde));
>>
>> Note I have 3 topics, all of them keyed by Author:
>> - AUTHOR_TOPIC is keyed by the authorKey and has the Author message;
>> - BOOKS_BY_AUTHOR is keyed by authorKey and has a Set of Books;
>> - PUBLISHER_ASSOCIATIONS_BY_AUTHOR is keyed by authorKey and has a set of
>> AutorPublisherAssociation (this is a Pojo that links one author to one
>> publisher);
>>
>> Also note that the IF is intended to avoid NPEs, and also to deal with
>> tombstones, where if I want to delete the list of publishers associated to
>> an author, an tombstone to the PUBLISHER_ASSOCIATIONS topics would override
>> the list of Publishers on the Author.
>>
>> In my simple testcase, I am not sending any updates, just one message on
>> each topic, on this order: author, booksByAuthor, publisherByAuthor.
>> When author arrives, both ValueJoiners are called with author message and
>> null for the right table.
>> When a set of books arrive, both joins will be called ONCE, the first
>> joiner receives an author and null, the second joiner receives an author
>> and the set of books.
>> The problem comes next:
>> When the set of  AutorPublisherAssociation arrives, the first ValueJoiner
>> is called TWICE, one with author and apSet, and the second time it's called
>> with author and null.
>>
>> I don't understand why in this scenario the ValueJoiner is called twice,
>> with a null instead of the message at last, overriding the correct value.
>>
>> Thanks
>> Murilo
>>
>>
>>
> 


Re: Issues with KTable to KTable leftJoin

Posted by Murilo Tavares <mu...@gmail.com>.
Adding a bit more to this, it looks like an issue with cascading
leftJoins...
I can see that internally, the ValueJoiner is being called by a
KTableKTableRightJoin$KTableKTableRightJoinProcessor, with sendOldValues =
true.
Not quite sure, but it looks to me that when we create "tmp
= A.leftJoin(B)", internally KafkaStreams also creates B.rightJoin(A), and
sets sendOldValues on A. Later, by calling tmp.leftJoin(C), it could be
setting sendOldValues on B, or something like that...
Is there any known issues on cascading leftJoins?
Thanks
Murilo



On Tue, 29 Jan 2019 at 14:10, Murilo Tavares <mu...@gmail.com> wrote:

> Hi
> I am trying to understand why a KTable to KTable left join is being called
> twice when I receive a message on the right table.
> Here is my Topology:
>
> Serde<Author> authorSerde = ...
> Serde<Set<Book>> bSetSerde = ...
> Serde<Set<AutorPublisherAssociation>> apSetSerde = ...
> KTable<String, Author> authorTable = builder.table(AUTHOR_TOPIC,
> Consumed.with(Serdes.String(), authorSerde));
> KTable<String, Set<Book>> booksByAuthorTable =
> builder.table(BOOKS_BY_AUTHOR,
> Consumed.with(Serdes.String(), bSetSerde));
> KTable<String, Set<AutorPublisherAssociation>> apTable =
> builder.table(PUBLISHER_ASSOCIATIONS_BY_AUTHOR,
> Consumed.with(Serdes.String(), apSetSerde));
> KTable<String, Author> enrichedAuthorTable = authorTable.leftJoin(apTable,
> (a,apSet) -> {
> if (apSet == null) {
> a.setPublishers(new HashSet<>());
> } else {
> a.setPublishers(apSet.stream().map(ap ->
> ap.getPublisher()).collect(Collectors.toSet()));
> }
> return a;
> }).leftJoin(booksByAuthorTable, (a, b)-> {
> a.setBooks(b);
> return a;
> });
> enrichedAuthorTable.toStream().to(ENRICHED_AUTHORS,
> Produced.with(Serdes.String(),authorSerde));
>
> Note I have 3 topics, all of them keyed by Author:
> - AUTHOR_TOPIC is keyed by the authorKey and has the Author message;
> - BOOKS_BY_AUTHOR is keyed by authorKey and has a Set of Books;
> - PUBLISHER_ASSOCIATIONS_BY_AUTHOR is keyed by authorKey and has a set of
> AutorPublisherAssociation (this is a Pojo that links one author to one
> publisher);
>
> Also note that the IF is intended to avoid NPEs, and also to deal with
> tombstones, where if I want to delete the list of publishers associated to
> an author, an tombstone to the PUBLISHER_ASSOCIATIONS topics would override
> the list of Publishers on the Author.
>
> In my simple testcase, I am not sending any updates, just one message on
> each topic, on this order: author, booksByAuthor, publisherByAuthor.
> When author arrives, both ValueJoiners are called with author message and
> null for the right table.
> When a set of books arrive, both joins will be called ONCE, the first
> joiner receives an author and null, the second joiner receives an author
> and the set of books.
> The problem comes next:
> When the set of  AutorPublisherAssociation arrives, the first ValueJoiner
> is called TWICE, one with author and apSet, and the second time it's called
> with author and null.
>
> I don't understand why in this scenario the ValueJoiner is called twice,
> with a null instead of the message at last, overriding the correct value.
>
> Thanks
> Murilo
>
>
>