You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Zach Cox <zc...@gmail.com> on 2016/02/17 15:16:19 UTC

Changing parallelism

Hi - we are building a stateful Flink streaming job that will run
indefinitely. One part of the job builds up state per key in a global
window that will need to exist for a very long time. We will definitely be
using the savepoints to restore job state after new code deploys.

We were planning to be able to increase the parallelism of the job
incrementally over time, as the volume of input data grows. We also have a
large amount of historical data loaded into Kafka we'd like to reprocess
initially with the streaming job to backfill Elasticsearch, and then
transition the job seamlessly to nearline processing. We were planning to
use a large parallelism during the historical reprocessing, and then
decrease it when the job has caught up to new events.

However, the savepoint docs state that the job parallelism cannot be
changed over time [1]. Does this mean we need to use the same, fixed
parallelism=n during reprocessing and going forward? Are there any tricks
or workarounds we could use to still make changes to parallelism and take
advantage of savepoints?

Thanks,
Zach

[1]
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html#current-limitations

Re: Changing parallelism

Posted by Zach Cox <zc...@gmail.com>.
Hi Ufuk - thanks for the 2016 roadmap - glad to see changing parallelism is
the first bullet :)  Mesos support also sounds great, we're currently
running job and task managers on Mesos statically via Marathon.

Hi Stephan - thanks, that trick sounds pretty clever, I will try wrapping
my head around using 2 different jobs and uids like that.

-Zach


On Thu, Feb 18, 2016 at 7:13 AM Stephan Ewen <se...@apache.org> wrote:

> Hi Zach!
>
> Yes, changing parallelism is pretty high up the priority list. The good
> news is that "scaling in" is the simpler part of changing the parallelism
> and we are pushing to get that in soon.
>
>
> Until then, there is only a pretty ugly trick that you can do right now to
> "rescale' the state:
>
>   1) savepoint with high parallelism
>
>   2) run an intermediate job that has the state twice in two operators,
> once with high parallelism, once with low. Emit the state from the first
> operator, write in the second. The first operator has the operator ID of
> the initial high-parallelism state.
>
>   3) Run the low parallelism job, and the stateful operator needs the ID
> of the second (low parallelism) operator in the intermediate job.
>
>
> Greetings,
> Stephan
>
>
> On Thu, Feb 18, 2016 at 9:24 AM, Ufuk Celebi <uc...@apache.org> wrote:
>
>> Hey Zach!
>>
>> Sounds like a great use case.
>>
>> On Wed, Feb 17, 2016 at 3:16 PM, Zach Cox <zc...@gmail.com> wrote:
>> > However, the savepoint docs state that the job parallelism cannot be
>> changed
>> > over time [1]. Does this mean we need to use the same, fixed
>> parallelism=n
>> > during reprocessing and going forward? Are there any tricks or
>> workarounds
>> > we could use to still make changes to parallelism and take advantage of
>> > savepoints?
>>
>> Yes, currently you have to keep the parallelism fixed. Dynamic scale
>> in and out of programs will have very high priority after the 1.0
>> release [1]. Unfortunately, I'm not aware of any work arounds to
>> overcome this at the moment.
>>
>> – Ufuk
>>
>> [1] https://flink.apache.org/news/2015/12/18/a-year-in-review.html (at
>> the end of the post there is a road map for 2016)
>>
>
>

Re: Changing parallelism

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Abhishek,

state can be emitted from funtions as regular records. There is no way to
share state the local state of a task with other tasks of the same operator
or with other operators.
Flink's key-partitioned state is always scoped to the key of the current
record. It is not possible to iterate over all local state.
If your function implements the Checkpointed interface, you have one state
object for the whole function. In this case, you can see all local state.
However, the Checkpointed functions have the limitation that they cannot be
rescaled.

Best, Fabian

2017-01-08 17:01 GMT+01:00 abhishekrs <ab...@tetrationanalytics.com>:

> Hi Stephan,
>
> I want to pursue your idea. How do I emit state from an operator. Operator
> for me is a rich function. Or will I need a different style operator? I am
> unable to find how to iterate over all state - in open or otherwise (from
> an
> operator).
>
> Are there APIs to inspect the savepoints - using offline programs?
>
> -Abhishek-
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Changing-
> parallelism-tp4967p10911.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Changing parallelism

Posted by abhishekrs <ab...@tetrationanalytics.com>.
Hi Stephan,

I want to pursue your idea. How do I emit state from an operator. Operator
for me is a rich function. Or will I need a different style operator? I am
unable to find how to iterate over all state - in open or otherwise (from an
operator). 

Are there APIs to inspect the savepoints - using offline programs?

-Abhishek-



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-parallelism-tp4967p10911.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Changing parallelism

Posted by Stephan Ewen <se...@apache.org>.
Hi Zach!

Yes, changing parallelism is pretty high up the priority list. The good
news is that "scaling in" is the simpler part of changing the parallelism
and we are pushing to get that in soon.


Until then, there is only a pretty ugly trick that you can do right now to
"rescale' the state:

  1) savepoint with high parallelism

  2) run an intermediate job that has the state twice in two operators,
once with high parallelism, once with low. Emit the state from the first
operator, write in the second. The first operator has the operator ID of
the initial high-parallelism state.

  3) Run the low parallelism job, and the stateful operator needs the ID of
the second (low parallelism) operator in the intermediate job.


Greetings,
Stephan


On Thu, Feb 18, 2016 at 9:24 AM, Ufuk Celebi <uc...@apache.org> wrote:

> Hey Zach!
>
> Sounds like a great use case.
>
> On Wed, Feb 17, 2016 at 3:16 PM, Zach Cox <zc...@gmail.com> wrote:
> > However, the savepoint docs state that the job parallelism cannot be
> changed
> > over time [1]. Does this mean we need to use the same, fixed
> parallelism=n
> > during reprocessing and going forward? Are there any tricks or
> workarounds
> > we could use to still make changes to parallelism and take advantage of
> > savepoints?
>
> Yes, currently you have to keep the parallelism fixed. Dynamic scale
> in and out of programs will have very high priority after the 1.0
> release [1]. Unfortunately, I'm not aware of any work arounds to
> overcome this at the moment.
>
> – Ufuk
>
> [1] https://flink.apache.org/news/2015/12/18/a-year-in-review.html (at
> the end of the post there is a road map for 2016)
>

Re: Changing parallelism

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Zach!

Sounds like a great use case.

On Wed, Feb 17, 2016 at 3:16 PM, Zach Cox <zc...@gmail.com> wrote:
> However, the savepoint docs state that the job parallelism cannot be changed
> over time [1]. Does this mean we need to use the same, fixed parallelism=n
> during reprocessing and going forward? Are there any tricks or workarounds
> we could use to still make changes to parallelism and take advantage of
> savepoints?

Yes, currently you have to keep the parallelism fixed. Dynamic scale
in and out of programs will have very high priority after the 1.0
release [1]. Unfortunately, I'm not aware of any work arounds to
overcome this at the moment.

– Ufuk

[1] https://flink.apache.org/news/2015/12/18/a-year-in-review.html (at
the end of the post there is a road map for 2016)