You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mathieu Fenniak <ma...@replicon.com> on 2016/08/17 14:59:31 UTC

KTable aggregations send intermediate results downstream?

Hello again, kafka-users,

When I aggregate a KTable, a future input that updates a KTable's
value for a specific key causes the aggregate's subtractor to be
invoked, and then its adder.  This part is great, completely
as-expected.

But what I didn't expect is that the intermediate result of the
subtractor would be sent downstream.  This value doesn't reflect the
reality of the inputs to the aggregator, so sending it downstream is
effectively sending "corrupt" data to the next processing node.  Is
this the expected behavior, or is this a bug?

Take for example, a table of blog articles and an aggregator that
counts the number of words in each category of the blog:

topic: articles
  K1, {"category": "kafka", "text": "word1, word2, word3"}
  K2, {"category": "kafka", "text": "word1, word2"}

articles.groupBy((k,v) -> v.category)
  .aggregate(() -> 0,
    (k,v,t) -> t + v.text.split(" ").length,
    (k,v,t) -> t - v.text.split(" ").length
  )

This aggregator will produce {key: "kafka", value: 3}, then {key:
"kafka", value: 5}.  If I update one of the blog articles and send a
new message to the articles topic:

  K1, {"category": "kafka", "text": "word1, word2, word3, word4"}

The aggregator will first produce {key: "kafka", value: 2} when the
subtractor is called, then will produce {key: "kafka", value: 6} when
the adder is called.  The subtractor's calculation does not actually
match the reality; K1 was never deleted, it was just updated.

Mathieu

Re: KTable aggregations send intermediate results downstream?

Posted by Guozhang Wang <wa...@gmail.com>.
The comment from Kasier Chen on the patch is correct:

1. Objects.equals() depends on the typed class object.equal() function,
which may not be safe: for example users can override the equals function
to only compare a subset of all the fields, whereas for Kafka Streams we
need these two objects to be exactly the same.

2. ChangedSer / Deser is used to serialize the Changed<> type that has only
one of the old / new value not null. You need to extend this encoder /
decoder in a backward compatible way.


Guozhang


On Wed, Aug 24, 2016 at 7:48 PM, Mathieu Fenniak <
mathieu.fenniak@replicon.com> wrote:

