You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Flávio Stutz <fl...@gmail.com> on 2018/06/26 18:22:27 UTC

[DISCUSS] KIP-326: Schedulable KTable as Graph source

Hey, guys, I've just created a new KIP about creating a new DSL graph
source for realtime partitioned consolidations.

We have faced the following scenario/problem in a lot of situations with
KStreams:
   - Huge incoming data being processed by numerous application instances
   - Need to aggregate different fields whose records span all topic
partitions (something like “total amount spent by people aged > 30 yrs”
when processing a topic partitioned by userid).

The challenge here is to manage this kind of situation without any
bottlenecks. We don't need the “global aggregation” to be processed at each
incoming message. On a scenario of 500 instances, each handling 1k
messages/s, any single point of aggregation (single partitioned topics,
global tables or external databases) would create a bottleneck of 500k
messages/s for single threaded/CPU elements.

For this scenario, it is possible to store the partial aggregations on
local stores and, from time to time, query those states and aggregate them
as a single value, avoiding bottlenecks. This is a way to create a "timed
aggregation barrier”.

If we leverage this kind of built-in feature we could greatly enhance the
ability of KStreams to better handle the CAP Theorem characteristics, so
that one could choose to have Consistency over Availability when needed.

We started this discussion with Matthias J. Sax here:
https://issues.apache.org/jira/browse/KAFKA-6953

If you want to see more, go to KIP-326 at:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source

-Flávio Stutz

Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source

Posted by ti...@gmail.com, ti...@gmail.com.
Actually this was intended to be registered under the number 323, but someone else catch this number while the proposal was being edited. The correct number and URL of the KIP is:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source

On 2018/06/26 20:22:09, Ted Yu <yu...@gmail.com> wrote: 
> What's the relationship between this KIP and KIP-323 ?
> 
> Thanks
> 
> On Tue, Jun 26, 2018 at 11:22 AM, Flávio Stutz <fl...@gmail.com>
> wrote:
> 
> > Hey, guys, I've just created a new KIP about creating a new DSL graph
> > source for realtime partitioned consolidations.
> >
> > We have faced the following scenario/problem in a lot of situations with
> > KStreams:
> >    - Huge incoming data being processed by numerous application instances
> >    - Need to aggregate different fields whose records span all topic
> > partitions (something like “total amount spent by people aged > 30 yrs”
> > when processing a topic partitioned by userid).
> >
> > The challenge here is to manage this kind of situation without any
> > bottlenecks. We don't need the “global aggregation” to be processed at each
> > incoming message. On a scenario of 500 instances, each handling 1k
> > messages/s, any single point of aggregation (single partitioned topics,
> > global tables or external databases) would create a bottleneck of 500k
> > messages/s for single threaded/CPU elements.
> >
> > For this scenario, it is possible to store the partial aggregations on
> > local stores and, from time to time, query those states and aggregate them
> > as a single value, avoiding bottlenecks. This is a way to create a "timed
> > aggregation barrier”.
> >
> > If we leverage this kind of built-in feature we could greatly enhance the
> > ability of KStreams to better handle the CAP Theorem characteristics, so
> > that one could choose to have Consistency over Availability when needed.
> >
> > We started this discussion with Matthias J. Sax here:
> > https://issues.apache.org/jira/browse/KAFKA-6953
> >
> > If you want to see more, go to KIP-326 at:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 326%3A+Schedulable+KTable+as+Graph+source
> >
> > -Flávio Stutz
> >
> 

Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source

Posted by Ted Yu <yu...@gmail.com>.
What's the relationship between this KIP and KIP-323 ?

Thanks

On Tue, Jun 26, 2018 at 11:22 AM, Flávio Stutz <fl...@gmail.com>
wrote:

> Hey, guys, I've just created a new KIP about creating a new DSL graph
> source for realtime partitioned consolidations.
>
> We have faced the following scenario/problem in a lot of situations with
> KStreams:
>    - Huge incoming data being processed by numerous application instances
>    - Need to aggregate different fields whose records span all topic
> partitions (something like “total amount spent by people aged > 30 yrs”
> when processing a topic partitioned by userid).
>
> The challenge here is to manage this kind of situation without any
> bottlenecks. We don't need the “global aggregation” to be processed at each
> incoming message. On a scenario of 500 instances, each handling 1k
> messages/s, any single point of aggregation (single partitioned topics,
> global tables or external databases) would create a bottleneck of 500k
> messages/s for single threaded/CPU elements.
>
> For this scenario, it is possible to store the partial aggregations on
> local stores and, from time to time, query those states and aggregate them
> as a single value, avoiding bottlenecks. This is a way to create a "timed
> aggregation barrier”.
>
> If we leverage this kind of built-in feature we could greatly enhance the
> ability of KStreams to better handle the CAP Theorem characteristics, so
> that one could choose to have Consistency over Availability when needed.
>
> We started this discussion with Matthias J. Sax here:
> https://issues.apache.org/jira/browse/KAFKA-6953
>
> If you want to see more, go to KIP-326 at:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 326%3A+Schedulable+KTable+as+Graph+source
>
> -Flávio Stutz
>

Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source

Posted by fl...@gmail.com, fl...@gmail.com.
Cons:
We tried the "single partition" strategy, but the problem is that for each incoming message to the Graph, we have another output message with the aggregated (cummulative or not) result, so that if we have a million messages/s (among all parallel tasks) being processed, we'll have another million message/s of aggregated values on the single partitioned topic, so that it will not be possible for a single consumer to handle this load. Did I miss something?

Pros:
Using a Schedulable Stream Source from state store's values, we can control the pressure of global aggregation that - despite of the strategy applied - have to be performed by a single instance. In this way, the pressure will be determined by the desired schedule time (that can be lowered as needed) and the number of tuples in the state store instances (the fewer different keys on state store instances the higher pratical frequency of global aggregation task). Using this strategy, the load of incoming messages won't affect the global aggregation mechanism, because we are spliting them apart temporarly (distributed partial aggregation among partition tasks happens during the incoming messages workload; and global final aggregation is performed by a single task from time to time, assynchronously).

[SOURCE PARTITION 1]  >-----100k messages/s---->  [TASK GRAPH 1]  >----100 different aggregated values---->  [INSTANCE 1 STATE STORE] >---> END
[SOURCE PARTITION 2]  >-----100k messages/s---->  [TASK GRAPH 2]  >----200 different aggregated values---->  [INSTANCE 2 STATE STORE] >---> END
[SOURCE PARTITION 3]  >-----100k messages/s---->  [TASK GRAPH 3]  >----100 different aggregated values---->  [INSTANCE 3 STATE STORE] >---> END

