You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Matthias J. Sax" <ma...@confluent.io> on 2017/05/09 17:31:49 UTC

Kafka Streams Usage Patterns

Hi,

I started a new Wiki page to collect some common usage patterns for
Kafka Streams.

Right now, it contains a quick example on "how to compute average". Hope
we can collect more example like this!

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns


-Matthias


Re: Kafka Streams Usage Patterns

Posted by Michal Borowiecki <mi...@openbet.com>.
Hi all,

Another pattern I think is worth adding is a sliding-windowed message 
reordering and de-duplicating processor.

The outline I have in mind is based on this (just the timestamp would 
come from the record context - in this question the timestamp was in the 
body of the message):

https://stackoverflow.com/a/44345374/7897191

Please let me know if you have a better design for this?

Cheers,

Michal


On 27/05/17 21:16, Jay Kreps wrote:
> This is great!
>
> -Jay
>
> On Sat, May 27, 2017 at 12:47 PM, Michal Borowiecki 
> <michal.borowiecki@openbet.com <ma...@openbet.com>> 
> wrote:
>
>     Hi all,
>
>     I've updated the wiki page with a draft pattern for consecutively
>     growing time-windowed aggregations which was discussed some time
>     ago on this mailing list.
>
>     I'm yet to add the part that cleans up the stores using
>     punctuations. Stay tuned.
>
>
>     On a somewhat similar subject, I've been working to implement the
>     following requirements:
>
>     * transaction sums per customer session (simple, just extract
>     non-expired session-windowed aggregates from a SessionStore using
>     interactive queries)
>
>     * global transaction sums for all _/currently active/_ customer
>     sessions
>
>     The second bit proved non-trivial, because session-windowed
>     KTables (or any windowed KTables for that matter) don't notify
>     downstream when a window expires. And I can't use punctuate until
>     KIP-138 is implemented because stream time punctuation is no good
>     in this case (records can stop coming), reliable system time
>     punctuation would be needed.
>
>     Below is how I implemented this, I'm yet to test it thoroughly.
>
>     I wonder if anyone knows of an easier way of achieving the same.
>
>     If so, I'm looking forward to suggestions. If not, I'll add that
>     to the patterns wiki page too, in case someone else finds it useful.
>
>
>     builder
>        .stream(/*key serde*/, /*transaction serde*/,"transaciton-topic")
>
>        .groupByKey(/*key serde*/, /*transaction serde*/)
>
>        .aggregate(
>          () -> /*empty aggregate*/,
>          aggregator(),
>          merger(),
>          SessionWindows.with(SESSION_TIMEOUT_MS).until(SESSION_TIMEOUT_MS*2),
>          /* aggregate serde */,
>          txPerCustomerSumStore()// this store can be queried for per customer session data )
>
>        .toStream()
>
>        .filter(((key, value) -> value !=null))// tombstones only come when a session is merged into a bigger
>     session, so ignore them
>
>     // the below map/groupByKey/reduce operations are to only
>     propagate updates to the _latest_ session per customer to downstream
>
>        .map((windowedCustomerId, agg) ->// this moves timestamp from the windowed key into the value // so
>     that we can group by customerId only and reduce to the later value
>     new KeyValue<>(
>            windowedCustomerId.key(),// just customerId new WindowedAggsImpl(// this is just like a tuple2 but with nicely named accessors:
>     timestamp() and aggs()
>              windowedCustomerId.window().end(),
>              agg
>            )
>          )
>        )
>        .groupByKey( /*key serde*/, /*windowed aggs serde*/ )// key is just customerId .reduce(// take later session value and ignore any older - downstream only
>     cares about _current_ sessions (val, agg) -> val.timestamp() > agg.timestamp() ? val : agg,
>          TimeWindows.of(SESSION_TIMEOUT_MS).advanceBy(SESSION_TIMOUT_DELAY_TOLERANCE_MS),
>          "latest-session-windowed" )
>
>        .groupBy((windowedCustomerId, timeAndAggs) ->// calculate totals with maximum granularity, which is
>     per-partition new KeyValue<>(
>            new Windowed<>(
>              windowedCustomerId.key().hashCode() %PARTITION_COUNT_FOR_TOTALS,// KIP-159 would come in handy here, to access partition number
>     instead
>              windowedCustomerId.window()// will use this in the interactive queries to pick the oldest
>     not-yet-expired window
>            ),
>            timeAndAggs.aggs()
>          ),
>          new SessionKeySerde<>(Serdes.Integer()),
>     /* aggregate serde */
>        )
>
>        .reduce(
>          (val, agg) -> agg.add(val),
>          (val, agg) -> agg.subtract(val),
>          txTotalsStore()// this store can be queried to get totals per partition for all
>     active sessions );
>
>     builder.globalTable(
>        new SessionKeySerde<>(Serdes.Integer()),
>        /* aggregate serde */,
>        changelogTopicForStore(TRANSACTION_TOTALS),"totals");
>     // this global table puts per partition totals on every node, so
>     that they can be easily summed for global totals, picking the
>     oldest not-yet-expired window
>
>     TODO: put in StreamParitioners (with KTable.through variants added
>     in KAFKA-5045) to avoid re-partitioning where I know it's unnecessary.
>
>     The idea behind the % PARTITION_COUNT_FOR_TOTALS bit is that I
>     want to first do summation with max parallelism and minimize the
>     work needed downstream. So I calculate a per-partition sum first
>     to limit the updates that the totals topic will receive and the
>     summing work done by the interactive queries on the global store.
>     Is this a good way of going about it?
>
>     Thanks,
>
>     Michał
>
>
>     On 09/05/17 18:31, Matthias J. Sax wrote:
>>     Hi,
>>
>>     I started a new Wiki page to collect some common usage patterns for
>>     Kafka Streams.
>>
>>     Right now, it contains a quick example on "how to compute average". Hope
>>     we can collect more example like this!
>>
>>     https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns
>>     <https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns>
>>
>>
>>     -Matthias
>>
>
>     -- 
>     <http://www.openbet.com/> 	Michal Borowiecki
>     Senior Software Engineer L4
>     	T: 	+44 208 742 1600 <tel:+44%2020%208742%201600>
>
>     	
>     	+44 203 249 8448 <tel:+44%2020%203249%208448>
>
>     	
>     	
>     	E: 	michal.borowiecki@openbet.com
>     <ma...@openbet.com>
>     	W: 	www.openbet.com <http://www.openbet.com/>
>
>     	
>     	OpenBet Ltd
>
>     	Chiswick Park Building 9
>
>     	566 Chiswick High Rd
>
>     	London
>
>     	W4 5XT
>
>     	UK
>
>     	
>     <https://www.openbet.com/email_promo>
>
>     This message is confidential and intended only for the addressee.
>     If you have received this message in error, please immediately
>     notify the postmaster@openbet.com <ma...@openbet.com>
>     and delete it from your system as well as any copies. The content
>     of e-mails as well as traffic data may be monitored by OpenBet for
>     employment and security purposes. To protect the environment
>     please do not print this e-mail unless necessary. OpenBet Ltd.
>     Registered Office: Chiswick Park Building 9, 566 Chiswick High
>     Road, London, W4 5XT, United Kingdom. A company registered in
>     England and Wales. Registered no. 3134634. VAT no. GB927523612
>
>

