You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Xiang Zhang <xi...@stripe.com> on 2021/09/08 18:09:52 UTC

Allocation-preserving scheduling and task-local recovery

Hello,

We have an app running on Flink 1.10.2 deployed in standalone mode. We
enabled task-local recovery by setting both *state.backend.local-recovery *and
*state.backend.rocksdb.localdir*. The app has over 100 task managers and 2
job managers (active and passive).

This is what we have observed. When we restarted a task manager, all tasks
got canceled (due to the default failover configuration
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/#failover-strategies>).
Then these tasks were re-distributed among the task manager (e.g. some
tasks manager have more slots used than before restart). This caused all
task managers to download state from remote storage all over again.

The same thing happened when we restarted a job manager. The job manager
failed over to the passive one successfully, however all tasks were
canceled and reallocated among the task managers again.

My understanding is that if task-local recovery is enabled, Flink will try
to enable sticky assignment of tasks to previous task managers they run on.
This doesn't seem to be the case. My question is how we can enable
this allocation-preserving
scheduling
<https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/large_state_tuning/#allocation-preserving-scheduling>
when Flink handles failures.

Thanks,
Xiang

Re: Allocation-preserving scheduling and task-local recovery

Posted by Xiang Zhang <xi...@stripe.com>.
Robert, thank you for your reply! 

I tried to remove "cluster.evenly-spread-out-slots", and then tested two scenarios: 1) restart the leader job manager; 2) restart a single task manager. These tests are done in a testing environment where I have six task managers and only four tasks to schedule. Without this spread-out setting, task assignments are neither sticky nor evenly spread out, which is a degraded behavior for our case. 

For your other question, yes, I only restarted a single task manager (just bring down the service and restart it again but the host was kept running). All other task manager services kept running.

I had a quick look at the code you pointed out, and it does appear that it should prefer previous allocation if it's available. I wonder if for some reason, this previous allocation information becomes not available but I haven't dug deep enough yet. 

Regarding the logs, apart from the normal logs from the job manager, I think these might be a bit suspicious. 

First of all, there are a bunch of entries like this: 
INFO  SlotPoolImpl:372 - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{5ba754cd04ed8474a5b9eb218d6b5ffc}]
However, right after these lines, it printed out that a resource manager was registered:
INFO  JobMaster:960 - JobManager successfully registered at ResourceManager, leader id: 81f1266070acb5599b96265e69d840ad.

In addition, I see a bunch these when the job manager requests for new slots:
INFO  SlotPoolImpl:322 - Requesting new slot [SlotRequestId{d7864d265e19bede1456d523aec5e6d6}] and profile ResourceProfile{UNKNOWN} from resource manager.
Here the ResourceProfile is "Unknown". This could be a red-herring though. 


On 2021/09/09 12:05:06, Robert Metzger <rm...@apache.org> wrote: 
> Hi,
> from my understanding of the code [1], the task scheduling first considers
> the state location, and then uses the evenly spread out scheduling strategy
> as a fall back. So in my understanding of the code, the local recovery
> should have preference over the evenly spread out strategy.
> 
> If you can easily test it, I would still recommend removing the
> "cluster.evenly-spread-out-slots" strategy, just to make sure my
> understanding is really correct.
> 
> I don't think that's the case, but just to make sure: You are only
> restarting a single task manager, right? The other task managers keep
> running? (Afaik the state information is lost of a TaskManager restarts)
> 
> Sorry that I don't have a real answer here (yet). Is there anything
> suspicious in the JobManager or TaskManager logs?
> 
> 
> [1]
> https://github.com/apache/flink/blob/release-1.10/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
> 
> On Wed, Sep 8, 2021 at 9:44 PM Xiang Zhang <xi...@stripe.com> wrote:
> 
> > We also have this configuration set in case it makes any difference when
> > allocation tasks: cluster.evenly-spread-out-slots.
> >
> > On 2021/09/08 18:09:52, Xiang Zhang <xi...@stripe.com> wrote:
> > > Hello,
> > >
> > > We have an app running on Flink 1.10.2 deployed in standalone mode. We
> > > enabled task-local recovery by setting both
> > *state.backend.local-recovery *and
> > > *state.backend.rocksdb.localdir*. The app has over 100 task managers and
> > 2
> > > job managers (active and passive).
> > >
> > > This is what we have observed. When we restarted a task manager, all
> > tasks
> > > got canceled (due to the default failover configuration
> > > <
> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/#failover-strategies
> > >).
> > > Then these tasks were re-distributed among the task manager (e.g. some
> > > tasks manager have more slots used than before restart). This caused all
> > > task managers to download state from remote storage all over again.
> > >
> > > The same thing happened when we restarted a job manager. The job manager
> > > failed over to the passive one successfully, however all tasks were
> > > canceled and reallocated among the task managers again.
> > >
> > > My understanding is that if task-local recovery is enabled, Flink will
> > try
> > > to enable sticky assignment of tasks to previous task managers they run
> > on.
> > > This doesn't seem to be the case. My question is how we can enable
> > > this allocation-preserving
> > > scheduling
> > > <
> > https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/large_state_tuning/#allocation-preserving-scheduling
> > >
> > > when Flink handles failures.
> > >
> > > Thanks,
> > > Xiang
> > >
> >
> 