> Hi Guozhang,
>
> I've been working around this issue by dropping down to the Processor
> API, but, I was hoping you might be able to point out if there is a
> flaw is in this proposed change:
>
>     https://github.com/apache/kafka/compare/trunk...
> mfenniak:suppress-duplicate-repartition-output
>
> This adjusts KTableRepartitionMap so that if there's no change in the
> group-by key, the repartition processor just forwards the changed
> value onwards.  (This breaks a couple of tests that anticipate the
> exact existing output, so don't consider this a complete patch...)
>
> Mathieu
>
>
> On Fri, Aug 19, 2016 at 12:29 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> > Hi Mathieu,
> >
> > If you are only interested in the aggregate result "snapshot" but not its
> > change stream (note that KTable itself is not actually a "table" as in
> > RDBMS, but still a stream), you can try to use the queryable state
> feature
> > that is available in trunk, which will be available in 0.10.1.0 release.
> >
> > In sum, it allows you to query any states "snapshot" which is used in
> > aggregation operators in real time with state store provided APIs such as
> > get-by-key, range queries on windows, etc. Details can be found in thie
> KIP
> > (we are working on more docs / blog posts at the time):
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 67%3A+Queryable+state+for+Kafka+Streams
> >
> > Guozhang
> >
> >
> > On Thu, Aug 18, 2016 at 6:40 AM, Mathieu Fenniak <
> > mathieu.fenniak@replicon.com> wrote:
> >
> >> Hi Guozhang,
> >>
> >> Hm... I hadn't thought of the repartitioning involvement.
> >>
> >> I'm not confident I'm understanding completely, but I believe you're
> >> saying the decision to process data in this way is made before the
> >> data being processed is available, because the partition *may* change,
> >> because the groupBy key *may* change.
> >>
> >> I'm still feeling that I'm stuck getting corrupted output in the
> >> middle of an aggregation.
> >>
> >> It's especially problematic for me if the updates to the source KTable
> >> don't actually affect the results of the aggregation.  In the
> >> word-count example in my original e-mail, this might be similar to
> >> editing an unrelated field "author" in any article; doesn't actually
> >> affect the groupBy, doesn't affect the aggregation, but still results
> >> in the wrong output occurring temporarily.  (and inefficient
> >> processing)
> >>
> >> Are there any tools in Kafka Streams that might help me prevent
> >> downstream calculations if the relevant inputs haven't changed?  I was
> >> thinking I'd be able to use mapValues to pluck only relevant fields
> >> out of a KTable, materialize a new KTable (.through) from that, and
> >> then there'd be some state from which KS would be able to only invoke
> >> downstream nodes if data has changed... but it doesn't seem to work
> >> like that.
> >>
> >> Thanks so much for your responses Guozhang, I really appreciate your
> >> time to help me out.
> >>
> >> Mathieu
> >>
> >>
> >> On Wed, Aug 17, 2016 at 5:51 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >> > The problem is that Kafka Streams need to repartition the streams
> based
> >> on
> >> > the groupBy keys when doing aggregations. For your case, the original
> >> > stream may be read from a topic that is partitioned on "K", and you
> need
> >> to
> >> > first repartition on "category" on an intermediate topic before the
> >> > aggregation can be executed.
> >> >
> >> > Hence the old and new value may be sent to two different partitions of
> >> the
> >> > intermediate topic, and hence be processed by two different process
> (it
> >> > won't be the case in your application, since you mentioned the
> "category"
> >> > will never change). Since the library cannot tell if the groupBy key
> will
> >> > never change, it has to be conservative and do this subtract / add
> >> process
> >> > while receiving the old / new value.
> >> >
> >> >
> >> > Guozhang
> >> >
> >> >
> >> > On Wed, Aug 17, 2016 at 1:45 PM, Mathieu Fenniak <
> >> > mathieu.fenniak@replicon.com> wrote:
> >> >
> >> >> Hi Guozhang,
> >> >>
> >> >> Thanks for responding.  Ah, I see what you're saying... in the case
> of
> >> >> an update to the KTable, the aggregator's subtractor result would be
> >> >> necessary if the group-by key changes in the update.
> >> >>
> >> >> It makes sense, but unfortunately the behavior leaves me feeling a
> >> >> little sketchy... when the group-by key doesn't change (which is
> >> >> guaranteed in my case), I'm outputting results that don't correspond
> >> >> at all to the inputs, temporarily.  It's immediately followed by a
> >> >> corrected result.
> >> >>
> >> >> Would it be a feasible optimization to not send the subtractor's
> >> >> result out of the aggregate, only in the case where the groupBy key
> >> >> does not change between the old record and the new record?
> >> >>
> >> >> Mathieu
> >> >>
> >> >> On Wed, Aug 17, 2016 at 2:12 PM, Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >> >> > Hello Mathieu,
> >> >> >
> >> >> > Note that semantics of KTable aggregations (i.e.
> >> >> "KTable.groupBy.aggregate"
> >> >> > as in 0.10.0) and KStream aggregations (i.e.
> "KStream.aggregateByKey"
> >> as
> >> >> in
> >> >> > 0.10.0) are different, in the sense that when the table is updated
> >> (i.e.
> >> >> a
> >> >> > new record with the same key "K1" is received), the old record's
> >> effect
> >> >> on
> >> >> > the aggregation need to first be subtracted before the new record's
> >> >> effect
> >> >> > on the aggregation can be added; whereas in the latter case there
> is
> >> no
> >> >> > "old values" that are not overridden, hence only "adder"
> aggregator is
> >> >> > needed.
> >> >> >
> >> >> > So suppose your updated record on K1 is on a different "category",
> >> say:
> >> >> >
> >> >> > K1, {"category": "kafka2", "text": "word1, word2, word3, word4"}
> >> >> >
> >> >> >
> >> >> > Then the aggregated result should be:
> >> >> >
> >> >> > {key: "kafka", value: 2}
> >> >> > {key: "kafka2", value: 4}
> >> >> >
> >> >> >
> >> >> > Does this make sense now?
> >> >> >
> >> >> > Guozhang
> >> >> >
> >> >> >
> >> >> > On Wed, Aug 17, 2016 at 7:59 AM, Mathieu Fenniak <
> >> >> > mathieu.fenniak@replicon.com> wrote:
> >> >> >
> >> >> >> Hello again, kafka-users,
> >> >> >>
> >> >> >> When I aggregate a KTable, a future input that updates a KTable's
> >> >> >> value for a specific key causes the aggregate's subtractor to be
> >> >> >> invoked, and then its adder.  This part is great, completely
> >> >> >> as-expected.
> >> >> >>
> >> >> >> But what I didn't expect is that the intermediate result of the
> >> >> >> subtractor would be sent downstream.  This value doesn't reflect
> the
> >> >> >> reality of the inputs to the aggregator, so sending it downstream
> is
> >> >> >> effectively sending "corrupt" data to the next processing node.
> Is
> >> >> >> this the expected behavior, or is this a bug?
> >> >> >>
> >> >> >> Take for example, a table of blog articles and an aggregator that
> >> >> >> counts the number of words in each category of the blog:
> >> >> >>
> >> >> >> topic: articles
> >> >> >>   K1, {"category": "kafka", "text": "word1, word2, word3"}
> >> >> >>   K2, {"category": "kafka", "text": "word1, word2"}
> >> >> >>
> >> >> >> articles.groupBy((k,v) -> v.category)
> >> >> >>   .aggregate(() -> 0,
> >> >> >>     (k,v,t) -> t + v.text.split(" ").length,
> >> >> >>     (k,v,t) -> t - v.text.split(" ").length
> >> >> >>   )
> >> >> >>
> >> >> >> This aggregator will produce {key: "kafka", value: 3}, then {key:
> >> >> >> "kafka", value: 5}.  If I update one of the blog articles and
> send a
> >> >> >> new message to the articles topic:
> >> >> >>
> >> >> >>   K1, {"category": "kafka", "text": "word1, word2, word3, word4"}
> >> >> >>
> >> >> >> The aggregator will first produce {key: "kafka", value: 2} when
> the
> >> >> >> subtractor is called, then will produce {key: "kafka", value: 6}
> when
> >> >> >> the adder is called.  The subtractor's calculation does not
> actually
> >> >> >> match the reality; K1 was never deleted, it was just updated.
> >> >> >>
> >> >> >> Mathieu
> >> >> >>
> >> >> >
> >> >> >
> >> >> >
> >> >> > --
> >> >> > -- Guozhang
> >> >>
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >>
> >
> >
> >
> > --
> > -- Guozhang
>