-- 
Signature
<http://www.openbet.com/> 	Michal Borowiecki
Senior Software Engineer L4
	T: 	+44 208 742 1600

	
	+44 203 249 8448

	
	
	E: 	michal.borowiecki@openbet.com
	W: 	www.openbet.com <http://www.openbet.com/>

	
	OpenBet Ltd

	Chiswick Park Building 9

	566 Chiswick High Rd

	London

	W4 5XT

	UK

	
<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmaster@openbet.com <ma...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612


Re: Kafka Streams Usage Patterns

Posted by Jay Kreps <ja...@confluent.io>.
This is great!

-Jay

On Sat, May 27, 2017 at 12:47 PM, Michal Borowiecki <
michal.borowiecki@openbet.com> wrote:

> Hi all,
>
> I've updated the wiki page with a draft pattern for consecutively growing
> time-windowed aggregations which was discussed some time ago on this
> mailing list.
>
> I'm yet to add the part that cleans up the stores using punctuations. Stay
> tuned.
>
>
> On a somewhat similar subject, I've been working to implement the
> following requirements:
>
> * transaction sums per customer session (simple, just extract non-expired
> session-windowed aggregates from a SessionStore using interactive queries)
>
> * global transaction sums for all *currently active* customer sessions
>
> The second bit proved non-trivial, because session-windowed KTables (or
> any windowed KTables for that matter) don't notify downstream when a window
> expires. And I can't use punctuate until KIP-138 is implemented because
> stream time punctuation is no good in this case (records can stop coming),
> reliable system time punctuation would be needed.
>
> Below is how I implemented this, I'm yet to test it thoroughly.
>
> I wonder if anyone knows of an easier way of achieving the same.
>
> If so, I'm looking forward to suggestions. If not, I'll add that to the
> patterns wiki page too, in case someone else finds it useful.
>
>
> builder
>   .stream(/*key serde*/, /*transaction serde*/, "transaciton-topic")
>
>   .groupByKey(/*key serde*/, /*transaction serde*/)
>
>   .aggregate(
>     () -> /*empty aggregate*/,
>     aggregator(),
>     merger(),
>     SessionWindows.with(SESSION_TIMEOUT_MS).until(SESSION_TIMEOUT_MS*2),
>     /* aggregate serde */,
>     txPerCustomerSumStore() // this store can be queried for per customer session data  )
>
>   .toStream()
>
>   .filter(((key, value) -> value != null)) // tombstones only come when a session is merged into a bigger session, so ignore them
>    // the below map/groupByKey/reduce operations are to only propagate updates to the *latest* session per customer to downstream
>
>   .map((windowedCustomerId, agg) -> // this moves timestamp from the windowed key into the value                                                          // so that we can group by customerId only and reduce to the later value    new KeyValue<>(
>       windowedCustomerId.key(), // just customerId      new WindowedAggsImpl( // this is just like a tuple2 but with nicely named accessors: timestamp() and aggs()
>         windowedCustomerId.window().end(),
>         agg
>       )
>     )
>   )
>   .groupByKey( /*key serde*/, /*windowed aggs serde*/ ) // key is just customerId  .reduce( // take later session value and ignore any older - downstream only cares about *current* sessions    (val, agg) -> val.timestamp() > agg.timestamp() ? val : agg,
>     TimeWindows.of(SESSION_TIMEOUT_MS).advanceBy(SESSION_TIMOUT_DELAY_TOLERANCE_MS),
>     "latest-session-windowed"  )
>
>   .groupBy((windowedCustomerId, timeAndAggs) -> // calculate totals with maximum granularity, which is per-partition    new KeyValue<>(
>       new Windowed<>(
>         windowedCustomerId.key().hashCode() % PARTITION_COUNT_FOR_TOTALS,  // KIP-159 would come in handy here, to access partition number instead
>         windowedCustomerId.window() // will use this in the interactive queries to pick the oldest not-yet-expired window
>       ),
>       timeAndAggs.aggs()
>     ),
>     new SessionKeySerde<>(Serdes.Integer()),    /* aggregate serde */
>   )
>
>   .reduce(
>     (val, agg) -> agg.add(val),
>     (val, agg) -> agg.subtract(val),
>     txTotalsStore() // this store can be queried to get totals per partition for all active sessions  );
>
> builder.globalTable(
>   new SessionKeySerde<>(Serdes.Integer()),
>   /* aggregate serde */,
>   changelogTopicForStore(TRANSACTION_TOTALS), "totals");// this global table puts per partition totals on every node, so that they can be easily summed for global totals, picking the oldest not-yet-expired window
>
> TODO: put in StreamParitioners (with KTable.through variants added in
> KAFKA-5045) to avoid re-partitioning where I know it's unnecessary.
>
> The idea behind the % PARTITION_COUNT_FOR_TOTALS bit is that I want to
> first do summation with max parallelism and minimize the work needed
> downstream. So I calculate a per-partition sum first to limit the updates
> that the totals topic will receive and the summing work done by the
> interactive queries on the global store. Is this a good way of going about
> it?
>
> Thanks,
>
> Michał
>
> On 09/05/17 18:31, Matthias J. Sax wrote:
>
> Hi,
>
> I started a new Wiki page to collect some common usage patterns for
> Kafka Streams.
>
> Right now, it contains a quick example on "how to compute average". Hope
> we can collect more example like this!
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns
>
>
> -Matthias
>
>
>
> --
> <http://www.openbet.com/> Michal Borowiecki
> Senior Software Engineer L4
> T: +44 208 742 1600 <+44%2020%208742%201600>
>
>
> +44 203 249 8448 <+44%2020%203249%208448>
>
>
>
> E: michal.borowiecki@openbet.com
> W: www.openbet.com
> OpenBet Ltd
>
> Chiswick Park Building 9
>
> 566 Chiswick High Rd
>
> London
>
> W4 5XT
>
> UK
> <https://www.openbet.com/email_promo>
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmaster@openbet.com and delete it from your system as well as any
> copies. The content of e-mails as well as traffic data may be monitored by
> OpenBet for employment and security purposes. To protect the environment
> please do not print this e-mail unless necessary. OpenBet Ltd. Registered
> Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
> United Kingdom. A company registered in England and Wales. Registered no.
> 3134634. VAT no. GB927523612
>

Re: Kafka Streams Usage Patterns

Posted by Michal Borowiecki <mi...@openbet.com>.
Hi all,

I've updated the wiki page with a draft pattern for consecutively 
growing time-windowed aggregations which was discussed some time ago on 
this mailing list.

I'm yet to add the part that cleans up the stores using punctuations. 
Stay tuned.


On a somewhat similar subject, I've been working to implement the 
following requirements:

* transaction sums per customer session (simple, just extract 
non-expired session-windowed aggregates from a SessionStore using 
interactive queries)

* global transaction sums for all _/currently active/_ customer sessions

The second bit proved non-trivial, because session-windowed KTables (or 
any windowed KTables for that matter) don't notify downstream when a 
window expires. And I can't use punctuate until KIP-138 is implemented 
because stream time punctuation is no good in this case (records can 
stop coming), reliable system time punctuation would be needed.

Below is how I implemented this, I'm yet to test it thoroughly.

I wonder if anyone knows of an easier way of achieving the same.

If so, I'm looking forward to suggestions. If not, I'll add that to the 
patterns wiki page too, in case someone else finds it useful.


builder
   .stream(/*key serde*/, /*transaction serde*/,"transaciton-topic")

   .groupByKey(/*key serde*/, /*transaction serde*/)

   .aggregate(
     () -> /*empty aggregate*/,
     aggregator(),
     merger(),
     SessionWindows.with(SESSION_TIMEOUT_MS).until(SESSION_TIMEOUT_MS*2),
     /* aggregate serde */,
     txPerCustomerSumStore()// this store can be queried for per customer session data )

   .toStream()

   .filter(((key, value) -> value !=null))// tombstones only come when a session is merged into a bigger session, 
so ignore them

// the below map/groupByKey/reduce operations are to only propagate 
updates to the _latest_ session per customer to downstream

   .map((windowedCustomerId, agg) ->// this moves timestamp from the windowed key into the value // so that 
we can group by customerId only and reduce to the later value new KeyValue<>(
       windowedCustomerId.key(),// just customerId new WindowedAggsImpl(// this is just like a tuple2 but with nicely named accessors: 
timestamp() and aggs()
         windowedCustomerId.window().end(),
         agg
       )
     )
   )
   .groupByKey( /*key serde*/, /*windowed aggs serde*/ )// key is just customerId .reduce(// take later session value and ignore any older - downstream only cares 
about _current_ sessions (val, agg) -> val.timestamp() > agg.timestamp() ? val : agg,
     TimeWindows.of(SESSION_TIMEOUT_MS).advanceBy(SESSION_TIMOUT_DELAY_TOLERANCE_MS),
     "latest-session-windowed" )

   .groupBy((windowedCustomerId, timeAndAggs) ->// calculate totals with maximum granularity, which is per-partition new KeyValue<>(
       new Windowed<>(
         windowedCustomerId.key().hashCode() %PARTITION_COUNT_FOR_TOTALS,// KIP-159 would come in handy here, to access partition number instead
         windowedCustomerId.window()// will use this in the interactive queries to pick the oldest 
not-yet-expired window
       ),
       timeAndAggs.aggs()
     ),
     new SessionKeySerde<>(Serdes.Integer()),
