You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Adam Bellemare <ad...@gmail.com> on 2020/08/06 16:57:12 UTC

Status of dynamic worker scaling with Kafka consumers

Hi Folks

When processing events from Kafka, it seems that, from my reading, the
distribution of partitions maps directly to the worker via the concept of
'splits' :

https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54

From the code:

> The partitions are evenly distributed among the splits. The number of
splits returned is {@code
> min(desiredNumSplits, totalNumPartitions)}, though better not to depend
on the exact count.

> <p>It is important to assign the partitions deterministically so that we
can support resuming a
> split from last checkpoint. The Kafka partitions are sorted by {@code
<topic, partition>} and then
> assigned to splits in round-robin order.

I'm not intimately familiar with Beam's execution model, but my reading of
this code suggests that:
1) Beam allocates partitions to workers once, at creation time
2) This implies that once started, the worker count cannot be changed as
the partitions are not redistributed
3) Any state is tied to the split, which is in turn tied to the worker.
This means outside of, say, a global window
<https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/windowing/GlobalWindow.html>,
materialized kafka state is "localized" to a worker.

Follow up Q:
4) Is this independent of the runner? I am much more familiar with Apache
Spark as a runner than say, Dataflow.

If any could confirm or refute my 3 statements and 1 question, it would go
a long way towards validating my understanding of Beam's current
relationship to scaling and partitioned data locality with Kafka.

Thanks

Re: Status of dynamic worker scaling with Kafka consumers

Posted by Adam Bellemare <ad...@gmail.com>.
Thank you Alexey, I appreciate your responses.

On Tue, Aug 11, 2020 at 10:57 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> Hi Adam,
>
> 1) Correct. Current KafkaIO.Read implementation is based on Beam
> “UnboundedSource” which requires to have fixed number of splits at DAG
> construction time.
> 2) Correct.
>
> Dynamic topics and partitions discovering is a long story in Beam. Since
> you are interested in this, it would be worth to take a look on these
> discussions [1][2]. One of the way to have it in Beam is to use
> SplittableDoFn [3] instead of UnboundedSource API. As I mentioned before,
> there is ongoing work on this to make KafkaIO read with SDF [4] and that
> should allow in the future to discover new partitions/topics in runtime.
>
> [1] https://issues.apache.org/jira/browse/BEAM-5786
> [2] https://issues.apache.org/jira/browse/BEAM-727
> [3] https://beam.apache.org/blog/splittable-do-fn/
> [4] https://issues.apache.org/jira/browse/BEAM-9977
>
> On 11 Aug 2020, at 15:01, Adam Bellemare <ad...@gmail.com> wrote:
>
> Hello Alexey
>
> Thank you for replying to my questions. A number of my colleagues have
> been musing about the idea of dynamically changing the partition count of
> Apache Kafka's input topics for Beam jobs during runtime (We intend to use
> the Google Dataflow runner for our jobs). I have been hesitant to endorse
> such an operation because my understanding of Beam at this point in time is
> that dynamically scaling the topic partition count up will not be
> automatically detected by the Beam job, such that these partitions will go
> unassigned until the job is restarted.
>
> This, of course, ignores the impact to the state stores, particularly
> data-locality issues. My understanding here (again) is that Beam stores
> keyed state in alignment with the kafka partitions, and so changing the
> partition count would affect the distribution of state significantly (which
> is my primary reason to oppose this operation).
>
> In sum, if you (or anyone else reading this email!) could refute or
> support these statements I would be very grateful:
> 1) Beam doesn't support dynamic upscaling of Kafka partition counts. The
> job needs to be restarted to pick new partitions up (which is in line with
> many other stream processors, and not something I would consider a defect)
> 2) A job's state pertaining to a Kafka source (such as materializing a
> stream) is divided along the Kafka partition boundaries.
>
> Thanks!
>
> On Mon, Aug 10, 2020 at 1:08 PM Alexey Romanenko <ar...@gmail.com>
> wrote:
>
>> Hi Adam,
>>
>> 1) Yes, correct. Though, there is ongoing work to do it in runtime and
>> support topics/partitions discovering.
>>
>> 2) Yes but in case of worker fails, its task (read from specific
>> partition in case of KafkaIO) will be assigned to different one. How? It
>> depends on underlying data processing engine.
>>
>> 3) In general - yes, but some specific things, like storing the
>> checkpoints for unbounded sources, could be different in terms of
>> implementation. Though, Beam model should be applied in the same way for
>> different runners, however, the implementation can vary. This is actually
>> why Beam runners exist - they apply Beam model on different data processing
>> engine and make it unified for Beam users.
>>
>> 4) Please, see 3)
>>
>> I hope it will shed some light =) Please, let us know if you have more
>> questions.
>>
>> Regards,
>> Alexey
>>
>> On 6 Aug 2020, at 18:57, Adam Bellemare <ad...@gmail.com> wrote:
>>
>> Hi Folks
>>
>> When processing events from Kafka, it seems that, from my reading, the
>> distribution of partitions maps directly to the worker via the concept of
>> 'splits' :
>>
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
>>
>> From the code:
>>
>> > The partitions are evenly distributed among the splits. The number of
>> splits returned is {@code
>> > min(desiredNumSplits, totalNumPartitions)}, though better not to depend
>> on the exact count.
>>
>> > <p>It is important to assign the partitions deterministically so that
>> we can support resuming a
>> > split from last checkpoint. The Kafka partitions are sorted by {@code
>> <topic, partition>} and then
>> > assigned to splits in round-robin order.
>>
>> I'm not intimately familiar with Beam's execution model, but my reading
>> of this code suggests that:
>> 1) Beam allocates partitions to workers once, at creation time
>> 2) This implies that once started, the worker count cannot be changed as
>> the partitions are not redistributed
>> 3) Any state is tied to the split, which is in turn tied to the worker.
>> This means outside of, say, a global window
>> <https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/windowing/GlobalWindow.html>,
>> materialized kafka state is "localized" to a worker.
>>
>> Follow up Q:
>> 4) Is this independent of the runner? I am much more familiar with Apache
>> Spark as a runner than say, Dataflow.
>>
>> If any could confirm or refute my 3 statements and 1 question, it would
>> go a long way towards validating my understanding of Beam's current
>> relationship to scaling and partitioned data locality with Kafka.
>>
>> Thanks
>>
>>
>>
>

