You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Michael Noll (JIRA)" <ji...@apache.org> on 2016/04/11 11:06:25 UTC

[jira] [Comment Edited] (KAFKA-3511) Provide built-in aggregators sum() and avg() in Kafka Streams DSL

    [ https://issues.apache.org/jira/browse/KAFKA-3511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15234770#comment-15234770 ] 

Michael Noll edited comment on KAFKA-3511 at 4/11/16 9:05 AM:
--------------------------------------------------------------

Hmm.  I'd prefer an API that allows me to write:

* {{stream.max()}} rather than
* {{stream.aggregate(new Max())}}

That said I think we are talking about two related but different questions:

1. Which aggregations do we provide out of the box?
2. How do we expose these built-in aggregations?

Regarding point 1, we could consider these built-in aggregations (cf. Scala's collection API, for example), followed by an example use case.  Perhaps, in general, it might be a good idea to think in terms of common use cases -- taking into account which would make sense on windowed streams, which on non-windowed streams, which on KStream, which on KTable, which on both -- when deciding whether/which aggregations we would provide out of the box.  Again, I am listing concrete function names here but the purpose is to list their *functionality* (point 2 is meant to address how this functionality is being exposed).

* {{count}} / {{countBy}} -- "how many pageviews between 4AM and 5 AM?"   (countBy and filter are related via filter.count)
* {{min}} / {{minBy}} -- "the page with the least views"
* {{max}} / {{maxBy}} -- "the page with the most views", "the IP address with the most hits (e.g. to implement throttling, blocklists)"
* {{sum}} -- "total number of pageviews per user"
* {{distinct}} -- "distinct elements in the stream" (this is related to changelog stream aka KTable and the stream-table duality; perhaps distinct would make sense only for KStream, as "distinct" is essentially a native feature of KTable)
* Somewhat related: we may also want to consider a "sortBy" and "take" operators to help implement "give me the top N things" type of use cases; min and max only return a single element.  Idea: "windowedStream.sortBy(...).take(5)".

Note: Unfortunately, because we use Java, we can't benefit from implicits / typeclasses as we could e.g. in Scala.  For example, here's the function signature of {{sum}} in Scala:

{code}
def sum[B >: Int](implicit num: Numeric[B]): B
{code}

The benefit is that {{sum}} works automagically as long as type {{B}} "looks like a number", which is a great help when you need to implement an out-of-the-box version of {{sum}} that does the right thing, always.

Regarding point 2, my personal opinion is that *some* basic functionality should be provided via dedicated operators -- call it sugar if you will -- to achieve a more pleasant API for common use cases (e.g. "count()" rather than "aggregate(new Count())").  Of course, we could opt to implement these dedicated operators via "aggregate(new Func())" behind the scenes, which may help with composability when one wants to write a new API layer on top.

See also:

* Scala API:
** http://docs.scala-lang.org/overviews/collections/trait-traversable
** http://docs.scala-lang.org/overviews/collections/seqs
* Spark Streaming API: http://spark.apache.org/docs/latest/streaming-programming-guide.html
* Flink Datastream API: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#datastream-transformations


was (Author: miguno):
Hmm.  I'd prefer an API that allows me to write:

* {{stream.max()}} rather than
* {{stream.aggregate(new Max())}}

That said I think we are talking about two related but different questions:

1. Which aggregations do we provide out of the box?
2. How do we expose these built-in aggregations?

Regarding point 1, we could consider these built-in aggregations (cf. Scala's collection API, for example), followed by an example use case.  Perhaps, in general, it might be a good idea to think in terms of common use cases -- taking into account which would make sense on windowed streams, which on non-windowed streams, which on KStream, which on KTable, which on both -- when deciding whether/which aggregations we would provide out of the box.  Again, I am listing concrete function names here but the purpose is to list their *functionality* (point 2 is meant to address how this functionality is being exposed).

* {{count}} / {{countBy}} -- "how many pageviews between 4AM and 5 AM?"   (countBy and filter are related via filter.count)
* {{min}} / {{minBy}} -- "the page with the least views"
* {{max}} / {{maxBy}} -- "the page with the most views", "the IP address with the most hits (e.g. to implement throttling, blocklists)"
* {{sum}} -- "total number of pageviews per user"
* {{distinct}} -- "distinct elements in the stream" (this is related to changelog stream aka KTable and the stream-table duality; perhaps distinct would make sense only for KStream, as "distinct" is essentially a native feature of KTable)
* Somewhat related: we may also want to consider a "sortBy" and "take" operators to help implement "give me the top N things" type of use cases; min and max only return a single element.  Idea: "windowedStream.sortBy(...).take(5)".

Note: Unfortunately, because we use Java, we can't benefit from implicits / typeclasses as we could e.g. in Scala.  For example, here's the function signature of {{sum}} in Scala:

{code}
def sum[B >: Int](implicit num: Numeric[B]): B
{code}

The benefit is that {{sum}} works automagically as long as type {{B}} "looks like a number", which is a great help when you need to implement an out-of-the-box version of {{sum}} that does the right thing, always.

Regarding point 2, my personal opinion is that *some* basic functionality should be provided via dedicated operators -- call it sugar if you will -- to achieve a more pleasant API for common use cases (e.g. "count()" rather than "aggregate(new Count())").  Of course, we could opt to implement these dedicated operators via "aggregate(new Func())" behind the scenes, which may help with composability when one wants to write a new API layer on top.

> Provide built-in aggregators sum() and avg() in Kafka Streams DSL
> -----------------------------------------------------------------
>
>                 Key: KAFKA-3511
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3511
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>              Labels: api, newbie
>             Fix For: 0.10.0.0
>
>
> Currently we only have one built-in aggregate function count() in the Kafka Streams DSL, but we want to add more aggregation functions like sum() and avg().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)