/* aggregate serde */
   )

   .reduce(
     (val, agg) -> agg.add(val),
     (val, agg) -> agg.subtract(val),
     txTotalsStore()// this store can be queried to get totals per partition for all active 
sessions );

builder.globalTable(
   new SessionKeySerde<>(Serdes.Integer()),
   /* aggregate serde */,
   changelogTopicForStore(TRANSACTION_TOTALS),"totals");
// this global table puts per partition totals on every node, so that 
they can be easily summed for global totals, picking the oldest 
not-yet-expired window

TODO: put in StreamParitioners (with KTable.through variants added in 
KAFKA-5045) to avoid re-partitioning where I know it's unnecessary.

The idea behind the % PARTITION_COUNT_FOR_TOTALS bit is that I want to 
first do summation with max parallelism and minimize the work needed 
downstream. So I calculate a per-partition sum first to limit the 
updates that the totals topic will receive and the summing work done by 
the interactive queries on the global store. Is this a good way of going 
about it?

Thanks,

Michał


On 09/05/17 18:31, Matthias J. Sax wrote:
> Hi,
>
> I started a new Wiki page to collect some common usage patterns for
> Kafka Streams.
>
> Right now, it contains a quick example on "how to compute average". Hope
> we can collect more example like this!
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns
>
>
> -Matthias
>

-- 
Signature
<http://www.openbet.com/> 	Michal Borowiecki
Senior Software Engineer L4
	T: 	+44 208 742 1600

	
	+44 203 249 8448

	
	
	E: 	michal.borowiecki@openbet.com
	W: 	www.openbet.com <http://www.openbet.com/>

	
	OpenBet Ltd

	Chiswick Park Building 9

	566 Chiswick High Rd

	London

	W4 5XT

	UK

	
<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmaster@openbet.com <ma...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612