[ONE SEC SCHEDULED SOURCE] >-----400 messages/s------> [TASK GRAPH X]  >----global aggregated value----> [OUTPUT TOPIC /global-total]
(one second scheduled KTable with all state stores instances's tuples)

What do you think?

-Flávio Stutz





On 2018/06/29 17:23:29, flaviostutz@gmail.com <fl...@gmail.com> wrote: 
> Just copying a follow up from another thread to here (sorry about the mess):
> 
> From: Guozhang Wang <wa...@gmail.com>
> Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source
> Date: 2018/06/25 22:24:17
> List: dev@kafka.apache.org
> 
> Flávio, thanks for creating this KIP.
> 
> I think this "single-aggregation" use case is common enough that we should
> consider how to efficiently supports it: for example, for KSQL that's built
> on top of Streams, we've seen lots of query statements whose return is
> expected a single row indicating the "total aggregate" etc. See
> https://github.com/confluentinc/ksql/issues/430 for details.
> 
> I've not read through https://issues.apache.org/jira/browse/KAFKA-6953, but
> I'm wondering if we have discussed the option of supporting it in a
> "pre-aggregate" manner: that is we do partial aggregates on parallel tasks,
> and then sends the partial aggregated value via a single topic partition
> for the final aggregate, to reduce the traffic on that single partition and
> hence the final aggregate workload.
> Of course, for non-commutative aggregates we'd probably need to provide
> another API in addition to aggregate, like the `merge` function for
> session-based aggregates, to let users customize the operations of merging
> two partial aggregates into a single partial aggregate. What's its pros and
> cons compared with the current proposal?
> 
> 
> Guozhang
> On 2018/06/26 18:22:27, Flávio Stutz <fl...@gmail.com> wrote: 
> > Hey, guys, I've just created a new KIP about creating a new DSL graph
> > source for realtime partitioned consolidations.
> > 
> > We have faced the following scenario/problem in a lot of situations with
> > KStreams:
> >    - Huge incoming data being processed by numerous application instances
> >    - Need to aggregate different fields whose records span all topic
> > partitions (something like “total amount spent by people aged > 30 yrs”
> > when processing a topic partitioned by userid).
> > 
> > The challenge here is to manage this kind of situation without any
> > bottlenecks. We don't need the “global aggregation” to be processed at each
> > incoming message. On a scenario of 500 instances, each handling 1k
> > messages/s, any single point of aggregation (single partitioned topics,
> > global tables or external databases) would create a bottleneck of 500k
> > messages/s for single threaded/CPU elements.
> > 
> > For this scenario, it is possible to store the partial aggregations on
> > local stores and, from time to time, query those states and aggregate them
> > as a single value, avoiding bottlenecks. This is a way to create a "timed
> > aggregation barrier”.
> > 
> > If we leverage this kind of built-in feature we could greatly enhance the
> > ability of KStreams to better handle the CAP Theorem characteristics, so
> > that one could choose to have Consistency over Availability when needed.
> > 
> > We started this discussion with Matthias J. Sax here:
> > https://issues.apache.org/jira/browse/KAFKA-6953
> > 
> > If you want to see more, go to KIP-326 at:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source
> > 
> > -Flávio Stutz
> > 
> 

Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source

Posted by fl...@gmail.com, fl...@gmail.com.
> I agree with Guozhang on comparing the pros and cons of the approach he
> outlined vs the one in the proposed KIP.
I've just replied him. Please take a look.

> Will the triggering mechanism always be time, or would it make sense to
> expand to use other mechanisms such as the number of records, or some value
> present in one of the records?
Until now, elapsed time solved all the uses cases that confronted us. Maybe there are others which would turn those trigering mechanisms useful. Do you have any example?

> When setting "all instances" to true, how is the leader chosen?
In our implementation based on punctuation and API exposition through REST, we did the following:
   - during punctuate, each instance would query the KStreams API to discover its own instance id and the ids from all active instances
   - the instance with the lowest id will be the leader
   - this way, if the leader gets down, another instance would take place

> If "all instances" is set to false are all the partial aggregates forwarded
> to single output topic?
No. If it is false, it is expected that the sourced KTable will have only partial aggregates from the current instance's state store. 

This would be useful when there are tens of thousands of Keys on the partial tables (making it slow for a single instance to aggregate all instances states) so that one could implement a hierarchical partial aggregation, mainly in cases where there are many instances to handle the load. Something like:
   - each instance, from time to time, gets its own partial aggregated value (through the proposed mechanism) and publishes its value to a GlobalKTable (say, "aggregation-level0"), appending its instance id to the Key (something like "[instance-id]-[ktable-key]")
   - another graph, from time to time, gets a shard of K,V (say, a third of those instance-id keys) and aggregates them in another GlobalKTable ("aggregation-level1") with keys like "[shard-id]-[ktable-key]"
   - depending on the quantity of keys, this would take a couple of levels for distributing the "reduce" job in parallel.

The described "hierarchical reduce" mechanism may be another KIP in the future, with automatic sharding and even auto leveling based on load.

Thanks for the reply!

-Flávio Stutz



On 2018/06/29 19:53:41, Bill Bejeck <bb...@gmail.com> wrote: 
> Hi Flávio,
> 
> Thanks for creating the KIP.
> 
> I agree with Guozhang on comparing the pros and cons of the approach he
> outlined vs the one in the proposed KIP.
> 
> I also have a few clarification questions on the current KIP
> 
> Will the triggering mechanism always be time, or would it make sense to
> expand to use other mechanisms such as the number of records, or some value
> present in one of the records?
> When setting "all instances" to true, how is the leader chosen?
> If "all instances" is set to false are all the partial aggregates forwarded
> to single output topic?
> 
> Thanks again,
> Bill
> 
> 
> On Fri, Jun 29, 2018 at 2:15 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
> > Flavio,
> >
> > thanks for cleaning up the KIP number collision.
> >
> > With regard to KIP-328
> > (
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
> > )
> > I am wondering how both relate to each other?
> >
> > Any thoughts?
> >
> >
> > -Matthias
> >
> > On 6/29/18 10:23 AM, flaviostutz@gmail.com wrote:
> > > Just copying a follow up from another thread to here (sorry about the
> > mess):
> > >
> > > From: Guozhang Wang <wa...@gmail.com>
> > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source
> > > Date: 2018/06/25 22:24:17
> > > List: dev@kafka.apache.org
> > >
> > > Flávio, thanks for creating this KIP.
> > >
> > > I think this "single-aggregation" use case is common enough that we
> > should
> > > consider how to efficiently supports it: for example, for KSQL that's
> > built
> > > on top of Streams, we've seen lots of query statements whose return is
> > > expected a single row indicating the "total aggregate" etc. See
> > > https://github.com/confluentinc/ksql/issues/430 for details.
> > >
> > > I've not read through https://issues.apache.org/jira/browse/KAFKA-6953,
> > but
> > > I'm wondering if we have discussed the option of supporting it in a
> > > "pre-aggregate" manner: that is we do partial aggregates on parallel
> > tasks,
> > > and then sends the partial aggregated value via a single topic partition
> > > for the final aggregate, to reduce the traffic on that single partition
> > and
> > > hence the final aggregate workload.
> > > Of course, for non-commutative aggregates we'd probably need to provide
> > > another API in addition to aggregate, like the `merge` function for
> > > session-based aggregates, to let users customize the operations of
> > merging
> > > two partial aggregates into a single partial aggregate. What's its pros
> > and
> > > cons compared with the current proposal?
> > >
> > >
> > > Guozhang
> > > On 2018/06/26 18:22:27, Flávio Stutz <fl...@gmail.com> wrote:
> > >> Hey, guys, I've just created a new KIP about creating a new DSL graph
> > >> source for realtime partitioned consolidations.
> > >>
> > >> We have faced the following scenario/problem in a lot of situations with
> > >> KStreams:
> > >>    - Huge incoming data being processed by numerous application
> > instances
> > >>    - Need to aggregate different fields whose records span all topic
> > >> partitions (something like “total amount spent by people aged > 30 yrs”
> > >> when processing a topic partitioned by userid).
> > >>
> > >> The challenge here is to manage this kind of situation without any
> > >> bottlenecks. We don't need the “global aggregation” to be processed at
> > each
> > >> incoming message. On a scenario of 500 instances, each handling 1k
> > >> messages/s, any single point of aggregation (single partitioned topics,
> > >> global tables or external databases) would create a bottleneck of 500k
> > >> messages/s for single threaded/CPU elements.
> > >>
> > >> For this scenario, it is possible to store the partial aggregations on
> > >> local stores and, from time to time, query those states and aggregate
> > them
> > >> as a single value, avoiding bottlenecks. This is a way to create a
> > "timed
> > >> aggregation barrier”.
> > >>
> > >> If we leverage this kind of built-in feature we could greatly enhance
> > the
> > >> ability of KStreams to better handle the CAP Theorem characteristics, so
> > >> that one could choose to have Consistency over Availability when needed.
> > >>
> > >> We started this discussion with Matthias J. Sax here:
> > >> https://issues.apache.org/jira/browse/KAFKA-6953
> > >>
> > >> If you want to see more, go to KIP-326 at:
> > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source
> > >>
> > >> -Flávio Stutz
> > >>
> >
> >
> 

Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source

Posted by Guozhang Wang <wa...@gmail.com>.
Repasting my comment from the other email thread:

--------------------------

Flávio, thanks for creating this KIP.

I think this "single-aggregation" use case is common enough that we should
consider how to efficiently supports it: for example, for KSQL that's built
on top of Streams, we've seen lots of query statements whose return is
expected a single row indicating the "total aggregate" etc. See
https://github.com/confluentinc/ksql/issues/430 for details.

I've not read through https://issues.apache.org/jira/browse/KAFKA-6953, but
I'm wondering if we have discussed the option of supporting it in a
"pre-aggregate" manner: that is we do partial aggregates on parallel tasks,
and then sends the partial aggregated value via a single topic partition
for the final aggregate, to reduce the traffic on that single partition and
hence the final aggregate workload.
Of course, for non-commutative aggregates we'd probably need to provide
another API in addition to aggregate, like the `merge` function for
session-based aggregates, to let users customize the operations of merging
two partial aggregates into a single partial aggregate. What's its pros and
cons compared with the current proposal?


Guozhang




On Fri, Jun 29, 2018 at 12:53 PM, Bill Bejeck <bb...@gmail.com> wrote:

> Hi Flávio,
>
> Thanks for creating the KIP.
>
> I agree with Guozhang on comparing the pros and cons of the approach he
> outlined vs the one in the proposed KIP.
>
> I also have a few clarification questions on the current KIP
>
> Will the triggering mechanism always be time, or would it make sense to
> expand to use other mechanisms such as the number of records, or some value
> present in one of the records?
> When setting "all instances" to true, how is the leader chosen?
> If "all instances" is set to false are all the partial aggregates forwarded
> to single output topic?
>
> Thanks again,
> Bill
>
>
> On Fri, Jun 29, 2018 at 2:15 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > Flavio,
> >
> > thanks for cleaning up the KIP number collision.
> >
> > With regard to KIP-328
> > (
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 328%3A+Ability+to+suppress+updates+for+KTables
> > )
> > I am wondering how both relate to each other?
> >
> > Any thoughts?
> >
> >
> > -Matthias
> >
> > On 6/29/18 10:23 AM, flaviostutz@gmail.com wrote:
> > > Just copying a follow up from another thread to here (sorry about the
> > mess):
> > >
> > > From: Guozhang Wang <wa...@gmail.com>
> > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source
> > > Date: 2018/06/25 22:24:17
> > > List: dev@kafka.apache.org
> > >
> > > Flávio, thanks for creating this KIP.
> > >
> > > I think this "single-aggregation" use case is common enough that we
> > should
> > > consider how to efficiently supports it: for example, for KSQL that's
> > built
> > > on top of Streams, we've seen lots of query statements whose return is
> > > expected a single row indicating the "total aggregate" etc. See
> > > https://github.com/confluentinc/ksql/issues/430 for details.
> > >
> > > I've not read through https://issues.apache.org/jira/browse/KAFKA-6953
> ,
> > but
> > > I'm wondering if we have discussed the option of supporting it in a
> > > "pre-aggregate" manner: that is we do partial aggregates on parallel
> > tasks,
> > > and then sends the partial aggregated value via a single topic
> partition
> > > for the final aggregate, to reduce the traffic on that single partition
> > and
> > > hence the final aggregate workload.
> > > Of course, for non-commutative aggregates we'd probably need to provide
> > > another API in addition to aggregate, like the `merge` function for
> > > session-based aggregates, to let users customize the operations of
> > merging
> > > two partial aggregates into a single partial aggregate. What's its pros
> > and
> > > cons compared with the current proposal?
> > >
> > >
> > > Guozhang
> > > On 2018/06/26 18:22:27, Flávio Stutz <fl...@gmail.com> wrote:
> > >> Hey, guys, I've just created a new KIP about creating a new DSL graph
> > >> source for realtime partitioned consolidations.
> > >>
> > >> We have faced the following scenario/problem in a lot of situations
> with
> > >> KStreams:
> > >>    - Huge incoming data being processed by numerous application
> > instances
> > >>    - Need to aggregate different fields whose records span all topic
> > >> partitions (something like “total amount spent by people aged > 30
> yrs”
> > >> when processing a topic partitioned by userid).
> > >>
> > >> The challenge here is to manage this kind of situation without any
> > >> bottlenecks. We don't need the “global aggregation” to be processed at
> > each
> > >> incoming message. On a scenario of 500 instances, each handling 1k
> > >> messages/s, any single point of aggregation (single partitioned
> topics,
> > >> global tables or external databases) would create a bottleneck of 500k
> > >> messages/s for single threaded/CPU elements.
> > >>
> > >> For this scenario, it is possible to store the partial aggregations on
> > >> local stores and, from time to time, query those states and aggregate
> > them
> > >> as a single value, avoiding bottlenecks. This is a way to create a
> > "timed
> > >> aggregation barrier”.
> > >>
> > >> If we leverage this kind of built-in feature we could greatly enhance
> > the
> > >> ability of KStreams to better handle the CAP Theorem characteristics,
> so
> > >> that one could choose to have Consistency over Availability when
> needed.
> > >>
> > >> We started this discussion with Matthias J. Sax here:
> > >> https://issues.apache.org/jira/browse/KAFKA-6953
> > >>
> > >> If you want to see more, go to KIP-326 at:
> > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 326%3A+Schedulable+KTable+as+Graph+source
> > >>
> > >> -Flávio Stutz
> > >>
> >
> >
>



-- 
-- Guozhang

Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source

Posted by Bill Bejeck <bb...@gmail.com>.
Hi Flávio,

Thanks for creating the KIP.

I agree with Guozhang on comparing the pros and cons of the approach he
outlined vs the one in the proposed KIP.

I also have a few clarification questions on the current KIP

Will the triggering mechanism always be time, or would it make sense to
expand to use other mechanisms such as the number of records, or some value
present in one of the records?
When setting "all instances" to true, how is the leader chosen?
If "all instances" is set to false are all the partial aggregates forwarded
to single output topic?

Thanks again,
Bill


On Fri, Jun 29, 2018 at 2:15 PM Matthias J. Sax <ma...@confluent.io>
wrote:

> Flavio,
>
> thanks for cleaning up the KIP number collision.
>
> With regard to KIP-328
> (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
> )
> I am wondering how both relate to each other?
>
> Any thoughts?
>
>
> -Matthias
>
> On 6/29/18 10:23 AM, flaviostutz@gmail.com wrote:
> > Just copying a follow up from another thread to here (sorry about the
> mess):
> >
> > From: Guozhang Wang <wa...@gmail.com>
> > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source
> > Date: 2018/06/25 22:24:17
> > List: dev@kafka.apache.org
> >
> > Flávio, thanks for creating this KIP.
> >
> > I think this "single-aggregation" use case is common enough that we
> should
> > consider how to efficiently supports it: for example, for KSQL that's
> built
> > on top of Streams, we've seen lots of query statements whose return is
> > expected a single row indicating the "total aggregate" etc. See
> > https://github.com/confluentinc/ksql/issues/430 for details.
> >
> > I've not read through https://issues.apache.org/jira/browse/KAFKA-6953,
> but
> > I'm wondering if we have discussed the option of supporting it in a
> > "pre-aggregate" manner: that is we do partial aggregates on parallel
> tasks,
> > and then sends the partial aggregated value via a single topic partition
> > for the final aggregate, to reduce the traffic on that single partition
> and
> > hence the final aggregate workload.
> > Of course, for non-commutative aggregates we'd probably need to provide
> > another API in addition to aggregate, like the `merge` function for
> > session-based aggregates, to let users customize the operations of
> merging
> > two partial aggregates into a single partial aggregate. What's its pros
> and
> > cons compared with the current proposal?
> >
> >
> > Guozhang
> > On 2018/06/26 18:22:27, Flávio Stutz <fl...@gmail.com> wrote:
> >> Hey, guys, I've just created a new KIP about creating a new DSL graph
> >> source for realtime partitioned consolidations.
> >>
> >> We have faced the following scenario/problem in a lot of situations with
> >> KStreams:
> >>    - Huge incoming data being processed by numerous application
> instances
> >>    - Need to aggregate different fields whose records span all topic
> >> partitions (something like “total amount spent by people aged > 30 yrs”
> >> when processing a topic partitioned by userid).
> >>
> >> The challenge here is to manage this kind of situation without any
> >> bottlenecks. We don't need the “global aggregation” to be processed at
> each
> >> incoming message. On a scenario of 500 instances, each handling 1k
> >> messages/s, any single point of aggregation (single partitioned topics,
> >> global tables or external databases) would create a bottleneck of 500k
> >> messages/s for single threaded/CPU elements.
> >>
> >> For this scenario, it is possible to store the partial aggregations on
> >> local stores and, from time to time, query those states and aggregate
> them
> >> as a single value, avoiding bottlenecks. This is a way to create a
> "timed
> >> aggregation barrier”.
> >>
> >> If we leverage this kind of built-in feature we could greatly enhance
> the
> >> ability of KStreams to better handle the CAP Theorem characteristics, so
> >> that one could choose to have Consistency over Availability when needed.
> >>
> >> We started this discussion with Matthias J. Sax here:
> >> https://issues.apache.org/jira/browse/KAFKA-6953
> >>
> >> If you want to see more, go to KIP-326 at:
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source
> >>
> >> -Flávio Stutz
> >>
>
>

Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source

Posted by John Roesler <jo...@confluent.io>.
Hi Flávio,

Sure thing. And apologies in advance if I missed the point.

Below is some more-or-less realistic Java code to demonstrate how, given a
high-volume (heavily partitioned) stream of purchases, we can "step down"
the update rate with rate-limited intermediate aggregations.
Please bear in mind that the suppression API itself is still under debate,
so this is just for illustration purposes.

Basically, the "suppress" operator creates a processor whose job is just to
store the latest value for each key and not emit it until the configured
time.

So if key "X" gets updated 1000x/sec, we can use suppress to make sure it
doesn't get emitted to the next processor more than once per second.

Does this make sense?

Thanks,
-John

public class KTableSuppressProcessorTest {
    private static class Purchase {
        final long customerId;
        final int value;

        private Purchase(final long customerId, final int value) {
            this.customerId = customerId;
            this.value = value;
        }
    }

    private static class PurchaseSerde implements Serde<Purchase> {...}

    public Topology buildTopology() {
        final StreamsBuilder builder = new StreamsBuilder();
        final String purchases = "purchases";

        final KTable<Long, Purchase> input = builder.table(
            purchases,
            Consumed.with(Serdes.Long(), new PurchaseSerde())
        );

        // Fairly sloppy, but the idea is to "split" each customer id into
one id per partition.
        // This way, we can first total their purchases inside each
partition before aggregating them
        // across partitions
        final KTable<Long, Purchase> purchasesWithPartitionedCustomers =
input.transformValues(
            () -> new ValueTransformerWithKey<Long, Purchase, Purchase>() {
                private ProcessorContext context;

                @Override
                public void init(final ProcessorContext context) {
                    this.context = context;
                }

                @Override
                public Purchase transform(final Long readOnlyKey, final
Purchase purchase) {
                    final int partition = context.partition();
                    return new Purchase(
                        purchase.customerId * 1000 + partition, // Assuming
we have < 1k partitions...
                        purchase.value
                    );
                }
            });

        final KGroupedTable<Long, Integer>
purchaseValueByPartitionedCustomer =
            purchasesWithPartitionedCustomers.groupBy(
                (id, purchase) -> new KeyValue<>(purchase.customerId,
purchase.value)
            );

        final Suppression<Long, Integer> oncePerKeyPerSecond =
Suppression.suppressIntermediateEvents(
            IntermediateSuppression
                .emitAfter(Duration.ofSeconds(1))
                .bufferKeys(5000)
                .bufferFullStrategy(EMIT)
        );

        // First level of aggregation. Each customer gets their purchases
aggregated *just within each partition*.
        // The result of this aggregation is emitted at most once per
second per customer per purchase-partition
        final KTable<Long, Integer> totalValueByPartitionedCustomer =
            purchaseValueByPartitionedCustomer
                .reduce((l, r) -> l + r, (l, r) -> l - r)
                .suppress(oncePerKeyPerSecond);

        // This is where we reverse the partitioning of each customer and
then aggregate
        // each customer's purchases across partitions
        // The result of this aggregation is emitted at most once per
second per customer
        final KTable<Long, Integer>
aggregatedTotalValueByPartitionedCustomer =
            totalValueByPartitionedCustomer
                .groupBy((key, value) -> new KeyValue<>(key / 1000, value))
                .reduce((l, r) -> l + r, (l, r) -> l - r)
                .suppress(oncePerKeyPerSecond);

        // Sending all the intermediate totals to a single key to get the
final aggregation
        // The result of this aggregation is emitted at most once per second
        final KTable<String, Integer> total =
aggregatedTotalValueByPartitionedCustomer
            .groupBy((key, value) -> new KeyValue<>("ALL", value))
            .reduce((l, r) -> l + r, (l, r) -> l - r)
            .suppress(Suppression.suppressIntermediateEvents(
                IntermediateSuppression.emitAfter(Duration.ofSeconds(1))
            ));

        // This topic will contain just one key ("ALL"), and the value will
be
        // the ever-updating all-time purchase value
        // Note that it'll be updated once per second.
        total.toStream().to("total-purchases-value");

        return builder.build();
    }
}

On Mon, Jul 2, 2018 at 3:38 PM flaviostutz@gmail.com <fl...@gmail.com>
wrote:

> Thanks for clarifying the real usage of KIP-328. Now I understood a bit
> better.
> I didn't see how that feature would be used to minimize the number of
> publications to the single partitioned output topic. When it is falls into
> supression, the graph stops going down? Could you explain better? If that
> is possible I think it would be great.
>
> Thanks for the intervention!
>
> -Flávio Stutz
>
>
>
>
> On 2018/07/02 20:03:57, John Roesler <jo...@confluent.io> wrote:
> > Hi Flávio,
> >
> > Thanks for the KIP. I'll apologize that I'm arriving late to the
> > discussion. I've tried to catch up, but I might have missed some nuances.
> >
> > Regarding KIP-328, the idea is to add the ability to suppress
> intermediate
> > results from all KTables, not just windowed ones. I think this could
> > support your use case in combination with the strategy that Guozhang
> > proposed of having one or more pre-aggregation steps that ultimately push
> > into a single-partition topic for final aggregation. Suppressing
> > intermediate results would solve the problem you noted that today
> > pre-aggregating doesn't do much to staunch the flow up updates.
> >
> > I'm not sure if this would be good enough for you overall; I just wanted
> to
> > clarify the role of KIP-328.
> > In particular, the solution you mentioned is to have the downstream
> KTables
> > actually query the upstream ones to compute their results. I'm not sure
> > whether it's more efficient to do these queries on the schedule, or to
> have
> > the upstream tables emit their results, on the same schedule.
> >
> > What do you think?
> >
> > Thanks,
> > -John
> >
> > On Sun, Jul 1, 2018 at 10:03 PM flaviostutz@gmail.com <
> flaviostutz@gmail.com>
> > wrote:
> >
> > > For what I understood, that KIP is related to how KStreams will handle
> > > KTable updates in Windowed scenarios to optimize resource usage.
> > > I couldn't see any specific relation to this KIP. Had you?
> > >
> > > -Flávio Stutz
> > >
> > >
> > > On 2018/06/29 18:14:46, "Matthias J. Sax" <ma...@confluent.io>
> wrote:
> > > > Flavio,
> > > >
> > > > thanks for cleaning up the KIP number collision.
> > > >
> > > > With regard to KIP-328
> > > > (
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
> > > )
> > > > I am wondering how both relate to each other?
> > > >
> > > > Any thoughts?
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 6/29/18 10:23 AM, flaviostutz@gmail.com wrote:
> > > > > Just copying a follow up from another thread to here (sorry about
> the
> > > mess):
> > > > >
> > > > > From: Guozhang Wang <wa...@gmail.com>
> > > > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source
> > > > > Date: 2018/06/25 22:24:17
> > > > > List: dev@kafka.apache.org
> > > > >
> > > > > Flávio, thanks for creating this KIP.
> > > > >
> > > > > I think this "single-aggregation" use case is common enough that we
> > > should
> > > > > consider how to efficiently supports it: for example, for KSQL
> that's
> > > built
> > > > > on top of Streams, we've seen lots of query statements whose
> return is
> > > > > expected a single row indicating the "total aggregate" etc. See
> > > > > https://github.com/confluentinc/ksql/issues/430 for details.
> > > > >
> > > > > I've not read through
> https://issues.apache.org/jira/browse/KAFKA-6953,
> > > but
> > > > > I'm wondering if we have discussed the option of supporting it in a
> > > > > "pre-aggregate" manner: that is we do partial aggregates on
> parallel
> > > tasks,
> > > > > and then sends the partial aggregated value via a single topic
> > > partition
> > > > > for the final aggregate, to reduce the traffic on that single
> > > partition and
> > > > > hence the final aggregate workload.
> > > > > Of course, for non-commutative aggregates we'd probably need to
> provide
> > > > > another API in addition to aggregate, like the `merge` function for
> > > > > session-based aggregates, to let users customize the operations of
> > > merging
> > > > > two partial aggregates into a single partial aggregate. What's its
> > > pros and
> > > > > cons compared with the current proposal?
> > > > >
> > > > >
> > > > > Guozhang
> > > > > On 2018/06/26 18:22:27, Flávio Stutz <fl...@gmail.com>
> wrote:
> > > > >> Hey, guys, I've just created a new KIP about creating a new DSL
> graph
> > > > >> source for realtime partitioned consolidations.
> > > > >>
> > > > >> We have faced the following scenario/problem in a lot of
> situations
> > > with
> > > > >> KStreams:
> > > > >>    - Huge incoming data being processed by numerous application
> > > instances
> > > > >>    - Need to aggregate different fields whose records span all
> topic
> > > > >> partitions (something like “total amount spent by people aged > 30
> > > yrs”
> > > > >> when processing a topic partitioned by userid).
> > > > >>
> > > > >> The challenge here is to manage this kind of situation without any
> > > > >> bottlenecks. We don't need the “global aggregation” to be
> processed
> > > at each
> > > > >> incoming message. On a scenario of 500 instances, each handling 1k
> > > > >> messages/s, any single point of aggregation (single partitioned
> > > topics,
> > > > >> global tables or external databases) would create a bottleneck of
> 500k
> > > > >> messages/s for single threaded/CPU elements.
> > > > >>
> > > > >> For this scenario, it is possible to store the partial
> aggregations on
> > > > >> local stores and, from time to time, query those states and
> aggregate
> > > them
> > > > >> as a single value, avoiding bottlenecks. This is a way to create a
> > > "timed
> > > > >> aggregation barrier”.
> > > > >>
> > > > >> If we leverage this kind of built-in feature we could greatly
> enhance
> > > the
> > > > >> ability of KStreams to better handle the CAP Theorem
> characteristics,
> > > so
> > > > >> that one could choose to have Consistency over Availability when
> > > needed.
> > > > >>
> > > > >> We started this discussion with Matthias J. Sax here:
> > > > >> https://issues.apache.org/jira/browse/KAFKA-6953
> > > > >>
> > > > >> If you want to see more, go to KIP-326 at:
> > > > >>
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source
> > > > >>
> > > > >> -Flávio Stutz
> > > > >>
> > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source

Posted by fl...@gmail.com, fl...@gmail.com.
Thanks for clarifying the real usage of KIP-328. Now I understood a bit better.
I didn't see how that feature would be used to minimize the number of publications to the single partitioned output topic. When it is falls into supression, the graph stops going down? Could you explain better? If that is possible I think it would be great.

Thanks for the intervention!

-Flávio Stutz




On 2018/07/02 20:03:57, John Roesler <jo...@confluent.io> wrote: 
> Hi Flávio,
> 
> Thanks for the KIP. I'll apologize that I'm arriving late to the
> discussion. I've tried to catch up, but I might have missed some nuances.
> 
> Regarding KIP-328, the idea is to add the ability to suppress intermediate
> results from all KTables, not just windowed ones. I think this could
> support your use case in combination with the strategy that Guozhang
> proposed of having one or more pre-aggregation steps that ultimately push
> into a single-partition topic for final aggregation. Suppressing
> intermediate results would solve the problem you noted that today
> pre-aggregating doesn't do much to staunch the flow up updates.
> 
> I'm not sure if this would be good enough for you overall; I just wanted to
> clarify the role of KIP-328.
> In particular, the solution you mentioned is to have the downstream KTables
> actually query the upstream ones to compute their results. I'm not sure
> whether it's more efficient to do these queries on the schedule, or to have
> the upstream tables emit their results, on the same schedule.
> 
> What do you think?
> 
> Thanks,
> -John
> 
> On Sun, Jul 1, 2018 at 10:03 PM flaviostutz@gmail.com <fl...@gmail.com>
> wrote:
> 
> > For what I understood, that KIP is related to how KStreams will handle
> > KTable updates in Windowed scenarios to optimize resource usage.
> > I couldn't see any specific relation to this KIP. Had you?
> >
> > -Flávio Stutz
> >
> >
> > On 2018/06/29 18:14:46, "Matthias J. Sax" <ma...@confluent.io> wrote:
> > > Flavio,
> > >
> > > thanks for cleaning up the KIP number collision.
> > >
> > > With regard to KIP-328
> > > (
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
> > )
> > > I am wondering how both relate to each other?
> > >
> > > Any thoughts?
> > >
> > >
> > > -Matthias
> > >
> > > On 6/29/18 10:23 AM, flaviostutz@gmail.com wrote:
> > > > Just copying a follow up from another thread to here (sorry about the
> > mess):
> > > >
> > > > From: Guozhang Wang <wa...@gmail.com>
> > > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source
> > > > Date: 2018/06/25 22:24:17
> > > > List: dev@kafka.apache.org
> > > >
> > > > Flávio, thanks for creating this KIP.
> > > >
> > > > I think this "single-aggregation" use case is common enough that we
> > should
> > > > consider how to efficiently supports it: for example, for KSQL that's
> > built
> > > > on top of Streams, we've seen lots of query statements whose return is
> > > > expected a single row indicating the "total aggregate" etc. See
> > > > https://github.com/confluentinc/ksql/issues/430 for details.
> > > >
> > > > I've not read through https://issues.apache.org/jira/browse/KAFKA-6953,
> > but
> > > > I'm wondering if we have discussed the option of supporting it in a
> > > > "pre-aggregate" manner: that is we do partial aggregates on parallel
> > tasks,
> > > > and then sends the partial aggregated value via a single topic
> > partition
> > > > for the final aggregate, to reduce the traffic on that single
> > partition and
> > > > hence the final aggregate workload.
> > > > Of course, for non-commutative aggregates we'd probably need to provide
> > > > another API in addition to aggregate, like the `merge` function for
> > > > session-based aggregates, to let users customize the operations of
> > merging
> > > > two partial aggregates into a single partial aggregate. What's its
> > pros and
> > > > cons compared with the current proposal?
> > > >
> > > >
> > > > Guozhang
> > > > On 2018/06/26 18:22:27, Flávio Stutz <fl...@gmail.com> wrote:
> > > >> Hey, guys, I've just created a new KIP about creating a new DSL graph
> > > >> source for realtime partitioned consolidations.
> > > >>
> > > >> We have faced the following scenario/problem in a lot of situations
> > with
> > > >> KStreams:
> > > >>    - Huge incoming data being processed by numerous application
> > instances
> > > >>    - Need to aggregate different fields whose records span all topic
> > > >> partitions (something like “total amount spent by people aged > 30
> > yrs”
> > > >> when processing a topic partitioned by userid).
> > > >>
> > > >> The challenge here is to manage this kind of situation without any
> > > >> bottlenecks. We don't need the “global aggregation” to be processed
> > at each
> > > >> incoming message. On a scenario of 500 instances, each handling 1k
> > > >> messages/s, any single point of aggregation (single partitioned
> > topics,
> > > >> global tables or external databases) would create a bottleneck of 500k
> > > >> messages/s for single threaded/CPU elements.
> > > >>
> > > >> For this scenario, it is possible to store the partial aggregations on
> > > >> local stores and, from time to time, query those states and aggregate
> > them
> > > >> as a single value, avoiding bottlenecks. This is a way to create a
> > "timed
> > > >> aggregation barrier”.
> > > >>
> > > >> If we leverage this kind of built-in feature we could greatly enhance
> > the
> > > >> ability of KStreams to better handle the CAP Theorem characteristics,
> > so
> > > >> that one could choose to have Consistency over Availability when
> > needed.
> > > >>
> > > >> We started this discussion with Matthias J. Sax here:
> > > >> https://issues.apache.org/jira/browse/KAFKA-6953
> > > >>
> > > >> If you want to see more, go to KIP-326 at:
> > > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source
> > > >>
> > > >> -Flávio Stutz
> > > >>
> > >
> > >
> >
> 

Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Moved this KIP into status "inactive". Feel free to resume and any time.

-Matthias

On 7/15/18 6:55 PM, Matthias J. Sax wrote:
> I think it would make a lot of sense to provide a simple DSL abstraction.
> 
> Something like:
> 
> KStream stream = ...
> KTable count = stream.count();
> 
> The missing groupBy() or grouByKey() class indicates a global counting
> operation. The JavaDocs should highlight the impact.
> 
> One open question is, what key we want to use for the result KTable?
> 
> Also, the details about optional parameters like `Materialized` need to
> be discussed in details.
> 
> 
> 
> -Matthias
> 
> 
> On 7/6/18 2:43 PM, Guozhang Wang wrote:
>> That's a lot of email exchanges for me to catch up :)
>>
>> My original proposed alternative solution is indeed relying on
>> pre-aggregate before sending to the single-partition topic, so that the
>> traffic on that single-partition topic would not be huge (I called it
>> partial-aggregate but the intent was the same).
>>
>> What I was thinking is that, given such a scenario could be common, if
>> we've decided to go down this route should we provide a new API that wrap's
>> John's proposed topology (right now with KIP-328 users still need to
>> leverage this trick manually):
>>
>>
>> ----------
>>
>> final KStream<String, String> siteEvents = builder.stream("/site-events");
>>
>> final KStream<Integer, Integer> keyedByPartition = siteEvents.transform(/*
>> generate KeyValue(key, 1) for the pre-aggregate*/);
>>
>> final KTable<Integer, Long> countsByPartition =
>> keyedByPartition.groupByKey().count();   /* pre-aggregate */
>>
>> final KGroupedTable<String, Long> singlePartition =
>> countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL", value));
>>  /* sent the suppressed pre-aggregate values to the single partition topic
>> */
>>
>> final KTable<String, Long> totalCount = singlePartition.reduce((l, r) -> l +
>> r, (l, r) -> l - r);   /* read from the single partition topic, do reduce
>> on the data*/
>>
>> ----------
>>
>> Note that if we wrap them all into a new operator, users would need to
>> provide two functions, for the aggregate and for the final "reduce" (in my
>> previous email I called it merger function, but for the same intent).
>>
>>
>>
>> Guozhang
>>
>>
>>
>> On Thu, Jul 5, 2018 at 3:38 PM, John Roesler <jo...@confluent.io> wrote:
>>
>>> Ok, I didn't get quite as far as I hoped, and several things are far from
>>> ready, but here's what I have so far:
>>> https://github.com/apache/kafka/pull/5337
>>>
>>> The "unit" test works, and is a good example of how you should expect it to
>>> behave:
>>> https://github.com/apache/kafka/pull/5337/files#diff-
>>> 2fdec52b9cc3d0e564f0c12a199bed77
>>>
>>> I have one working integration test, but it's slow going getting the timing
>>> right, so no promises of any kind ;)
>>>
>>> Let me know what you think!
>>>
>>> Thanks,
>>> -John
>>>
>>> On Thu, Jul 5, 2018 at 8:39 AM John Roesler <jo...@confluent.io> wrote:
>>>
>>>> Hey Flávio,
>>>>
>>>> Thanks! I haven't got anything usable yet, but I'm working on it now. I'm
>>>> hoping to push up my branch by the end of the day.
>>>>
>>>> I don't know if you've seen it but Streams actually already has something
>>>> like this, in the form of caching on materialized stores. If you pass in
>>> a
>>>> "Materialized.withCachingEnabled()", you should be able to get a POC
>>>> working by setting the max cache size pretty high and setting the commit
>>>> interval for your desired rate:
>>>> https://docs.confluent.io/current/streams/developer-
>>> guide/memory-mgmt.html#streams-developer-guide-memory-management
>>>> .
>>>>
>>>> There are a couple of cases in joins and whatnot where it doesn't work,
>>>> but for the aggregations we discussed, it should. The reason for KIP-328
>>> is
>>>> to provide finer control and hopefully a more straightforward API.
>>>>
>>>> Let me know if that works, and I'll drop a message in here when I create
>>>> the draft PR for KIP-328. I'd really appreciate your feedback.
>>>>
>>>> Thanks,
>>>> -John
>>>>
>>>> On Wed, Jul 4, 2018 at 10:17 PM flaviostutz@gmail.com <
>>>> flaviostutz@gmail.com> wrote:
>>>>
>>>>> John, that was fantastic, man!
>>>>> Have you built any custom implementation of your KIP in your machine so
>>>>> that I could test it out here? I wish I could test it out.
>>>>> If you need any help implementing this feature, please tell me.
>>>>>
>>>>> Thanks.
>>>>>
>>>>> -Flávio Stutz
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 2018/07/03 18:04:52, John Roesler <jo...@confluent.io> wrote:
>>>>>> Hi Flávio,
>>>>>> Thanks! I think that we can actually do this, but the API could be
>>>>> better.
>>>>>> I've included Java code below, but I'll copy and modify your example
>>> so
>>>>>> we're on the same page.
>>>>>>
>>>>>> EXERCISE 1:
>>>>>>   - The case is "total counting of events for a huge website"
>>>>>>   - Tasks from Application A will have something like:
>>>>>>          .stream(/site-events)
>>>>>>          .transform( re-key s.t. the new key is the partition id)
>>>>>>          .groupByKey() // you have to do this before count
>>>>>>          .count()
>>>>>>           // you explicitly published to a one-partition topic here,
>>> but
>>>>>> it's actually sufficient just
>>>>>>           // to re-group onto one key. You could name and pre-create
>>> the
>>>>>> intermediate topic here,
>>>>>>           // but you don't need a separate application for the final
>>>>>> aggregation.
>>>>>>          .groupBy((partitionId, partialCount) -> new KeyValue("ALL",
>>>>>> partialCount))
>>>>>>          .aggregate(sum up the partialCounts)
>>>>>>          .publish(/counter-total)
>>>>>>
>>>>>> I've left out the suppressions, but they would go right after the
>>>>> count()
>>>>>> and the aggregate().
>>>>>>
>>>>>> With this program, you don't have to worry about the
>>> double-aggregation
>>>>> you
>>>>>> mentioned in the last email. The KTable produced by the first count()
>>>>> will
>>>>>> maintain the correct count per partition. If the value changes for any
>>>>>> partition, it'll emit a retraction of the old value and then the new
>>>>> value
>>>>>> downstream, so that the final aggregation can update itself properly.
>>>>>>
>>>>>> I think we can optimize both the execution and the programability by
>>>>> adding
>>>>>> a "global aggregation" concept. But In principle, it seems like this
>>>>> usage
>>>>>> of the current API will support your use case.
>>>>>>
>>>>>> Once again, though, this is just to present an alternative. I haven't
>>>>> done
>>>>>> the math on whether your proposal would be more efficient.
>>>>>>
>>>>>> Thanks,
>>>>>> -John
>>>>>>
>>>>>> Here's the same algorithm written in Java:
>>>>>>
>>>>>> final KStream<String, String> siteEvents =
>>>>> builder.stream("/site-events");
>>>>>>
>>>>>> // here we re-key the events so that the key is actually the partition
>>>>> id.
>>>>>> // we don't need the value to do a count, so I just set it to "1".
>>>>>> final KStream<Integer, Integer> keyedByPartition =
>>>>> siteEvents.transform(()
>>>>>> -> new Transformer<String, String, KeyValue<Integer, Integer>>() {
>>>>>>     private ProcessorContext context;
>>>>>>
>>>>>>     @Override
>>>>>>     public void init(final ProcessorContext context) {
>>>>>>         this.context = context;
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public KeyValue<Integer, Integer> transform(final String key,
>>> final
>>>>>> String value) {
>>>>>>         return new KeyValue<>(context.partition(), 1);
>>>>>>     }
>>>>>> });
>>>>>>
>>>>>> // Note that we can't do "count()" on a KStream, we have to group it
>>>>> first.
>>>>>> I'm grouping by the key, so it will produce the count for each key.
>>>>>> // Since the key is actually the partition id, it will produce the
>>>>>> pre-aggregated count per partition.
>>>>>> // Note that the result is a KTable<PartitionId,Count>. It'll always
>>>>>> contain the most recent count for each partition.
>>>>>> final KTable<Integer, Long> countsByPartition =
>>>>>> keyedByPartition.groupByKey().count();
>>>>>>
>>>>>> // Now we get ready for the final roll-up. We re-group all the
>>>>> constituent
>>>>>> counts
>>>>>> final KGroupedTable<String, Long> singlePartition =
>>>>>> countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL",
>>> value));
>>>>>>
>>>>>> final KTable<String, Long> totalCount = singlePartition.reduce((l, r)
>>>>> -> l
>>>>>> + r, (l, r) -> l - r);
>>>>>>
>>>>>> totalCount.toStream().foreach((k, v) -> {
>>>>>>     // k is always "ALL"
>>>>>>     // v is always the most recent total value
>>>>>>     System.out.println("The total event count is: " + v);
>>>>>> });
>>>>>>
>>>>>>
>>>>>> On Tue, Jul 3, 2018 at 9:21 AM flaviostutz@gmail.com <
>>>>> flaviostutz@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Great feature you have there!
>>>>>>>
>>>>>>> I'll try to exercise here how we would achieve the same functional
>>>>>>> objectives using your KIP:
>>>>>>>
>>>>>>> EXERCISE 1:
>>>>>>>   - The case is "total counting of events for a huge website"
>>>>>>>   - Tasks from Application A will have something like:
>>>>>>>          .stream(/site-events)
>>>>>>>          .count()
>>>>>>>          .publish(/single-partitioned-topic-with-count-partials)
>>>>>>>   - The published messages will be, for example:
>>>>>>>           ["counter-task1", 2345]
>>>>>>>           ["counter-task2", 8495]
>>>>>>>           ["counter-task3", 4839]
>>>>>>>   - Single Task from Application B will have something like:
>>>>>>>          .stream(/single-partitioned-topic-with-count-partials)
>>>>>>>          .aggregate(by messages whose key starts with "counter")
>>>>>>>          .publish(/counter-total)
>>>>>>>   - FAIL HERE. How would I know what is the overall partitions?
>>> Maybe
>>>>> two
>>>>>>> partials for the same task will arrive before other tasks and it
>>> maybe
>>>>>>> aggregated twice.
>>>>>>>
>>>>>>> I tried to think about using GlobalKTables, but I didn't get an easy
>>>>> way
>>>>>>> to aggregate the keys from that table. Do you have any clue?
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>> -Flávio Stutz
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> /partial-counters-to-single-partitioned-topic
>>>>>>>
>>>>>>> On 2018/07/02 20:03:57, John Roesler <jo...@confluent.io> wrote:
>>>>>>>> Hi Flávio,
>>>>>>>>
>>>>>>>> Thanks for the KIP. I'll apologize that I'm arriving late to the
>>>>>>>> discussion. I've tried to catch up, but I might have missed some
>>>>> nuances.
>>>>>>>>
>>>>>>>> Regarding KIP-328, the idea is to add the ability to suppress
>>>>>>> intermediate
>>>>>>>> results from all KTables, not just windowed ones. I think this
>>> could
>>>>>>>> support your use case in combination with the strategy that
>>> Guozhang
>>>>>>>> proposed of having one or more pre-aggregation steps that
>>>>> ultimately push
>>>>>>>> into a single-partition topic for final aggregation. Suppressing
>>>>>>>> intermediate results would solve the problem you noted that today
>>>>>>>> pre-aggregating doesn't do much to staunch the flow up updates.
>>>>>>>>
>>>>>>>> I'm not sure if this would be good enough for you overall; I just
>>>>> wanted
>>>>>>> to
>>>>>>>> clarify the role of KIP-328.
>>>>>>>> In particular, the solution you mentioned is to have the
>>> downstream
>>>>>>> KTables
>>>>>>>> actually query the upstream ones to compute their results. I'm not
>>>>> sure
>>>>>>>> whether it's more efficient to do these queries on the schedule,
>>> or
>>>>> to
>>>>>>> have
>>>>>>>> the upstream tables emit their results, on the same schedule.
>>>>>>>>
>>>>>>>> What do you think?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> -John
>>>>>>>>
>>>>>>>> On Sun, Jul 1, 2018 at 10:03 PM flaviostutz@gmail.com <
>>>>>>> flaviostutz@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> For what I understood, that KIP is related to how KStreams will
>>>>> handle
>>>>>>>>> KTable updates in Windowed scenarios to optimize resource usage.
>>>>>>>>> I couldn't see any specific relation to this KIP. Had you?
>>>>>>>>>
>>>>>>>>> -Flávio Stutz
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 2018/06/29 18:14:46, "Matthias J. Sax" <
>>> matthias@confluent.io>
>>>>>>> wrote:
>>>>>>>>>> Flavio,
>>>>>>>>>>
>>>>>>>>>> thanks for cleaning up the KIP number collision.
>>>>>>>>>>
>>>>>>>>>> With regard to KIP-328
>>>>>>>>>> (
>>>>>>>>>
>>>>>>>
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 328%3A+Ability+to+suppress+updates+for+KTables
>>>>>>>>> )
>>>>>>>>>> I am wondering how both relate to each other?
>>>>>>>>>>
>>>>>>>>>> Any thoughts?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>> On 6/29/18 10:23 AM, flaviostutz@gmail.com wrote:
>>>>>>>>>>> Just copying a follow up from another thread to here (sorry
>>>>> about
>>>>>>> the
>>>>>>>>> mess):
>>>>>>>>>>>
>>>>>>>>>>> From: Guozhang Wang <wa...@gmail.com>
>>>>>>>>>>> Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph
>>>>> source
>>>>>>>>>>> Date: 2018/06/25 22:24:17
>>>>>>>>>>> List: dev@kafka.apache.org
>>>>>>>>>>>
>>>>>>>>>>> Flávio, thanks for creating this KIP.
>>>>>>>>>>>
>>>>>>>>>>> I think this "single-aggregation" use case is common enough
>>>>> that we
>>>>>>>>> should
>>>>>>>>>>> consider how to efficiently supports it: for example, for
>>> KSQL
>>>>>>> that's
>>>>>>>>> built
>>>>>>>>>>> on top of Streams, we've seen lots of query statements whose
>>>>>>> return is
>>>>>>>>>>> expected a single row indicating the "total aggregate" etc.
>>>>> See
>>>>>>>>>>> https://github.com/confluentinc/ksql/issues/430 for
>>> details.
>>>>>>>>>>>
>>>>>>>>>>> I've not read through
>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6953,
>>>>>>>>> but
>>>>>>>>>>> I'm wondering if we have discussed the option of supporting
>>>>> it in a
>>>>>>>>>>> "pre-aggregate" manner: that is we do partial aggregates on
>>>>>>> parallel
>>>>>>>>> tasks,
>>>>>>>>>>> and then sends the partial aggregated value via a single
>>> topic
>>>>>>>>> partition
>>>>>>>>>>> for the final aggregate, to reduce the traffic on that
>>> single
>>>>>>>>> partition and
>>>>>>>>>>> hence the final aggregate workload.
>>>>>>>>>>> Of course, for non-commutative aggregates we'd probably need
>>>>> to
>>>>>>> provide
>>>>>>>>>>> another API in addition to aggregate, like the `merge`
>>>>> function for
>>>>>>>>>>> session-based aggregates, to let users customize the
>>>>> operations of
>>>>>>>>> merging
>>>>>>>>>>> two partial aggregates into a single partial aggregate.
>>>>> What's its
>>>>>>>>> pros and
>>>>>>>>>>> cons compared with the current proposal?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Guozhang
>>>>>>>>>>> On 2018/06/26 18:22:27, Flávio Stutz <flaviostutz@gmail.com
>>>>
>>>>>>> wrote:
>>>>>>>>>>>> Hey, guys, I've just created a new KIP about creating a new
>>>>> DSL
>>>>>>> graph
>>>>>>>>>>>> source for realtime partitioned consolidations.
>>>>>>>>>>>>
>>>>>>>>>>>> We have faced the following scenario/problem in a lot of
>>>>>>> situations
>>>>>>>>> with
>>>>>>>>>>>> KStreams:
>>>>>>>>>>>>    - Huge incoming data being processed by numerous
>>>>> application
>>>>>>>>> instances
>>>>>>>>>>>>    - Need to aggregate different fields whose records span
>>>>> all
>>>>>>> topic
>>>>>>>>>>>> partitions (something like “total amount spent by people
>>>>> aged > 30
>>>>>>>>> yrs”
>>>>>>>>>>>> when processing a topic partitioned by userid).
>>>>>>>>>>>>
>>>>>>>>>>>> The challenge here is to manage this kind of situation
>>>>> without any
>>>>>>>>>>>> bottlenecks. We don't need the “global aggregation” to be
>>>>>>> processed
>>>>>>>>> at each
>>>>>>>>>>>> incoming message. On a scenario of 500 instances, each
>>>>> handling 1k
>>>>>>>>>>>> messages/s, any single point of aggregation (single
>>>>> partitioned
>>>>>>>>> topics,
>>>>>>>>>>>> global tables or external databases) would create a
>>>>> bottleneck of
>>>>>>> 500k
>>>>>>>>>>>> messages/s for single threaded/CPU elements.
>>>>>>>>>>>>
>>>>>>>>>>>> For this scenario, it is possible to store the partial
>>>>>>> aggregations on
>>>>>>>>>>>> local stores and, from time to time, query those states and
>>>>>>> aggregate
>>>>>>>>> them
>>>>>>>>>>>> as a single value, avoiding bottlenecks. This is a way to
>>>>> create a
>>>>>>>>> "timed
>>>>>>>>>>>> aggregation barrier”.
>>>>>>>>>>>>
>>>>>>>>>>>> If we leverage this kind of built-in feature we could
>>> greatly
>>>>>>> enhance
>>>>>>>>> the
>>>>>>>>>>>> ability of KStreams to better handle the CAP Theorem
>>>>>>> characteristics,
>>>>>>>>> so
>>>>>>>>>>>> that one could choose to have Consistency over Availability
>>>>> when
>>>>>>>>> needed.
>>>>>>>>>>>>
>>>>>>>>>>>> We started this discussion with Matthias J. Sax here:
>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6953
>>>>>>>>>>>>
>>>>>>>>>>>> If you want to see more, go to KIP-326 at:
>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 326%3A+Schedulable+KTable+as+Graph+source
>>>>>>>>>>>>
>>>>>>>>>>>> -Flávio Stutz
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>>
> 


Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I think it would make a lot of sense to provide a simple DSL abstraction.