-- 
-- Guozhang

Re: KTable aggregations send intermediate results downstream?

Posted by Mathieu Fenniak <ma...@replicon.com>.
Hi Guozhang,

I've been working around this issue by dropping down to the Processor
API, but, I was hoping you might be able to point out if there is a
flaw is in this proposed change:

    https://github.com/apache/kafka/compare/trunk...mfenniak:suppress-duplicate-repartition-output

This adjusts KTableRepartitionMap so that if there's no change in the
group-by key, the repartition processor just forwards the changed
value onwards.  (This breaks a couple of tests that anticipate the
exact existing output, so don't consider this a complete patch...)

Mathieu


On Fri, Aug 19, 2016 at 12:29 PM, Guozhang Wang <wa...@gmail.com> wrote:
> Hi Mathieu,
>
> If you are only interested in the aggregate result "snapshot" but not its
> change stream (note that KTable itself is not actually a "table" as in
> RDBMS, but still a stream), you can try to use the queryable state feature
> that is available in trunk, which will be available in 0.10.1.0 release.
>
> In sum, it allows you to query any states "snapshot" which is used in
> aggregation operators in real time with state store provided APIs such as
> get-by-key, range queries on windows, etc. Details can be found in thie KIP
> (we are working on more docs / blog posts at the time):
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
>
> Guozhang
>
>
> On Thu, Aug 18, 2016 at 6:40 AM, Mathieu Fenniak <
> mathieu.fenniak@replicon.com> wrote:
>
>> Hi Guozhang,
>>
>> Hm... I hadn't thought of the repartitioning involvement.
>>
>> I'm not confident I'm understanding completely, but I believe you're
>> saying the decision to process data in this way is made before the
>> data being processed is available, because the partition *may* change,
>> because the groupBy key *may* change.
>>
>> I'm still feeling that I'm stuck getting corrupted output in the
>> middle of an aggregation.
>>
>> It's especially problematic for me if the updates to the source KTable
>> don't actually affect the results of the aggregation.  In the
>> word-count example in my original e-mail, this might be similar to
>> editing an unrelated field "author" in any article; doesn't actually
>> affect the groupBy, doesn't affect the aggregation, but still results
>> in the wrong output occurring temporarily.  (and inefficient
>> processing)
>>
>> Are there any tools in Kafka Streams that might help me prevent
>> downstream calculations if the relevant inputs haven't changed?  I was
>> thinking I'd be able to use mapValues to pluck only relevant fields
>> out of a KTable, materialize a new KTable (.through) from that, and
>> then there'd be some state from which KS would be able to only invoke
>> downstream nodes if data has changed... but it doesn't seem to work
>> like that.
>>
>> Thanks so much for your responses Guozhang, I really appreciate your
>> time to help me out.
>>
>> Mathieu
>>
>>
>> On Wed, Aug 17, 2016 at 5:51 PM, Guozhang Wang <wa...@gmail.com> wrote:
>> > The problem is that Kafka Streams need to repartition the streams based
>> on
>> > the groupBy keys when doing aggregations. For your case, the original
>> > stream may be read from a topic that is partitioned on "K", and you need
>> to
>> > first repartition on "category" on an intermediate topic before the
>> > aggregation can be executed.
>> >
>> > Hence the old and new value may be sent to two different partitions of
>> the
>> > intermediate topic, and hence be processed by two different process (it
>> > won't be the case in your application, since you mentioned the "category"
>> > will never change). Since the library cannot tell if the groupBy key will
>> > never change, it has to be conservative and do this subtract / add
>> process
>> > while receiving the old / new value.
>> >
>> >
>> > Guozhang
>> >
>> >
>> > On Wed, Aug 17, 2016 at 1:45 PM, Mathieu Fenniak <
>> > mathieu.fenniak@replicon.com> wrote:
>> >
>> >> Hi Guozhang,
>> >>
>> >> Thanks for responding.  Ah, I see what you're saying... in the case of
>> >> an update to the KTable, the aggregator's subtractor result would be
>> >> necessary if the group-by key changes in the update.
>> >>
>> >> It makes sense, but unfortunately the behavior leaves me feeling a
>> >> little sketchy... when the group-by key doesn't change (which is
>> >> guaranteed in my case), I'm outputting results that don't correspond
>> >> at all to the inputs, temporarily.  It's immediately followed by a
>> >> corrected result.
>> >>
>> >> Would it be a feasible optimization to not send the subtractor's
>> >> result out of the aggregate, only in the case where the groupBy key
>> >> does not change between the old record and the new record?
>> >>
>> >> Mathieu
>> >>
>> >> On Wed, Aug 17, 2016 at 2:12 PM, Guozhang Wang <wa...@gmail.com>
>> wrote:
>> >> > Hello Mathieu,
>> >> >
>> >> > Note that semantics of KTable aggregations (i.e.
>> >> "KTable.groupBy.aggregate"
>> >> > as in 0.10.0) and KStream aggregations (i.e. "KStream.aggregateByKey"
>> as
>> >> in
>> >> > 0.10.0) are different, in the sense that when the table is updated
>> (i.e.
>> >> a
>> >> > new record with the same key "K1" is received), the old record's
>> effect
>> >> on
>> >> > the aggregation need to first be subtracted before the new record's
>> >> effect
>> >> > on the aggregation can be added; whereas in the latter case there is
>> no
>> >> > "old values" that are not overridden, hence only "adder" aggregator is
>> >> > needed.
>> >> >
>> >> > So suppose your updated record on K1 is on a different "category",
>> say:
>> >> >
>> >> > K1, {"category": "kafka2", "text": "word1, word2, word3, word4"}
>> >> >
>> >> >
>> >> > Then the aggregated result should be:
>> >> >
>> >> > {key: "kafka", value: 2}
>> >> > {key: "kafka2", value: 4}
>> >> >
>> >> >
>> >> > Does this make sense now?
>> >> >
>> >> > Guozhang
>> >> >
>> >> >
>> >> > On Wed, Aug 17, 2016 at 7:59 AM, Mathieu Fenniak <
>> >> > mathieu.fenniak@replicon.com> wrote:
>> >> >
>> >> >> Hello again, kafka-users,
>> >> >>
>> >> >> When I aggregate a KTable, a future input that updates a KTable's
>> >> >> value for a specific key causes the aggregate's subtractor to be
>> >> >> invoked, and then its adder.  This part is great, completely
>> >> >> as-expected.
>> >> >>
>> >> >> But what I didn't expect is that the intermediate result of the
>> >> >> subtractor would be sent downstream.  This value doesn't reflect the
>> >> >> reality of the inputs to the aggregator, so sending it downstream is
>> >> >> effectively sending "corrupt" data to the next processing node.  Is
>> >> >> this the expected behavior, or is this a bug?
>> >> >>
>> >> >> Take for example, a table of blog articles and an aggregator that
>> >> >> counts the number of words in each category of the blog:
>> >> >>
>> >> >> topic: articles
>> >> >>   K1, {"category": "kafka", "text": "word1, word2, word3"}
>> >> >>   K2, {"category": "kafka", "text": "word1, word2"}
>> >> >>
>> >> >> articles.groupBy((k,v) -> v.category)
>> >> >>   .aggregate(() -> 0,
>> >> >>     (k,v,t) -> t + v.text.split(" ").length,
>> >> >>     (k,v,t) -> t - v.text.split(" ").length
>> >> >>   )
>> >> >>
>> >> >> This aggregator will produce {key: "kafka", value: 3}, then {key:
>> >> >> "kafka", value: 5}.  If I update one of the blog articles and send a
>> >> >> new message to the articles topic:
>> >> >>
>> >> >>   K1, {"category": "kafka", "text": "word1, word2, word3, word4"}
>> >> >>
>> >> >> The aggregator will first produce {key: "kafka", value: 2} when the
>> >> >> subtractor is called, then will produce {key: "kafka", value: 6} when
>> >> >> the adder is called.  The subtractor's calculation does not actually
>> >> >> match the reality; K1 was never deleted, it was just updated.
>> >> >>
>> >> >> Mathieu
>> >> >>
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > -- Guozhang
>> >>
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>>
>
>
>
> --
> -- Guozhang

