You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/28 11:32:55 UTC

[GitHub] [flink] JesseAtSZ opened a new pull request, #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

JesseAtSZ opened a new pull request, #20091:
URL: https://github.com/apache/flink/pull/20091

   ## What is the purpose of the change
   
   *When checkpoints are stored using the local file system, path initialization is performed to recursively create folders, method mkdirsInternal may fail. However, no further processing is done here. Even if the checkpoint path is wrong, the task may start. In addition, because FINALIZE_CHECKPOINT_FAILURE is ignored at checkFailureCounter method and does not participate in the statistics of tolerableCpFailureNumber, the checkpoint may fail all the time.*
   
   ## Brief change log
     - *Check whether the sharedStateDirectory and the taskOwnedStateDirectory are created successfully. If the creation fails, an exception will be thrown.*
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] curcur commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
curcur commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1174569765

   > Got it, thanks. My concern is that probably we should fix the error handling in `CheckpointFailureManager`, instead of `mkdirs` call:
   > 
   > ```
   >     case FINALIZE_CHECKPOINT_FAILURE:
   >         // ignore
   >         break;
   > ```
   > 
   > The [documentation](execution.checkpointing.tolerable-failed-checkpoints) for `execution.checkpointing.tolerable-failed-checkpoints` says that it applies to any `IOException on the Job Manager`. This is the case here (`IOException` gets wrapped into `CheckpointException` when I try it locally).
   > 
   > By adding a check of `mkdirs`, we do fix this particular problem; but any other `IOException` (e.g. intermittent failure when writing the `_metadata` file) will be ignored by `CheckpointFailureManager`.
   > 
   > So how about counting `FINALIZE_CHECKPOINT_FAILURE` as a real failure in `CheckpointFailureManager`? We could still fail fast if the directories could not be created, but that would be just an optimization then.
   
   @pnowojski Do you have an idea why `FINALIZE_CHECKPOINT_FAILURE` is not counted as a checkpoint failure? Looks strange to me as well. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
