You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by uce <gi...@git.apache.org> on 2016/02/12 22:16:22 UTC

[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

GitHub user uce opened a pull request:

    https://github.com/apache/flink/pull/1633

    [FLINK-3396] [runtime] Fail job submission after state restore failure

    If state restore fails during job graph submission, the submitting client never gets notified about the submission failure. For detached submissions, this results in a time out rather than the actual failure cause. The actual failure can only be seen in the logs.
    
    This PR changes the behaviour to fail the submission.
    
    The failure cause is wrapped in a `SuppressRestartsException` in order to prevent the restart strategy from kicking in. We call `ExecutionGraph#fail(Throwable)` in the first place in order to have proper clean up.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/flink 3396-state_restore

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1633.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1633
    
----
commit 982857bc0f5248a694d908141df0330de2befa3c
Author: Ufuk Celebi <uc...@apache.org>
Date:   2016-02-12T20:59:58Z

    [FLINK-3396] [runtime] Fail job submission after state restore failure
    
    Problem: If state restore fails during job graph submission, the submitting
    client never gets notified about the submission failure. For detached
    submissions, this results in a time out rather than the actual failure cause.
    The actual failure can only be seen in the logs.
    
    Solution: Fail the submission if state restore fails.

commit 9d2f4d88e99c3e50d8c001c8aa2cdb2848c80478
Author: Ufuk Celebi <uc...@apache.org>
Date:   2016-02-12T21:09:29Z

    [hotfix] Rename UnrecoverableException to SuppressRestartsException

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1633#discussion_r52999704
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -1079,6 +1079,9 @@ class JobManager(
               executionGraph.registerExecutionListener(gateway)
               executionGraph.registerJobStatusListener(gateway)
             }
    +
    +        // All good. Submission succeeded!
    +        jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
    --- End diff --
    
    Moved this one up to have correct ACKing behaviour.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1633#issuecomment-184599724
  
    Thanks for review Till! I've [opened an issue](https://issues.apache.org/jira/browse/FLINK-3411) for the failure behaviour in case of checkpoint state recovery.
    
    I'll merge this today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1633#discussion_r52998947
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -1073,57 +1073,73 @@ class JobManager(
           // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously
           // because it is a blocking operation
           future {
    -        try {
    -          if (isRecovery) {
    -            executionGraph.restoreLatestCheckpointedState()
    -          }
    -          else {
    -            val snapshotSettings = jobGraph.getSnapshotSettings
    -            if (snapshotSettings != null) {
    -              val savepointPath = snapshotSettings.getSavepointPath()
    +        val restoreStateSuccess =
    +          try {
    +            if (isRecovery) {
    +              executionGraph.restoreLatestCheckpointedState()
    --- End diff --
    
    Had an offline discussion with Stephan. He agrees with you that the failure in this case is too hard. I'll undo that change by ACK'ing the submission earlier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1633#discussion_r52992451
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -1073,57 +1073,73 @@ class JobManager(
           // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously
           // because it is a blocking operation
           future {
    -        try {
    -          if (isRecovery) {
    -            executionGraph.restoreLatestCheckpointedState()
    -          }
    -          else {
    -            val snapshotSettings = jobGraph.getSnapshotSettings
    -            if (snapshotSettings != null) {
    -              val savepointPath = snapshotSettings.getSavepointPath()
    +        val restoreStateSuccess =
    +          try {
    +            if (isRecovery) {
    +              executionGraph.restoreLatestCheckpointedState()
    --- End diff --
    
    I'm not so sure about that, to be honest. What if the `restoreLatestCheckpointedState` fails because of some HDFS/ZooKeeper problems. Then you would like to try restarting the job, wouldn't you? The client should then be notified once all restarting attempts have been exhausted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1633#issuecomment-184694713
  
    Travis has passed. Did you have another look at this @tillrohrmann?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1633#issuecomment-184719993
  
    Closing this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

Posted by uce <gi...@git.apache.org>.
Github user uce closed the pull request at:

    https://github.com/apache/flink/pull/1633


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1633#issuecomment-184313039
  
    Changes look good to me @uce. I had only one inline question concerning a semantic change.
    
    Apart from that +1 for merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1633#discussion_r53019879
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -1079,6 +1079,9 @@ class JobManager(
               executionGraph.registerExecutionListener(gateway)
               executionGraph.registerJobStatusListener(gateway)
             }
    +
    +        // All good. Submission succeeded!
    +        jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
    --- End diff --
    
    Oh boy... not my day today. Thanks for catching that. This was not expected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1633#discussion_r52924511
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -1073,57 +1073,73 @@ class JobManager(
           // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously
           // because it is a blocking operation
           future {
    -        try {
    -          if (isRecovery) {
    -            executionGraph.restoreLatestCheckpointedState()
    -          }
    -          else {
    -            val snapshotSettings = jobGraph.getSnapshotSettings
    -            if (snapshotSettings != null) {
    -              val savepointPath = snapshotSettings.getSavepointPath()
    +        val restoreStateSuccess =
    +          try {
    +            if (isRecovery) {
    +              executionGraph.restoreLatestCheckpointedState()
    --- End diff --
    
    Is it intended that now failures in the `restoreLatestCheckpointedState` are non recoverable as well? This seems to be different from the former implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1633#discussion_r53000913
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -1073,57 +1073,73 @@ class JobManager(
           // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously
           // because it is a blocking operation
           future {
    -        try {
    -          if (isRecovery) {
    -            executionGraph.restoreLatestCheckpointedState()
    -          }
    -          else {
    -            val snapshotSettings = jobGraph.getSnapshotSettings
    -            if (snapshotSettings != null) {
    -              val savepointPath = snapshotSettings.getSavepointPath()
    +        val restoreStateSuccess =
    +          try {
    +            if (isRecovery) {
    +              executionGraph.restoreLatestCheckpointedState()
    --- End diff --
    
    Regarding the `JobSubmitSuccess`: we had it as a follow up to have more fine-grained integration with the the client and left it as a duplicate submit message for the time being (instead of something like `JobRecovered`).
    
    The other behaviour is back to the previous state now. I hear you that it makes sense to integrate the state restore behaviour with the execution graph restart.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1633#discussion_r52998567
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -1073,57 +1073,73 @@ class JobManager(
           // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously
           // because it is a blocking operation
           future {
    -        try {
    -          if (isRecovery) {
    -            executionGraph.restoreLatestCheckpointedState()
    -          }
    -          else {
    -            val snapshotSettings = jobGraph.getSnapshotSettings
    -            if (snapshotSettings != null) {
    -              val savepointPath = snapshotSettings.getSavepointPath()
    +        val restoreStateSuccess =
    +          try {
    +            if (isRecovery) {
    +              executionGraph.restoreLatestCheckpointedState()
    --- End diff --
    
    The behaviour right now for a failure while doing a job recovery would simply fail the `ExecutionGraph` triggering a restart. A successful job recovery would send a `JobSubmitSuccess` to the client. I'm not sure whether this is actually correct, since the client already received a `JobSubmitMessage` from the `JobManager` while initially submitting the job. But I think this will simply be ignored.
    
    Thus, suppressing the restart behaviour in case of a job recovery would actually change the behaviour.
    
    If it makes sense and if it is possible to recover from failures while recovering a job or restoring a savepoint, it would make sense to not directly fail the job without restarting. Maybe one should distinguish that based on the actually occurring exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1633#discussion_r52986635
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -1073,57 +1073,73 @@ class JobManager(
           // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously
           // because it is a blocking operation
           future {
    -        try {
    -          if (isRecovery) {
    -            executionGraph.restoreLatestCheckpointedState()
    -          }
    -          else {
    -            val snapshotSettings = jobGraph.getSnapshotSettings
    -            if (snapshotSettings != null) {
    -              val savepointPath = snapshotSettings.getSavepointPath()
    +        val restoreStateSuccess =
    +          try {
    +            if (isRecovery) {
    +              executionGraph.restoreLatestCheckpointedState()
    --- End diff --
    
    Yes, it had the same issue as the savepoint restore. If restoring the checkpoint failed, job submission was never ACK'd, but execution was restarted etc. I'm not sure that this is the behaviour we want in the long term, but I think failing the submission is clearer behaviour right now.
    
    Another issue I've noticed is the following: when HA checkpoint recovery fails and the job cannot be scheduled for execution, the job will eventually be removed from ZooKeeper and hence never be recovered again. Let me open an issue for this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1633#issuecomment-184250973
  
    @tillrohrmann, could you have a look at this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1633#discussion_r53019468
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -1079,6 +1079,9 @@ class JobManager(
               executionGraph.registerExecutionListener(gateway)
               executionGraph.registerJobStatusListener(gateway)
             }
    +
    +        // All good. Submission succeeded!
    +        jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
    --- End diff --
    
    Hmm but now we have the problem that the user might see a `JobSubmitSuccess` without the job being stored in the `SubmittedJobGraphStore`, right? This means that if the `JobManager` dies before the job is persisted, it will never be recovered. I think this violates the `JobSubmitSuccess` contract.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1633#discussion_r52997375
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -1073,57 +1073,73 @@ class JobManager(
           // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously
           // because it is a blocking operation
           future {
    -        try {
    -          if (isRecovery) {
    -            executionGraph.restoreLatestCheckpointedState()
    -          }
    -          else {
    -            val snapshotSettings = jobGraph.getSnapshotSettings
    -            if (snapshotSettings != null) {
    -              val savepointPath = snapshotSettings.getSavepointPath()
    +        val restoreStateSuccess =
    +          try {
    +            if (isRecovery) {
    +              executionGraph.restoreLatestCheckpointedState()
    --- End diff --
    
    But then I would not keep the behaviour as it is right now. Instead, we should then consider the job submitted before trying to recover any checkpoint state and keep the restart behaviour. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---