Something like:

KStream stream = ...
KTable count = stream.count();

The missing groupBy() or grouByKey() class indicates a global counting
operation. The JavaDocs should highlight the impact.

One open question is, what key we want to use for the result KTable?

Also, the details about optional parameters like `Materialized` need to
be discussed in details.



-Matthias


On 7/6/18 2:43 PM, Guozhang Wang wrote:
> That's a lot of email exchanges for me to catch up :)
> 
> My original proposed alternative solution is indeed relying on
> pre-aggregate before sending to the single-partition topic, so that the
> traffic on that single-partition topic would not be huge (I called it
> partial-aggregate but the intent was the same).
> 
> What I was thinking is that, given such a scenario could be common, if
> we've decided to go down this route should we provide a new API that wrap's
> John's proposed topology (right now with KIP-328 users still need to
> leverage this trick manually):
> 
> 
> ----------
> 
> final KStream<String, String> siteEvents = builder.stream("/site-events");
> 
> final KStream<Integer, Integer> keyedByPartition = siteEvents.transform(/*
> generate KeyValue(key, 1) for the pre-aggregate*/);
> 
> final KTable<Integer, Long> countsByPartition =
> keyedByPartition.groupByKey().count();   /* pre-aggregate */
> 
> final KGroupedTable<String, Long> singlePartition =
> countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL", value));
>  /* sent the suppressed pre-aggregate values to the single partition topic
> */
> 
> final KTable<String, Long> totalCount = singlePartition.reduce((l, r) -> l +
> r, (l, r) -> l - r);   /* read from the single partition topic, do reduce
> on the data*/
> 
> ----------
> 
> Note that if we wrap them all into a new operator, users would need to
> provide two functions, for the aggregate and for the final "reduce" (in my
> previous email I called it merger function, but for the same intent).
> 
> 
> 
> Guozhang
> 
> 
> 
> On Thu, Jul 5, 2018 at 3:38 PM, John Roesler <jo...@confluent.io> wrote:
> 
>> Ok, I didn't get quite as far as I hoped, and several things are far from
>> ready, but here's what I have so far:
>> https://github.com/apache/kafka/pull/5337
>>
>> The "unit" test works, and is a good example of how you should expect it to
>> behave:
>> https://github.com/apache/kafka/pull/5337/files#diff-
>> 2fdec52b9cc3d0e564f0c12a199bed77
>>
>> I have one working integration test, but it's slow going getting the timing
>> right, so no promises of any kind ;)
>>
>> Let me know what you think!
>>
>> Thanks,
>> -John
>>
>> On Thu, Jul 5, 2018 at 8:39 AM John Roesler <jo...@confluent.io> wrote:
>>
>>> Hey Flávio,
>>>
>>> Thanks! I haven't got anything usable yet, but I'm working on it now. I'm
>>> hoping to push up my branch by the end of the day.
>>>
>>> I don't know if you've seen it but Streams actually already has something
>>> like this, in the form of caching on materialized stores. If you pass in
>> a
>>> "Materialized.withCachingEnabled()", you should be able to get a POC
>>> working by setting the max cache size pretty high and setting the commit
>>> interval for your desired rate:
>>> https://docs.confluent.io/current/streams/developer-
>> guide/memory-mgmt.html#streams-developer-guide-memory-management
>>> .
>>>
>>> There are a couple of cases in joins and whatnot where it doesn't work,
>>> but for the aggregations we discussed, it should. The reason for KIP-328
>> is
>>> to provide finer control and hopefully a more straightforward API.
>>>
>>> Let me know if that works, and I'll drop a message in here when I create
>>> the draft PR for KIP-328. I'd really appreciate your feedback.
>>>
>>> Thanks,
>>> -John
>>>
>>> On Wed, Jul 4, 2018 at 10:17 PM flaviostutz@gmail.com <
>>> flaviostutz@gmail.com> wrote:
>>>
>>>> John, that was fantastic, man!
>>>> Have you built any custom implementation of your KIP in your machine so
>>>> that I could test it out here? I wish I could test it out.
>>>> If you need any help implementing this feature, please tell me.
>>>>
>>>> Thanks.
>>>>
>>>> -Flávio Stutz
>>>>
>>>>
>>>>
>>>>
>>>> On 2018/07/03 18:04:52, John Roesler <jo...@confluent.io> wrote:
>>>>> Hi Flávio,
>>>>> Thanks! I think that we can actually do this, but the API could be
>>>> better.
>>>>> I've included Java code below, but I'll copy and modify your example
>> so
>>>>> we're on the same page.
>>>>>
>>>>> EXERCISE 1:
>>>>>   - The case is "total counting of events for a huge website"
>>>>>   - Tasks from Application A will have something like:
>>>>>          .stream(/site-events)
>>>>>          .transform( re-key s.t. the new key is the partition id)
>>>>>          .groupByKey() // you have to do this before count
>>>>>          .count()
>>>>>           // you explicitly published to a one-partition topic here,
>> but
>>>>> it's actually sufficient just
>>>>>           // to re-group onto one key. You could name and pre-create
>> the
>>>>> intermediate topic here,
>>>>>           // but you don't need a separate application for the final
>>>>> aggregation.
>>>>>          .groupBy((partitionId, partialCount) -> new KeyValue("ALL",
>>>>> partialCount))
>>>>>          .aggregate(sum up the partialCounts)
>>>>>          .publish(/counter-total)
>>>>>
>>>>> I've left out the suppressions, but they would go right after the
>>>> count()
>>>>> and the aggregate().
>>>>>
>>>>> With this program, you don't have to worry about the
>> double-aggregation
>>>> you
>>>>> mentioned in the last email. The KTable produced by the first count()
>>>> will
>>>>> maintain the correct count per partition. If the value changes for any
>>>>> partition, it'll emit a retraction of the old value and then the new
>>>> value
>>>>> downstream, so that the final aggregation can update itself properly.
>>>>>
>>>>> I think we can optimize both the execution and the programability by
>>>> adding
>>>>> a "global aggregation" concept. But In principle, it seems like this
>>>> usage
>>>>> of the current API will support your use case.
>>>>>
>>>>> Once again, though, this is just to present an alternative. I haven't
>>>> done
>>>>> the math on whether your proposal would be more efficient.
>>>>>
>>>>> Thanks,
>>>>> -John
>>>>>
>>>>> Here's the same algorithm written in Java:
>>>>>
>>>>> final KStream<String, String> siteEvents =
>>>> builder.stream("/site-events");
>>>>>
>>>>> // here we re-key the events so that the key is actually the partition
>>>> id.
>>>>> // we don't need the value to do a count, so I just set it to "1".
>>>>> final KStream<Integer, Integer> keyedByPartition =
>>>> siteEvents.transform(()
>>>>> -> new Transformer<String, String, KeyValue<Integer, Integer>>() {
>>>>>     private ProcessorContext context;
>>>>>
>>>>>     @Override
>>>>>     public void init(final ProcessorContext context) {
>>>>>         this.context = context;
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public KeyValue<Integer, Integer> transform(final String key,
>> final
>>>>> String value) {
>>>>>         return new KeyValue<>(context.partition(), 1);
>>>>>     }
>>>>> });
>>>>>
>>>>> // Note that we can't do "count()" on a KStream, we have to group it
>>>> first.
>>>>> I'm grouping by the key, so it will produce the count for each key.
>>>>> // Since the key is actually the partition id, it will produce the
>>>>> pre-aggregated count per partition.
>>>>> // Note that the result is a KTable<PartitionId,Count>. It'll always
>>>>> contain the most recent count for each partition.
>>>>> final KTable<Integer, Long> countsByPartition =
>>>>> keyedByPartition.groupByKey().count();
>>>>>
>>>>> // Now we get ready for the final roll-up. We re-group all the
>>>> constituent
>>>>> counts
>>>>> final KGroupedTable<String, Long> singlePartition =
>>>>> countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL",
>> value));
>>>>>
>>>>> final KTable<String, Long> totalCount = singlePartition.reduce((l, r)
>>>> -> l
>>>>> + r, (l, r) -> l - r);
>>>>>
>>>>> totalCount.toStream().foreach((k, v) -> {
>>>>>     // k is always "ALL"
>>>>>     // v is always the most recent total value
>>>>>     System.out.println("The total event count is: " + v);
>>>>> });
>>>>>
>>>>>
>>>>> On Tue, Jul 3, 2018 at 9:21 AM flaviostutz@gmail.com <
>>>> flaviostutz@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Great feature you have there!
>>>>>>
>>>>>> I'll try to exercise here how we would achieve the same functional
>>>>>> objectives using your KIP:
>>>>>>
>>>>>> EXERCISE 1:
>>>>>>   - The case is "total counting of events for a huge website"
>>>>>>   - Tasks from Application A will have something like:
>>>>>>          .stream(/site-events)
>>>>>>          .count()
>>>>>>          .publish(/single-partitioned-topic-with-count-partials)
>>>>>>   - The published messages will be, for example:
>>>>>>           ["counter-task1", 2345]
>>>>>>           ["counter-task2", 8495]
>>>>>>           ["counter-task3", 4839]
>>>>>>   - Single Task from Application B will have something like:
>>>>>>          .stream(/single-partitioned-topic-with-count-partials)
>>>>>>          .aggregate(by messages whose key starts with "counter")
>>>>>>          .publish(/counter-total)
>>>>>>   - FAIL HERE. How would I know what is the overall partitions?
>> Maybe
>>>> two
>>>>>> partials for the same task will arrive before other tasks and it
>> maybe
>>>>>> aggregated twice.
>>>>>>
>>>>>> I tried to think about using GlobalKTables, but I didn't get an easy
>>>> way
>>>>>> to aggregate the keys from that table. Do you have any clue?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> -Flávio Stutz
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> /partial-counters-to-single-partitioned-topic
>>>>>>
>>>>>> On 2018/07/02 20:03:57, John Roesler <jo...@confluent.io> wrote:
>>>>>>> Hi Flávio,
>>>>>>>
>>>>>>> Thanks for the KIP. I'll apologize that I'm arriving late to the
>>>>>>> discussion. I've tried to catch up, but I might have missed some
>>>> nuances.
>>>>>>>
>>>>>>> Regarding KIP-328, the idea is to add the ability to suppress
>>>>>> intermediate
>>>>>>> results from all KTables, not just windowed ones. I think this
>> could
>>>>>>> support your use case in combination with the strategy that
>> Guozhang
>>>>>>> proposed of having one or more pre-aggregation steps that
>>>> ultimately push
>>>>>>> into a single-partition topic for final aggregation. Suppressing
>>>>>>> intermediate results would solve the problem you noted that today
>>>>>>> pre-aggregating doesn't do much to staunch the flow up updates.
>>>>>>>
>>>>>>> I'm not sure if this would be good enough for you overall; I just
>>>> wanted
>>>>>> to
>>>>>>> clarify the role of KIP-328.
>>>>>>> In particular, the solution you mentioned is to have the
>> downstream
>>>>>> KTables
>>>>>>> actually query the upstream ones to compute their results. I'm not
>>>> sure
>>>>>>> whether it's more efficient to do these queries on the schedule,
>> or
>>>> to
>>>>>> have
>>>>>>> the upstream tables emit their results, on the same schedule.
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> -John
>>>>>>>
>>>>>>> On Sun, Jul 1, 2018 at 10:03 PM flaviostutz@gmail.com <
>>>>>> flaviostutz@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> For what I understood, that KIP is related to how KStreams will
>>>> handle
>>>>>>>> KTable updates in Windowed scenarios to optimize resource usage.
>>>>>>>> I couldn't see any specific relation to this KIP. Had you?
>>>>>>>>
>>>>>>>> -Flávio Stutz
>>>>>>>>
>>>>>>>>
>>>>>>>> On 2018/06/29 18:14:46, "Matthias J. Sax" <
>> matthias@confluent.io>
>>>>>> wrote:
>>>>>>>>> Flavio,
>>>>>>>>>
>>>>>>>>> thanks for cleaning up the KIP number collision.
>>>>>>>>>
>>>>>>>>> With regard to KIP-328
>>>>>>>>> (
>>>>>>>>
>>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 328%3A+Ability+to+suppress+updates+for+KTables
>>>>>>>> )
>>>>>>>>> I am wondering how both relate to each other?
>>>>>>>>>
>>>>>>>>> Any thoughts?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>> On 6/29/18 10:23 AM, flaviostutz@gmail.com wrote:
>>>>>>>>>> Just copying a follow up from another thread to here (sorry
>>>> about
>>>>>> the
>>>>>>>> mess):
>>>>>>>>>>
>>>>>>>>>> From: Guozhang Wang <wa...@gmail.com>
>>>>>>>>>> Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph
>>>> source
>>>>>>>>>> Date: 2018/06/25 22:24:17
>>>>>>>>>> List: dev@kafka.apache.org
>>>>>>>>>>
>>>>>>>>>> Flávio, thanks for creating this KIP.
>>>>>>>>>>
>>>>>>>>>> I think this "single-aggregation" use case is common enough
>>>> that we
>>>>>>>> should
>>>>>>>>>> consider how to efficiently supports it: for example, for
>> KSQL
>>>>>> that's
>>>>>>>> built
>>>>>>>>>> on top of Streams, we've seen lots of query statements whose
>>>>>> return is
>>>>>>>>>> expected a single row indicating the "total aggregate" etc.
>>>> See
>>>>>>>>>> https://github.com/confluentinc/ksql/issues/430 for
>> details.
>>>>>>>>>>
>>>>>>>>>> I've not read through
>>>>>> https://issues.apache.org/jira/browse/KAFKA-6953,
>>>>>>>> but
>>>>>>>>>> I'm wondering if we have discussed the option of supporting
>>>> it in a
>>>>>>>>>> "pre-aggregate" manner: that is we do partial aggregates on
>>>>>> parallel
>>>>>>>> tasks,
>>>>>>>>>> and then sends the partial aggregated value via a single
>> topic
>>>>>>>> partition
>>>>>>>>>> for the final aggregate, to reduce the traffic on that
>> single
>>>>>>>> partition and
>>>>>>>>>> hence the final aggregate workload.
>>>>>>>>>> Of course, for non-commutative aggregates we'd probably need
>>>> to
>>>>>> provide
>>>>>>>>>> another API in addition to aggregate, like the `merge`
>>>> function for
>>>>>>>>>> session-based aggregates, to let users customize the
>>>> operations of
>>>>>>>> merging
>>>>>>>>>> two partial aggregates into a single partial aggregate.
>>>> What's its
>>>>>>>> pros and
>>>>>>>>>> cons compared with the current proposal?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Guozhang
>>>>>>>>>> On 2018/06/26 18:22:27, Flávio Stutz <flaviostutz@gmail.com
>>>
>>>>>> wrote:
>>>>>>>>>>> Hey, guys, I've just created a new KIP about creating a new
>>>> DSL
>>>>>> graph
>>>>>>>>>>> source for realtime partitioned consolidations.
>>>>>>>>>>>
>>>>>>>>>>> We have faced the following scenario/problem in a lot of
>>>>>> situations
>>>>>>>> with
>>>>>>>>>>> KStreams:
>>>>>>>>>>>    - Huge incoming data being processed by numerous
>>>> application
>>>>>>>> instances
>>>>>>>>>>>    - Need to aggregate different fields whose records span
>>>> all
>>>>>> topic
>>>>>>>>>>> partitions (something like “total amount spent by people
>>>> aged > 30
>>>>>>>> yrs”
>>>>>>>>>>> when processing a topic partitioned by userid).
>>>>>>>>>>>
>>>>>>>>>>> The challenge here is to manage this kind of situation
>>>> without any
>>>>>>>>>>> bottlenecks. We don't need the “global aggregation” to be
>>>>>> processed
>>>>>>>> at each
>>>>>>>>>>> incoming message. On a scenario of 500 instances, each
>>>> handling 1k
>>>>>>>>>>> messages/s, any single point of aggregation (single
>>>> partitioned
>>>>>>>> topics,
>>>>>>>>>>> global tables or external databases) would create a
>>>> bottleneck of
>>>>>> 500k
>>>>>>>>>>> messages/s for single threaded/CPU elements.
>>>>>>>>>>>
>>>>>>>>>>> For this scenario, it is possible to store the partial
>>>>>> aggregations on
>>>>>>>>>>> local stores and, from time to time, query those states and
>>>>>> aggregate
>>>>>>>> them
>>>>>>>>>>> as a single value, avoiding bottlenecks. This is a way to
>>>> create a
>>>>>>>> "timed
>>>>>>>>>>> aggregation barrier”.
>>>>>>>>>>>
>>>>>>>>>>> If we leverage this kind of built-in feature we could
>> greatly
>>>>>> enhance
>>>>>>>> the
>>>>>>>>>>> ability of KStreams to better handle the CAP Theorem
>>>>>> characteristics,
>>>>>>>> so
>>>>>>>>>>> that one could choose to have Consistency over Availability
>>>> when
>>>>>>>> needed.
>>>>>>>>>>>
>>>>>>>>>>> We started this discussion with Matthias J. Sax here:
>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6953
>>>>>>>>>>>
>>>>>>>>>>> If you want to see more, go to KIP-326 at:
>>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 326%3A+Schedulable+KTable+as+Graph+source
>>>>>>>>>>>
>>>>>>>>>>> -Flávio Stutz
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> 
> 
> 


Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source