Re: KTable aggregations send intermediate results downstream?

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Mathieu,

If you are only interested in the aggregate result "snapshot" but not its
change stream (note that KTable itself is not actually a "table" as in
RDBMS, but still a stream), you can try to use the queryable state feature
that is available in trunk, which will be available in 0.10.1.0 release.

In sum, it allows you to query any states "snapshot" which is used in
aggregation operators in real time with state store provided APIs such as
get-by-key, range queries on windows, etc. Details can be found in thie KIP
(we are working on more docs / blog posts at the time):

https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams

Guozhang


On Thu, Aug 18, 2016 at 6:40 AM, Mathieu Fenniak <
mathieu.fenniak@replicon.com> wrote:

> Hi Guozhang,
>
> Hm... I hadn't thought of the repartitioning involvement.
>
> I'm not confident I'm understanding completely, but I believe you're
> saying the decision to process data in this way is made before the
> data being processed is available, because the partition *may* change,
> because the groupBy key *may* change.
>
> I'm still feeling that I'm stuck getting corrupted output in the
> middle of an aggregation.
>
> It's especially problematic for me if the updates to the source KTable
> don't actually affect the results of the aggregation.  In the
> word-count example in my original e-mail, this might be similar to
> editing an unrelated field "author" in any article; doesn't actually
> affect the groupBy, doesn't affect the aggregation, but still results
> in the wrong output occurring temporarily.  (and inefficient
> processing)
>
> Are there any tools in Kafka Streams that might help me prevent
> downstream calculations if the relevant inputs haven't changed?  I was
> thinking I'd be able to use mapValues to pluck only relevant fields
> out of a KTable, materialize a new KTable (.through) from that, and
> then there'd be some state from which KS would be able to only invoke
> downstream nodes if data has changed... but it doesn't seem to work
> like that.
>
> Thanks so much for your responses Guozhang, I really appreciate your
> time to help me out.
>
> Mathieu
>
>
> On Wed, Aug 17, 2016 at 5:51 PM, Guozhang Wang <wa...@gmail.com> wrote:
> > The problem is that Kafka Streams need to repartition the streams based
> on
> > the groupBy keys when doing aggregations. For your case, the original
> > stream may be read from a topic that is partitioned on "K", and you need
> to
> > first repartition on "category" on an intermediate topic before the
> > aggregation can be executed.
> >
> > Hence the old and new value may be sent to two different partitions of
> the
> > intermediate topic, and hence be processed by two different process (it
> > won't be the case in your application, since you mentioned the "category"
> > will never change). Since the library cannot tell if the groupBy key will
> > never change, it has to be conservative and do this subtract / add
> process
> > while receiving the old / new value.
> >
> >
> > Guozhang
> >
> >
> > On Wed, Aug 17, 2016 at 1:45 PM, Mathieu Fenniak <
> > mathieu.fenniak@replicon.com> wrote:
> >
> >> Hi Guozhang,
> >>
> >> Thanks for responding.  Ah, I see what you're saying... in the case of
> >> an update to the KTable, the aggregator's subtractor result would be
> >> necessary if the group-by key changes in the update.
> >>
> >> It makes sense, but unfortunately the behavior leaves me feeling a
> >> little sketchy... when the group-by key doesn't change (which is
> >> guaranteed in my case), I'm outputting results that don't correspond
> >> at all to the inputs, temporarily.  It's immediately followed by a
> >> corrected result.
> >>
> >> Would it be a feasible optimization to not send the subtractor's
> >> result out of the aggregate, only in the case where the groupBy key
> >> does not change between the old record and the new record?
> >>
> >> Mathieu
> >>
> >> On Wed, Aug 17, 2016 at 2:12 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >> > Hello Mathieu,
> >> >
> >> > Note that semantics of KTable aggregations (i.e.
> >> "KTable.groupBy.aggregate"
> >> > as in 0.10.0) and KStream aggregations (i.e. "KStream.aggregateByKey"
> as
> >> in
> >> > 0.10.0) are different, in the sense that when the table is updated
> (i.e.
> >> a
> >> > new record with the same key "K1" is received), the old record's
> effect
> >> on
> >> > the aggregation need to first be subtracted before the new record's
> >> effect
> >> > on the aggregation can be added; whereas in the latter case there is
> no
> >> > "old values" that are not overridden, hence only "adder" aggregator is
> >> > needed.
> >> >
> >> > So suppose your updated record on K1 is on a different "category",
> say:
> >> >
> >> > K1, {"category": "kafka2", "text": "word1, word2, word3, word4"}
> >> >
> >> >
> >> > Then the aggregated result should be:
> >> >
> >> > {key: "kafka", value: 2}
> >> > {key: "kafka2", value: 4}
> >> >
> >> >
> >> > Does this make sense now?
> >> >
> >> > Guozhang
> >> >
> >> >
> >> > On Wed, Aug 17, 2016 at 7:59 AM, Mathieu Fenniak <
> >> > mathieu.fenniak@replicon.com> wrote:
> >> >
> >> >> Hello again, kafka-users,
> >> >>
> >> >> When I aggregate a KTable, a future input that updates a KTable's
> >> >> value for a specific key causes the aggregate's subtractor to be
> >> >> invoked, and then its adder.  This part is great, completely
> >> >> as-expected.
> >> >>
> >> >> But what I didn't expect is that the intermediate result of the
> >> >> subtractor would be sent downstream.  This value doesn't reflect the
> >> >> reality of the inputs to the aggregator, so sending it downstream is
> >> >> effectively sending "corrupt" data to the next processing node.  Is
> >> >> this the expected behavior, or is this a bug?
> >> >>
> >> >> Take for example, a table of blog articles and an aggregator that
> >> >> counts the number of words in each category of the blog:
> >> >>
> >> >> topic: articles
> >> >>   K1, {"category": "kafka", "text": "word1, word2, word3"}
> >> >>   K2, {"category": "kafka", "text": "word1, word2"}
> >> >>
> >> >> articles.groupBy((k,v) -> v.category)
> >> >>   .aggregate(() -> 0,
> >> >>     (k,v,t) -> t + v.text.split(" ").length,
> >> >>     (k,v,t) -> t - v.text.split(" ").length
> >> >>   )
> >> >>
> >> >> This aggregator will produce {key: "kafka", value: 3}, then {key:
> >> >> "kafka", value: 5}.  If I update one of the blog articles and send a
> >> >> new message to the articles topic:
> >> >>
> >> >>   K1, {"category": "kafka", "text": "word1, word2, word3, word4"}
> >> >>
> >> >> The aggregator will first produce {key: "kafka", value: 2} when the
> >> >> subtractor is called, then will produce {key: "kafka", value: 6} when
> >> >> the adder is called.  The subtractor's calculation does not actually
> >> >> match the reality; K1 was never deleted, it was just updated.
> >> >>
> >> >> Mathieu
> >> >>
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >>
> >
> >
> >
> > --
> > -- Guozhang
>



