You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Shanthoosh Venkataraman (JIRA)" <ji...@apache.org> on 2018/11/12 07:53:00 UTC

[jira] [Updated] (SAMZA-1989) SystemStreamGrouper interface change for SEP-5

     [ https://issues.apache.org/jira/browse/SAMZA-1989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Shanthoosh Venkataraman updated SAMZA-1989:
-------------------------------------------
    Description: 
Samza users may need to increase the partition count of the input streams of their stateful samza jobs. For example, Kafka needs to limit the maximum size of each partition to scale up its performance. Thus the number of partitions of a Kafka topic needs to be expanded to reduce the partition size if the average byte-in-rate or retention time of the Kafka topic has doubled.

In order to perform a join between streams, stateful jobs generally route the partitions from the different input streams to same task of a container. However, when a input stream repartitioning happens, key space of a partition gets redistributed. This will make the stateful jobs to produce erroneous results.

So if the partition count of input stream is increased then the users have to manually purge the changelog topics, local RocksDb state of their stateful jobs. This  results in an increased operational complexity and data loss.




  was:
Samza users may need to increase the partition count of the input streams of their stateful samza jobs. For example, Kafka needs to limit the maximum size of each partition to scale up its performance. Thus the number of partitions of a Kafka topic needs to be expanded to reduce the partition size if the average byte-in-rate or retention time of the Kafka topic has doubled.

In order to perform a join between streams, stateful jobs generally route the partitions from the different input streams to same task of a container. However, when a input stream repartitioning happens, key space of a partition gets redistributed. This will make the stateful jobs to produce erroneous results.

So if the partition count of input stream is increased then the users have to manually purge the changelog topics, local RocksDb state of their stateful jobs. This  results in an increased operational complexity and data loss.

This patch takes a first stab at solving the above problem and is comprised of the following changes:

1. Introduce a new group method in `SystemStreamPartitionGrouper` interface to generate task assignment factoring in the partition expansion of input streams.
2. Introduced a `StreamPartitionMapper` abstraction to allow the user to plugin the input stream partitioning function.  
3. Fixed the existing unit tests and added new unit tests to validate the new grouper changes.





> SystemStreamGrouper interface change for SEP-5
> ----------------------------------------------
>
>                 Key: SAMZA-1989
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1989
>             Project: Samza
>          Issue Type: New Feature
>            Reporter: Shanthoosh Venkataraman
>            Assignee: Shanthoosh Venkataraman
>            Priority: Major
>
> Samza users may need to increase the partition count of the input streams of their stateful samza jobs. For example, Kafka needs to limit the maximum size of each partition to scale up its performance. Thus the number of partitions of a Kafka topic needs to be expanded to reduce the partition size if the average byte-in-rate or retention time of the Kafka topic has doubled.
> In order to perform a join between streams, stateful jobs generally route the partitions from the different input streams to same task of a container. However, when a input stream repartitioning happens, key space of a partition gets redistributed. This will make the stateful jobs to produce erroneous results.
> So if the partition count of input stream is increased then the users have to manually purge the changelog topics, local RocksDb state of their stateful jobs. This  results in an increased operational complexity and data loss.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)