You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/07/19 08:34:00 UTC

[jira] [Commented] (FLINK-7216) ExecutionGraph can perform concurrent global restarts to scheduling

    [ https://issues.apache.org/jira/browse/FLINK-7216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16092788#comment-16092788 ] 

ASF GitHub Bot commented on FLINK-7216:
---------------------------------------

GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/4364

    [FLINK-7216] [distr. coordination] Guard against concurrent global failover

    **This is one of the blocker issues for the 1.3.2 release.**
    
    ## What is the purpose of the change
    
    This fixed the bug [FLINK-7216](https://issues.apache.org/jira/browse/FLINK-7216) where some race conditions can trigger concurrent failovers, triggering a restart-storm.
    
    The heart of the bug is the fact that we allow initiating another restart while already being in state `RESTARTING`. That was introduced as a safety net to catch exceptions (implementation bugs) that are reported in that state and need a full recovery to ensure consistency.
    
    However, this means that accidentally, multiple restarts may be triggered/queued and then execute after another. While one attempt is executing the failover, the next one will interfere or abort (as detected conflicting) and schedule another recovery, leading to the above mentioned restart storm. The restart storm subsides once one restart attempt makes enough progress (before the other interferes) to actually finish the scheduling phase.
    
    ## Brief change log
    
    This contains three issues, because the first two were needed for a preparing the fix.
      - [FLINK-6665](https://issues.apache.org/jira/browse/FLINK-6665) and [FLINK-6667](https://issues.apache.org/jira/browse/FLINK-6667) introduce an indirection where the `RestartStrategy` does no longer call `restart()` on the `ExecutionGraph` directly. Instead, they call a callback to initiate the restart.
      - The actual fix makes sure that the `globalModVersion` (which tracks global changes such as full restarts in the ExecutionGraph) is unchanged between triggering the restart and executing it. When scheduling multiple restart requests, only one will actually take effect, while the others detect being subsumed.
    
    ## Verifying this change
    
    This change added the following tests:
      - `ExecutionGraphRestartTest#testConcurrentGlobalFailAndRestarts()` tests explicitly that setting
      - `ExecutionGraphRestartTest#testConcurrentLocalFailAndRestart()` tests a similar setup 
    
    The general working of that mechanism is also covered by various existing test in `org.apache.flink.runtime.executiongraph.restart`
    
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): **no**
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no**
      - The serializers: **no**
      - The runtime per-record code paths (performance sensitive): **no**
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **yes**:
    
    It the change affects the restart logic on the `JobManager`.
    
    ## Documentation
    
      - Does this pull request introduce a new feature? **no**
      - If yes, how is the feature documented? **not applicable**
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink concurrent_restarts_13

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4364.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4364
    
----
commit 1abb816d664bdac9d8b9af438769b9f685e768ce
Author: zjureel <zj...@gmail.com>
Date:   2017-07-18T17:27:56Z

    [FLINK-6665] [FLINK-6667] [distributed coordination] Use a callback and a ScheduledExecutor for ExecutionGraph restarts
    
    Initial work by zjureel@gmail.com , improved by sewen@apache.org.

commit ef88524c808766e08d990f3bb69c45b04807c7c2
Author: Stephan Ewen <se...@apache.org>
Date:   2017-07-18T17:49:56Z

    [FLINK-7216] [distr. coordination] Guard against concurrent global failover

----


> ExecutionGraph can perform concurrent global restarts to scheduling
> -------------------------------------------------------------------
>
>                 Key: FLINK-7216
>                 URL: https://issues.apache.org/jira/browse/FLINK-7216
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>    Affects Versions: 1.2.1, 1.3.1
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Blocker
>             Fix For: 1.4.0, 1.3.2
>
>
> Because ExecutionGraph restarts happen asynchronously and possibly delayed, it can happen in rare corner cases that two restarts are attempted concurrently, in which case some structures on the Execution Graph undergo a concurrent access:
> Sample stack trace:
> {code}
> WARN  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Failed to restart the job.
> java.lang.IllegalStateException: SlotSharingGroup cannot clear task assignment, group still has allocated resources.
>     at org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup.clearTaskAssignment(SlotSharingGroup.java:78)
>     at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.resetForNewExecution(ExecutionJobVertex.java:535)
>     at org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1151)
>     at org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestarter$1.call(ExecutionGraphRestarter.java:40)
>     at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:95)
>     at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>     at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:748)
> {code}
> The solution is to strictly guard against "subsumed" restarts via the {{globalModVersion}} in a similar way as we fence local restarts against global restarts.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)