-- 
-- Guozhang

Re: KTable aggregations send intermediate results downstream?

Posted by Mathieu Fenniak <ma...@replicon.com>.
Hi Guozhang,

Hm... I hadn't thought of the repartitioning involvement.

I'm not confident I'm understanding completely, but I believe you're
saying the decision to process data in this way is made before the
data being processed is available, because the partition *may* change,
because the groupBy key *may* change.

I'm still feeling that I'm stuck getting corrupted output in the
middle of an aggregation.

It's especially problematic for me if the updates to the source KTable
don't actually affect the results of the aggregation.  In the
word-count example in my original e-mail, this might be similar to
editing an unrelated field "author" in any article; doesn't actually
affect the groupBy, doesn't affect the aggregation, but still results
in the wrong output occurring temporarily.  (and inefficient
processing)

Are there any tools in Kafka Streams that might help me prevent
downstream calculations if the relevant inputs haven't changed?  I was
thinking I'd be able to use mapValues to pluck only relevant fields
out of a KTable, materialize a new KTable (.through) from that, and
then there'd be some state from which KS would be able to only invoke
downstream nodes if data has changed... but it doesn't seem to work
like that.

Thanks so much for your responses Guozhang, I really appreciate your
time to help me out.

Mathieu


On Wed, Aug 17, 2016 at 5:51 PM, Guozhang Wang <wa...@gmail.com> wrote:
> The problem is that Kafka Streams need to repartition the streams based on
> the groupBy keys when doing aggregations. For your case, the original
> stream may be read from a topic that is partitioned on "K", and you need to
> first repartition on "category" on an intermediate topic before the
> aggregation can be executed.
>
> Hence the old and new value may be sent to two different partitions of the
> intermediate topic, and hence be processed by two different process (it
> won't be the case in your application, since you mentioned the "category"
> will never change). Since the library cannot tell if the groupBy key will
> never change, it has to be conservative and do this subtract / add process
> while receiving the old / new value.
>
>
> Guozhang
>
>
> On Wed, Aug 17, 2016 at 1:45 PM, Mathieu Fenniak <
> mathieu.fenniak@replicon.com> wrote:
>
>> Hi Guozhang,
>>
>> Thanks for responding.  Ah, I see what you're saying... in the case of
>> an update to the KTable, the aggregator's subtractor result would be
>> necessary if the group-by key changes in the update.
>>
>> It makes sense, but unfortunately the behavior leaves me feeling a
>> little sketchy... when the group-by key doesn't change (which is
>> guaranteed in my case), I'm outputting results that don't correspond
>> at all to the inputs, temporarily.  It's immediately followed by a
>> corrected result.
>>
>> Would it be a feasible optimization to not send the subtractor's
>> result out of the aggregate, only in the case where the groupBy key
>> does not change between the old record and the new record?
>>
>> Mathieu
>>
>> On Wed, Aug 17, 2016 at 2:12 PM, Guozhang Wang <wa...@gmail.com> wrote:
>> > Hello Mathieu,
>> >
>> > Note that semantics of KTable aggregations (i.e.
>> "KTable.groupBy.aggregate"
>> > as in 0.10.0) and KStream aggregations (i.e. "KStream.aggregateByKey" as
>> in
>> > 0.10.0) are different, in the sense that when the table is updated (i.e.
>> a
>> > new record with the same key "K1" is received), the old record's effect
>> on
>> > the aggregation need to first be subtracted before the new record's
>> effect
>> > on the aggregation can be added; whereas in the latter case there is no
>> > "old values" that are not overridden, hence only "adder" aggregator is
>> > needed.
>> >
>> > So suppose your updated record on K1 is on a different "category", say:
>> >
>> > K1, {"category": "kafka2", "text": "word1, word2, word3, word4"}
>> >
>> >
>> > Then the aggregated result should be:
>> >
>> > {key: "kafka", value: 2}
>> > {key: "kafka2", value: 4}
>> >
>> >
>> > Does this make sense now?
>> >
>> > Guozhang
>> >
>> >
>> > On Wed, Aug 17, 2016 at 7:59 AM, Mathieu Fenniak <
>> > mathieu.fenniak@replicon.com> wrote:
>> >
>> >> Hello again, kafka-users,
>> >>
>> >> When I aggregate a KTable, a future input that updates a KTable's
>> >> value for a specific key causes the aggregate's subtractor to be
>> >> invoked, and then its adder.  This part is great, completely
>> >> as-expected.
>> >>
>> >> But what I didn't expect is that the intermediate result of the
>> >> subtractor would be sent downstream.  This value doesn't reflect the
>> >> reality of the inputs to the aggregator, so sending it downstream is
>> >> effectively sending "corrupt" data to the next processing node.  Is
>> >> this the expected behavior, or is this a bug?
>> >>
>> >> Take for example, a table of blog articles and an aggregator that
>> >> counts the number of words in each category of the blog:
>> >>
>> >> topic: articles
>> >>   K1, {"category": "kafka", "text": "word1, word2, word3"}
>> >>   K2, {"category": "kafka", "text": "word1, word2"}
>> >>
>> >> articles.groupBy((k,v) -> v.category)
>> >>   .aggregate(() -> 0,
>> >>     (k,v,t) -> t + v.text.split(" ").length,
>> >>     (k,v,t) -> t - v.text.split(" ").length
>> >>   )
>> >>
>> >> This aggregator will produce {key: "kafka", value: 3}, then {key:
>> >> "kafka", value: 5}.  If I update one of the blog articles and send a
>> >> new message to the articles topic:
>> >>
>> >>   K1, {"category": "kafka", "text": "word1, word2, word3, word4"}
>> >>
>> >> The aggregator will first produce {key: "kafka", value: 2} when the
>> >> subtractor is called, then will produce {key: "kafka", value: 6} when
>> >> the adder is called.  The subtractor's calculation does not actually
>> >> match the reality; K1 was never deleted, it was just updated.
>> >>
>> >> Mathieu
>> >>
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>>
>
>
>
> --
> -- Guozhang