Re: Status of dynamic worker scaling with Kafka consumers

Posted by Alexey Romanenko <ar...@gmail.com>.
Hi Adam,

1) Correct. Current KafkaIO.Read implementation is based on Beam “UnboundedSource” which requires to have fixed number of splits at DAG construction time.
2) Correct.

Dynamic topics and partitions discovering is a long story in Beam. Since you are interested in this, it would be worth to take a look on these discussions [1][2]. One of the way to have it in Beam is to use SplittableDoFn [3] instead of UnboundedSource API. As I mentioned before, there is ongoing work on this to make KafkaIO read with SDF [4] and that should allow in the future to discover new partitions/topics in runtime.

[1] https://issues.apache.org/jira/browse/BEAM-5786
[2] https://issues.apache.org/jira/browse/BEAM-727 
[3] https://beam.apache.org/blog/splittable-do-fn/
[4] https://issues.apache.org/jira/browse/BEAM-9977

> On 11 Aug 2020, at 15:01, Adam Bellemare <ad...@gmail.com> wrote:
> 
> Hello Alexey
> 
> Thank you for replying to my questions. A number of my colleagues have been musing about the idea of dynamically changing the partition count of Apache Kafka's input topics for Beam jobs during runtime (We intend to use the Google Dataflow runner for our jobs). I have been hesitant to endorse such an operation because my understanding of Beam at this point in time is that dynamically scaling the topic partition count up will not be automatically detected by the Beam job, such that these partitions will go unassigned until the job is restarted. 
> 
> This, of course, ignores the impact to the state stores, particularly data-locality issues. My understanding here (again) is that Beam stores keyed state in alignment with the kafka partitions, and so changing the partition count would affect the distribution of state significantly (which is my primary reason to oppose this operation).
> 
> In sum, if you (or anyone else reading this email!) could refute or support these statements I would be very grateful:
> 1) Beam doesn't support dynamic upscaling of Kafka partition counts. The job needs to be restarted to pick new partitions up (which is in line with many other stream processors, and not something I would consider a defect)
> 2) A job's state pertaining to a Kafka source (such as materializing a stream) is divided along the Kafka partition boundaries.
> 
> Thanks!
> 
> On Mon, Aug 10, 2020 at 1:08 PM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
> Hi Adam,
> 
> 1) Yes, correct. Though, there is ongoing work to do it in runtime and support topics/partitions discovering. 
> 
> 2) Yes but in case of worker fails, its task (read from specific partition in case of KafkaIO) will be assigned to different one. How? It depends on underlying data processing engine.
> 
> 3) In general - yes, but some specific things, like storing the checkpoints for unbounded sources, could be different in terms of implementation. Though, Beam model should be applied in the same way for different runners, however, the implementation can vary. This is actually why Beam runners exist - they apply Beam model on different data processing engine and make it unified for Beam users.
> 
> 4) Please, see 3)
> 
> I hope it will shed some light =) Please, let us know if you have more questions.
> 
> Regards,
> Alexey
> 
>> On 6 Aug 2020, at 18:57, Adam Bellemare <adam.bellemare@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi Folks
>> 
>> When processing events from Kafka, it seems that, from my reading, the distribution of partitions maps directly to the worker via the concept of 'splits' :
>> 
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54 <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54>
>> 
>> From the code:
>> 
>> > The partitions are evenly distributed among the splits. The number of splits returned is {@code 
>> > min(desiredNumSplits, totalNumPartitions)}, though better not to depend on the exact count.
>> 
>> > <p>It is important to assign the partitions deterministically so that we can support resuming a 
>> > split from last checkpoint. The Kafka partitions are sorted by {@code <topic, partition>} and then
>> > assigned to splits in round-robin order.
>> 
>> I'm not intimately familiar with Beam's execution model, but my reading of this code suggests that:
>> 1) Beam allocates partitions to workers once, at creation time
>> 2) This implies that once started, the worker count cannot be changed as the partitions are not redistributed
>> 3) Any state is tied to the split, which is in turn tied to the worker. This means outside of, say, a global window <https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/windowing/GlobalWindow.html>, materialized kafka state is "localized" to a worker.
>> 
>> Follow up Q:
>> 4) Is this independent of the runner? I am much more familiar with Apache Spark as a runner than say, Dataflow. 
>> 
>> If any could confirm or refute my 3 statements and 1 question, it would go a long way towards validating my understanding of Beam's current relationship to scaling and partitioned data locality with Kafka. 
>> 
>> Thanks
>> 
> 


