You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax" <ma...@confluent.io> on 2019/11/25 00:53:22 UTC

Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source

Moved this KIP into status "inactive". Feel free to resume and any time.

-Matthias

On 7/15/18 6:55 PM, Matthias J. Sax wrote:
> I think it would make a lot of sense to provide a simple DSL abstraction.
> 
> Something like:
> 
> KStream stream = ...
> KTable count = stream.count();
> 
> The missing groupBy() or grouByKey() class indicates a global counting
> operation. The JavaDocs should highlight the impact.
> 
> One open question is, what key we want to use for the result KTable?
> 
> Also, the details about optional parameters like `Materialized` need to
> be discussed in details.
> 
> 
> 
> -Matthias
> 
> 
> On 7/6/18 2:43 PM, Guozhang Wang wrote:
>> That's a lot of email exchanges for me to catch up :)
>>
>> My original proposed alternative solution is indeed relying on
>> pre-aggregate before sending to the single-partition topic, so that the
>> traffic on that single-partition topic would not be huge (I called it
>> partial-aggregate but the intent was the same).
>>
>> What I was thinking is that, given such a scenario could be common, if
>> we've decided to go down this route should we provide a new API that wrap's
>> John's proposed topology (right now with KIP-328 users still need to
>> leverage this trick manually):
>>
>>
>> ----------
>>
>> final KStream<String, String> siteEvents = builder.stream("/site-events");
>>
>> final KStream<Integer, Integer> keyedByPartition = siteEvents.transform(/*
>> generate KeyValue(key, 1) for the pre-aggregate*/);
>>
>> final KTable<Integer, Long> countsByPartition =
>> keyedByPartition.groupByKey().count();   /* pre-aggregate */
>>
>> final KGroupedTable<String, Long> singlePartition =
>> countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL", value));
>>  /* sent the suppressed pre-aggregate values to the single partition topic
>> */
>>
>> final KTable<String, Long> totalCount = singlePartition.reduce((l, r) -> l +
>> r, (l, r) -> l - r);   /* read from the single partition topic, do reduce
>> on the data*/
>>
>> ----------
>>
>> Note that if we wrap them all into a new operator, users would need to
>> provide two functions, for the aggregate and for the final "reduce" (in my
>> previous email I called it merger function, but for the same intent).
>>
>>
>>
>> Guozhang
>>
>>
>>
>> On Thu, Jul 5, 2018 at 3:38 PM, John Roesler <jo...@confluent.io> wrote:
>>
>>> Ok, I didn't get quite as far as I hoped, and several things are far from
>>> ready, but here's what I have so far:
>>> https://github.com/apache/kafka/pull/5337
>>>
>>> The "unit" test works, and is a good example of how you should expect it to
>>> behave:
>>> https://github.com/apache/kafka/pull/5337/files#diff-
>>> 2fdec52b9cc3d0e564f0c12a199bed77
>>>
>>> I have one working integration test, but it's slow going getting the timing
>>> right, so no promises of any kind ;)
>>>
>>> Let me know what you think!
>>>
>>> Thanks,
>>> -John
>>>
>>> On Thu, Jul 5, 2018 at 8:39 AM John Roesler <jo...@confluent.io> wrote:
>>>
>>>> Hey Flávio,
>>>>
>>>> Thanks! I haven't got anything usable yet, but I'm working on it now. I'm
>>>> hoping to push up my branch by the end of the day.
>>>>
>>>> I don't know if you've seen it but Streams actually already has something
>>>> like this, in the form of caching on materialized stores. If you pass in
>>> a
>>>> "Materialized.withCachingEnabled()", you should be able to get a POC
>>>> working by setting the max cache size pretty high and setting the commit
>>>> interval for your desired rate:
>>>> https://docs.confluent.io/current/streams/developer-
>>> guide/memory-mgmt.html#streams-developer-guide-memory-management
>>>> .
>>>>
>>>> There are a couple of cases in joins and whatnot where it doesn't work,
>>>> but for the aggregations we discussed, it should. The reason for KIP-328
>>> is
>>>> to provide finer control and hopefully a more straightforward API.
>>>>
>>>> Let me know if that works, and I'll drop a message in here when I create
>>>> the draft PR for KIP-328. I'd really appreciate your feedback.
>>>>
>>>> Thanks,
>>>> -John
>>>>
>>>> On Wed, Jul 4, 2018 at 10:17 PM flaviostutz@gmail.com <
>>>> flaviostutz@gmail.com> wrote:
>>>>
>>>>> John, that was fantastic, man!
>>>>> Have you built any custom implementation of your KIP in your machine so
>>>>> that I could test it out here? I wish I could test it out.
>>>>> If you need any help implementing this feature, please tell me.
>>>>>
>>>>> Thanks.
>>>>>
>>>>> -Flávio Stutz
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 2018/07/03 18:04:52, John Roesler <jo...@confluent.io> wrote:
>>>>>> Hi Flávio,
>>>>>> Thanks! I think that we can actually do this, but the API could be
>>>>> better.
>>>>>> I've included Java code below, but I'll copy and modify your example
>>> so
>>>>>> we're on the same page.
>>>>>>
>>>>>> EXERCISE 1:
>>>>>>   - The case is "total counting of events for a huge website"
>>>>>>   - Tasks from Application A will have something like:
>>>>>>          .stream(/site-events)
>>>>>>          .transform( re-key s.t. the new key is the partition id)
>>>>>>          .groupByKey() // you have to do this before count
>>>>>>          .count()
>>>>>>           // you explicitly published to a one-partition topic here,
>>> but
>>>>>> it's actually sufficient just
>>>>>>           // to re-group onto one key. You could name and pre-create
>>> the
>>>>>> intermediate topic here,
>>>>>>           // but you don't need a separate application for the final
>>>>>> aggregation.
>>>>>>          .groupBy((partitionId, partialCount) -> new KeyValue("ALL",
>>>>>> partialCount))
>>>>>>          .aggregate(sum up the partialCounts)
>>>>>>          .publish(/counter-total)
>>>>>>
>>>>>> I've left out the suppressions, but they would go right after the
>>>>> count()
>>>>>> and the aggregate().
>>>>>>
>>>>>> With this program, you don't have to worry about the
>>> double-aggregation
>>>>> you
>>>>>> mentioned in the last email. The KTable produced by the first count()
>>>>> will
>>>>>> maintain the correct count per partition. If the value changes for any
>>>>>> partition, it'll emit a retraction of the old value and then the new
>>>>> value
>>>>>> downstream, so that the final aggregation can update itself properly.
>>>>>>
>>>>>> I think we can optimize both the execution and the programability by
>>>>> adding
>>>>>> a "global aggregation" concept. But In principle, it seems like this
>>>>> usage
>>>>>> of the current API will support your use case.
>>>>>>
>>>>>> Once again, though, this is just to present an alternative. I haven't
>>>>> done
>>>>>> the math on whether your proposal would be more efficient.
>>>>>>
>>>>>> Thanks,
>>>>>> -John
>>>>>>
>>>>>> Here's the same algorithm written in Java:
>>>>>>
>>>>>> final KStream<String, String> siteEvents =
>>>>> builder.stream("/site-events");
>>>>>>
>>>>>> // here we re-key the events so that the key is actually the partition
>>>>> id.
>>>>>> // we don't need the value to do a count, so I just set it to "1".
>>>>>> final KStream<Integer, Integer> keyedByPartition =
>>>>> siteEvents.transform(()
>>>>>> -> new Transformer<String, String, KeyValue<Integer, Integer>>() {
>>>>>>     private ProcessorContext context;
>>>>>>
>>>>>>     @Override
>>>>>>     public void init(final ProcessorContext context) {
>>>>>>         this.context = context;
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public KeyValue<Integer, Integer> transform(final String key,
>>> final
>>>>>> String value) {
>>>>>>         return new KeyValue<>(context.partition(), 1);
>>>>>>     }
>>>>>> });
>>>>>>
>>>>>> // Note that we can't do "count()" on a KStream, we have to group it
>>>>> first.
>>>>>> I'm grouping by the key, so it will produce the count for each key.
>>>>>> // Since the key is actually the partition id, it will produce the
>>>>>> pre-aggregated count per partition.
>>>>>> // Note that the result is a KTable<PartitionId,Count>. It'll always
>>>>>> contain the most recent count for each partition.
>>>>>> final KTable<Integer, Long> countsByPartition =
>>>>>> keyedByPartition.groupByKey().count();
>>>>>>
>>>>>> // Now we get ready for the final roll-up. We re-group all the
>>>>> constituent
>>>>>> counts
>>>>>> final KGroupedTable<String, Long> singlePartition =
>>>>>> countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL",
>>> value));
>>>>>>
>>>>>> final KTable<String, Long> totalCount = singlePartition.reduce((l, r)
>>>>> -> l
>>>>>> + r, (l, r) -> l - r);
>>>>>>
>>>>>> totalCount.toStream().foreach((k, v) -> {
>>>>>>     // k is always "ALL"
>>>>>>     // v is always the most recent total value
>>>>>>     System.out.println("The total event count is: " + v);
>>>>>> });
>>>>>>
>>>>>>
>>>>>> On Tue, Jul 3, 2018 at 9:21 AM flaviostutz@gmail.com <
>>>>> flaviostutz@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Great feature you have there!
>>>>>>>
>>>>>>> I'll try to exercise here how we would achieve the same functional
>>>>>>> objectives using your KIP:
>>>>>>>
>>>>>>> EXERCISE 1:
>>>>>>>   - The case is "total counting of events for a huge website"
>>>>>>>   - Tasks from Application A will have something like:
>>>>>>>          .stream(/site-events)
>>>>>>>          .count()
>>>>>>>          .publish(/single-partitioned-topic-with-count-partials)
>>>>>>>   - The published messages will be, for example:
>>>>>>>           ["counter-task1", 2345]
>>>>>>>           ["counter-task2", 8495]
>>>>>>>           ["counter-task3", 4839]
>>>>>>>   - Single Task from Application B will have something like:
>>>>>>>          .stream(/single-partitioned-topic-with-count-partials)
>>>>>>>          .aggregate(by messages whose key starts with "counter")
>>>>>>>          .publish(/counter-total)
>>>>>>>   - FAIL HERE. How would I know what is the overall partitions?
>>> Maybe
>>>>> two
>>>>>>> partials for the same task will arrive before other tasks and it
>>> maybe
>>>>>>> aggregated twice.
>>>>>>>
>>>>>>> I tried to think about using GlobalKTables, but I didn't get an easy
>>>>> way
>>>>>>> to aggregate the keys from that table. Do you have any clue?
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>> -Flávio Stutz
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> /partial-counters-to-single-partitioned-topic
>>>>>>>
>>>>>>> On 2018/07/02 20:03:57, John Roesler <jo...@confluent.io> wrote:
>>>>>>>> Hi Flávio,
>>>>>>>>
>>>>>>>> Thanks for the KIP. I'll apologize that I'm arriving late to the
>>>>>>>> discussion. I've tried to catch up, but I might have missed some
>>>>> nuances.
>>>>>>>>
>>>>>>>> Regarding KIP-328, the idea is to add the ability to suppress
>>>>>>> intermediate
>>>>>>>> results from all KTables, not just windowed ones. I think this
>>> could
>>>>>>>> support your use case in combination with the strategy that
>>> Guozhang
>>>>>>>> proposed of having one or more pre-aggregation steps that
>>>>> ultimately push
>>>>>>>> into a single-partition topic for final aggregation. Suppressing
>>>>>>>> intermediate results would solve the problem you noted that today
>>>>>>>> pre-aggregating doesn't do much to staunch the flow up updates.
>>>>>>>>
>>>>>>>> I'm not sure if this would be good enough for you overall; I just
>>>>> wanted
>>>>>>> to
>>>>>>>> clarify the role of KIP-328.
>>>>>>>> In particular, the solution you mentioned is to have the
>>> downstream
>>>>>>> KTables
>>>>>>>> actually query the upstream ones to compute their results. I'm not
>>>>> sure
>>>>>>>> whether it's more efficient to do these queries on the schedule,
>>> or
>>>>> to
>>>>>>> have
>>>>>>>> the upstream tables emit their results, on the same schedule.
>>>>>>>>
>>>>>>>> What do you think?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> -John
>>>>>>>>
>>>>>>>> On Sun, Jul 1, 2018 at 10:03 PM flaviostutz@gmail.com <
>>>>>>> flaviostutz@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> For what I understood, that KIP is related to how KStreams will
>>>>> handle
>>>>>>>>> KTable updates in Windowed scenarios to optimize resource usage.
>>>>>>>>> I couldn't see any specific relation to this KIP. Had you?
>>>>>>>>>
>>>>>>>>> -Flávio Stutz
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 2018/06/29 18:14:46, "Matthias J. Sax" <
>>> matthias@confluent.io>
>>>>>>> wrote:
>>>>>>>>>> Flavio,
>>>>>>>>>>
>>>>>>>>>> thanks for cleaning up the KIP number collision.
>>>>>>>>>>
>>>>>>>>>> With regard to KIP-328
>>>>>>>>>> (
>>>>>>>>>
>>>>>>>
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 328%3A+Ability+to+suppress+updates+for+KTables
>>>>>>>>> )
>>>>>>>>>> I am wondering how both relate to each other?
>>>>>>>>>>
>>>>>>>>>> Any thoughts?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>> On 6/29/18 10:23 AM, flaviostutz@gmail.com wrote:
>>>>>>>>>>> Just copying a follow up from another thread to here (sorry
>>>>> about
>>>>>>> the
>>>>>>>>> mess):
>>>>>>>>>>>
>>>>>>>>>>> From: Guozhang Wang <wa...@gmail.com>
>>>>>>>>>>> Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph
>>>>> source
>>>>>>>>>>> Date: 2018/06/25 22:24:17
>>>>>>>>>>> List: dev@kafka.apache.org
>>>>>>>>>>>
>>>>>>>>>>> Flávio, thanks for creating this KIP.
>>>>>>>>>>>
>>>>>>>>>>> I think this "single-aggregation" use case is common enough
>>>>> that we
>>>>>>>>> should
>>>>>>>>>>> consider how to efficiently supports it: for example, for
>>> KSQL
>>>>>>> that's
>>>>>>>>> built
>>>>>>>>>>> on top of Streams, we've seen lots of query statements whose
>>>>>>> return is
>>>>>>>>>>> expected a single row indicating the "total aggregate" etc.
>>>>> See
>>>>>>>>>>> https://github.com/confluentinc/ksql/issues/430 for
>>> details.
>>>>>>>>>>>
>>>>>>>>>>> I've not read through
>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6953,
>>>>>>>>> but
>>>>>>>>>>> I'm wondering if we have discussed the option of supporting
>>>>> it in a
>>>>>>>>>>> "pre-aggregate" manner: that is we do partial aggregates on
>>>>>>> parallel
>>>>>>>>> tasks,
>>>>>>>>>>> and then sends the partial aggregated value via a single
>>> topic
>>>>>>>>> partition
>>>>>>>>>>> for the final aggregate, to reduce the traffic on that
>>> single
>>>>>>>>> partition and
>>>>>>>>>>> hence the final aggregate workload.
>>>>>>>>>>> Of course, for non-commutative aggregates we'd probably need
>>>>> to
>>>>>>> provide
>>>>>>>>>>> another API in addition to aggregate, like the `merge`
>>>>> function for
>>>>>>>>>>> session-based aggregates, to let users customize the
>>>>> operations of
>>>>>>>>> merging
>>>>>>>>>>> two partial aggregates into a single partial aggregate.
>>>>> What's its
>>>>>>>>> pros and
>>>>>>>>>>> cons compared with the current proposal?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Guozhang
>>>>>>>>>>> On 2018/06/26 18:22:27, Flávio Stutz <flaviostutz@gmail.com
>>>>
>>>>>>> wrote:
>>>>>>>>>>>> Hey, guys, I've just created a new KIP about creating a new
>>>>> DSL
>>>>>>> graph
>>>>>>>>>>>> source for realtime partitioned consolidations.
>>>>>>>>>>>>
>>>>>>>>>>>> We have faced the following scenario/problem in a lot of
>>>>>>> situations
>>>>>>>>> with
>>>>>>>>>>>> KStreams:
>>>>>>>>>>>>    - Huge incoming data being processed by numerous
>>>>> application
>>>>>>>>> instances
>>>>>>>>>>>>    - Need to aggregate different fields whose records span
>>>>> all
>>>>>>> topic
>>>>>>>>>>>> partitions (something like “total amount spent by people
>>>>> aged > 30
>>>>>>>>> yrs”
>>>>>>>>>>>> when processing a topic partitioned by userid).
>>>>>>>>>>>>
>>>>>>>>>>>> The challenge here is to manage this kind of situation
>>>>> without any
>>>>>>>>>>>> bottlenecks. We don't need the “global aggregation” to be
>>>>>>> processed
>>>>>>>>> at each
>>>>>>>>>>>> incoming message. On a scenario of 500 instances, each
>>>>> handling 1k
>>>>>>>>>>>> messages/s, any single point of aggregation (single
>>>>> partitioned
>>>>>>>>> topics,
>>>>>>>>>>>> global tables or external databases) would create a
>>>>> bottleneck of
>>>>>>> 500k
>>>>>>>>>>>> messages/s for single threaded/CPU elements.
>>>>>>>>>>>>
>>>>>>>>>>>> For this scenario, it is possible to store the partial
>>>>>>> aggregations on
>>>>>>>>>>>> local stores and, from time to time, query those states and
>>>>>>> aggregate
>>>>>>>>> them
>>>>>>>>>>>> as a single value, avoiding bottlenecks. This is a way to
>>>>> create a
>>>>>>>>> "timed
>>>>>>>>>>>> aggregation barrier”.
>>>>>>>>>>>>
>>>>>>>>>>>> If we leverage this kind of built-in feature we could
>>> greatly
>>>>>>> enhance
>>>>>>>>> the
>>>>>>>>>>>> ability of KStreams to better handle the CAP Theorem
>>>>>>> characteristics,
>>>>>>>>> so
>>>>>>>>>>>> that one could choose to have Consistency over Availability
>>>>> when
>>>>>>>>> needed.
>>>>>>>>>>>>
>>>>>>>>>>>> We started this discussion with Matthias J. Sax here:
>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6953
>>>>>>>>>>>>
>>>>>>>>>>>> If you want to see more, go to KIP-326 at:
>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 326%3A+Schedulable+KTable+as+Graph+source
>>>>>>>>>>>>
>>>>>>>>>>>> -Flávio Stutz
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>>
>