Re: KTable aggregations send intermediate results downstream?

Posted by Guozhang Wang <wa...@gmail.com>.
The problem is that Kafka Streams need to repartition the streams based on
the groupBy keys when doing aggregations. For your case, the original
stream may be read from a topic that is partitioned on "K", and you need to
first repartition on "category" on an intermediate topic before the
aggregation can be executed.

Hence the old and new value may be sent to two different partitions of the
intermediate topic, and hence be processed by two different process (it
won't be the case in your application, since you mentioned the "category"
will never change). Since the library cannot tell if the groupBy key will
never change, it has to be conservative and do this subtract / add process
while receiving the old / new value.


Guozhang


On Wed, Aug 17, 2016 at 1:45 PM, Mathieu Fenniak <
mathieu.fenniak@replicon.com> wrote:

> Hi Guozhang,
>
> Thanks for responding.  Ah, I see what you're saying... in the case of
> an update to the KTable, the aggregator's subtractor result would be
> necessary if the group-by key changes in the update.
>
> It makes sense, but unfortunately the behavior leaves me feeling a
> little sketchy... when the group-by key doesn't change (which is
> guaranteed in my case), I'm outputting results that don't correspond
> at all to the inputs, temporarily.  It's immediately followed by a
> corrected result.
>
> Would it be a feasible optimization to not send the subtractor's
> result out of the aggregate, only in the case where the groupBy key
> does not change between the old record and the new record?
>
> Mathieu
>
> On Wed, Aug 17, 2016 at 2:12 PM, Guozhang Wang <wa...@gmail.com> wrote:
> > Hello Mathieu,
> >
> > Note that semantics of KTable aggregations (i.e.
> "KTable.groupBy.aggregate"
> > as in 0.10.0) and KStream aggregations (i.e. "KStream.aggregateByKey" as
> in
> > 0.10.0) are different, in the sense that when the table is updated (i.e.
> a
> > new record with the same key "K1" is received), the old record's effect
> on
> > the aggregation need to first be subtracted before the new record's
> effect
> > on the aggregation can be added; whereas in the latter case there is no
> > "old values" that are not overridden, hence only "adder" aggregator is
> > needed.
> >
> > So suppose your updated record on K1 is on a different "category", say:
> >
> > K1, {"category": "kafka2", "text": "word1, word2, word3, word4"}
> >
> >
> > Then the aggregated result should be:
> >
> > {key: "kafka", value: 2}
> > {key: "kafka2", value: 4}
> >
> >
> > Does this make sense now?
> >
> > Guozhang
> >
> >
> > On Wed, Aug 17, 2016 at 7:59 AM, Mathieu Fenniak <
> > mathieu.fenniak@replicon.com> wrote:
> >
> >> Hello again, kafka-users,
> >>
> >> When I aggregate a KTable, a future input that updates a KTable's
> >> value for a specific key causes the aggregate's subtractor to be
> >> invoked, and then its adder.  This part is great, completely
> >> as-expected.
> >>
> >> But what I didn't expect is that the intermediate result of the
> >> subtractor would be sent downstream.  This value doesn't reflect the
> >> reality of the inputs to the aggregator, so sending it downstream is
> >> effectively sending "corrupt" data to the next processing node.  Is
> >> this the expected behavior, or is this a bug?
> >>
> >> Take for example, a table of blog articles and an aggregator that
> >> counts the number of words in each category of the blog:
> >>
> >> topic: articles
> >>   K1, {"category": "kafka", "text": "word1, word2, word3"}
> >>   K2, {"category": "kafka", "text": "word1, word2"}
> >>
> >> articles.groupBy((k,v) -> v.category)
> >>   .aggregate(() -> 0,
> >>     (k,v,t) -> t + v.text.split(" ").length,
> >>     (k,v,t) -> t - v.text.split(" ").length
> >>   )
> >>
> >> This aggregator will produce {key: "kafka", value: 3}, then {key:
> >> "kafka", value: 5}.  If I update one of the blog articles and send a
> >> new message to the articles topic:
> >>
> >>   K1, {"category": "kafka", "text": "word1, word2, word3, word4"}
> >>
> >> The aggregator will first produce {key: "kafka", value: 2} when the
> >> subtractor is called, then will produce {key: "kafka", value: 6} when
> >> the adder is called.  The subtractor's calculation does not actually
> >> match the reality; K1 was never deleted, it was just updated.
> >>
> >> Mathieu
> >>
> >
> >
> >
> > --
> > -- Guozhang
>