Re: Allocation-preserving scheduling and task-local recovery

Posted by Robert Metzger <rm...@apache.org>.
Hi,
from my understanding of the code [1], the task scheduling first considers
the state location, and then uses the evenly spread out scheduling strategy
as a fall back. So in my understanding of the code, the local recovery
should have preference over the evenly spread out strategy.

If you can easily test it, I would still recommend removing the
"cluster.evenly-spread-out-slots" strategy, just to make sure my
understanding is really correct.

I don't think that's the case, but just to make sure: You are only
restarting a single task manager, right? The other task managers keep
running? (Afaik the state information is lost of a TaskManager restarts)

Sorry that I don't have a real answer here (yet). Is there anything
suspicious in the JobManager or TaskManager logs?


[1]
https://github.com/apache/flink/blob/release-1.10/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java

On Wed, Sep 8, 2021 at 9:44 PM Xiang Zhang <xi...@stripe.com> wrote:

> We also have this configuration set in case it makes any difference when
> allocation tasks: cluster.evenly-spread-out-slots.
>
> On 2021/09/08 18:09:52, Xiang Zhang <xi...@stripe.com> wrote:
> > Hello,
> >
> > We have an app running on Flink 1.10.2 deployed in standalone mode. We
> > enabled task-local recovery by setting both
> *state.backend.local-recovery *and
> > *state.backend.rocksdb.localdir*. The app has over 100 task managers and
> 2
> > job managers (active and passive).
> >
> > This is what we have observed. When we restarted a task manager, all
> tasks
> > got canceled (due to the default failover configuration
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/#failover-strategies
> >).
> > Then these tasks were re-distributed among the task manager (e.g. some
> > tasks manager have more slots used than before restart). This caused all
> > task managers to download state from remote storage all over again.
> >
> > The same thing happened when we restarted a job manager. The job manager
> > failed over to the passive one successfully, however all tasks were
> > canceled and reallocated among the task managers again.
> >
> > My understanding is that if task-local recovery is enabled, Flink will
> try
> > to enable sticky assignment of tasks to previous task managers they run
> on.
> > This doesn't seem to be the case. My question is how we can enable
> > this allocation-preserving
> > scheduling
> > <
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/large_state_tuning/#allocation-preserving-scheduling
> >
> > when Flink handles failures.
> >
> > Thanks,
> > Xiang
> >
>

Re: Allocation-preserving scheduling and task-local recovery

Posted by Xiang Zhang <xi...@stripe.com>.
We also have this configuration set in case it makes any difference when allocation tasks: cluster.evenly-spread-out-slots.

On 2021/09/08 18:09:52, Xiang Zhang <xi...@stripe.com> wrote: 
> Hello,
> 
> We have an app running on Flink 1.10.2 deployed in standalone mode. We
> enabled task-local recovery by setting both *state.backend.local-recovery *and
> *state.backend.rocksdb.localdir*. The app has over 100 task managers and 2
> job managers (active and passive).
> 
> This is what we have observed. When we restarted a task manager, all tasks
> got canceled (due to the default failover configuration
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/#failover-strategies>).
> Then these tasks were re-distributed among the task manager (e.g. some
> tasks manager have more slots used than before restart). This caused all
> task managers to download state from remote storage all over again.
> 
> The same thing happened when we restarted a job manager. The job manager
> failed over to the passive one successfully, however all tasks were
> canceled and reallocated among the task managers again.
> 
> My understanding is that if task-local recovery is enabled, Flink will try
> to enable sticky assignment of tasks to previous task managers they run on.
> This doesn't seem to be the case. My question is how we can enable
> this allocation-preserving
> scheduling
> <https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/large_state_tuning/#allocation-preserving-scheduling>
> when Flink handles failures.
> 
> Thanks,
> Xiang
>