You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by shimin yang <ys...@gmail.com> on 2018/11/08 02:13:22 UTC

[DISCUSS]Rethink the rescale operation, can we do it async

Currently, the rescale operation is to stop the whole job and restart it
with different parrellism. But the rescale operation cost a lot and took
lots of time to recover if the state size is quite big.

And a long-time rescale might cause other problems like latency increase
and back pressure. For some circumstances like a streaming computing cloud
service, users may be very sensitive to latency and resource usage. So it
would be better to make the rescale a cheaper operation.

I wonder if we could make it an async operation just like checkpoint. But
how to deal with the keyed state would be a pain in the ass. Currently I
just want to make some assumption to make things simpler. The asnyc rescale
operation can only double the parrellism or make it half.

In the scale up circumstance, we can copy the state to the newly created
worker and change the partitioner of the upstream. The best timing might be
get notified of checkpoint completed. But we still need to change the
partitioner of upstream. So the upstream should buffer the result or block
the computation util the state copy finished. Then make the partitioner to
send differnt elements with the same key to the same downstream operator.

In the scale down scenario, we can merge the keyed state of two operators
and also change the partitioner of upstream.

Re: [DISCUSS]Rethink the rescale operation, can we do it async

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

in general, I think it is clearly a good idea to make rescaling as cheap and as dynamic as possible and this is a goal that is surely on the long term roadmap of Flink.
However, from a technical point of view I think that things are not as simple if you go into details, and details is what the proposal lacks so far. For example, right now it is not yet possible to even modify the shape of an execution dynamically while the job is running (changing parallelism without restart) and the scheduling is not really aware of the position of keyed state partitions. Also the state repartitioning itself has some tricky details, like after repartitioning state in a consistent way the job is still making progress, so how does the state of new operators catch up with those changes, and all of that in a consistent way that does not violate exactly once. We have a bunch of ideas how to tackle those problems in different stages towards a goal that might be similar to what you describe. For example, an intermediate step could be that you still need to briefly stop and restart the job, but we leverage local recovery to speed up the redeployment and each operator is scheduled to an instance that is preloaded with the repartitioned state to continue, to minimise downtime. I think we would also solve it in a general way that does not have limitations like being only able to rescale up and down by a factor of 2. So you can expect to see many steps towards this in the future, but I doubt that there is a quick fix by “just make it async”.

Best,
Stefan

> On 8. Nov 2018, at 03:13, shimin yang <ys...@gmail.com> wrote:
> 
> Currently, the rescale operation is to stop the whole job and restart it
> with different parrellism. But the rescale operation cost a lot and took
> lots of time to recover if the state size is quite big.
> 
> And a long-time rescale might cause other problems like latency increase
> and back pressure. For some circumstances like a streaming computing cloud
> service, users may be very sensitive to latency and resource usage. So it
> would be better to make the rescale a cheaper operation.
> 
> I wonder if we could make it an async operation just like checkpoint. But
> how to deal with the keyed state would be a pain in the ass. Currently I
> just want to make some assumption to make things simpler. The asnyc rescale
> operation can only double the parrellism or make it half.
> 
> In the scale up circumstance, we can copy the state to the newly created
> worker and change the partitioner of the upstream. The best timing might be
> get notified of checkpoint completed. But we still need to change the
> partitioner of upstream. So the upstream should buffer the result or block
> the computation util the state copy finished. Then make the partitioner to
> send differnt elements with the same key to the same downstream operator.
> 
> In the scale down scenario, we can merge the keyed state of two operators
> and also change the partitioner of upstream.