-- 
-- Guozhang

Re: KTable aggregations send intermediate results downstream?

Posted by Mathieu Fenniak <ma...@replicon.com>.
Hi Guozhang,

Thanks for responding.  Ah, I see what you're saying... in the case of
an update to the KTable, the aggregator's subtractor result would be
necessary if the group-by key changes in the update.

It makes sense, but unfortunately the behavior leaves me feeling a
little sketchy... when the group-by key doesn't change (which is
guaranteed in my case), I'm outputting results that don't correspond
at all to the inputs, temporarily.  It's immediately followed by a
corrected result.

Would it be a feasible optimization to not send the subtractor's
result out of the aggregate, only in the case where the groupBy key
does not change between the old record and the new record?

Mathieu

On Wed, Aug 17, 2016 at 2:12 PM, Guozhang Wang <wa...@gmail.com> wrote:
> Hello Mathieu,
>
> Note that semantics of KTable aggregations (i.e. "KTable.groupBy.aggregate"
> as in 0.10.0) and KStream aggregations (i.e. "KStream.aggregateByKey" as in
> 0.10.0) are different, in the sense that when the table is updated (i.e. a
> new record with the same key "K1" is received), the old record's effect on
> the aggregation need to first be subtracted before the new record's effect
> on the aggregation can be added; whereas in the latter case there is no
> "old values" that are not overridden, hence only "adder" aggregator is
> needed.
>
> So suppose your updated record on K1 is on a different "category", say:
>
> K1, {"category": "kafka2", "text": "word1, word2, word3, word4"}
>
>
> Then the aggregated result should be:
>
> {key: "kafka", value: 2}
> {key: "kafka2", value: 4}
>
>
> Does this make sense now?
>
> Guozhang
>
>
> On Wed, Aug 17, 2016 at 7:59 AM, Mathieu Fenniak <
> mathieu.fenniak@replicon.com> wrote:
>
>> Hello again, kafka-users,
>>
>> When I aggregate a KTable, a future input that updates a KTable's
>> value for a specific key causes the aggregate's subtractor to be
>> invoked, and then its adder.  This part is great, completely
>> as-expected.
>>
>> But what I didn't expect is that the intermediate result of the
>> subtractor would be sent downstream.  This value doesn't reflect the
>> reality of the inputs to the aggregator, so sending it downstream is
>> effectively sending "corrupt" data to the next processing node.  Is
>> this the expected behavior, or is this a bug?
>>
>> Take for example, a table of blog articles and an aggregator that
>> counts the number of words in each category of the blog:
>>
>> topic: articles
>>   K1, {"category": "kafka", "text": "word1, word2, word3"}
>>   K2, {"category": "kafka", "text": "word1, word2"}
>>
>> articles.groupBy((k,v) -> v.category)
>>   .aggregate(() -> 0,
>>     (k,v,t) -> t + v.text.split(" ").length,
>>     (k,v,t) -> t - v.text.split(" ").length
>>   )
>>
>> This aggregator will produce {key: "kafka", value: 3}, then {key:
>> "kafka", value: 5}.  If I update one of the blog articles and send a
>> new message to the articles topic:
>>
>>   K1, {"category": "kafka", "text": "word1, word2, word3, word4"}
>>
>> The aggregator will first produce {key: "kafka", value: 2} when the
>> subtractor is called, then will produce {key: "kafka", value: 6} when
>> the adder is called.  The subtractor's calculation does not actually
>> match the reality; K1 was never deleted, it was just updated.
>>
>> Mathieu
>>
>
>
>
> --
> -- Guozhang