Re: Status of dynamic worker scaling with Kafka consumers

Posted by Adam Bellemare <ad...@gmail.com>.
Hello Alexey

Thank you for replying to my questions. A number of my colleagues have been
musing about the idea of dynamically changing the partition count of Apache
Kafka's input topics for Beam jobs during runtime (We intend to use the
Google Dataflow runner for our jobs). I have been hesitant to endorse such
an operation because my understanding of Beam at this point in time is that
dynamically scaling the topic partition count up will not be automatically
detected by the Beam job, such that these partitions will go unassigned
until the job is restarted.

This, of course, ignores the impact to the state stores, particularly
data-locality issues. My understanding here (again) is that Beam stores
keyed state in alignment with the kafka partitions, and so changing the
partition count would affect the distribution of state significantly (which
is my primary reason to oppose this operation).

In sum, if you (or anyone else reading this email!) could refute or support
these statements I would be very grateful:
1) Beam doesn't support dynamic upscaling of Kafka partition counts. The
job needs to be restarted to pick new partitions up (which is in line with
many other stream processors, and not something I would consider a defect)
2) A job's state pertaining to a Kafka source (such as materializing a
stream) is divided along the Kafka partition boundaries.

Thanks!

On Mon, Aug 10, 2020 at 1:08 PM Alexey Romanenko <ar...@gmail.com>
wrote:

> Hi Adam,
>
> 1) Yes, correct. Though, there is ongoing work to do it in runtime and
> support topics/partitions discovering.
>
> 2) Yes but in case of worker fails, its task (read from specific partition
> in case of KafkaIO) will be assigned to different one. How? It depends on
> underlying data processing engine.
>
> 3) In general - yes, but some specific things, like storing the
> checkpoints for unbounded sources, could be different in terms of
> implementation. Though, Beam model should be applied in the same way for
> different runners, however, the implementation can vary. This is actually
> why Beam runners exist - they apply Beam model on different data processing
> engine and make it unified for Beam users.
>
> 4) Please, see 3)
>
> I hope it will shed some light =) Please, let us know if you have more
> questions.
>
> Regards,
> Alexey
>
> On 6 Aug 2020, at 18:57, Adam Bellemare <ad...@gmail.com> wrote:
>
> Hi Folks
>
> When processing events from Kafka, it seems that, from my reading, the
> distribution of partitions maps directly to the worker via the concept of
> 'splits' :
>
>
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
>
> From the code:
>
> > The partitions are evenly distributed among the splits. The number of
> splits returned is {@code
> > min(desiredNumSplits, totalNumPartitions)}, though better not to depend
> on the exact count.
>
> > <p>It is important to assign the partitions deterministically so that we
> can support resuming a
> > split from last checkpoint. The Kafka partitions are sorted by {@code
> <topic, partition>} and then
> > assigned to splits in round-robin order.
>
> I'm not intimately familiar with Beam's execution model, but my reading of
> this code suggests that:
> 1) Beam allocates partitions to workers once, at creation time
> 2) This implies that once started, the worker count cannot be changed as
> the partitions are not redistributed
> 3) Any state is tied to the split, which is in turn tied to the worker.
> This means outside of, say, a global window
> <https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/windowing/GlobalWindow.html>,
> materialized kafka state is "localized" to a worker.
>
> Follow up Q:
> 4) Is this independent of the runner? I am much more familiar with Apache
> Spark as a runner than say, Dataflow.
>
> If any could confirm or refute my 3 statements and 1 question, it would go
> a long way towards validating my understanding of Beam's current
> relationship to scaling and partitioned data locality with Kafka.
>
> Thanks
>
>
>

Re: Status of dynamic worker scaling with Kafka consumers

Posted by Alexey Romanenko <ar...@gmail.com>.
Hi Adam,

1) Yes, correct. Though, there is ongoing work to do it in runtime and support topics/partitions discovering. 

2) Yes but in case of worker fails, its task (read from specific partition in case of KafkaIO) will be assigned to different one. How? It depends on underlying data processing engine.

3) In general - yes, but some specific things, like storing the checkpoints for unbounded sources, could be different in terms of implementation. Though, Beam model should be applied in the same way for different runners, however, the implementation can vary. This is actually why Beam runners exist - they apply Beam model on different data processing engine and make it unified for Beam users.

4) Please, see 3)

I hope it will shed some light =) Please, let us know if you have more questions.

Regards,
Alexey

> On 6 Aug 2020, at 18:57, Adam Bellemare <ad...@gmail.com> wrote:
> 
> Hi Folks
> 
> When processing events from Kafka, it seems that, from my reading, the distribution of partitions maps directly to the worker via the concept of 'splits' :
> 
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54 <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54>
> 
> From the code:
> 
> > The partitions are evenly distributed among the splits. The number of splits returned is {@code 
> > min(desiredNumSplits, totalNumPartitions)}, though better not to depend on the exact count.
> 
> > <p>It is important to assign the partitions deterministically so that we can support resuming a 
> > split from last checkpoint. The Kafka partitions are sorted by {@code <topic, partition>} and then
> > assigned to splits in round-robin order.
> 
> I'm not intimately familiar with Beam's execution model, but my reading of this code suggests that:
> 1) Beam allocates partitions to workers once, at creation time
> 2) This implies that once started, the worker count cannot be changed as the partitions are not redistributed
> 3) Any state is tied to the split, which is in turn tied to the worker. This means outside of, say, a global window <https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/windowing/GlobalWindow.html>, materialized kafka state is "localized" to a worker.
> 
> Follow up Q:
> 4) Is this independent of the runner? I am much more familiar with Apache Spark as a runner than say, Dataflow. 
> 
> If any could confirm or refute my 3 statements and 1 question, it would go a long way towards validating my understanding of Beam's current relationship to scaling and partitioned data locality with Kafka. 
> 
> Thanks
>