You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Mathieu DESPRIEE (Jira)" <ji...@apache.org> on 2020/12/11 11:21:00 UTC

[jira] [Created] (KAFKA-10844) groupBy without shuffling

Mathieu DESPRIEE created KAFKA-10844:
----------------------------------------

             Summary: groupBy without shuffling
                 Key: KAFKA-10844
                 URL: https://issues.apache.org/jira/browse/KAFKA-10844
             Project: Kafka
          Issue Type: Improvement
          Components: streams
    Affects Versions: 2.6.0
            Reporter: Mathieu DESPRIEE


The idea is to give a way to keep the current partitioning while doing a groupBy.

Our use-case is the following:
 We process device data (stream is partitioned by device-id), each device produces several metrics. We want to aggregate by metric, so currently we do a
{code:java}
 selectKey( ... => (device, metric)).groupByKey.windowedBy(...).aggregate(...)  {code}
This shuffles the data around, but it's not necessary, each (device, metric) group could stay in the original partition.

This is not only an optimization question. We are experiencing invalid aggregations when reprocessing history. In these reprocessing, we frequently see some tasks moving faster on some partitions. This causes problems with event-time: Lets' say data for device d1 is in partition p1 and stream-time t1, and device d2 / partition p2 / time t2.
 Now, if I re-key by (device, metric), records from both devices could have the same hash-key and land in the same partition. And if t2 is far ahead of t1, then all time-windows for t1 get expired at once.

Maybe I miss some way of doing this with the existing API, please let me know. Currently, I manually repartition and specify a custom partitioner, but it's tedious.

If I were to rewrite the aggregations manually with Transformer API, I would use (device, key) for my state store key, without changing the record key.

 

_(poke_ [~vvcephei] _following our discussion on users ml)_



--
This message was sent by Atlassian Jira
(v8.3.4#803005)