Re: KTable aggregations send intermediate results downstream?

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Mathieu,

Note that semantics of KTable aggregations (i.e. "KTable.groupBy.aggregate"
as in 0.10.0) and KStream aggregations (i.e. "KStream.aggregateByKey" as in
0.10.0) are different, in the sense that when the table is updated (i.e. a
new record with the same key "K1" is received), the old record's effect on
the aggregation need to first be subtracted before the new record's effect
on the aggregation can be added; whereas in the latter case there is no
"old values" that are not overridden, hence only "adder" aggregator is
needed.

So suppose your updated record on K1 is on a different "category", say:

K1, {"category": "kafka2", "text": "word1, word2, word3, word4"}


Then the aggregated result should be:

{key: "kafka", value: 2}
{key: "kafka2", value: 4}


Does this make sense now?

Guozhang


On Wed, Aug 17, 2016 at 7:59 AM, Mathieu Fenniak <
mathieu.fenniak@replicon.com> wrote:

> Hello again, kafka-users,
>
> When I aggregate a KTable, a future input that updates a KTable's
> value for a specific key causes the aggregate's subtractor to be
> invoked, and then its adder.  This part is great, completely
> as-expected.
>
> But what I didn't expect is that the intermediate result of the
> subtractor would be sent downstream.  This value doesn't reflect the
> reality of the inputs to the aggregator, so sending it downstream is
> effectively sending "corrupt" data to the next processing node.  Is
> this the expected behavior, or is this a bug?
>
> Take for example, a table of blog articles and an aggregator that
> counts the number of words in each category of the blog:
>
> topic: articles
>   K1, {"category": "kafka", "text": "word1, word2, word3"}
>   K2, {"category": "kafka", "text": "word1, word2"}
>
> articles.groupBy((k,v) -> v.category)
>   .aggregate(() -> 0,
>     (k,v,t) -> t + v.text.split(" ").length,
>     (k,v,t) -> t - v.text.split(" ").length
>   )
>
> This aggregator will produce {key: "kafka", value: 3}, then {key:
> "kafka", value: 5}.  If I update one of the blog articles and send a
> new message to the articles topic:
>
>   K1, {"category": "kafka", "text": "word1, word2, word3, word4"}
>
> The aggregator will first produce {key: "kafka", value: 2} when the
> subtractor is called, then will produce {key: "kafka", value: 6} when
> the adder is called.  The subtractor's calculation does not actually
> match the reality; K1 was never deleted, it was just updated.
>
> Mathieu
>



-- 
-- Guozhang