You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/07/12 09:32:00 UTC

[jira] [Commented] (FLINK-9575) Potential race condition when removing JobGraph in HA

    [ https://issues.apache.org/jira/browse/FLINK-9575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541378#comment-16541378 ] 

ASF GitHub Bot commented on FLINK-9575:
---------------------------------------

GitHub user Wosin opened a pull request:

    https://github.com/apache/flink/pull/6322

    [FLINK-9575]: Remove job-related BLOBS only if the job was removed suce…

    ## What is the purpose of the change
    
    Currently flink removes all blobs connected with the job, no matter if the job itself was removed successfully. This is not the desired behavior.
    
    ## Brief change log
    - Blobs and data will be removed only if the job itself will be removed sucessfully
    
    ## Verifying this change
    
    This change is a trivial rework / code cleanup without any test coverage.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes
      - The S3 file system connector: no
    ## Documentation
    
      - Does this pull request introduce a new feature? no
      - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Wosin/flink FLINK-9575

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6322.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6322
    
----
commit 1c5febebd7045e61862118649a11a85a1917a54a
Author: Wosin <bl...@...>
Date:   2018-07-04T08:27:54Z

    FLINK-9575: Remove job-related BLOBS only if the job was removed sucessfully

----


> Potential race condition when removing JobGraph in HA
> -----------------------------------------------------
>
>                 Key: FLINK-9575
>                 URL: https://issues.apache.org/jira/browse/FLINK-9575
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.5.0
>            Reporter: Dominik Wosiński
>            Assignee: Dominik Wosiński
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.5.2, 1.6.0
>
>
> When we are removing the _JobGraph_ from _JobManager_ for example after invoking _cancel()_, the following code is executed : 
> {noformat}
>  
> val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val result = if (removeJobFromStateBackend) { val futureOption = Some(future { try { // ...otherwise, we can have lingering resources when there is a concurrent shutdown // and the ZooKeeper client is closed. Not removing the job immediately allow the // shutdown to release all resources. submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t) } }(context.dispatcher)) try { archive ! decorateMessage( ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch { case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t) } futureOption } else { None } currentJobs.remove(jobID) result case None => None } // remove all job-related BLOBs from local and HA store libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) futureOption }
> val futureOption = currentJobs.get(jobID) match {
> case Some((eg, _)) =>
> val result = if (removeJobFromStateBackend) {
> val futureOption = Some(future {
> try {
> // ...otherwise, we can have lingering resources when there is a concurrent shutdown
> // and the ZooKeeper client is closed. Not removing the job immediately allow the
> // shutdown to release all resources.
> submittedJobGraphs.removeJobGraph(jobID)
> } catch {
> case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t)
> }
> }(context.dispatcher))
> try {
> archive ! decorateMessage(
> ArchiveExecutionGraph(
> jobID,
> ArchivedExecutionGraph.createFrom(eg)))
> } catch {
> case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t)
> }
> futureOption
> } else {
> None
> }
> currentJobs.remove(jobID)
> result
> case None => None
> }
> // remove all job-related BLOBs from local and HA store
> libraryCacheManager.unregisterJob(jobID)
> blobServer.cleanupJob(jobID, removeJobFromStateBackend)
> jobManagerMetricGroup.removeJob(jobID)
> futureOption
> }{noformat}
> This causes the asynchronous removal of the job and synchronous removal of blob files connected with this jar. This means as far as I understand that there is a potential problem that we can fail to remove job graph from _submittedJobGraphs._ If the JobManager fails and we elect the new leader it can try to recover such job, but it will fail with an exception since the assigned blob was already removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)