You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/03/26 17:00:03 UTC

[jira] [Commented] (SAMZA-1627) Watermark broadcast enhancements

    [ https://issues.apache.org/jira/browse/SAMZA-1627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414131#comment-16414131 ] 

ASF GitHub Bot commented on SAMZA-1627:
---------------------------------------

GitHub user xinyuiscool opened a pull request:

    https://github.com/apache/samza/pull/456

    SAMZA-1627: Watermark broadcast enhancements

    Currently each upstream task needs to broadcast to every single partition of intermediate streams in order to aggregate watermarks in the consumers. A better way to do this is to have only one downstream consumer doing the aggregation, and then broadcast to all the partitions. This is safe as we can prove the broadcast watermark message is after all the upstream tasks finished producing the events that before the event time before this watermark. This reduced the full message count from O(n^2) to O(n).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/xinyuiscool/samza SAMZA-1627

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/samza/pull/456.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #456
    
----
commit 9c43008c2cc3c79d0659dec0e608d7e6f5a8f63a
Author: xinyuiscool <xi...@...>
Date:   2018-03-22T21:32:37Z

    SAMZA-1627: Watermark broadcast enhancements

----


> Watermark broadcast enhancements
> --------------------------------
>
>                 Key: SAMZA-1627
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1627
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Xinyu Liu
>            Assignee: Xinyu Liu
>            Priority: Major
>
> Currently each upstream task needs to broadcast to every single partition of intermediate streams in order to aggregate watermarks in the consumers. It's O(n^2). For 256 tasks, 256-partition intermediate stream this can easily result in 64k msg/s if we send watermark every second. To illustrate:
> T1     T2    T3
>  |   \   /\ |  /\ /\ |
> P1      P2    P3
>  
> A better way to do this is to have only one downstream consumer doing the aggregation, and then broadcast to all the partitions. This is safe as we can do a simple proof: if P1 received watermark of t from all T1, T2, and T3, all the messages before t have been published to (P1, P2, P3) already (might not be consumed yet). So P1 can safely broadcast the watermark t to P2 and P3. To illustrate:
> T1     T2     T3
>     \      |       /
>          P1
>        /    \
>       P2  P3
> This reduced the full message count from O(n^2) to O(n). The cost is that this might introduce a few milliseconds delay since we need to exchange the message twice. The benefit clearly wins. In practice, the aggregate consumer can be decided from the (topic.hash() % total partitions) to spread the aggregation if we have multiple intermediate streams.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)