fredia commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1171930223

   @JesseAtSZ Thanks for your explanation, `fileSystem.mkdirs()` not only returns `false` when the directory exists, but also returns `false` when there is no permission to write.  This PR makes sense to me. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1173501690

   @MartijnVisser Thank for the review! could you please take a review again? In addition, I would like to ask about our code version management. Is it allowed to add code into the released version, such as 1.14.4? I found this problem on 1.14.4.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1174498642

   @rkhachatryan This is also my question when reading the code. I'm curious about why this exception was ignored. I don't know the original intention of ignoring this exception before (nor in the comments), so I choose to modify it in initializeBaseLocationsForCheckpoint. If you are sure to modify it like this, I can update the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1176999771

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1184265463

   Thanks a lot for your analysis @gaoyunhaii!
   
   I think we can distinguish these cases in different ways:
   1. [`checkNoPartlyOperatorsFinishedVertexUsedUnionListState`](https://github.com/apache/flink/blob/74f90d722f7be5db5298b84626935a585391f0df/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java#L186) returns a result value (e.g. `boolean`)  which then is checked in call hierarchy
   2. [`checkNoPartlyOperatorsFinishedVertexUsedUnionListState`](https://github.com/apache/flink/blob/74f90d722f7be5db5298b84626935a585391f0df/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java#L186) throws a `CheckpointException` with a new reason (and not wrapping in [`CheckpointCoordinator.finalizeCheckpoint`](https://github.com/apache/flink/blob/74f90d722f7be5db5298b84626935a585391f0df/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1361))
   3. [`checkNoPartlyOperatorsFinishedVertexUsedUnionListState`](https://github.com/apache/flink/blob/74f90d722f7be5db5298b84626935a585391f0df/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java#L186) throws a new type of exception; then in [`CheckpointCoordinator.finalizeCheckpoint`](https://github.com/apache/flink/blob/74f90d722f7be5db5298b84626935a585391f0df/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1361) we use a different failure reason for this type of exception
   
   (1) seems the cleanest way but requires more changes (4 calls)
   (2) might have some unanticipated consequences because of changing the exception type
   (3) seems a good compromise between invasiveness and cleanliness 
   
   So I'd implement (3) or (1), WDYT? 
   @JesseAtSZ , @gaoyunhaii 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1209180341

   @rkhachatryan Thank you for the changes. I'm still confused about the test case classloading I wrote earlier, could you please explain the reason?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #20091: [FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1212809791

   Merged as 88b309b7dcad269ad084eab5e2944724daf6dee4.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1172286412

   @fredia I updated the PR, could you please take a review again? And what else do I need to do to merge this PR into master?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on a diff in pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on code in PR #20091:
URL: https://github.com/apache/flink/pull/20091#discussion_r912713881


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java:
##########
@@ -113,8 +113,14 @@ public boolean supportsHighlyAvailableStorage() {
 
     @Override
     public void initializeBaseLocationsForCheckpoint() throws IOException {
-        fileSystem.mkdirs(sharedStateDirectory);
-        fileSystem.mkdirs(taskOwnedStateDirectory);
+        if (!fileSystem.mkdirs(sharedStateDirectory)) {
+            throw new IOException(
+                    "Failed to mkdir for sharedStateDirectory: " + sharedStateDirectory);

Review Comment:
   I think this is not the most friendly user error, this should be something like "Failed to create directory for shared state"



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java:
##########
@@ -113,8 +113,14 @@ public boolean supportsHighlyAvailableStorage() {
 
     @Override
     public void initializeBaseLocationsForCheckpoint() throws IOException {
-        fileSystem.mkdirs(sharedStateDirectory);
-        fileSystem.mkdirs(taskOwnedStateDirectory);
+        if (!fileSystem.mkdirs(sharedStateDirectory)) {
+            throw new IOException(
+                    "Failed to mkdir for sharedStateDirectory: " + sharedStateDirectory);
+        }
+        if (!fileSystem.mkdirs(taskOwnedStateDirectory)) {
+            throw new IOException(
+                    "Failed to mkdir for taskOwnedStateDirectory: " + taskOwnedStateDirectory);

Review Comment:
   Same here, the error message can be improved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1175451454

   Regarding `TRIGGER_CHECKPOINT_FAILURE`,
   
   I **had** the following concerns - but after checking the code they turned out to be wrong:
   If counted as failure:
   - "not all tasks are running" could cause it - wrong: there is a separate `NOT_ALL_REQUIRED_TASKS_RUNNING` constant
   - timing issues or existing concurrent checkpoints can produce it - wrong: they are separate constants as well
   
   If not counted as failure (left as is):
   - if we check `mkdirs` return value then it will cause `TRIGGER_CHECKPOINT_FAILURE`, again breaking the counter - wrong: `mkdirs` is called during the `CheckpointCoordinator` initialization
   
   Still, I'm leaning towards leaving it as is because there are probably other edge cases when it happens and shouldn't be counted as a failure, e.g. shutdown.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
fredia commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1173269329

   @rkhachatryan could you please take a review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1211620815

   @MartijnVisser 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #20091: [FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1192318307

   Thanks for updating the PR @JesseAtSZ ,
   Production code changes LGTM.
   
   Do you mind adding tests for the changed behavior?
   I think at least finalization failure counting should be covered, e.g. in `CheckpointFailureManagerITCase`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #20091: [FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1201477777

   How about using an approach similar to https://github.com/apache/flink/blob/c3e72be836e1d14871109ceb0ddb2e98392f4ff1/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java#L3471 ?
   That same code seems sufficient.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1177472210

   @rkhachatryan 
   
   > I'm wondering whether this check is sufficient. mkdirs javadoc says:
   > @return true if at least one new directory has been created, false otherwise
   
   The `mkdirs(final Path f)` comment in `LocalFileSystem` mentions that: 
   
   > Recursively creates the directory specified by the provided path.
   
   I think "at least one" means during the recursive creation of folders, multiple folders may be generated according to the folder hierarchy, so at least one directory will be generated.
   
   In addition, `Run e2e tests` and `Test - table` have failed all the time, what‘s’ the possible reason?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1194920838

   @rkhachatryan Sry, I'm a little busy recently, I'll update the PR in this week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1177700779

   I've checked it locally and found that `WindowDistinctAggregateITCase` fails with `Exceeded checkpoint tolerable failure threshold`, which is preceeded by 
   ```
    Caused by: org.apache.flink.util.FlinkRuntimeException: The vertex GlobalWindowAggregate[28] -> Calc[29] -> LocalWindowAggregate[30] (id = 6853e3e e87fc960bbdfeb6fc8a232136) has used UnionListState, but part of its tasks has called operators' finish method.
            at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlan.checkNoPartlyOperatorsFinishedVertexUsedUnionListState(DefaultCheckpointPlan. java:187) ~[classes/:?]
            at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlan.fulfillFinishedTaskStatus(DefaultCheckpointPlan.java:138) ~[classes/:?]
            at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:323) ~[classes/:?]
            at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1356) ~[classes/:?]
   ```
   
   @gaoyunhaii 
   Do you know what may be the reason?
   
   It seems unrelated to this change, so I'd open a ticket for investigation and temporarily increase `TolerableCheckpointFailureNumber` in the affected tests, WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1178726292

   >  @gaoyunhaii
   >  Do you know what may be the reason?
   
   This might be an expected behavior, I'll have a double look at the test~ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1174998991

   > I have similar doubts if maybe `TRIGGER_CHECKPOINT_FAILURE` shouldn't be ignored as well?
   
   I think this is necessary. This exception may occur when checkpoints are triggered periodically.
   
   Are there any other exceptions that need to be changed? I can submit the code together.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
pnowojski commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1174888262

   The initial set of ignored failure reasons was not well thought through and has been changed/adjusted over and over again, once we found a good example that a failure shouldn't be ignored. So:
   > Do you have an idea why FINALIZE_CHECKPOINT_FAILURE is not counted as a checkpoint failure? Looks strange to me as well.
   
   There is probably no good reason why.
   
   > If no strong reason, I go with @rkhachatryan , we should count FINALIZE_CHECKPOINT_FAILURE as well.
   
   👍 seems like a good change. Looking at the other failures `FINALIZE_CHECKPOINT_FAILURE` fits much better with things like:
   ```
               case IO_EXCEPTION:
               case CHECKPOINT_ASYNC_EXCEPTION:
               case CHECKPOINT_DECLINED:
               case CHECKPOINT_EXPIRED:
   ```
   Rather than checkpoint declined/subsumed. 
   
   I have similar doubts if maybe `TRIGGER_CHECKPOINT_FAILURE` shouldn't be ignored as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1184180054

   Hi @rkhachatryan very sorry for the delay, I have a check with the logic here, the cause of the failure is that
   
   1. For final checkpoint mechanism we has some specialization treatment for operators using union list state that we will abort the checkpoint if parts of the subtasks have finished. This avoid the possible state loss and data inconsistency for union list state. 
   2. The checking happens in finalize step, in this step, it would check all the operators using union list state, if parts of its subtasks finished, an exception will be thrown and the checkpoint will be failed with FINALIZE_CHECKPOINT_FAILURE
   3. Since in this PR we instead count FINALIZE_CHECKPOINT_FAILURE as explicit failures, thus some tests will be affected. 
   
   Since the failures with union list state is currently a by-design behavior, I tend to now for this case we fail the checkpoint with a dedicated reason and continue to not counting them. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1200444798

   How can I construct `FINALIZE_CHECKPOINT_FAILURE`? I tried to implement a `CheckpointStorage`, but I found that there are too many methods to implement.
   
   `    @Test(timeout = 20_000)
       public void testFinalizeCheckpointFailureTriggerJobFailed() throws Exception {
           final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
           env.enableCheckpointing(500);
           env.setRestartStrategy(RestartStrategies.noRestart());
           CheckpointConfig checkpointConfig = env.getCheckpointConfig();
           checkpointConfig.setCheckpointStorage(new MockCheckpointStorage());
           env.addSource(new StringGeneratingSourceFunction()).addSink(new DiscardingSink<>());
           JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
           try {
               // assert that the job only execute checkpoint once and only failed once.
               TestUtils.submitJobAndWaitForResult(
                       cluster.getClusterClient(), jobGraph, getClass().getClassLoader());
           }
           ......
   }`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #20091: [FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1209290466

   @MartijnVisser do you have any further feedback on this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1173803355

   Thanks @JesseAtSZ,
   Could you please confirm that the job is stateless?
   
   
   > the initialization on Coordinator will before the performCheckpoint on TM.
   
   [Not always](https://github.com/apache/flink/blob/16109a31468949f09c2a7bba9003761726e3d61c/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java#L145) - if a default location was used by JM (I'm not saying the check on TM should be added, just clarifying).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1183236982

   @gaoyunhaii did you have a chance to look at the test?
   
   @JesseAtSZ right now it's only a single test failure, which is likely related to a particular case (finished sources). 
   So I think it makes sense to investigate it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1176104206

   @rkhachatryan @MartijnVisser I updated the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1205216052

   I have tried to write the test case in `CheckpointFailureManagerITCase` like this:
   
   >     @Test
       public void testFinalizeCheckpointFailureTriggerJobFailed() throws Exception {
           final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
           env.enableCheckpointing(500);
           env.setRestartStrategy(RestartStrategies.noRestart());
           CheckpointConfig checkpointConfig = env.getCheckpointConfig();
           checkpointConfig.setCheckpointStorage(
                   new JobManagerCheckpointStorage() {
                       private static final long serialVersionUID = 8134582566514272547L;
   
                       // Throw exception when finalizing the checkpoint.
                       @Override
                       public CheckpointStorageAccess createCheckpointStorage(JobID jobId)
                               throws IOException {
                           return new MemoryBackendCheckpointStorageAccess(jobId, null, null, 100) {
                               @Override
                               public CheckpointStorageLocation initializeLocationForCheckpoint(
                                       long checkpointId) throws IOException {
                                   return new NonPersistentMetadataCheckpointStorageLocation(1000) {
                                       @Override
                                       public CheckpointMetadataOutputStream
                                               createMetadataOutputStream() throws IOException {
                                           throw new IOException("Artificial Exception");
                                       }
                                   };
                               }
                           };
                       }
                   });
           env.addSource(new StringGeneratingSourceFunction()).addSink(new DiscardingSink<>());
           JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
           try {
               // assert that the job only execute checkpoint once and only failed once.
               TestUtils.submitJobAndWaitForResult(
                       cluster.getClusterClient(), jobGraph, getClass().getClassLoader());
           } catch (JobExecutionException jobException) {
               Optional<FlinkRuntimeException> throwable =
                       ExceptionUtils.findThrowable(jobException, FlinkRuntimeException.class);
               Assert.assertTrue(throwable.isPresent());
           }
           // assert that the job only failed once.
           Assert.assertEquals(1, StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
       }
   
   However, when the checkpoint is executed, the methods in `flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/JobManagerCheckpointStorage.java` will be called, my implementation of `JobManagerCheckpointStorage` doesn't work, This seems to be related to `CheckpointStorageLoader.load` in `org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder#buildGraph`.
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #20091: [FLINK-27570][runtime]Count finalize failure in checkpoint manager

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #20091:
URL: https://github.com/apache/flink/pull/20091#discussion_r922878505


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -1368,6 +1369,15 @@ private CompletedCheckpoint finalizeCheckpoint(PendingCheckpoint pendingCheckpoi
                                 CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1));
             }
 
+            if (e1 instanceof FlinkExpectedException) {

Review Comment:
   I'm afraid `FlinkExcpectedException` will never reach this line
   because it will be caught in `PendingCheckpoint.finalizeCheckpoint` and wrapped into `IOException` there.
   Replacing `FlinkExpectedException` with `FlinkRuntimeException` shoudll be enough though.
   
   This also suggests that some test is necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1184097939

   > @gaoyunhaii did you have a chance to look at the test?
   
   Hi @rkhachatryan very sorry for the delay, I'll have a conclusion for the issue today~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1169866390

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #20091: [FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1189495948

   I have added a couple of fixup commits on top of this PR changes:
   https://github.com/rkhachatryan/flink/tree/f27570-exception
   
   Would you mind picking them (unless you have any concerns) @JesseAtSZ ?
   Could you please also rebase the PR to the latest to avoid FLINK-28269 failures?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1173543244

   > I found that some branches end in the string -rc. What does this mean?
   
   Release candidates, those are created prior to a release being released


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on a diff in pull request #20091: [FLINK-27570][runtime]Count finalize failure in checkpoint manager

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on code in PR #20091:
URL: https://github.com/apache/flink/pull/20091#discussion_r923029089


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -1368,6 +1369,15 @@ private CompletedCheckpoint finalizeCheckpoint(PendingCheckpoint pendingCheckpoi
                                 CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1));
             }
 
+            if (e1 instanceof FlinkExpectedException) {

Review Comment:
    Updated the PR, For some reasons, I can't compile successfully in my local(debugging).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -1368,6 +1369,15 @@ private CompletedCheckpoint finalizeCheckpoint(PendingCheckpoint pendingCheckpoi
                                 CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1));
             }
 
+            if (e1 instanceof FlinkExpectedException) {

Review Comment:
    Updated the PR, for some reasons, I can't compile successfully in my local(debugging).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1173520228

   @MartijnVisser So this means that this problem cannot be fixed in the old version unless you modify the code and compile it yourself, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1173516269

   > Thank for the review! could you please take a review again?
   
   Sure, but @rkhachatryan his comments are more important here :)
   
   
   
   > Is it allowed to merge code into the released version, such as 1.14.4? I found this problem on 1.14.4.
   
   Code will always be released with a new version, so that would be Flink 1.14.5 if it would be merged and a release would be created later. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1174397933

   Got it, thanks. My concern is that probably we should fix the error handling in `CheckpointFailureManager`, instead of `mkdirs` call:
   ```
       case FINALIZE_CHECKPOINT_FAILURE:
           // ignore
           break;
   ```
   
   The [documentation](execution.checkpointing.tolerable-failed-checkpoints) for `execution.checkpointing.tolerable-failed-checkpoints` says that it applies to any `IOException on the Job Manager`. 
   This is the case here (`IOException` gets wrapped into `CheckpointException` when I try it locally). 
   
   By adding a check of `mkdirs`, we do fix this particular problem; but any other `IOException` (e.g. intermittent failure when writing the `_metadata` file) will be ignored by `CheckpointFailureManager`.
   
   So how about counting `FINALIZE_CHECKPOINT_FAILURE` as a real failure in `CheckpointFailureManager`?
   We could still fail fast if the directories could not be created, but that would be just an optimization then.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1173500743

   Thanks for the PR @JesseAtSZ ,
   
   I'm trying to understand why checkpoints are failing with `FINALIZE_CHECKPOINT_FAILURE` (which is ignored by `CheckpointFailureManager`) and not something like `IOException`.
   From the code, it might happen only in `CheckpointCoordinator` - when all the tasks have already acknowleged the checkpoint.
   That probably means that the job is stateless.
   Could you confirm that @JesseAtSZ ?
   
   If that's NOT the case then we probably should fix failure counting first.
   
   Another question is related to the TM - do we need a symmetric check there? (in 
    `FsCheckpointStorageAccess.resolveCheckpointStorageLocation`).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1171037520

   @curcur 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on a diff in pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on code in PR #20091:
URL: https://github.com/apache/flink/pull/20091#discussion_r911656784


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java:
##########
@@ -113,8 +113,12 @@ public boolean supportsHighlyAvailableStorage() {
 
     @Override
     public void initializeBaseLocationsForCheckpoint() throws IOException {
-        fileSystem.mkdirs(sharedStateDirectory);
-        fileSystem.mkdirs(taskOwnedStateDirectory);
+        if (!fileSystem.mkdirs(sharedStateDirectory)) {
+            throw new IOException("Failed to mkdir for sharedStateDirectory.");

Review Comment:
   Got it, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1211521274

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1185131519

   @rkhachatryan Thanks a lot for the proposals, both (3) and (1) LGTM to me. If we use (1), I think we might change the method name to something like `noPartlyOperatorsFinishedVertexUsedUnionListState`. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #20091: [FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1209154102

   @JesseAtSZ you probably had issues with classloading.
   I've added a working test in a separate commit.
   I'd merge the PR if there are no further concerns (squashing the fixup commit I added).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1173532700

   @MartijnVisser I found that some branches end in the string -rc. What does this mean?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1212826620

   @rkhachatryan How can I find my name in the contributor list? This is my first contribution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan closed pull request #20091: [FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure

Posted by GitBox <gi...@apache.org>.
rkhachatryan closed pull request #20091: [FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure
URL: https://github.com/apache/flink/pull/20091


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1168612133

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "952703e12f5f8f2197c3139e2531b40c368684fe",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "952703e12f5f8f2197c3139e2531b40c368684fe",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 952703e12f5f8f2197c3139e2531b40c368684fe UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1171905414

   @fredia In my example, I specify the path of checkpoint in CentOS as /D/, fileSystem.mkdirs() will return false, and the file creation fails, so the checkpoint initialize fails. In addition, errors will be reported at each subsequent checkpoint: 
   Failure to finalize checkpoint. 
   Although I specified the tolerableCpFailureNumber parameter,FINALIZE_CHECKPOINT_FAILURE is ignored at checkFailureCounter() and does not participate in the statistics of tolerableCpFailureNumber, So as the job progresses, the checkpoint will always fail, but the user cannot perceive it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
fredia commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1171898497

   Thanks for creating this PR @JesseAtSZ.  IIUC, you expect an IOException when the checkpoint directory exists(`fileSystem.mkdir()`  return `false`)?  I think it's a little wired,  could you please explain your motivation?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #20091:
URL: https://github.com/apache/flink/pull/20091#discussion_r911611532


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java:
##########
@@ -113,8 +113,12 @@ public boolean supportsHighlyAvailableStorage() {
 
     @Override
     public void initializeBaseLocationsForCheckpoint() throws IOException {
-        fileSystem.mkdirs(sharedStateDirectory);
-        fileSystem.mkdirs(taskOwnedStateDirectory);
+        if (!fileSystem.mkdirs(sharedStateDirectory)) {
+            throw new IOException("Failed to mkdir for sharedStateDirectory.");

Review Comment:
   nit: add `sharedStateDirectory` path in the IOException message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1173521113

   > So this means that this problem cannot be fixed in the old version unless you modify the code and compile it yourself, right?
   
   Correct yes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1173787366

   > Thanks for the PR @JesseAtSZ ,
   > 
   > I'm trying to understand why checkpoints are failing with `FINALIZE_CHECKPOINT_FAILURE` (which is ignored by `CheckpointFailureManager`) and not something like `IOException`. From the code, it might happen only in `CheckpointCoordinator` - when all the tasks have already acknowleged the checkpoint. That probably means that the job is stateless. Could you confirm that @JesseAtSZ ?
   > 
   > If that's NOT the case then we probably should fix failure counting first.
   > 
   > Another question is related to the TM - do we need a symmetric check there? (in `FsCheckpointStorageAccess.resolveCheckpointStorageLocation`).
   
   I read the code, `FINALIZE_CHECKPOINT_FAILURE ` only occurs in the checkpoint coordinator. In addition, I don't think it is necessary to check the path on TM, because the initialization in Coordinator will be before `performCheckpoint`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
pnowojski commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1176053268

   I'm fine with that


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1173873330

   @rkhachatryan The job you mentioned is stateless, does it mean that whether state is used in in my job? [FLINK-27570](https://issues.apache.org/jira/browse/FLINK-27570) was first found in a job with state, later, I constructed a demo that didn't use state, which also has the same problem. Perhaps, do you want to express that although the checkpoint failed, the state was saved successfully in StateBackend? The state backend I use is the default, that is, MemoryStateBackend. If your problem is this, I need to debug the code again to make sure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #20091: [FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1189475857

   AZP failure unrelated: FLINK-28269


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1208875024

   @rkhachatryan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #20091: [FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1209195638

   Previously, I had a class-loading issue in this test. Byut trying to debug your test, I see that the custom `CheckpointStorage` doesn't override `configure()`; `JobManagerCheckpointStorage.configure()` creates a new `JobManagerCheckpointStorage` object instead of a testing one, which is used further in the test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1178450932

   @rkhachatryan If we modify `FINALIZE_CHECKPOINT_FAILURE` has a great impact, we can change `initializeBaseLocationsForCheckpoint` first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #20091: [FLINK-27570][runtime] Fix initialize base locations for checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #20091:
URL: https://github.com/apache/flink/pull/20091#discussion_r915560439


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java:
##########
@@ -113,8 +113,14 @@ public boolean supportsHighlyAvailableStorage() {
 
     @Override
     public void initializeBaseLocationsForCheckpoint() throws IOException {
-        fileSystem.mkdirs(sharedStateDirectory);
-        fileSystem.mkdirs(taskOwnedStateDirectory);
+        if (!fileSystem.mkdirs(sharedStateDirectory)) {

Review Comment:
   I'm wondering whether this check is sufficient. `mkdirs` javadoc says:
   `@return true if at least one new directory has been created, false otherwise`
   
   "at least one" confuses me; but probably an additional check here would be an overkill because it's only an optimization.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #20091: [FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1212794453

   I'm going to assume there's no further feedback and given that the previous feedback was addressed will merge the PR.
   We can address any further issues separately.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org