You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (JIRA)" <ji...@apache.org> on 2018/05/26 18:50:00 UTC

[jira] [Commented] (KAFKA-6953) [Streams] Schedulable KTable as Graph source (for minimizing aggregation pressure)

    [ https://issues.apache.org/jira/browse/KAFKA-6953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491781#comment-16491781 ] 

Matthias J. Sax commented on KAFKA-6953:
----------------------------------------

Couple of comments:
 - With sufficiently large caches the first approach about piping all data throw a singled-partitioned topic should actually be feasible. (cf. [https://kafka.apache.org/11/documentation/streams/developer-guide/memory-mgmt.html)]
 - The idea about REST proxy is also feasible; for this exact pattern, "Interactive Queries" was built (cf. [https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html);] to bad if you "don't like it"

Additionally, Kafka Streams exposes JXM metrics and you can register your own metrics there: [https://docs.confluent.io/current/streams/monitoring.html] (this might actually be the recommended way to get stats on how many records got processed).

Overall, I am not convinced atm that adding a feature as proposed is necessary. (But I am happy to get convinced :))

> [Streams] Schedulable KTable as Graph source (for minimizing aggregation pressure)
> ----------------------------------------------------------------------------------
>
>                 Key: KAFKA-6953
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6953
>             Project: Kafka
>          Issue Type: New Feature
>          Components: streams
>            Reporter: Flavio Stutz
>            Priority: Major
>
> === PROBLEM ===
> We have faced the following scenario/problem in a lot of situations with KStreams:
>    - Huge incoming data being processed by numerous application instances
>    - Need to aggregate, or count the overall data as a single value (something like "count the total number of messages that has been processed among all distributed instances")
>    - The challenge here is to manage this kind of situation without any bottlenecks. We don't need the overall aggregation of all instances states at each processed message, so it is possible to store the partial aggregations on local stores and, at time to time, query those states and aggregate them, avoiding bottlenecks.
> Some ways we KNOW it wouldn't work because of bottlenecks:
>     - Sink all instances local counter/aggregation result to a Topic with a single partition so that we could have another Graph with a single instance that could aggregate all results
>          - In this case, if I had 500 instances processing 1000/s each (with no bottlenecks), I would have a single partition topic with 500k messages/s for my single aggregating instance to process that much messages (IMPOSSIBLE bottleneck)
> === TRIALS ===
> These are some ways we managed to do this:
>    - Expose a REST endpoint so that Prometheus could extract local metrics of each application instance's state stores and them calculate the total count on Prometheus using queries
>          - we don't like this much because we believe KStreams was meant to INPUT and OUTPUT data using Kafka Topics for simplicity and power
>    - Create a scheduled Punctuate at the end of the Graph so that we can query (using getAllMetadata) all other instances's state store counters, sum them all and them publish to another Kafka Topic from time to time.
>           - For this to work we created a way so that only one application instance's Punctuate algorithm would perform the calculations (something like a master election through instance ids and metadata)
> === PROPOSAL ===
> Create a new DSL Source with the following characteristics:
>    - Source parameters: "scheduled time" (using cron's like config), "state store name", bool "from all application instances"
>    - Behavior: At the desired time, query all K,V tuples from the state store and source those messages to the Graph
>           - If "from all application instances" is true, query the tuples from all application instances state stores and source them all, concatenated
>    - This is a way to create a "timed aggregation barrier" to avoid bottlenecks. With this we could enhance the ability of KStreams to better handle the CAP Theorem characteristics, so that one could choose to have Consistency over Availability.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)