Posted by Guozhang Wang <wa...@gmail.com>.
That's a lot of email exchanges for me to catch up :)

My original proposed alternative solution is indeed relying on
pre-aggregate before sending to the single-partition topic, so that the
traffic on that single-partition topic would not be huge (I called it
partial-aggregate but the intent was the same).

What I was thinking is that, given such a scenario could be common, if
we've decided to go down this route should we provide a new API that wrap's
John's proposed topology (right now with KIP-328 users still need to
leverage this trick manually):


----------

final KStream<String, String> siteEvents = builder.stream("/site-events");

final KStream<Integer, Integer> keyedByPartition = siteEvents.transform(/*
generate KeyValue(key, 1) for the pre-aggregate*/);

final KTable<Integer, Long> countsByPartition =
keyedByPartition.groupByKey().count();   /* pre-aggregate */

final KGroupedTable<String, Long> singlePartition =
countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL", value));
 /* sent the suppressed pre-aggregate values to the single partition topic
*/

final KTable<String, Long> totalCount = singlePartition.reduce((l, r) -> l +
r, (l, r) -> l - r);   /* read from the single partition topic, do reduce
on the data*/

----------

Note that if we wrap them all into a new operator, users would need to
provide two functions, for the aggregate and for the final "reduce" (in my
previous email I called it merger function, but for the same intent).



Guozhang



On Thu, Jul 5, 2018 at 3:38 PM, John Roesler <jo...@confluent.io> wrote:

> Ok, I didn't get quite as far as I hoped, and several things are far from
> ready, but here's what I have so far:
> https://github.com/apache/kafka/pull/5337
>
> The "unit" test works, and is a good example of how you should expect it to
> behave:
> https://github.com/apache/kafka/pull/5337/files#diff-
> 2fdec52b9cc3d0e564f0c12a199bed77
>
> I have one working integration test, but it's slow going getting the timing
> right, so no promises of any kind ;)
>
> Let me know what you think!
>
> Thanks,
> -John
>
> On Thu, Jul 5, 2018 at 8:39 AM John Roesler <jo...@confluent.io> wrote:
>
> > Hey Flávio,
> >
> > Thanks! I haven't got anything usable yet, but I'm working on it now. I'm
> > hoping to push up my branch by the end of the day.
> >
> > I don't know if you've seen it but Streams actually already has something
> > like this, in the form of caching on materialized stores. If you pass in
> a
> > "Materialized.withCachingEnabled()", you should be able to get a POC
> > working by setting the max cache size pretty high and setting the commit
> > interval for your desired rate:
> > https://docs.confluent.io/current/streams/developer-
> guide/memory-mgmt.html#streams-developer-guide-memory-management
> > .
> >
> > There are a couple of cases in joins and whatnot where it doesn't work,
> > but for the aggregations we discussed, it should. The reason for KIP-328
> is
> > to provide finer control and hopefully a more straightforward API.
> >
> > Let me know if that works, and I'll drop a message in here when I create
> > the draft PR for KIP-328. I'd really appreciate your feedback.
> >
> > Thanks,
> > -John
> >
> > On Wed, Jul 4, 2018 at 10:17 PM flaviostutz@gmail.com <
> > flaviostutz@gmail.com> wrote:
> >
> >> John, that was fantastic, man!
> >> Have you built any custom implementation of your KIP in your machine so
> >> that I could test it out here? I wish I could test it out.
> >> If you need any help implementing this feature, please tell me.
> >>
> >> Thanks.
> >>
> >> -Flávio Stutz
> >>
> >>
> >>
> >>
> >> On 2018/07/03 18:04:52, John Roesler <jo...@confluent.io> wrote:
> >> > Hi Flávio,
> >> > Thanks! I think that we can actually do this, but the API could be
> >> better.
> >> > I've included Java code below, but I'll copy and modify your example
> so
> >> > we're on the same page.
> >> >
> >> > EXERCISE 1:
> >> >   - The case is "total counting of events for a huge website"
> >> >   - Tasks from Application A will have something like:
> >> >          .stream(/site-events)
> >> >          .transform( re-key s.t. the new key is the partition id)
> >> >          .groupByKey() // you have to do this before count
> >> >          .count()
> >> >           // you explicitly published to a one-partition topic here,
> but
> >> > it's actually sufficient just
> >> >           // to re-group onto one key. You could name and pre-create
> the
> >> > intermediate topic here,
> >> >           // but you don't need a separate application for the final
> >> > aggregation.
> >> >          .groupBy((partitionId, partialCount) -> new KeyValue("ALL",
> >> > partialCount))
> >> >          .aggregate(sum up the partialCounts)
> >> >          .publish(/counter-total)
> >> >
> >> > I've left out the suppressions, but they would go right after the
> >> count()
> >> > and the aggregate().
> >> >
> >> > With this program, you don't have to worry about the
> double-aggregation
> >> you
> >> > mentioned in the last email. The KTable produced by the first count()
> >> will
> >> > maintain the correct count per partition. If the value changes for any
> >> > partition, it'll emit a retraction of the old value and then the new
> >> value
> >> > downstream, so that the final aggregation can update itself properly.
> >> >
> >> > I think we can optimize both the execution and the programability by
> >> adding
> >> > a "global aggregation" concept. But In principle, it seems like this
> >> usage
> >> > of the current API will support your use case.
> >> >
> >> > Once again, though, this is just to present an alternative. I haven't
> >> done
> >> > the math on whether your proposal would be more efficient.
> >> >
> >> > Thanks,
> >> > -John
> >> >
> >> > Here's the same algorithm written in Java:
> >> >
> >> > final KStream<String, String> siteEvents =
> >> builder.stream("/site-events");
> >> >
> >> > // here we re-key the events so that the key is actually the partition
> >> id.
> >> > // we don't need the value to do a count, so I just set it to "1".
> >> > final KStream<Integer, Integer> keyedByPartition =
> >> siteEvents.transform(()
> >> > -> new Transformer<String, String, KeyValue<Integer, Integer>>() {
> >> >     private ProcessorContext context;
> >> >
> >> >     @Override
> >> >     public void init(final ProcessorContext context) {
> >> >         this.context = context;
> >> >     }
> >> >
> >> >     @Override
> >> >     public KeyValue<Integer, Integer> transform(final String key,
> final
> >> > String value) {
> >> >         return new KeyValue<>(context.partition(), 1);
> >> >     }
> >> > });
> >> >
> >> > // Note that we can't do "count()" on a KStream, we have to group it
> >> first.
> >> > I'm grouping by the key, so it will produce the count for each key.
> >> > // Since the key is actually the partition id, it will produce the
> >> > pre-aggregated count per partition.
> >> > // Note that the result is a KTable<PartitionId,Count>. It'll always
> >> > contain the most recent count for each partition.
> >> > final KTable<Integer, Long> countsByPartition =
> >> > keyedByPartition.groupByKey().count();
> >> >
> >> > // Now we get ready for the final roll-up. We re-group all the
> >> constituent
> >> > counts
> >> > final KGroupedTable<String, Long> singlePartition =
> >> > countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL",
> value));
> >> >
> >> > final KTable<String, Long> totalCount = singlePartition.reduce((l, r)
> >> -> l
> >> > + r, (l, r) -> l - r);
> >> >
> >> > totalCount.toStream().foreach((k, v) -> {
> >> >     // k is always "ALL"
> >> >     // v is always the most recent total value
> >> >     System.out.println("The total event count is: " + v);
> >> > });
> >> >
> >> >
> >> > On Tue, Jul 3, 2018 at 9:21 AM flaviostutz@gmail.com <
> >> flaviostutz@gmail.com>
> >> > wrote:
> >> >
> >> > > Great feature you have there!
> >> > >
> >> > > I'll try to exercise here how we would achieve the same functional
> >> > > objectives using your KIP:
> >> > >
> >> > > EXERCISE 1:
> >> > >   - The case is "total counting of events for a huge website"
> >> > >   - Tasks from Application A will have something like:
> >> > >          .stream(/site-events)
> >> > >          .count()
> >> > >          .publish(/single-partitioned-topic-with-count-partials)
> >> > >   - The published messages will be, for example:
> >> > >           ["counter-task1", 2345]
> >> > >           ["counter-task2", 8495]
> >> > >           ["counter-task3", 4839]
> >> > >   - Single Task from Application B will have something like:
> >> > >          .stream(/single-partitioned-topic-with-count-partials)
> >> > >          .aggregate(by messages whose key starts with "counter")
> >> > >          .publish(/counter-total)
> >> > >   - FAIL HERE. How would I know what is the overall partitions?
> Maybe
> >> two
> >> > > partials for the same task will arrive before other tasks and it
> maybe
> >> > > aggregated twice.
> >> > >
> >> > > I tried to think about using GlobalKTables, but I didn't get an easy
> >> way
> >> > > to aggregate the keys from that table. Do you have any clue?
> >> > >
> >> > > Thanks.
> >> > >
> >> > > -Flávio Stutz
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > /partial-counters-to-single-partitioned-topic
> >> > >
> >> > > On 2018/07/02 20:03:57, John Roesler <jo...@confluent.io> wrote:
> >> > > > Hi Flávio,
> >> > > >
> >> > > > Thanks for the KIP. I'll apologize that I'm arriving late to the
> >> > > > discussion. I've tried to catch up, but I might have missed some
> >> nuances.
> >> > > >
> >> > > > Regarding KIP-328, the idea is to add the ability to suppress
> >> > > intermediate
> >> > > > results from all KTables, not just windowed ones. I think this
> could
> >> > > > support your use case in combination with the strategy that
> Guozhang
> >> > > > proposed of having one or more pre-aggregation steps that
> >> ultimately push
> >> > > > into a single-partition topic for final aggregation. Suppressing
> >> > > > intermediate results would solve the problem you noted that today
> >> > > > pre-aggregating doesn't do much to staunch the flow up updates.
> >> > > >
> >> > > > I'm not sure if this would be good enough for you overall; I just
> >> wanted
> >> > > to
> >> > > > clarify the role of KIP-328.
> >> > > > In particular, the solution you mentioned is to have the
> downstream
> >> > > KTables
> >> > > > actually query the upstream ones to compute their results. I'm not
> >> sure
> >> > > > whether it's more efficient to do these queries on the schedule,
> or
> >> to
> >> > > have
> >> > > > the upstream tables emit their results, on the same schedule.
> >> > > >
> >> > > > What do you think?
> >> > > >
> >> > > > Thanks,
> >> > > > -John
> >> > > >
> >> > > > On Sun, Jul 1, 2018 at 10:03 PM flaviostutz@gmail.com <
> >> > > flaviostutz@gmail.com>
> >> > > > wrote:
> >> > > >
> >> > > > > For what I understood, that KIP is related to how KStreams will
> >> handle
> >> > > > > KTable updates in Windowed scenarios to optimize resource usage.
> >> > > > > I couldn't see any specific relation to this KIP. Had you?
> >> > > > >
> >> > > > > -Flávio Stutz
> >> > > > >
> >> > > > >
> >> > > > > On 2018/06/29 18:14:46, "Matthias J. Sax" <
> matthias@confluent.io>
> >> > > wrote:
> >> > > > > > Flavio,
> >> > > > > >
> >> > > > > > thanks for cleaning up the KIP number collision.
> >> > > > > >
> >> > > > > > With regard to KIP-328
> >> > > > > > (
> >> > > > >
> >> > >
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 328%3A+Ability+to+suppress+updates+for+KTables
> >> > > > > )
> >> > > > > > I am wondering how both relate to each other?
> >> > > > > >
> >> > > > > > Any thoughts?
> >> > > > > >
> >> > > > > >
> >> > > > > > -Matthias
> >> > > > > >
> >> > > > > > On 6/29/18 10:23 AM, flaviostutz@gmail.com wrote:
> >> > > > > > > Just copying a follow up from another thread to here (sorry
> >> about
> >> > > the
> >> > > > > mess):
> >> > > > > > >
> >> > > > > > > From: Guozhang Wang <wa...@gmail.com>
> >> > > > > > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph
> >> source
> >> > > > > > > Date: 2018/06/25 22:24:17
> >> > > > > > > List: dev@kafka.apache.org
> >> > > > > > >
> >> > > > > > > Flávio, thanks for creating this KIP.
> >> > > > > > >
> >> > > > > > > I think this "single-aggregation" use case is common enough
> >> that we
> >> > > > > should
> >> > > > > > > consider how to efficiently supports it: for example, for
> KSQL
> >> > > that's
> >> > > > > built
> >> > > > > > > on top of Streams, we've seen lots of query statements whose
> >> > > return is
> >> > > > > > > expected a single row indicating the "total aggregate" etc.
> >> See
> >> > > > > > > https://github.com/confluentinc/ksql/issues/430 for
> details.
> >> > > > > > >
> >> > > > > > > I've not read through
> >> > > https://issues.apache.org/jira/browse/KAFKA-6953,
> >> > > > > but
> >> > > > > > > I'm wondering if we have discussed the option of supporting
> >> it in a
> >> > > > > > > "pre-aggregate" manner: that is we do partial aggregates on
> >> > > parallel
> >> > > > > tasks,
> >> > > > > > > and then sends the partial aggregated value via a single
> topic
> >> > > > > partition
> >> > > > > > > for the final aggregate, to reduce the traffic on that
> single
> >> > > > > partition and
> >> > > > > > > hence the final aggregate workload.
> >> > > > > > > Of course, for non-commutative aggregates we'd probably need
> >> to
> >> > > provide
> >> > > > > > > another API in addition to aggregate, like the `merge`
> >> function for
> >> > > > > > > session-based aggregates, to let users customize the
> >> operations of
> >> > > > > merging
> >> > > > > > > two partial aggregates into a single partial aggregate.
> >> What's its
> >> > > > > pros and
> >> > > > > > > cons compared with the current proposal?
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > Guozhang
> >> > > > > > > On 2018/06/26 18:22:27, Flávio Stutz <flaviostutz@gmail.com
> >
> >> > > wrote:
> >> > > > > > >> Hey, guys, I've just created a new KIP about creating a new
> >> DSL
> >> > > graph
> >> > > > > > >> source for realtime partitioned consolidations.
> >> > > > > > >>
> >> > > > > > >> We have faced the following scenario/problem in a lot of
> >> > > situations
> >> > > > > with
> >> > > > > > >> KStreams:
> >> > > > > > >>    - Huge incoming data being processed by numerous
> >> application
> >> > > > > instances
> >> > > > > > >>    - Need to aggregate different fields whose records span
> >> all
> >> > > topic
> >> > > > > > >> partitions (something like “total amount spent by people
> >> aged > 30
> >> > > > > yrs”
> >> > > > > > >> when processing a topic partitioned by userid).
> >> > > > > > >>
> >> > > > > > >> The challenge here is to manage this kind of situation
> >> without any
> >> > > > > > >> bottlenecks. We don't need the “global aggregation” to be
> >> > > processed
> >> > > > > at each
> >> > > > > > >> incoming message. On a scenario of 500 instances, each
> >> handling 1k
> >> > > > > > >> messages/s, any single point of aggregation (single
> >> partitioned
> >> > > > > topics,
> >> > > > > > >> global tables or external databases) would create a
> >> bottleneck of
> >> > > 500k
> >> > > > > > >> messages/s for single threaded/CPU elements.
> >> > > > > > >>
> >> > > > > > >> For this scenario, it is possible to store the partial
> >> > > aggregations on
> >> > > > > > >> local stores and, from time to time, query those states and
> >> > > aggregate
> >> > > > > them
> >> > > > > > >> as a single value, avoiding bottlenecks. This is a way to
> >> create a
> >> > > > > "timed
> >> > > > > > >> aggregation barrier”.
> >> > > > > > >>
> >> > > > > > >> If we leverage this kind of built-in feature we could
> greatly
> >> > > enhance
> >> > > > > the
> >> > > > > > >> ability of KStreams to better handle the CAP Theorem
> >> > > characteristics,
> >> > > > > so
> >> > > > > > >> that one could choose to have Consistency over Availability
> >> when
> >> > > > > needed.
> >> > > > > > >>
> >> > > > > > >> We started this discussion with Matthias J. Sax here:
> >> > > > > > >> https://issues.apache.org/jira/browse/KAFKA-6953
> >> > > > > > >>
> >> > > > > > >> If you want to see more, go to KIP-326 at:
> >> > > > > > >>
> >> > > > >
> >> > >
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 326%3A+Schedulable+KTable+as+Graph+source
> >> > > > > > >>
> >> > > > > > >> -Flávio Stutz
> >> > > > > > >>
> >> > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>



-- 
-- Guozhang

Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source

Posted by John Roesler <jo...@confluent.io>.
Ok, I didn't get quite as far as I hoped, and several things are far from
ready, but here's what I have so far:
https://github.com/apache/kafka/pull/5337

The "unit" test works, and is a good example of how you should expect it to
behave:
https://github.com/apache/kafka/pull/5337/files#diff-2fdec52b9cc3d0e564f0c12a199bed77

I have one working integration test, but it's slow going getting the timing
right, so no promises of any kind ;)

Let me know what you think!

Thanks,
-John

On Thu, Jul 5, 2018 at 8:39 AM John Roesler <jo...@confluent.io> wrote:

> Hey Flávio,
>
> Thanks! I haven't got anything usable yet, but I'm working on it now. I'm
> hoping to push up my branch by the end of the day.
>
> I don't know if you've seen it but Streams actually already has something
> like this, in the form of caching on materialized stores. If you pass in a
> "Materialized.withCachingEnabled()", you should be able to get a POC
> working by setting the max cache size pretty high and setting the commit
> interval for your desired rate:
> https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html#streams-developer-guide-memory-management
> .
>
> There are a couple of cases in joins and whatnot where it doesn't work,
> but for the aggregations we discussed, it should. The reason for KIP-328 is
> to provide finer control and hopefully a more straightforward API.
>
> Let me know if that works, and I'll drop a message in here when I create
> the draft PR for KIP-328. I'd really appreciate your feedback.
>
> Thanks,
> -John
>
> On Wed, Jul 4, 2018 at 10:17 PM flaviostutz@gmail.com <
> flaviostutz@gmail.com> wrote:
>
>> John, that was fantastic, man!
>> Have you built any custom implementation of your KIP in your machine so
>> that I could test it out here? I wish I could test it out.
>> If you need any help implementing this feature, please tell me.
>>
>> Thanks.
>>
>> -Flávio Stutz
>>
>>
>>
>>
>> On 2018/07/03 18:04:52, John Roesler <jo...@confluent.io> wrote:
>> > Hi Flávio,
>> > Thanks! I think that we can actually do this, but the API could be
>> better.
>> > I've included Java code below, but I'll copy and modify your example so
>> > we're on the same page.
>> >
>> > EXERCISE 1:
>> >   - The case is "total counting of events for a huge website"
>> >   - Tasks from Application A will have something like:
>> >          .stream(/site-events)
>> >          .transform( re-key s.t. the new key is the partition id)
>> >          .groupByKey() // you have to do this before count
>> >          .count()
>> >           // you explicitly published to a one-partition topic here, but
>> > it's actually sufficient just
>> >           // to re-group onto one key. You could name and pre-create the
>> > intermediate topic here,
>> >           // but you don't need a separate application for the final
>> > aggregation.
>> >          .groupBy((partitionId, partialCount) -> new KeyValue("ALL",
>> > partialCount))
>> >          .aggregate(sum up the partialCounts)
>> >          .publish(/counter-total)
>> >
>> > I've left out the suppressions, but they would go right after the
>> count()
>> > and the aggregate().
>> >
>> > With this program, you don't have to worry about the double-aggregation
>> you
>> > mentioned in the last email. The KTable produced by the first count()
>> will
>> > maintain the correct count per partition. If the value changes for any
>> > partition, it'll emit a retraction of the old value and then the new
>> value
>> > downstream, so that the final aggregation can update itself properly.
>> >
>> > I think we can optimize both the execution and the programability by
>> adding
>> > a "global aggregation" concept. But In principle, it seems like this
>> usage
>> > of the current API will support your use case.
>> >
>> > Once again, though, this is just to present an alternative. I haven't
>> done
>> > the math on whether your proposal would be more efficient.
>> >
>> > Thanks,
>> > -John
>> >
>> > Here's the same algorithm written in Java:
>> >
>> > final KStream<String, String> siteEvents =
>> builder.stream("/site-events");
>> >
>> > // here we re-key the events so that the key is actually the partition
>> id.
>> > // we don't need the value to do a count, so I just set it to "1".
>> > final KStream<Integer, Integer> keyedByPartition =
>> siteEvents.transform(()
>> > -> new Transformer<String, String, KeyValue<Integer, Integer>>() {
>> >     private ProcessorContext context;
>> >
>> >     @Override
>> >     public void init(final ProcessorContext context) {
>> >         this.context = context;
>> >     }
>> >
>> >     @Override
>> >     public KeyValue<Integer, Integer> transform(final String key, final
>> > String value) {
>> >         return new KeyValue<>(context.partition(), 1);
>> >     }
>> > });
>> >
>> > // Note that we can't do "count()" on a KStream, we have to group it
>> first.
>> > I'm grouping by the key, so it will produce the count for each key.
>> > // Since the key is actually the partition id, it will produce the
>> > pre-aggregated count per partition.
>> > // Note that the result is a KTable<PartitionId,Count>. It'll always
>> > contain the most recent count for each partition.
>> > final KTable<Integer, Long> countsByPartition =
>> > keyedByPartition.groupByKey().count();
>> >
>> > // Now we get ready for the final roll-up. We re-group all the
>> constituent
>> > counts
>> > final KGroupedTable<String, Long> singlePartition =
>> > countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL", value));
>> >
>> > final KTable<String, Long> totalCount = singlePartition.reduce((l, r)
>> -> l
>> > + r, (l, r) -> l - r);
>> >
>> > totalCount.toStream().foreach((k, v) -> {
>> >     // k is always "ALL"
>> >     // v is always the most recent total value
>> >     System.out.println("The total event count is: " + v);
>> > });
>> >
>> >
>> > On Tue, Jul 3, 2018 at 9:21 AM flaviostutz@gmail.com <
>> flaviostutz@gmail.com>
>> > wrote:
>> >
>> > > Great feature you have there!
>> > >
>> > > I'll try to exercise here how we would achieve the same functional
>> > > objectives using your KIP:
>> > >
>> > > EXERCISE 1:
>> > >   - The case is "total counting of events for a huge website"
>> > >   - Tasks from Application A will have something like:
>> > >          .stream(/site-events)
>> > >          .count()
>> > >          .publish(/single-partitioned-topic-with-count-partials)
>> > >   - The published messages will be, for example:
>> > >           ["counter-task1", 2345]
>> > >           ["counter-task2", 8495]
>> > >           ["counter-task3", 4839]
>> > >   - Single Task from Application B will have something like:
>> > >          .stream(/single-partitioned-topic-with-count-partials)
>> > >          .aggregate(by messages whose key starts with "counter")
>> > >          .publish(/counter-total)
>> > >   - FAIL HERE. How would I know what is the overall partitions? Maybe
>> two
>> > > partials for the same task will arrive before other tasks and it maybe
>> > > aggregated twice.
>> > >
>> > > I tried to think about using GlobalKTables, but I didn't get an easy
>> way
>> > > to aggregate the keys from that table. Do you have any clue?
>> > >
>> > > Thanks.
>> > >
>> > > -Flávio Stutz
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > > /partial-counters-to-single-partitioned-topic
>> > >
>> > > On 2018/07/02 20:03:57, John Roesler <jo...@confluent.io> wrote:
>> > > > Hi Flávio,
>> > > >
>> > > > Thanks for the KIP. I'll apologize that I'm arriving late to the
>> > > > discussion. I've tried to catch up, but I might have missed some
>> nuances.
>> > > >
>> > > > Regarding KIP-328, the idea is to add the ability to suppress
>> > > intermediate
>> > > > results from all KTables, not just windowed ones. I think this could
>> > > > support your use case in combination with the strategy that Guozhang
>> > > > proposed of having one or more pre-aggregation steps that
>> ultimately push
>> > > > into a single-partition topic for final aggregation. Suppressing
>> > > > intermediate results would solve the problem you noted that today
>> > > > pre-aggregating doesn't do much to staunch the flow up updates.
>> > > >
>> > > > I'm not sure if this would be good enough for you overall; I just
>> wanted
>> > > to
>> > > > clarify the role of KIP-328.
>> > > > In particular, the solution you mentioned is to have the downstream
>> > > KTables
>> > > > actually query the upstream ones to compute their results. I'm not
>> sure
>> > > > whether it's more efficient to do these queries on the schedule, or
>> to
>> > > have
>> > > > the upstream tables emit their results, on the same schedule.
>> > > >
>> > > > What do you think?
>> > > >
>> > > > Thanks,
>> > > > -John
>> > > >
>> > > > On Sun, Jul 1, 2018 at 10:03 PM flaviostutz@gmail.com <
>> > > flaviostutz@gmail.com>
>> > > > wrote:
>> > > >
>> > > > > For what I understood, that KIP is related to how KStreams will
>> handle
>> > > > > KTable updates in Windowed scenarios to optimize resource usage.
>> > > > > I couldn't see any specific relation to this KIP. Had you?
>> > > > >
>> > > > > -Flávio Stutz
>> > > > >
>> > > > >
>> > > > > On 2018/06/29 18:14:46, "Matthias J. Sax" <ma...@confluent.io>
>> > > wrote:
>> > > > > > Flavio,
>> > > > > >
>> > > > > > thanks for cleaning up the KIP number collision.
>> > > > > >
>> > > > > > With regard to KIP-328
>> > > > > > (
>> > > > >
>> > >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
>> > > > > )
>> > > > > > I am wondering how both relate to each other?
>> > > > > >
>> > > > > > Any thoughts?
>> > > > > >
>> > > > > >
>> > > > > > -Matthias
>> > > > > >
>> > > > > > On 6/29/18 10:23 AM, flaviostutz@gmail.com wrote:
>> > > > > > > Just copying a follow up from another thread to here (sorry
>> about
>> > > the
>> > > > > mess):
>> > > > > > >
>> > > > > > > From: Guozhang Wang <wa...@gmail.com>
>> > > > > > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph
>> source
>> > > > > > > Date: 2018/06/25 22:24:17
>> > > > > > > List: dev@kafka.apache.org
>> > > > > > >
>> > > > > > > Flávio, thanks for creating this KIP.
>> > > > > > >
>> > > > > > > I think this "single-aggregation" use case is common enough
>> that we
>> > > > > should
>> > > > > > > consider how to efficiently supports it: for example, for KSQL
>> > > that's
>> > > > > built
>> > > > > > > on top of Streams, we've seen lots of query statements whose
>> > > return is
>> > > > > > > expected a single row indicating the "total aggregate" etc.
>> See
>> > > > > > > https://github.com/confluentinc/ksql/issues/430 for details.
>> > > > > > >
>> > > > > > > I've not read through
>> > > https://issues.apache.org/jira/browse/KAFKA-6953,
>> > > > > but
>> > > > > > > I'm wondering if we have discussed the option of supporting
>> it in a
>> > > > > > > "pre-aggregate" manner: that is we do partial aggregates on
>> > > parallel
>> > > > > tasks,
>> > > > > > > and then sends the partial aggregated value via a single topic
>> > > > > partition
>> > > > > > > for the final aggregate, to reduce the traffic on that single
>> > > > > partition and
>> > > > > > > hence the final aggregate workload.
>> > > > > > > Of course, for non-commutative aggregates we'd probably need
>> to
>> > > provide
>> > > > > > > another API in addition to aggregate, like the `merge`
>> function for
>> > > > > > > session-based aggregates, to let users customize the
>> operations of
>> > > > > merging
>> > > > > > > two partial aggregates into a single partial aggregate.
>> What's its
>> > > > > pros and
>> > > > > > > cons compared with the current proposal?
>> > > > > > >
>> > > > > > >
>> > > > > > > Guozhang
>> > > > > > > On 2018/06/26 18:22:27, Flávio Stutz <fl...@gmail.com>
>> > > wrote:
>> > > > > > >> Hey, guys, I've just created a new KIP about creating a new
>> DSL
>> > > graph
>> > > > > > >> source for realtime partitioned consolidations.
>> > > > > > >>
>> > > > > > >> We have faced the following scenario/problem in a lot of
>> > > situations
>> > > > > with
>> > > > > > >> KStreams:
>> > > > > > >>    - Huge incoming data being processed by numerous
>> application
>> > > > > instances
>> > > > > > >>    - Need to aggregate different fields whose records span
>> all
>> > > topic
>> > > > > > >> partitions (something like “total amount spent by people
>> aged > 30
>> > > > > yrs”
>> > > > > > >> when processing a topic partitioned by userid).
>> > > > > > >>
>> > > > > > >> The challenge here is to manage this kind of situation
>> without any
>> > > > > > >> bottlenecks. We don't need the “global aggregation” to be
>> > > processed
>> > > > > at each
>> > > > > > >> incoming message. On a scenario of 500 instances, each
>> handling 1k
>> > > > > > >> messages/s, any single point of aggregation (single
>> partitioned
>> > > > > topics,
>> > > > > > >> global tables or external databases) would create a
>> bottleneck of
>> > > 500k
>> > > > > > >> messages/s for single threaded/CPU elements.
>> > > > > > >>
>> > > > > > >> For this scenario, it is possible to store the partial
>> > > aggregations on
>> > > > > > >> local stores and, from time to time, query those states and
>> > > aggregate
>> > > > > them
>> > > > > > >> as a single value, avoiding bottlenecks. This is a way to
>> create a
>> > > > > "timed
>> > > > > > >> aggregation barrier”.
>> > > > > > >>
>> > > > > > >> If we leverage this kind of built-in feature we could greatly
>> > > enhance
>> > > > > the
>> > > > > > >> ability of KStreams to better handle the CAP Theorem
>> > > characteristics,
>> > > > > so
>> > > > > > >> that one could choose to have Consistency over Availability
>> when
>> > > > > needed.
>> > > > > > >>
>> > > > > > >> We started this discussion with Matthias J. Sax here:
>> > > > > > >> https://issues.apache.org/jira/browse/KAFKA-6953
>> > > > > > >>
>> > > > > > >> If you want to see more, go to KIP-326 at:
>> > > > > > >>
>> > > > >
>> > >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source
>> > > > > > >>
>> > > > > > >> -Flávio Stutz
>> > > > > > >>
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source

Posted by John Roesler <jo...@confluent.io>.
Hey Flávio,

Thanks! I haven't got anything usable yet, but I'm working on it now. I'm
hoping to push up my branch by the end of the day.

I don't know if you've seen it but Streams actually already has something
like this, in the form of caching on materialized stores. If you pass in a
"Materialized.withCachingEnabled()", you should be able to get a POC
working by setting the max cache size pretty high and setting the commit
interval for your desired rate:
https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html#streams-developer-guide-memory-management
.

There are a couple of cases in joins and whatnot where it doesn't work, but
for the aggregations we discussed, it should. The reason for KIP-328 is to
provide finer control and hopefully a more straightforward API.

Let me know if that works, and I'll drop a message in here when I create
the draft PR for KIP-328. I'd really appreciate your feedback.

Thanks,
-John

On Wed, Jul 4, 2018 at 10:17 PM flaviostutz@gmail.com <fl...@gmail.com>
wrote:

> John, that was fantastic, man!
> Have you built any custom implementation of your KIP in your machine so
> that I could test it out here? I wish I could test it out.
> If you need any help implementing this feature, please tell me.
>
> Thanks.
>
> -Flávio Stutz
>
>
>
>
> On 2018/07/03 18:04:52, John Roesler <jo...@confluent.io> wrote:
> > Hi Flávio,
> > Thanks! I think that we can actually do this, but the API could be
> better.
> > I've included Java code below, but I'll copy and modify your example so
> > we're on the same page.
> >
> > EXERCISE 1:
> >   - The case is "total counting of events for a huge website"
> >   - Tasks from Application A will have something like:
> >          .stream(/site-events)
> >          .transform( re-key s.t. the new key is the partition id)
> >          .groupByKey() // you have to do this before count
> >          .count()
> >           // you explicitly published to a one-partition topic here, but
> > it's actually sufficient just
> >           // to re-group onto one key. You could name and pre-create the
> > intermediate topic here,
> >           // but you don't need a separate application for the final
> > aggregation.
> >          .groupBy((partitionId, partialCount) -> new KeyValue("ALL",
> > partialCount))
> >          .aggregate(sum up the partialCounts)
> >          .publish(/counter-total)
> >
> > I've left out the suppressions, but they would go right after the count()
> > and the aggregate().
> >
> > With this program, you don't have to worry about the double-aggregation
> you
> > mentioned in the last email. The KTable produced by the first count()
> will
> > maintain the correct count per partition. If the value changes for any
> > partition, it'll emit a retraction of the old value and then the new
> value
> > downstream, so that the final aggregation can update itself properly.
> >
> > I think we can optimize both the execution and the programability by
> adding
> > a "global aggregation" concept. But In principle, it seems like this
> usage
> > of the current API will support your use case.
> >
> > Once again, though, this is just to present an alternative. I haven't
> done
> > the math on whether your proposal would be more efficient.
> >
> > Thanks,
> > -John
> >
> > Here's the same algorithm written in Java:
> >
> > final KStream<String, String> siteEvents =
> builder.stream("/site-events");
> >
> > // here we re-key the events so that the key is actually the partition
> id.
> > // we don't need the value to do a count, so I just set it to "1".
> > final KStream<Integer, Integer> keyedByPartition =
> siteEvents.transform(()
> > -> new Transformer<String, String, KeyValue<Integer, Integer>>() {
> >     private ProcessorContext context;
> >
> >     @Override
> >     public void init(final ProcessorContext context) {
> >         this.context = context;
> >     }
> >
> >     @Override
> >     public KeyValue<Integer, Integer> transform(final String key, final
> > String value) {
> >         return new KeyValue<>(context.partition(), 1);
> >     }
> > });
> >
> > // Note that we can't do "count()" on a KStream, we have to group it
> first.
> > I'm grouping by the key, so it will produce the count for each key.
> > // Since the key is actually the partition id, it will produce the
> > pre-aggregated count per partition.
> > // Note that the result is a KTable<PartitionId,Count>. It'll always
> > contain the most recent count for each partition.
> > final KTable<Integer, Long> countsByPartition =
> > keyedByPartition.groupByKey().count();
> >
> > // Now we get ready for the final roll-up. We re-group all the
> constituent
> > counts
> > final KGroupedTable<String, Long> singlePartition =
> > countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL", value));
> >
> > final KTable<String, Long> totalCount = singlePartition.reduce((l, r) ->
> l
> > + r, (l, r) -> l - r);
> >
> > totalCount.toStream().foreach((k, v) -> {
> >     // k is always "ALL"
> >     // v is always the most recent total value
> >     System.out.println("The total event count is: " + v);
> > });
> >
> >
> > On Tue, Jul 3, 2018 at 9:21 AM flaviostutz@gmail.com <
> flaviostutz@gmail.com>
> > wrote:
> >
> > > Great feature you have there!
> > >
> > > I'll try to exercise here how we would achieve the same functional
> > > objectives using your KIP:
> > >
> > > EXERCISE 1:
> > >   - The case is "total counting of events for a huge website"
> > >   - Tasks from Application A will have something like:
> > >          .stream(/site-events)
> > >          .count()
> > >          .publish(/single-partitioned-topic-with-count-partials)
> > >   - The published messages will be, for example:
> > >           ["counter-task1", 2345]
> > >           ["counter-task2", 8495]
> > >           ["counter-task3", 4839]
> > >   - Single Task from Application B will have something like:
> > >          .stream(/single-partitioned-topic-with-count-partials)
> > >          .aggregate(by messages whose key starts with "counter")
> > >          .publish(/counter-total)
> > >   - FAIL HERE. How would I know what is the overall partitions? Maybe
> two
> > > partials for the same task will arrive before other tasks and it maybe
> > > aggregated twice.
> > >
> > > I tried to think about using GlobalKTables, but I didn't get an easy
> way
> > > to aggregate the keys from that table. Do you have any clue?
> > >
> > > Thanks.
> > >
> > > -Flávio Stutz
> > >
> > >
> > >
> > >
> > >
> > >
> > > /partial-counters-to-single-partitioned-topic
> > >
> > > On 2018/07/02 20:03:57, John Roesler <jo...@confluent.io> wrote:
> > > > Hi Flávio,
> > > >
> > > > Thanks for the KIP. I'll apologize that I'm arriving late to the
> > > > discussion. I've tried to catch up, but I might have missed some
> nuances.
> > > >
> > > > Regarding KIP-328, the idea is to add the ability to suppress
> > > intermediate
> > > > results from all KTables, not just windowed ones. I think this could
> > > > support your use case in combination with the strategy that Guozhang
> > > > proposed of having one or more pre-aggregation steps that ultimately
> push
> > > > into a single-partition topic for final aggregation. Suppressing
> > > > intermediate results would solve the problem you noted that today
> > > > pre-aggregating doesn't do much to staunch the flow up updates.
> > > >
> > > > I'm not sure if this would be good enough for you overall; I just
> wanted
> > > to
> > > > clarify the role of KIP-328.
> > > > In particular, the solution you mentioned is to have the downstream
> > > KTables
> > > > actually query the upstream ones to compute their results. I'm not
> sure
> > > > whether it's more efficient to do these queries on the schedule, or
> to
> > > have
> > > > the upstream tables emit their results, on the same schedule.
> > > >
> > > > What do you think?
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > > On Sun, Jul 1, 2018 at 10:03 PM flaviostutz@gmail.com <
> > > flaviostutz@gmail.com>
> > > > wrote:
> > > >
> > > > > For what I understood, that KIP is related to how KStreams will
> handle
> > > > > KTable updates in Windowed scenarios to optimize resource usage.
> > > > > I couldn't see any specific relation to this KIP. Had you?
> > > > >
> > > > > -Flávio Stutz
> > > > >
> > > > >
> > > > > On 2018/06/29 18:14:46, "Matthias J. Sax" <ma...@confluent.io>
> > > wrote:
> > > > > > Flavio,
> > > > > >
> > > > > > thanks for cleaning up the KIP number collision.
> > > > > >
> > > > > > With regard to KIP-328
> > > > > > (
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
> > > > > )
> > > > > > I am wondering how both relate to each other?
> > > > > >
> > > > > > Any thoughts?
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > > On 6/29/18 10:23 AM, flaviostutz@gmail.com wrote:
> > > > > > > Just copying a follow up from another thread to here (sorry
> about
> > > the
> > > > > mess):
> > > > > > >
> > > > > > > From: Guozhang Wang <wa...@gmail.com>
> > > > > > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph
> source
> > > > > > > Date: 2018/06/25 22:24:17
> > > > > > > List: dev@kafka.apache.org
> > > > > > >
> > > > > > > Flávio, thanks for creating this KIP.
> > > > > > >
> > > > > > > I think this "single-aggregation" use case is common enough
> that we
> > > > > should
> > > > > > > consider how to efficiently supports it: for example, for KSQL
> > > that's
> > > > > built
> > > > > > > on top of Streams, we've seen lots of query statements whose
> > > return is
> > > > > > > expected a single row indicating the "total aggregate" etc. See
> > > > > > > https://github.com/confluentinc/ksql/issues/430 for details.
> > > > > > >
> > > > > > > I've not read through
> > > https://issues.apache.org/jira/browse/KAFKA-6953,
> > > > > but
> > > > > > > I'm wondering if we have discussed the option of supporting it
> in a
> > > > > > > "pre-aggregate" manner: that is we do partial aggregates on
> > > parallel
> > > > > tasks,
> > > > > > > and then sends the partial aggregated value via a single topic
> > > > > partition
> > > > > > > for the final aggregate, to reduce the traffic on that single
> > > > > partition and
> > > > > > > hence the final aggregate workload.
> > > > > > > Of course, for non-commutative aggregates we'd probably need to
> > > provide
> > > > > > > another API in addition to aggregate, like the `merge`
> function for
> > > > > > > session-based aggregates, to let users customize the
> operations of
> > > > > merging
> > > > > > > two partial aggregates into a single partial aggregate. What's
> its
> > > > > pros and
> > > > > > > cons compared with the current proposal?
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > > On 2018/06/26 18:22:27, Flávio Stutz <fl...@gmail.com>
> > > wrote:
> > > > > > >> Hey, guys, I've just created a new KIP about creating a new
> DSL
> > > graph
> > > > > > >> source for realtime partitioned consolidations.
> > > > > > >>
> > > > > > >> We have faced the following scenario/problem in a lot of
> > > situations
> > > > > with
> > > > > > >> KStreams:
> > > > > > >>    - Huge incoming data being processed by numerous
> application
> > > > > instances
> > > > > > >>    - Need to aggregate different fields whose records span all
> > > topic
> > > > > > >> partitions (something like “total amount spent by people aged
> > 30
> > > > > yrs”
> > > > > > >> when processing a topic partitioned by userid).
> > > > > > >>
> > > > > > >> The challenge here is to manage this kind of situation
> without any
> > > > > > >> bottlenecks. We don't need the “global aggregation” to be
> > > processed
> > > > > at each
> > > > > > >> incoming message. On a scenario of 500 instances, each
> handling 1k
> > > > > > >> messages/s, any single point of aggregation (single
> partitioned
> > > > > topics,
> > > > > > >> global tables or external databases) would create a
> bottleneck of
> > > 500k
> > > > > > >> messages/s for single threaded/CPU elements.
> > > > > > >>
> > > > > > >> For this scenario, it is possible to store the partial
> > > aggregations on
> > > > > > >> local stores and, from time to time, query those states and
> > > aggregate
> > > > > them
> > > > > > >> as a single value, avoiding bottlenecks. This is a way to
> create a
> > > > > "timed
> > > > > > >> aggregation barrier”.
> > > > > > >>
> > > > > > >> If we leverage this kind of built-in feature we could greatly
> > > enhance
> > > > > the
> > > > > > >> ability of KStreams to better handle the CAP Theorem
> > > characteristics,
> > > > > so
> > > > > > >> that one could choose to have Consistency over Availability
> when
> > > > > needed.
> > > > > > >>
> > > > > > >> We started this discussion with Matthias J. Sax here:
> > > > > > >> https://issues.apache.org/jira/browse/KAFKA-6953
> > > > > > >>
> > > > > > >> If you want to see more, go to KIP-326 at:
> > > > > > >>
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source
> > > > > > >>
> > > > > > >> -Flávio Stutz
> > > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source

Posted by fl...@gmail.com, fl...@gmail.com.
John, that was fantastic, man!
Have you built any custom implementation of your KIP in your machine so that I could test it out here? I wish I could test it out.
If you need any help implementing this feature, please tell me.

Thanks.

-Flávio Stutz




