You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Henry Cai (JIRA)" <ji...@apache.org> on 2016/04/21 01:56:25 UTC

[jira] [Commented] (KAFKA-3101) Optimize Aggregation Outputs

    [ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15250977#comment-15250977 ] 

Henry Cai commented on KAFKA-3101:
----------------------------------

We really need this batching/buffering feature for us to adopt kafka streams, otherwise the output rate from aggregation store is too high.  Any idea on when this will be implemented?

Another use case is similar to this, we have a left outer join case between two streams:

INSERT INTO C
SELECT a, b
FROM A
Left Outer Join B
on a.id = b.id

On the output stream, we might see (a, null) then followed by (a, b) which cancels the (a, null).  In order to reduce this kind of churn, we can have a policy of 15 minute buffer, if we don't see (a, b) within 15 minute then we emit (a, null).

Hopefully your solution for buffering aggregation output can solve the left outer join case as well.  The other workaround would be to delay the B stream by 15 minutes which also needs a buffering mechanism.


> Optimize Aggregation Outputs
> ----------------------------
>
>                 Key: KAFKA-3101
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3101
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Bill Bejeck
>              Labels: architecture
>             Fix For: 0.10.1.0
>
>
> Today we emit one output record for each incoming message for Table / Windowed Stream Aggregations. For example, say we have a sequence of aggregate outputs computed from the input stream (assuming there is no agg value for this key before):
> V1, V2, V3, V4, V5
> Then the aggregator will output the following sequence of Change<newValue, oldValue>:
> <V1, null>, <V2, V1>, <V3, V2>, <V4, V3>, <V5, V4>
> where could cost a lot of CPU overhead computing the intermediate results. 
> Instead if we can let the underlying state store to "remember" the last emitted old value, we can reduce the number of emits based on some configs. More specifically, we can add one more field in the KV store engine storing the last emitted old value, which only get updated when we emit to the downstream processor. For example:
> At Beginning:                 
> Store: key => empty (no agg values yet)
> V1 computed:         
> Update Both in Store: key => (V1, V1),     Emit <V1, null>
> V2 computed:         
> Update NewValue in Store: key => (V2, V1),     No Emit
> V3 computed:         
> Update NewValue in Store: key => (V3, V1),     No Emit
> V4 computed:         
> Update Both in Store: key => (V4, V4),     Emit <V4, V1>
> V5 computed:         
> Update NewValue in Store: key => (V5, V4),     No Emit
> One more thing to consider is that, we need a "closing" time control on the not-yet-emitted keys; when some time has elapsed (or the window is to be closed), we need to check for any key if their current materialized pairs have not been emitted (for example <V5, V4> in the above example). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)