You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Xinyu Liu (JIRA)" <ji...@apache.org> on 2018/03/22 17:00:00 UTC

[jira] [Created] (SAMZA-1627) Watermark broadcast enhancements

Xinyu Liu created SAMZA-1627:
--------------------------------

             Summary: Watermark broadcast enhancements
                 Key: SAMZA-1627
                 URL: https://issues.apache.org/jira/browse/SAMZA-1627
             Project: Samza
          Issue Type: Bug
            Reporter: Xinyu Liu
            Assignee: Xinyu Liu


Currently each upstream task needs to broadcast to every single partition of intermediate streams in order to aggregate watermarks in the consumers. It's O(n^2). For 256 tasks, 256-partition intermediate stream this can easily result in 64k msg/s if we send watermark every second. To illustrate:

T1     T2    T3

 |   \   /\ |  /\ /\ |

P1      P2    P3

 

A better way to do this is to have only one downstream consumer doing the aggregation, and then broadcast to all the partitions. This is safe as we can do a simple proof: if P1 received watermark of t from all T1, T2, and T3, all the messages before t have been published to (P1, P2, P3) already (might not be consumed yet). So P1 can safely broadcast the watermark t to P2 and P3. To illustrate:

T1     T2     T3

    \      |       /

         P1

       /    \

      P2  P3

This reduced the full message count from O(n^2) to O(n). In practice, the aggregate consumer can be decided from the (topic.hash() % total partitions) to spread the aggregation if we have multiple intermediate streams.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)