On 2018/07/03 18:04:52, John Roesler <jo...@confluent.io> wrote: 
> Hi Flávio,
> Thanks! I think that we can actually do this, but the API could be better.
> I've included Java code below, but I'll copy and modify your example so
> we're on the same page.
> 
> EXERCISE 1:
>   - The case is "total counting of events for a huge website"
>   - Tasks from Application A will have something like:
>          .stream(/site-events)
>          .transform( re-key s.t. the new key is the partition id)
>          .groupByKey() // you have to do this before count
>          .count()
>           // you explicitly published to a one-partition topic here, but
> it's actually sufficient just
>           // to re-group onto one key. You could name and pre-create the
> intermediate topic here,
>           // but you don't need a separate application for the final
> aggregation.
>          .groupBy((partitionId, partialCount) -> new KeyValue("ALL",
> partialCount))
>          .aggregate(sum up the partialCounts)
>          .publish(/counter-total)
> 
> I've left out the suppressions, but they would go right after the count()
> and the aggregate().
> 
> With this program, you don't have to worry about the double-aggregation you
> mentioned in the last email. The KTable produced by the first count() will
> maintain the correct count per partition. If the value changes for any
> partition, it'll emit a retraction of the old value and then the new value
> downstream, so that the final aggregation can update itself properly.
> 
> I think we can optimize both the execution and the programability by adding
> a "global aggregation" concept. But In principle, it seems like this usage
> of the current API will support your use case.
> 
> Once again, though, this is just to present an alternative. I haven't done
> the math on whether your proposal would be more efficient.
> 
> Thanks,
> -John
> 
> Here's the same algorithm written in Java:
> 
> final KStream<String, String> siteEvents = builder.stream("/site-events");
> 
> // here we re-key the events so that the key is actually the partition id.
> // we don't need the value to do a count, so I just set it to "1".
> final KStream<Integer, Integer> keyedByPartition = siteEvents.transform(()
> -> new Transformer<String, String, KeyValue<Integer, Integer>>() {
>     private ProcessorContext context;
> 
>     @Override
>     public void init(final ProcessorContext context) {
>         this.context = context;
>     }
> 
>     @Override
>     public KeyValue<Integer, Integer> transform(final String key, final
> String value) {
>         return new KeyValue<>(context.partition(), 1);
>     }
> });
> 
> // Note that we can't do "count()" on a KStream, we have to group it first.
> I'm grouping by the key, so it will produce the count for each key.
> // Since the key is actually the partition id, it will produce the
> pre-aggregated count per partition.
> // Note that the result is a KTable<PartitionId,Count>. It'll always
> contain the most recent count for each partition.
> final KTable<Integer, Long> countsByPartition =
> keyedByPartition.groupByKey().count();
> 
> // Now we get ready for the final roll-up. We re-group all the constituent
> counts
> final KGroupedTable<String, Long> singlePartition =
> countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL", value));
> 
> final KTable<String, Long> totalCount = singlePartition.reduce((l, r) -> l
> + r, (l, r) -> l - r);
> 
> totalCount.toStream().foreach((k, v) -> {
>     // k is always "ALL"
>     // v is always the most recent total value
>     System.out.println("The total event count is: " + v);
> });
> 
> 
> On Tue, Jul 3, 2018 at 9:21 AM flaviostutz@gmail.com <fl...@gmail.com>
> wrote:
> 
> > Great feature you have there!
> >
> > I'll try to exercise here how we would achieve the same functional
> > objectives using your KIP:
> >
> > EXERCISE 1:
> >   - The case is "total counting of events for a huge website"
> >   - Tasks from Application A will have something like:
> >          .stream(/site-events)
> >          .count()
> >          .publish(/single-partitioned-topic-with-count-partials)
> >   - The published messages will be, for example:
> >           ["counter-task1", 2345]
> >           ["counter-task2", 8495]
> >           ["counter-task3", 4839]
> >   - Single Task from Application B will have something like:
> >          .stream(/single-partitioned-topic-with-count-partials)
> >          .aggregate(by messages whose key starts with "counter")
> >          .publish(/counter-total)
> >   - FAIL HERE. How would I know what is the overall partitions? Maybe two
> > partials for the same task will arrive before other tasks and it maybe
> > aggregated twice.
> >
> > I tried to think about using GlobalKTables, but I didn't get an easy way
> > to aggregate the keys from that table. Do you have any clue?
> >
> > Thanks.
> >
> > -Flávio Stutz
> >
> >
> >
> >
> >
> >
> > /partial-counters-to-single-partitioned-topic
> >
> > On 2018/07/02 20:03:57, John Roesler <jo...@confluent.io> wrote:
> > > Hi Flávio,
> > >
> > > Thanks for the KIP. I'll apologize that I'm arriving late to the
> > > discussion. I've tried to catch up, but I might have missed some nuances.
> > >
> > > Regarding KIP-328, the idea is to add the ability to suppress
> > intermediate
> > > results from all KTables, not just windowed ones. I think this could
> > > support your use case in combination with the strategy that Guozhang
> > > proposed of having one or more pre-aggregation steps that ultimately push
> > > into a single-partition topic for final aggregation. Suppressing
> > > intermediate results would solve the problem you noted that today
> > > pre-aggregating doesn't do much to staunch the flow up updates.
> > >
> > > I'm not sure if this would be good enough for you overall; I just wanted
> > to
> > > clarify the role of KIP-328.
> > > In particular, the solution you mentioned is to have the downstream
> > KTables
> > > actually query the upstream ones to compute their results. I'm not sure
> > > whether it's more efficient to do these queries on the schedule, or to
> > have
> > > the upstream tables emit their results, on the same schedule.
> > >
> > > What do you think?
> > >
> > > Thanks,
> > > -John
> > >
> > > On Sun, Jul 1, 2018 at 10:03 PM flaviostutz@gmail.com <
> > flaviostutz@gmail.com>
> > > wrote:
> > >
> > > > For what I understood, that KIP is related to how KStreams will handle
> > > > KTable updates in Windowed scenarios to optimize resource usage.
> > > > I couldn't see any specific relation to this KIP. Had you?
> > > >
> > > > -Flávio Stutz
> > > >
> > > >
> > > > On 2018/06/29 18:14:46, "Matthias J. Sax" <ma...@confluent.io>
> > wrote:
> > > > > Flavio,
> > > > >
> > > > > thanks for cleaning up the KIP number collision.
> > > > >
> > > > > With regard to KIP-328
> > > > > (
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
> > > > )
> > > > > I am wondering how both relate to each other?
> > > > >
> > > > > Any thoughts?
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 6/29/18 10:23 AM, flaviostutz@gmail.com wrote:
> > > > > > Just copying a follow up from another thread to here (sorry about
> > the
> > > > mess):
> > > > > >
> > > > > > From: Guozhang Wang <wa...@gmail.com>
> > > > > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source
> > > > > > Date: 2018/06/25 22:24:17
> > > > > > List: dev@kafka.apache.org
> > > > > >
> > > > > > Flávio, thanks for creating this KIP.
> > > > > >
> > > > > > I think this "single-aggregation" use case is common enough that we
> > > > should
> > > > > > consider how to efficiently supports it: for example, for KSQL
> > that's
> > > > built
> > > > > > on top of Streams, we've seen lots of query statements whose
> > return is
> > > > > > expected a single row indicating the "total aggregate" etc. See
> > > > > > https://github.com/confluentinc/ksql/issues/430 for details.
> > > > > >
> > > > > > I've not read through
> > https://issues.apache.org/jira/browse/KAFKA-6953,
> > > > but
> > > > > > I'm wondering if we have discussed the option of supporting it in a
> > > > > > "pre-aggregate" manner: that is we do partial aggregates on
> > parallel
> > > > tasks,
> > > > > > and then sends the partial aggregated value via a single topic
> > > > partition
> > > > > > for the final aggregate, to reduce the traffic on that single
> > > > partition and
> > > > > > hence the final aggregate workload.
> > > > > > Of course, for non-commutative aggregates we'd probably need to
> > provide
> > > > > > another API in addition to aggregate, like the `merge` function for
> > > > > > session-based aggregates, to let users customize the operations of
> > > > merging
> > > > > > two partial aggregates into a single partial aggregate. What's its
> > > > pros and
> > > > > > cons compared with the current proposal?
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > > On 2018/06/26 18:22:27, Flávio Stutz <fl...@gmail.com>
> > wrote:
> > > > > >> Hey, guys, I've just created a new KIP about creating a new DSL
> > graph
> > > > > >> source for realtime partitioned consolidations.
> > > > > >>
> > > > > >> We have faced the following scenario/problem in a lot of
> > situations
> > > > with
> > > > > >> KStreams:
> > > > > >>    - Huge incoming data being processed by numerous application
> > > > instances
> > > > > >>    - Need to aggregate different fields whose records span all
> > topic
> > > > > >> partitions (something like “total amount spent by people aged > 30
> > > > yrs”
> > > > > >> when processing a topic partitioned by userid).
> > > > > >>
> > > > > >> The challenge here is to manage this kind of situation without any
> > > > > >> bottlenecks. We don't need the “global aggregation” to be
> > processed
> > > > at each
> > > > > >> incoming message. On a scenario of 500 instances, each handling 1k
> > > > > >> messages/s, any single point of aggregation (single partitioned
> > > > topics,
> > > > > >> global tables or external databases) would create a bottleneck of
> > 500k
> > > > > >> messages/s for single threaded/CPU elements.
> > > > > >>
> > > > > >> For this scenario, it is possible to store the partial
> > aggregations on
> > > > > >> local stores and, from time to time, query those states and
> > aggregate
> > > > them
> > > > > >> as a single value, avoiding bottlenecks. This is a way to create a
> > > > "timed
> > > > > >> aggregation barrier”.
> > > > > >>
> > > > > >> If we leverage this kind of built-in feature we could greatly
> > enhance
> > > > the
> > > > > >> ability of KStreams to better handle the CAP Theorem
> > characteristics,
> > > > so
> > > > > >> that one could choose to have Consistency over Availability when
> > > > needed.
> > > > > >>
> > > > > >> We started this discussion with Matthias J. Sax here:
> > > > > >> https://issues.apache.org/jira/browse/KAFKA-6953
> > > > > >>
> > > > > >> If you want to see more, go to KIP-326 at:
> > > > > >>
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source
> > > > > >>
> > > > > >> -Flávio Stutz
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
> 

Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source

Posted by John Roesler <jo...@confluent.io>.
Hi Flávio,
Thanks! I think that we can actually do this, but the API could be better.
I've included Java code below, but I'll copy and modify your example so
we're on the same page.

EXERCISE 1:
  - The case is "total counting of events for a huge website"
  - Tasks from Application A will have something like:
         .stream(/site-events)
         .transform( re-key s.t. the new key is the partition id)
         .groupByKey() // you have to do this before count
         .count()
          // you explicitly published to a one-partition topic here, but
it's actually sufficient just
          // to re-group onto one key. You could name and pre-create the
intermediate topic here,
          // but you don't need a separate application for the final
aggregation.
         .groupBy((partitionId, partialCount) -> new KeyValue("ALL",
partialCount))
         .aggregate(sum up the partialCounts)
         .publish(/counter-total)

I've left out the suppressions, but they would go right after the count()
and the aggregate().

With this program, you don't have to worry about the double-aggregation you
mentioned in the last email. The KTable produced by the first count() will
maintain the correct count per partition. If the value changes for any
partition, it'll emit a retraction of the old value and then the new value
downstream, so that the final aggregation can update itself properly.

I think we can optimize both the execution and the programability by adding
a "global aggregation" concept. But In principle, it seems like this usage
of the current API will support your use case.

Once again, though, this is just to present an alternative. I haven't done
the math on whether your proposal would be more efficient.

Thanks,
-John

Here's the same algorithm written in Java:

final KStream<String, String> siteEvents = builder.stream("/site-events");

// here we re-key the events so that the key is actually the partition id.
// we don't need the value to do a count, so I just set it to "1".
final KStream<Integer, Integer> keyedByPartition = siteEvents.transform(()
-> new Transformer<String, String, KeyValue<Integer, Integer>>() {
    private ProcessorContext context;

    @Override
    public void init(final ProcessorContext context) {
        this.context = context;
    }

    @Override
    public KeyValue<Integer, Integer> transform(final String key, final
String value) {
        return new KeyValue<>(context.partition(), 1);
    }
});

// Note that we can't do "count()" on a KStream, we have to group it first.
I'm grouping by the key, so it will produce the count for each key.
// Since the key is actually the partition id, it will produce the
pre-aggregated count per partition.
// Note that the result is a KTable<PartitionId,Count>. It'll always
contain the most recent count for each partition.
final KTable<Integer, Long> countsByPartition =
keyedByPartition.groupByKey().count();

// Now we get ready for the final roll-up. We re-group all the constituent
counts
final KGroupedTable<String, Long> singlePartition =
countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL", value));

final KTable<String, Long> totalCount = singlePartition.reduce((l, r) -> l
+ r, (l, r) -> l - r);

totalCount.toStream().foreach((k, v) -> {
    // k is always "ALL"
    // v is always the most recent total value
    System.out.println("The total event count is: " + v);
});


On Tue, Jul 3, 2018 at 9:21 AM flaviostutz@gmail.com <fl...@gmail.com>
wrote:

> Great feature you have there!
>
> I'll try to exercise here how we would achieve the same functional
> objectives using your KIP:
>
> EXERCISE 1:
>   - The case is "total counting of events for a huge website"
>   - Tasks from Application A will have something like:
>          .stream(/site-events)
>          .count()
>          .publish(/single-partitioned-topic-with-count-partials)
>   - The published messages will be, for example:
>           ["counter-task1", 2345]
>           ["counter-task2", 8495]
>           ["counter-task3", 4839]
>   - Single Task from Application B will have something like:
>          .stream(/single-partitioned-topic-with-count-partials)
>          .aggregate(by messages whose key starts with "counter")
>          .publish(/counter-total)
>   - FAIL HERE. How would I know what is the overall partitions? Maybe two
> partials for the same task will arrive before other tasks and it maybe
> aggregated twice.
>
> I tried to think about using GlobalKTables, but I didn't get an easy way
> to aggregate the keys from that table. Do you have any clue?
>
> Thanks.
>
> -Flávio Stutz
>
>
>
>
>
>
> /partial-counters-to-single-partitioned-topic
>
> On 2018/07/02 20:03:57, John Roesler <jo...@confluent.io> wrote:
> > Hi Flávio,
> >
> > Thanks for the KIP. I'll apologize that I'm arriving late to the
> > discussion. I've tried to catch up, but I might have missed some nuances.
> >
> > Regarding KIP-328, the idea is to add the ability to suppress
> intermediate
> > results from all KTables, not just windowed ones. I think this could
> > support your use case in combination with the strategy that Guozhang
> > proposed of having one or more pre-aggregation steps that ultimately push
> > into a single-partition topic for final aggregation. Suppressing
> > intermediate results would solve the problem you noted that today
> > pre-aggregating doesn't do much to staunch the flow up updates.
> >
> > I'm not sure if this would be good enough for you overall; I just wanted
> to
> > clarify the role of KIP-328.
> > In particular, the solution you mentioned is to have the downstream
> KTables
> > actually query the upstream ones to compute their results. I'm not sure
> > whether it's more efficient to do these queries on the schedule, or to
> have
> > the upstream tables emit their results, on the same schedule.
> >
> > What do you think?
> >
> > Thanks,
> > -John
> >
> > On Sun, Jul 1, 2018 at 10:03 PM flaviostutz@gmail.com <
> flaviostutz@gmail.com>
> > wrote:
> >
> > > For what I understood, that KIP is related to how KStreams will handle
> > > KTable updates in Windowed scenarios to optimize resource usage.
> > > I couldn't see any specific relation to this KIP. Had you?
> > >
> > > -Flávio Stutz
> > >
> > >
> > > On 2018/06/29 18:14:46, "Matthias J. Sax" <ma...@confluent.io>
> wrote:
> > > > Flavio,
> > > >
> > > > thanks for cleaning up the KIP number collision.
> > > >
> > > > With regard to KIP-328
> > > > (
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
> > > )
> > > > I am wondering how both relate to each other?
> > > >
> > > > Any thoughts?
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 6/29/18 10:23 AM, flaviostutz@gmail.com wrote:
> > > > > Just copying a follow up from another thread to here (sorry about
> the
> > > mess):
> > > > >
> > > > > From: Guozhang Wang <wa...@gmail.com>
> > > > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source
> > > > > Date: 2018/06/25 22:24:17
> > > > > List: dev@kafka.apache.org
> > > > >
> > > > > Flávio, thanks for creating this KIP.
> > > > >
> > > > > I think this "single-aggregation" use case is common enough that we
> > > should
> > > > > consider how to efficiently supports it: for example, for KSQL
> that's
> > > built
> > > > > on top of Streams, we've seen lots of query statements whose
> return is
> > > > > expected a single row indicating the "total aggregate" etc. See
> > > > > https://github.com/confluentinc/ksql/issues/430 for details.
> > > > >
> > > > > I've not read through
> https://issues.apache.org/jira/browse/KAFKA-6953,
> > > but
> > > > > I'm wondering if we have discussed the option of supporting it in a
> > > > > "pre-aggregate" manner: that is we do partial aggregates on
> parallel
> > > tasks,
> > > > > and then sends the partial aggregated value via a single topic
> > > partition
> > > > > for the final aggregate, to reduce the traffic on that single
> > > partition and
> > > > > hence the final aggregate workload.
> > > > > Of course, for non-commutative aggregates we'd probably need to
> provide
> > > > > another API in addition to aggregate, like the `merge` function for
> > > > > session-based aggregates, to let users customize the operations of
> > > merging
> > > > > two partial aggregates into a single partial aggregate. What's its
> > > pros and
> > > > > cons compared with the current proposal?
> > > > >
> > > > >
> > > > > Guozhang
> > > > > On 2018/06/26 18:22:27, Flávio Stutz <fl...@gmail.com>
> wrote:
> > > > >> Hey, guys, I've just created a new KIP about creating a new DSL
> graph
> > > > >> source for realtime partitioned consolidations.
> > > > >>
> > > > >> We have faced the following scenario/problem in a lot of
> situations
> > > with
> > > > >> KStreams:
> > > > >>    - Huge incoming data being processed by numerous application
> > > instances
> > > > >>    - Need to aggregate different fields whose records span all
> topic
> > > > >> partitions (something like “total amount spent by people aged > 30
> > > yrs”
> > > > >> when processing a topic partitioned by userid).
> > > > >>
> > > > >> The challenge here is to manage this kind of situation without any
> > > > >> bottlenecks. We don't need the “global aggregation” to be
> processed
> > > at each
> > > > >> incoming message. On a scenario of 500 instances, each handling 1k
> > > > >> messages/s, any single point of aggregation (single partitioned
> > > topics,
> > > > >> global tables or external databases) would create a bottleneck of
> 500k
> > > > >> messages/s for single threaded/CPU elements.
> > > > >>
> > > > >> For this scenario, it is possible to store the partial
> aggregations on
> > > > >> local stores and, from time to time, query those states and
> aggregate
> > > them
> > > > >> as a single value, avoiding bottlenecks. This is a way to create a
> > > "timed
> > > > >> aggregation barrier”.
> > > > >>
> > > > >> If we leverage this kind of built-in feature we could greatly
> enhance
> > > the
> > > > >> ability of KStreams to better handle the CAP Theorem
> characteristics,
> > > so
> > > > >> that one could choose to have Consistency over Availability when
> > > needed.
> > > > >>
> > > > >> We started this discussion with Matthias J. Sax here:
> > > > >> https://issues.apache.org/jira/browse/KAFKA-6953
> > > > >>
> > > > >> If you want to see more, go to KIP-326 at:
> > > > >>
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source
> > > > >>
> > > > >> -Flávio Stutz
> > > > >>
> > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source

Posted by fl...@gmail.com, fl...@gmail.com.
Great feature you have there!

I'll try to exercise here how we would achieve the same functional objectives using your KIP:

