You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2015/10/06 10:31:26 UTC

[jira] [Commented] (FLINK-2804) Support blocking job submission with Job Manager recovery

    [ https://issues.apache.org/jira/browse/FLINK-2804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14944694#comment-14944694 ] 

ASF GitHub Bot commented on FLINK-2804:
---------------------------------------

GitHub user uce opened a pull request:

    https://github.com/apache/flink/pull/1230

    [FLINK-2804] [client, runtime] Add blocking job submission support in… … … case of JobManager recovery

    This is based on some other PRs. Only review the last commit.
    
    The client submit and wait method gets a configuration parameter, which it uses to determine how to react to a lost job manager. In normal operation, nothing changes.
    
    This is the log output of a test for this. This is tailored to the current state of the job graph recovery. I think there is some room for improvement in the future in the messages the client gets on recovery. Currently the job manager simply assumes that the same job client actor is waiting for updates (that's why you see multiple submission messages.
    
    The most important part to review is the retry loop in `submitJobAndWait#submitJobAndWait`. @tillrohrmann, can you review this please?
    
    ```
    1 > 10:21:48,754 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - ================================================================================
    1 > 10:21:48,756 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - Test testJobManagerConnectionLoss(org.apache.flink.runtime.client.JobClientRecoveryITCase) is running.
    1 > 10:21:48,756 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - --------------------------------------------------------------------------------
    1 > 10:21:48,970 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - Started JobManagerProcess(id=0, port=52432).
    1 > 10:21:49,037 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - Started JobManagerProcess(id=1, port=52433).
    1 > 10:21:49,104 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - Started JobManagerProcess(id=2, port=52434).
    1 > 10:21:49,863 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - Started taskmanager.
    1 > 10:21:49,899 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - Waiting for leader retrieval.
    1 > 10:21:50,549 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - Leader determined as akka.tcp://flink@127.0.0.1:52432/user/jobmanager. Trying to determine, which job manager process this is.
    1 > 10:21:50,555 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - Leader determined as process 0.
    1 > 10:21:50,716 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - Waiting for task manager to connect.
    1 > 10:21:50,724 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - Task manager connected.
    1 > 10:21:50,726 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - Waiting for first execution of job graph.
    1 > 10:21:50,726 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - Submitting blocking twice job.
    1 > 10:21:50,729 INFO  org.apache.flink.runtime.client.JobClient                     - Submitting job f56fb890afbddc1f25ab5d0e56771cad in recovery mode ZOOKEEPER
    1 > 10:21:50,732 INFO  org.apache.flink.runtime.client.JobClient                     - Sending message to JobManager akka.tcp://flink@127.0.0.1:52432/user/jobmanager to submit job blocking job graph (f56fb890afbddc1f25ab5d0e56771cad) and wait for progress
    1 > 10:21:50,846 INFO  org.apache.flink.runtime.client.JobClient                     - Job was successfully submitted to the JobManager
    1 > 10:21:50,849 INFO  org.apache.flink.runtime.client.JobClient                     - 10/06/2015 10:21:50	Job execution switched to status RUNNING.
    1 > 10:21:50,854 INFO  org.apache.flink.runtime.client.JobClient                     - 10/06/2015 10:21:50	blocking vertex(1/1) switched to SCHEDULED 
    1 > 10:21:50,855 INFO  org.apache.flink.runtime.client.JobClient                     - 10/06/2015 10:21:50	blocking vertex(1/1) switched to DEPLOYING 
    1 > 10:21:50,876 INFO  org.apache.flink.runtime.client.JobClient                     - 10/06/2015 10:21:50	blocking vertex(1/1) switched to RUNNING 
    1 > 10:21:50,934 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - Killing the leader.
    1 > 10:21:50,935 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - Waiting for leader retrieval
    1 > 10:21:57,022 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - Leader determined as akka.tcp://flink@127.0.0.1:52433/user/jobmanager. Trying to determine, which job manager process this is.
    1 > 10:21:57,022 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - Leader determined as process 1
    1 > 10:21:57,022 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - Waiting for second execution of job graph.
    1 > 10:21:59,907 INFO  org.apache.flink.runtime.client.JobClient                     - Lost connection to JobManager akka.tcp://flink@127.0.0.1:52432/user/jobmanager
    1 > 10:21:59,908 INFO  org.apache.flink.runtime.client.JobClient                     - Lost connection to job manager. Retrieving new one.
    1 > 10:21:59,930 INFO  org.apache.flink.runtime.client.JobClient                     - New job manager address akka.tcp://flink@127.0.0.1:52433/user/jobmanager and id 43147f59-30bc-4a9c-a5db-43136d14b409
    1 > 10:21:59,930 INFO  org.apache.flink.runtime.client.JobClient                     - Updating job client actor with new job manager.
    1 > 10:22:07,137 INFO  org.apache.flink.runtime.client.JobClient                     - Job was successfully submitted to the JobManager
    1 > 10:22:07,141 INFO  org.apache.flink.runtime.client.JobClient                     - 10/06/2015 10:22:07	Job execution switched to status RUNNING.
    1 > 10:22:07,144 INFO  org.apache.flink.runtime.client.JobClient                     - 10/06/2015 10:22:07	blocking vertex(1/1) switched to SCHEDULED 
    1 > 10:22:07,145 INFO  org.apache.flink.runtime.client.JobClient                     - 10/06/2015 10:22:07	blocking vertex(1/1) switched to DEPLOYING 
    1 > 10:22:07,157 INFO  org.apache.flink.runtime.client.JobClient                     - 10/06/2015 10:22:07	blocking vertex(1/1) switched to RUNNING 
    1 > 10:22:07,241 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - Killing the leader.
    1 > 10:22:07,241 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - Waiting for job submission to return.
    1 > 10:22:19,906 INFO  org.apache.flink.runtime.client.JobClient                     - Lost connection to JobManager akka.tcp://flink@127.0.0.1:52433/user/jobmanager
    1 > 10:22:19,907 INFO  org.apache.flink.runtime.client.JobClient                     - Lost connection to job manager. Retrieving new one.
    1 > 10:22:19,926 INFO  org.apache.flink.runtime.client.JobClient                     - New job manager address akka.tcp://flink@127.0.0.1:52434/user/jobmanager and id d195f698-9cbd-4695-8809-8c93bfe96fac
    1 > 10:22:19,926 INFO  org.apache.flink.runtime.client.JobClient                     - Updating job client actor with new job manager.
    1 > 10:22:25,102 INFO  org.apache.flink.runtime.client.JobClient                     - Job was successfully submitted to the JobManager
    1 > 10:22:25,105 INFO  org.apache.flink.runtime.client.JobClient                     - 10/06/2015 10:22:25	Job execution switched to status RUNNING.
    1 > 10:22:25,108 INFO  org.apache.flink.runtime.client.JobClient                     - 10/06/2015 10:22:25	blocking vertex(1/1) switched to SCHEDULED 
    1 > 10:22:25,108 INFO  org.apache.flink.runtime.client.JobClient                     - 10/06/2015 10:22:25	blocking vertex(1/1) switched to DEPLOYING 
    1 > 10:22:25,119 INFO  org.apache.flink.runtime.client.JobClient                     - 10/06/2015 10:22:25	blocking vertex(1/1) switched to RUNNING 
    1 > 10:22:25,121 INFO  org.apache.flink.runtime.client.JobClient                     - 10/06/2015 10:22:25	blocking vertex(1/1) switched to FINISHED 
    1 > 10:22:25,122 INFO  org.apache.flink.runtime.client.JobClient                     - 10/06/2015 10:22:25	Job execution switched to status FINISHED.
    1 > 10:22:25,135 INFO  org.apache.flink.runtime.client.JobClient                     - Job execution complete
    1 > 10:22:25,135 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - Job submission returned.
    1 > 10:22:25,137 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - --------------------------------------------------------------------------------
    1 > 10:22:25,137 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - Test testJobManagerConnectionLoss(org.apache.flink.runtime.client.JobClientRecoveryITCase) successfully run.
    1 > 10:22:25,137 INFO  org.apache.flink.runtime.client.JobClientRecoveryITCase       - ================================================================================
    ```

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/flink client-recovery

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1230.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1230
    
----
commit 690822a3a8d933f3e316e577cbd632dc4c30dd6b
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-09-03T13:13:28Z

    [runtime] Add type parameter to ByteStreamStateHandle

commit 38e3f303362c6032162b47feaed145dcab2e247b
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-09-19T17:53:18Z

    [clients] Submit job detached if recovery enabled

commit 9a5e119a8ab82c2dc63bee64691e38911413b49c
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-09-20T11:08:24Z

    [FLINK-2652] [tests] Temporary ignore flakey PartitionRequestClientFactoryTest

commit 2ef10b77cbb12b79414809cff2f0c1e162823ed8
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-09-01T15:25:46Z

    [FLINK-2354] [runtime] Add job graph and checkpoint recovery
    
    Sync shutdown of CheckpointCoordinator in ExecutionGraph
    
    - The CheckpointCoordinator was shutdown asynchronously
    - This could result in a NPE when the ExecutionGraph is archived (sets the exec
      context to null)
    - Instead of synchronizing this, just do the blocking ZooKeeper operations
      async (one of which was already async)
    
    Fix resource leak during concurrent removal

commit a50899b5fea4289c51ff09395b6300e5ff410f05
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-09-30T14:38:37Z

    [FLINK-2792] [jobmanager, logging] Set actor message log level to TRACE

commit e85c179998971b10e8af1b235919ae04819e6c05
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-09-30T14:38:57Z

    [tests] Add ChaosMonkeyTest

commit c96c11bb673229881ff4167fd1260c7baca4db42
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-10-05T08:05:05Z

    [FLINK-2805] [blobmanager] Write JARs to file state backend for recovery

commit 52e5ec33a317a8c24c45a837797409c0749bb5c7
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-10-05T22:38:38Z

    [FLINK-2804] [client, runtime] Add blocking job submission support in case of JobManager recovery

----


> Support blocking job submission with Job Manager recovery
> ---------------------------------------------------------
>
>                 Key: FLINK-2804
>                 URL: https://issues.apache.org/jira/browse/FLINK-2804
>             Project: Flink
>          Issue Type: Improvement
>    Affects Versions: master
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>            Priority: Minor
>
> Submitting a job in a blocking fashion with JobManager recovery and a failing JobManager fails on the client side (the one submitting the job). The job still continues to be recovered.
> I propose to add simple support to re-retrieve the leading job manager and update the client actor with it and then wait for the result as before.
> As of the current standing in PR #1153 (https://github.com/apache/flink/pull/1153) the job manager assumes that the same actor is running and just keeps on sending execution state updates etc. (if the listening behaviour is not detached).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)