You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jozef Vilcek <jo...@gmail.com> on 2018/11/16 09:57:51 UTC

Flink operator max parallelism and rescalable jobs

Hi,

I want to collect some feedback on rescaling streaming Beam pipeline on
Flink runner. Flink seems to be able to re-scale jobs, which in Beam terms
means changing the parallelism in Beam. However, one have to make sure that
state can rescale as well to the predefined MAX parallelism. Max
parallelism must be set for job on FlinkRunner.

Flink supports fiddling with max parallelism on global, environment and
operator level. Changes in operator level are not possible with beam. I
found this JIRA which seems to be inconclusive if changes in operator
parallelism make sense to adopt somehow in Beam
https://issues.apache.org/jira/browse/BEAM-68

I did try to set max parallelism to environment via my local patch. My job
did launch and not crash like before when I bumped parallelism += 1. But
there was one drawback as far as I know. My test job reads from kafka and
after launching job from savepoint point, one partition does not continue
from offset in savepoint but according to what is defined by
auto.offset.reset (my case 'latest') which is not great.

My questions:

1. Should re-scale work for beam if runner does support it or there can be
some incompatibilities in general depending on how particular runner works

2. Did anyone have a success with Flink and rescale? Honestly, not sure how
well it behaves in native Flink. Never tried it

3. Why does kafka not redistribute stored partition offsets after chenging
parallelism?

4. Is BEAM-68 still relevant?

Many thanks,
Jozef

Re: Flink operator max parallelism and rescalable jobs

Posted by Jozef Vilcek <jo...@gmail.com>.
Hey Max, thanks for the pointer to UnboundedSourceWrapper.
I have created BEAM-6077 and will try to come up with the patch

On Fri, Nov 16, 2018 at 12:41 PM Maximilian Michels <mx...@apache.org> wrote:

> Hi Jozef,
>
> The main blocker for rescaling Beam pipelines on Flink was the use of
> Key Group state. This splits each operator state additionally into N
> partitions, such that N * P = MAX_PARALLELISM, where P is the
> parallelism of the operator.
>
> This has largely been done. However, it is not complete. If you look at
> the way the UnboundedSourceWrapper snapshots its state, you will see
> that it does not support Key Groups. Thus, if you increase the
> parallelism, one of the new parallel instances of the operator will
> _not_ receive state and thus behave differently.
>
> I think we could migrate UnboundedSourceWrapper to KeyGroups and then
> also leverage spread of the Kafka partitions.
>
> Thanks,
> Max
>
> On 16.11.18 10:57, Jozef Vilcek wrote:
> > Hi,
> >
> > I want to collect some feedback on rescaling streaming Beam pipeline on
> > Flink runner. Flink seems to be able to re-scale jobs, which in Beam
> > terms means changing the parallelism in Beam. However, one have to make
> > sure that state can rescale as well to the predefined MAX parallelism.
> > Max parallelism must be set for job on FlinkRunner.
> >
> > Flink supports fiddling with max parallelism on global, environment and
> > operator level. Changes in operator level are not possible with beam. I
> > found this JIRA which seems to be inconclusive if changes in operator
> > parallelism make sense to adopt somehow in Beam
> > https://issues.apache.org/jira/browse/BEAM-68
> >
> > I did try to set max parallelism to environment via my local patch. My
> > job did launch and not crash like before when I bumped parallelism += 1.
> > But there was one drawback as far as I know. My test job reads from
> > kafka and after launching job from savepoint point, one partition does
> > not continue from offset in savepoint but according to what is defined
> > by auto.offset.reset (my case 'latest') which is not great.
> >
> > My questions:
> >
> > 1. Should re-scale work for beam if runner does support it or there can
> > be some incompatibilities in general depending on how particular runner
> > works
> >
> > 2. Did anyone have a success with Flink and rescale? Honestly, not sure
> > how well it behaves in native Flink. Never tried it
> >
> > 3. Why does kafka not redistribute stored partition offsets after
> > chenging parallelism?
> >
> > 4. Is BEAM-68 still relevant?
> >
> > Many thanks,
> > Jozef
>

Re: Flink operator max parallelism and rescalable jobs

Posted by Maximilian Michels <mx...@apache.org>.
Hi Jozef,

The main blocker for rescaling Beam pipelines on Flink was the use of 
Key Group state. This splits each operator state additionally into N 
partitions, such that N * P = MAX_PARALLELISM, where P is the 
parallelism of the operator.

This has largely been done. However, it is not complete. If you look at 
the way the UnboundedSourceWrapper snapshots its state, you will see 
that it does not support Key Groups. Thus, if you increase the 
parallelism, one of the new parallel instances of the operator will 
_not_ receive state and thus behave differently.

I think we could migrate UnboundedSourceWrapper to KeyGroups and then 
also leverage spread of the Kafka partitions.

Thanks,
Max

On 16.11.18 10:57, Jozef Vilcek wrote:
> Hi,
> 
> I want to collect some feedback on rescaling streaming Beam pipeline on 
> Flink runner. Flink seems to be able to re-scale jobs, which in Beam 
> terms means changing the parallelism in Beam. However, one have to make 
> sure that state can rescale as well to the predefined MAX parallelism. 
> Max parallelism must be set for job on FlinkRunner.
> 
> Flink supports fiddling with max parallelism on global, environment and 
> operator level. Changes in operator level are not possible with beam. I 
> found this JIRA which seems to be inconclusive if changes in operator 
> parallelism make sense to adopt somehow in Beam
> https://issues.apache.org/jira/browse/BEAM-68
> 
> I did try to set max parallelism to environment via my local patch. My 
> job did launch and not crash like before when I bumped parallelism += 1. 
> But there was one drawback as far as I know. My test job reads from 
> kafka and after launching job from savepoint point, one partition does 
> not continue from offset in savepoint but according to what is defined 
> by auto.offset.reset (my case 'latest') which is not great.
> 
> My questions:
> 
> 1. Should re-scale work for beam if runner does support it or there can 
> be some incompatibilities in general depending on how particular runner 
> works
> 
> 2. Did anyone have a success with Flink and rescale? Honestly, not sure 
> how well it behaves in native Flink. Never tried it
> 
> 3. Why does kafka not redistribute stored partition offsets after 
> chenging parallelism?
> 
> 4. Is BEAM-68 still relevant?
> 
> Many thanks,
> Jozef