EXERCISE 1:
  - The case is "total counting of events for a huge website"
  - Tasks from Application A will have something like:
         .stream(/site-events)
         .count()
         .publish(/single-partitioned-topic-with-count-partials)
  - The published messages will be, for example:
          ["counter-task1", 2345]
          ["counter-task2", 8495]
          ["counter-task3", 4839]
  - Single Task from Application B will have something like:
         .stream(/single-partitioned-topic-with-count-partials)
         .aggregate(by messages whose key starts with "counter")
         .publish(/counter-total)
  - FAIL HERE. How would I know what is the overall partitions? Maybe two partials for the same task will arrive before other tasks and it maybe aggregated twice.

I tried to think about using GlobalKTables, but I didn't get an easy way to aggregate the keys from that table. Do you have any clue?

Thanks.

-Flávio Stutz






/partial-counters-to-single-partitioned-topic

On 2018/07/02 20:03:57, John Roesler <jo...@confluent.io> wrote: 
> Hi Flávio,
> 
> Thanks for the KIP. I'll apologize that I'm arriving late to the
> discussion. I've tried to catch up, but I might have missed some nuances.
> 
> Regarding KIP-328, the idea is to add the ability to suppress intermediate
> results from all KTables, not just windowed ones. I think this could
> support your use case in combination with the strategy that Guozhang
> proposed of having one or more pre-aggregation steps that ultimately push
> into a single-partition topic for final aggregation. Suppressing
> intermediate results would solve the problem you noted that today
> pre-aggregating doesn't do much to staunch the flow up updates.
> 
> I'm not sure if this would be good enough for you overall; I just wanted to
> clarify the role of KIP-328.
> In particular, the solution you mentioned is to have the downstream KTables
> actually query the upstream ones to compute their results. I'm not sure
> whether it's more efficient to do these queries on the schedule, or to have
> the upstream tables emit their results, on the same schedule.
> 
> What do you think?
> 
> Thanks,
> -John
> 
> On Sun, Jul 1, 2018 at 10:03 PM flaviostutz@gmail.com <fl...@gmail.com>
> wrote:
> 
> > For what I understood, that KIP is related to how KStreams will handle
> > KTable updates in Windowed scenarios to optimize resource usage.
> > I couldn't see any specific relation to this KIP. Had you?
> >
> > -Flávio Stutz
> >
> >
> > On 2018/06/29 18:14:46, "Matthias J. Sax" <ma...@confluent.io> wrote:
> > > Flavio,
> > >
> > > thanks for cleaning up the KIP number collision.
> > >
> > > With regard to KIP-328
> > > (
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
> > )
> > > I am wondering how both relate to each other?
> > >
> > > Any thoughts?
> > >
> > >
> > > -Matthias
> > >
> > > On 6/29/18 10:23 AM, flaviostutz@gmail.com wrote:
> > > > Just copying a follow up from another thread to here (sorry about the
> > mess):
> > > >
> > > > From: Guozhang Wang <wa...@gmail.com>
> > > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source
> > > > Date: 2018/06/25 22:24:17
> > > > List: dev@kafka.apache.org
> > > >
> > > > Flávio, thanks for creating this KIP.
> > > >
> > > > I think this "single-aggregation" use case is common enough that we
> > should
> > > > consider how to efficiently supports it: for example, for KSQL that's
> > built
> > > > on top of Streams, we've seen lots of query statements whose return is
> > > > expected a single row indicating the "total aggregate" etc. See
> > > > https://github.com/confluentinc/ksql/issues/430 for details.
> > > >
> > > > I've not read through https://issues.apache.org/jira/browse/KAFKA-6953,
> > but
> > > > I'm wondering if we have discussed the option of supporting it in a
> > > > "pre-aggregate" manner: that is we do partial aggregates on parallel
> > tasks,
> > > > and then sends the partial aggregated value via a single topic
> > partition
> > > > for the final aggregate, to reduce the traffic on that single
> > partition and
> > > > hence the final aggregate workload.
> > > > Of course, for non-commutative aggregates we'd probably need to provide
> > > > another API in addition to aggregate, like the `merge` function for
> > > > session-based aggregates, to let users customize the operations of
> > merging
> > > > two partial aggregates into a single partial aggregate. What's its
> > pros and
> > > > cons compared with the current proposal?
> > > >
> > > >
> > > > Guozhang
> > > > On 2018/06/26 18:22:27, Flávio Stutz <fl...@gmail.com> wrote:
> > > >> Hey, guys, I've just created a new KIP about creating a new DSL graph
> > > >> source for realtime partitioned consolidations.
> > > >>
> > > >> We have faced the following scenario/problem in a lot of situations
> > with
> > > >> KStreams:
> > > >>    - Huge incoming data being processed by numerous application
> > instances
> > > >>    - Need to aggregate different fields whose records span all topic
> > > >> partitions (something like “total amount spent by people aged > 30
> > yrs”
> > > >> when processing a topic partitioned by userid).
> > > >>
> > > >> The challenge here is to manage this kind of situation without any
> > > >> bottlenecks. We don't need the “global aggregation” to be processed
> > at each
> > > >> incoming message. On a scenario of 500 instances, each handling 1k
> > > >> messages/s, any single point of aggregation (single partitioned
> > topics,
> > > >> global tables or external databases) would create a bottleneck of 500k
> > > >> messages/s for single threaded/CPU elements.
> > > >>
> > > >> For this scenario, it is possible to store the partial aggregations on
> > > >> local stores and, from time to time, query those states and aggregate
> > them
> > > >> as a single value, avoiding bottlenecks. This is a way to create a
> > "timed
> > > >> aggregation barrier”.
> > > >>
> > > >> If we leverage this kind of built-in feature we could greatly enhance
> > the
> > > >> ability of KStreams to better handle the CAP Theorem characteristics,
> > so
> > > >> that one could choose to have Consistency over Availability when
> > needed.
> > > >>
> > > >> We started this discussion with Matthias J. Sax here:
> > > >> https://issues.apache.org/jira/browse/KAFKA-6953
> > > >>
> > > >> If you want to see more, go to KIP-326 at:
> > > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source
> > > >>
> > > >> -Flávio Stutz
> > > >>
> > >
> > >
> >
> 

Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source

Posted by John Roesler <jo...@confluent.io>.
Hi Flávio,

Thanks for the KIP. I'll apologize that I'm arriving late to the
discussion. I've tried to catch up, but I might have missed some nuances.

Regarding KIP-328, the idea is to add the ability to suppress intermediate
results from all KTables, not just windowed ones. I think this could
support your use case in combination with the strategy that Guozhang
proposed of having one or more pre-aggregation steps that ultimately push
into a single-partition topic for final aggregation. Suppressing
intermediate results would solve the problem you noted that today
pre-aggregating doesn't do much to staunch the flow up updates.

I'm not sure if this would be good enough for you overall; I just wanted to
clarify the role of KIP-328.
In particular, the solution you mentioned is to have the downstream KTables
actually query the upstream ones to compute their results. I'm not sure
whether it's more efficient to do these queries on the schedule, or to have
the upstream tables emit their results, on the same schedule.

What do you think?

Thanks,
-John

On Sun, Jul 1, 2018 at 10:03 PM flaviostutz@gmail.com <fl...@gmail.com>
wrote:

> For what I understood, that KIP is related to how KStreams will handle
> KTable updates in Windowed scenarios to optimize resource usage.
> I couldn't see any specific relation to this KIP. Had you?
>
> -Flávio Stutz
>
>
> On 2018/06/29 18:14:46, "Matthias J. Sax" <ma...@confluent.io> wrote:
> > Flavio,
> >
> > thanks for cleaning up the KIP number collision.
> >
> > With regard to KIP-328
> > (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
> )
> > I am wondering how both relate to each other?
> >
> > Any thoughts?
> >
> >
> > -Matthias
> >
> > On 6/29/18 10:23 AM, flaviostutz@gmail.com wrote:
> > > Just copying a follow up from another thread to here (sorry about the
> mess):
> > >
> > > From: Guozhang Wang <wa...@gmail.com>
> > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source
> > > Date: 2018/06/25 22:24:17
> > > List: dev@kafka.apache.org
> > >
> > > Flávio, thanks for creating this KIP.
> > >
> > > I think this "single-aggregation" use case is common enough that we
> should
> > > consider how to efficiently supports it: for example, for KSQL that's
> built
> > > on top of Streams, we've seen lots of query statements whose return is
> > > expected a single row indicating the "total aggregate" etc. See
> > > https://github.com/confluentinc/ksql/issues/430 for details.
> > >
> > > I've not read through https://issues.apache.org/jira/browse/KAFKA-6953,
> but
> > > I'm wondering if we have discussed the option of supporting it in a
> > > "pre-aggregate" manner: that is we do partial aggregates on parallel
> tasks,
> > > and then sends the partial aggregated value via a single topic
> partition
> > > for the final aggregate, to reduce the traffic on that single
> partition and
> > > hence the final aggregate workload.
> > > Of course, for non-commutative aggregates we'd probably need to provide
> > > another API in addition to aggregate, like the `merge` function for
> > > session-based aggregates, to let users customize the operations of
> merging
> > > two partial aggregates into a single partial aggregate. What's its
> pros and
> > > cons compared with the current proposal?
> > >
> > >
> > > Guozhang
> > > On 2018/06/26 18:22:27, Flávio Stutz <fl...@gmail.com> wrote:
> > >> Hey, guys, I've just created a new KIP about creating a new DSL graph
> > >> source for realtime partitioned consolidations.
> > >>
> > >> We have faced the following scenario/problem in a lot of situations
> with
> > >> KStreams:
> > >>    - Huge incoming data being processed by numerous application
> instances
> > >>    - Need to aggregate different fields whose records span all topic
> > >> partitions (something like “total amount spent by people aged > 30
> yrs”
> > >> when processing a topic partitioned by userid).
> > >>
> > >> The challenge here is to manage this kind of situation without any
> > >> bottlenecks. We don't need the “global aggregation” to be processed
> at each
> > >> incoming message. On a scenario of 500 instances, each handling 1k
> > >> messages/s, any single point of aggregation (single partitioned
> topics,
> > >> global tables or external databases) would create a bottleneck of 500k
> > >> messages/s for single threaded/CPU elements.
> > >>
> > >> For this scenario, it is possible to store the partial aggregations on
> > >> local stores and, from time to time, query those states and aggregate
> them
> > >> as a single value, avoiding bottlenecks. This is a way to create a
> "timed
> > >> aggregation barrier”.
> > >>
> > >> If we leverage this kind of built-in feature we could greatly enhance
> the
> > >> ability of KStreams to better handle the CAP Theorem characteristics,
> so
> > >> that one could choose to have Consistency over Availability when
> needed.
> > >>
> > >> We started this discussion with Matthias J. Sax here:
> > >> https://issues.apache.org/jira/browse/KAFKA-6953
> > >>
> > >> If you want to see more, go to KIP-326 at:
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source
> > >>
> > >> -Flávio Stutz
> > >>
> >
> >
>

Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source

Posted by fl...@gmail.com, fl...@gmail.com.
For what I understood, that KIP is related to how KStreams will handle KTable updates in Windowed scenarios to optimize resource usage.
I couldn't see any specific relation to this KIP. Had you?

-Flávio Stutz


On 2018/06/29 18:14:46, "Matthias J. Sax" <ma...@confluent.io> wrote: 
> Flavio,
> 
> thanks for cleaning up the KIP number collision.
> 
> With regard to KIP-328
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables)
> I am wondering how both relate to each other?
> 
> Any thoughts?
> 
> 
> -Matthias
> 
> On 6/29/18 10:23 AM, flaviostutz@gmail.com wrote:
> > Just copying a follow up from another thread to here (sorry about the mess):
> > 
> > From: Guozhang Wang <wa...@gmail.com>
> > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source
> > Date: 2018/06/25 22:24:17
> > List: dev@kafka.apache.org
> > 
> > Flávio, thanks for creating this KIP.
> > 
> > I think this "single-aggregation" use case is common enough that we should
> > consider how to efficiently supports it: for example, for KSQL that's built
> > on top of Streams, we've seen lots of query statements whose return is
> > expected a single row indicating the "total aggregate" etc. See
> > https://github.com/confluentinc/ksql/issues/430 for details.
> > 
> > I've not read through https://issues.apache.org/jira/browse/KAFKA-6953, but
> > I'm wondering if we have discussed the option of supporting it in a
> > "pre-aggregate" manner: that is we do partial aggregates on parallel tasks,
> > and then sends the partial aggregated value via a single topic partition
> > for the final aggregate, to reduce the traffic on that single partition and
> > hence the final aggregate workload.
> > Of course, for non-commutative aggregates we'd probably need to provide
> > another API in addition to aggregate, like the `merge` function for
> > session-based aggregates, to let users customize the operations of merging
> > two partial aggregates into a single partial aggregate. What's its pros and
> > cons compared with the current proposal?
> > 
> > 
> > Guozhang
> > On 2018/06/26 18:22:27, Flávio Stutz <fl...@gmail.com> wrote: 
> >> Hey, guys, I've just created a new KIP about creating a new DSL graph
> >> source for realtime partitioned consolidations.
> >>
> >> We have faced the following scenario/problem in a lot of situations with
> >> KStreams:
> >>    - Huge incoming data being processed by numerous application instances
> >>    - Need to aggregate different fields whose records span all topic
> >> partitions (something like “total amount spent by people aged > 30 yrs”
> >> when processing a topic partitioned by userid).
> >>
> >> The challenge here is to manage this kind of situation without any
> >> bottlenecks. We don't need the “global aggregation” to be processed at each
> >> incoming message. On a scenario of 500 instances, each handling 1k
> >> messages/s, any single point of aggregation (single partitioned topics,
> >> global tables or external databases) would create a bottleneck of 500k
> >> messages/s for single threaded/CPU elements.
> >>
> >> For this scenario, it is possible to store the partial aggregations on
> >> local stores and, from time to time, query those states and aggregate them
> >> as a single value, avoiding bottlenecks. This is a way to create a "timed
> >> aggregation barrier”.
> >>
> >> If we leverage this kind of built-in feature we could greatly enhance the
> >> ability of KStreams to better handle the CAP Theorem characteristics, so
> >> that one could choose to have Consistency over Availability when needed.
> >>
> >> We started this discussion with Matthias J. Sax here:
> >> https://issues.apache.org/jira/browse/KAFKA-6953
> >>
> >> If you want to see more, go to KIP-326 at:
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source
> >>
> >> -Flávio Stutz
> >>
> 
> 

Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Flavio,

thanks for cleaning up the KIP number collision.

With regard to KIP-328
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables)
I am wondering how both relate to each other?

Any thoughts?


-Matthias

On 6/29/18 10:23 AM, flaviostutz@gmail.com wrote:
> Just copying a follow up from another thread to here (sorry about the mess):
> 
> From: Guozhang Wang <wa...@gmail.com>
> Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source
> Date: 2018/06/25 22:24:17
> List: dev@kafka.apache.org
> 
> Flávio, thanks for creating this KIP.
> 
> I think this "single-aggregation" use case is common enough that we should
> consider how to efficiently supports it: for example, for KSQL that's built
> on top of Streams, we've seen lots of query statements whose return is
> expected a single row indicating the "total aggregate" etc. See
> https://github.com/confluentinc/ksql/issues/430 for details.
> 
> I've not read through https://issues.apache.org/jira/browse/KAFKA-6953, but
> I'm wondering if we have discussed the option of supporting it in a
> "pre-aggregate" manner: that is we do partial aggregates on parallel tasks,
> and then sends the partial aggregated value via a single topic partition
> for the final aggregate, to reduce the traffic on that single partition and
> hence the final aggregate workload.
> Of course, for non-commutative aggregates we'd probably need to provide
> another API in addition to aggregate, like the `merge` function for
> session-based aggregates, to let users customize the operations of merging
> two partial aggregates into a single partial aggregate. What's its pros and
> cons compared with the current proposal?
> 
> 
> Guozhang
> On 2018/06/26 18:22:27, Flávio Stutz <fl...@gmail.com> wrote: 
>> Hey, guys, I've just created a new KIP about creating a new DSL graph
>> source for realtime partitioned consolidations.
>>
>> We have faced the following scenario/problem in a lot of situations with
>> KStreams:
>>    - Huge incoming data being processed by numerous application instances
>>    - Need to aggregate different fields whose records span all topic
>> partitions (something like “total amount spent by people aged > 30 yrs”
>> when processing a topic partitioned by userid).
>>
>> The challenge here is to manage this kind of situation without any
>> bottlenecks. We don't need the “global aggregation” to be processed at each
>> incoming message. On a scenario of 500 instances, each handling 1k
>> messages/s, any single point of aggregation (single partitioned topics,
>> global tables or external databases) would create a bottleneck of 500k
>> messages/s for single threaded/CPU elements.
>>
>> For this scenario, it is possible to store the partial aggregations on
>> local stores and, from time to time, query those states and aggregate them
>> as a single value, avoiding bottlenecks. This is a way to create a "timed
>> aggregation barrier”.
>>
>> If we leverage this kind of built-in feature we could greatly enhance the
>> ability of KStreams to better handle the CAP Theorem characteristics, so
>> that one could choose to have Consistency over Availability when needed.
>>
>> We started this discussion with Matthias J. Sax here:
>> https://issues.apache.org/jira/browse/KAFKA-6953
>>
>> If you want to see more, go to KIP-326 at:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source
>>
>> -Flávio Stutz
>>


Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source

Posted by fl...@gmail.com, fl...@gmail.com.
Just copying a follow up from another thread to here (sorry about the mess):

From: Guozhang Wang <wa...@gmail.com>
Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source
Date: 2018/06/25 22:24:17
List: dev@kafka.apache.org

Flávio, thanks for creating this KIP.

I think this "single-aggregation" use case is common enough that we should
consider how to efficiently supports it: for example, for KSQL that's built
on top of Streams, we've seen lots of query statements whose return is
expected a single row indicating the "total aggregate" etc. See
https://github.com/confluentinc/ksql/issues/430 for details.

I've not read through https://issues.apache.org/jira/browse/KAFKA-6953, but
I'm wondering if we have discussed the option of supporting it in a
"pre-aggregate" manner: that is we do partial aggregates on parallel tasks,
and then sends the partial aggregated value via a single topic partition
for the final aggregate, to reduce the traffic on that single partition and
hence the final aggregate workload.
Of course, for non-commutative aggregates we'd probably need to provide
another API in addition to aggregate, like the `merge` function for
session-based aggregates, to let users customize the operations of merging
two partial aggregates into a single partial aggregate. What's its pros and
cons compared with the current proposal?


Guozhang
On 2018/06/26 18:22:27, Flávio Stutz <fl...@gmail.com> wrote: 
> Hey, guys, I've just created a new KIP about creating a new DSL graph
> source for realtime partitioned consolidations.
> 
> We have faced the following scenario/problem in a lot of situations with
> KStreams:
>    - Huge incoming data being processed by numerous application instances
>    - Need to aggregate different fields whose records span all topic
> partitions (something like “total amount spent by people aged > 30 yrs”
> when processing a topic partitioned by userid).
> 
> The challenge here is to manage this kind of situation without any
> bottlenecks. We don't need the “global aggregation” to be processed at each
> incoming message. On a scenario of 500 instances, each handling 1k
> messages/s, any single point of aggregation (single partitioned topics,
> global tables or external databases) would create a bottleneck of 500k
> messages/s for single threaded/CPU elements.
> 
> For this scenario, it is possible to store the partial aggregations on
> local stores and, from time to time, query those states and aggregate them
> as a single value, avoiding bottlenecks. This is a way to create a "timed
> aggregation barrier”.
> 
> If we leverage this kind of built-in feature we could greatly enhance the
> ability of KStreams to better handle the CAP Theorem characteristics, so
> that one could choose to have Consistency over Availability when needed.
> 
> We started this discussion with Matthias J. Sax here:
> https://issues.apache.org/jira/browse/KAFKA-6953
> 
> If you want to see more, go to KIP-326 at:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source
> 
> -Flávio Stutz
>