You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/18 11:20:57 UTC

[GitHub] [flink] rkhachatryan opened a new pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

rkhachatryan opened a new pull request #14683:
URL: https://github.com/apache/flink/pull/14683


   ## What is the purpose of the change
   
   Cleaning checkpoints may try to schedule new checkpoint trigger requests.
   This will fail (throw `RejectedExecutionException`) if ThreadPool is shutting down.
   This in turn will fail the JM.
   
   ## Brief change log
   
   - Tolerate checkpoint cleanup failures
   - Skip scheduling checkpoint triggering if checkpoint coordinator is shutting down
   
   ## Verifying this change
   
   **TBD**
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: yes
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? no
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r559745155



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCleanerTest.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
+
+public class CheckpointCleanerTest {
+
+    @Test
+    public void testTolerateFailureInPostCleanupSubmit() throws InterruptedException {

Review comment:
       It tests that a failure to submit a post-cleanup callback doesn't propagate.
   
   We probably need to adopt some spec-based test framework :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r560057569



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
##########
@@ -62,8 +62,15 @@ public void cleanCheckpoint(
                             }
                         }
                     } finally {
-                        numberOfCheckpointsToClean.decrementAndGet();
-                        postCleanAction.run();
+                        try {
+                            numberOfCheckpointsToClean.decrementAndGet();
+                            postCleanAction.run();
+                        } catch (Exception e) {
+                            LOG.error(
+                                    "Error while cleaning up checkpoint {}",
+                                    checkpoint.getCheckpointID(),
+                                    e);

Review comment:
       Hmm, I guess this is related to the threading model of the `CheckpointCoordinator`.
   
   Could you elaborate on the deadlock you are fearing? Which other locks would be held by the `postCleanAction` which could cause a deadlock?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14683:
URL: https://github.com/apache/flink/pull/14683#issuecomment-762201068


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12178",
       "triggerID" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9af5c7d08603f8cb3a76066a3013aabb69603416",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12194",
       "triggerID" : "9af5c7d08603f8cb3a76066a3013aabb69603416",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b622404a44f014885964d28f894bdd60cb20c7e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4b622404a44f014885964d28f894bdd60cb20c7e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d6ce0b45855c1318f26251bbf725bc90223a455c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12195",
       "triggerID" : "d6ce0b45855c1318f26251bbf725bc90223a455c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e99d76cdfa380f5885d89e5e81da90b25840aa52",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12198",
       "triggerID" : "e99d76cdfa380f5885d89e5e81da90b25840aa52",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4b622404a44f014885964d28f894bdd60cb20c7e UNKNOWN
   * e99d76cdfa380f5885d89e5e81da90b25840aa52 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12198) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on pull request #14683:
URL: https://github.com/apache/flink/pull/14683#issuecomment-763473192


   Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on pull request #14683:
URL: https://github.com/apache/flink/pull/14683#issuecomment-762419210


   Thanks for the feedback @tillrohrmann . I've updated the PR and replied to comments, please take a look.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14683:
URL: https://github.com/apache/flink/pull/14683#issuecomment-762201068


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12178",
       "triggerID" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9af5c7d08603f8cb3a76066a3013aabb69603416",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12194",
       "triggerID" : "9af5c7d08603f8cb3a76066a3013aabb69603416",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b622404a44f014885964d28f894bdd60cb20c7e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4b622404a44f014885964d28f894bdd60cb20c7e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9af5c7d08603f8cb3a76066a3013aabb69603416 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12194) 
   * 4b622404a44f014885964d28f894bdd60cb20c7e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14683:
URL: https://github.com/apache/flink/pull/14683#issuecomment-762201068


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12178",
       "triggerID" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9af5c7d08603f8cb3a76066a3013aabb69603416",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12194",
       "triggerID" : "9af5c7d08603f8cb3a76066a3013aabb69603416",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b622404a44f014885964d28f894bdd60cb20c7e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4b622404a44f014885964d28f894bdd60cb20c7e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d6ce0b45855c1318f26251bbf725bc90223a455c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12195",
       "triggerID" : "d6ce0b45855c1318f26251bbf725bc90223a455c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e99d76cdfa380f5885d89e5e81da90b25840aa52",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e99d76cdfa380f5885d89e5e81da90b25840aa52",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4b622404a44f014885964d28f894bdd60cb20c7e UNKNOWN
   * d6ce0b45855c1318f26251bbf725bc90223a455c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12195) 
   * e99d76cdfa380f5885d89e5e81da90b25840aa52 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r559747444



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1278,7 +1278,12 @@ public void run() {
     }
 
     void scheduleTriggerRequest() {
-        timer.execute(this::executeQueuedRequest);
+        if (isShutdown()) {

Review comment:
       This is only an optimization because `timer` thread can be shutdown after the check.
   I'm concerned that adding `synchronized` might create deadlocks (if not now then in the future).
   See also thread [above](https://github.com/apache/flink/pull/14683#discussion_r559742642).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14683:
URL: https://github.com/apache/flink/pull/14683#issuecomment-762201068


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12178",
       "triggerID" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9af5c7d08603f8cb3a76066a3013aabb69603416",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12194",
       "triggerID" : "9af5c7d08603f8cb3a76066a3013aabb69603416",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b622404a44f014885964d28f894bdd60cb20c7e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4b622404a44f014885964d28f894bdd60cb20c7e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d6ce0b45855c1318f26251bbf725bc90223a455c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12195",
       "triggerID" : "d6ce0b45855c1318f26251bbf725bc90223a455c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e99d76cdfa380f5885d89e5e81da90b25840aa52",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12198",
       "triggerID" : "e99d76cdfa380f5885d89e5e81da90b25840aa52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "33866c10b72c60551b1149853af764b62353b047",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "33866c10b72c60551b1149853af764b62353b047",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4b622404a44f014885964d28f894bdd60cb20c7e UNKNOWN
   * e99d76cdfa380f5885d89e5e81da90b25840aa52 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12198) 
   * 33866c10b72c60551b1149853af764b62353b047 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14683:
URL: https://github.com/apache/flink/pull/14683#issuecomment-762188053


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 4a55885d99666f660b89a2cbde2b15416dcc30fa (Mon Jan 18 11:28:47 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-20992).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r559745525



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCleanerTest.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
+
+public class CheckpointCleanerTest {
+
+    @Test
+    public void testTolerateFailureInPostCleanupSubmit() throws InterruptedException {
+        ExecutorService executor = java.util.concurrent.Executors.newSingleThreadExecutor();
+        CompletedCheckpoint checkpoint = createCheckpoint();
+        TestDiscardCallback discardCallback = new TestDiscardCallback();
+        checkpoint.setDiscardCallback(discardCallback);
+        new CheckpointsCleaner().cleanCheckpoint(checkpoint, true, executor::shutdownNow, executor);
+        checkState(executor.awaitTermination(10, SECONDS));
+        assertTrue(discardCallback.isDiscarded());
+    }
+
+    @Test
+    public void testTolerateFailureInPostCleanup() {
+        CompletedCheckpoint checkpoint = createCheckpoint();
+        TestDiscardCallback discardCallback = new TestDiscardCallback();
+        checkpoint.setDiscardCallback(discardCallback);
+        new CheckpointsCleaner()
+                .cleanCheckpoint(
+                    checkpoint,
+                        true,
+                        () -> {
+                            throw new RuntimeException();

Review comment:
       I guess it's the same discussion as [above](https://github.com/apache/flink/pull/14683#discussion_r559714220).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14683:
URL: https://github.com/apache/flink/pull/14683#issuecomment-762201068


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12178",
       "triggerID" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e96593cc1962fe5346515f12a9dd71ddfcae3297 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12178) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r560087455



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
##########
@@ -62,8 +62,15 @@ public void cleanCheckpoint(
                             }
                         }
                     } finally {
-                        numberOfCheckpointsToClean.decrementAndGet();
-                        postCleanAction.run();
+                        try {
+                            numberOfCheckpointsToClean.decrementAndGet();
+                            postCleanAction.run();
+                        } catch (Exception e) {
+                            LOG.error(
+                                    "Error while cleaning up checkpoint {}",
+                                    checkpoint.getCheckpointID(),
+                                    e);

Review comment:
       The calling thread already acquires `CheckpointCoordinator.lock` and then `checkpoint.lock`. So it will be enough to synchronize on `checkpoint.lock` in `postCleanAction` to have a deadlock.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r560057569



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
##########
@@ -62,8 +62,15 @@ public void cleanCheckpoint(
                             }
                         }
                     } finally {
-                        numberOfCheckpointsToClean.decrementAndGet();
-                        postCleanAction.run();
+                        try {
+                            numberOfCheckpointsToClean.decrementAndGet();
+                            postCleanAction.run();
+                        } catch (Exception e) {
+                            LOG.error(
+                                    "Error while cleaning up checkpoint {}",
+                                    checkpoint.getCheckpointID(),
+                                    e);

Review comment:
       Hmm, I guess this is related to the threading model of the `CheckpointCoordinator`.
   
   Could you elaborate on the deadlock you are fearing? Which other locks would be hold by the `postCleanAction` which could cause a deadlock?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r560143586



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
##########
@@ -62,8 +62,15 @@ public void cleanCheckpoint(
                             }
                         }
                     } finally {
-                        numberOfCheckpointsToClean.decrementAndGet();
-                        postCleanAction.run();
+                        try {
+                            numberOfCheckpointsToClean.decrementAndGet();
+                            postCleanAction.run();
+                        } catch (Exception e) {
+                            LOG.error(
+                                    "Error while cleaning up checkpoint {}",
+                                    checkpoint.getCheckpointID(),
+                                    e);

Review comment:
       The calling thread is the thread calling `CheckpointsCleaner.cleanCheckpoint`, right? Then this call will enqueue the clean up action and eventually release the `checkpoint.lock`. That's when the clean up action can complete. 
   
   I think as long as we have made sure that the `CheckpointsCleaner` owns the `Checkpoint` there should be no other thread accessing the `checkpoint.lock`. At the end of the day it probably boils down to a proper lifecycle management of the involved objects and deciding who owns what.
   
   But maybe I am overlooking something here and you have a concrete thread interleaving in mind which I don't see at the moment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r560190909



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCleanerTest.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
+
+public class CheckpointCleanerTest {
+
+    @Test
+    public void testTolerateFailureInPostCleanupSubmit() throws InterruptedException {

Review comment:
       I dropped this commit alltogether as a result of the above discussion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14683:
URL: https://github.com/apache/flink/pull/14683#issuecomment-762201068


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12178",
       "triggerID" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9af5c7d08603f8cb3a76066a3013aabb69603416",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12194",
       "triggerID" : "9af5c7d08603f8cb3a76066a3013aabb69603416",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b622404a44f014885964d28f894bdd60cb20c7e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4b622404a44f014885964d28f894bdd60cb20c7e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d6ce0b45855c1318f26251bbf725bc90223a455c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12195",
       "triggerID" : "d6ce0b45855c1318f26251bbf725bc90223a455c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e99d76cdfa380f5885d89e5e81da90b25840aa52",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12198",
       "triggerID" : "e99d76cdfa380f5885d89e5e81da90b25840aa52",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4b622404a44f014885964d28f894bdd60cb20c7e UNKNOWN
   * d6ce0b45855c1318f26251bbf725bc90223a455c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12195) 
   * e99d76cdfa380f5885d89e5e81da90b25840aa52 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12198) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r560061568



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCleanerTest.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
+
+public class CheckpointCleanerTest {
+
+    @Test
+    public void testTolerateFailureInPostCleanupSubmit() throws InterruptedException {

Review comment:
       Ok, I think the problem is not the spec-based test framework but that I don't see where the failure from the submission of the post-cleanup callback is coming from. When I remove your changes of this PR, then the test also passes. Hence, this indicates to me that we are not testing the change of this PR here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14683:
URL: https://github.com/apache/flink/pull/14683#issuecomment-762201068


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12178",
       "triggerID" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e96593cc1962fe5346515f12a9dd71ddfcae3297 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12178) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r559745155



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCleanerTest.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
+
+public class CheckpointCleanerTest {
+
+    @Test
+    public void testTolerateFailureInPostCleanupSubmit() throws InterruptedException {

Review comment:
       It tests that failure to submit a post-cleanup callback don't propagate.
   
   We probably need to adopt some spec-based test framework :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14683:
URL: https://github.com/apache/flink/pull/14683#issuecomment-762201068


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12178",
       "triggerID" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9af5c7d08603f8cb3a76066a3013aabb69603416",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12194",
       "triggerID" : "9af5c7d08603f8cb3a76066a3013aabb69603416",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b622404a44f014885964d28f894bdd60cb20c7e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4b622404a44f014885964d28f894bdd60cb20c7e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d6ce0b45855c1318f26251bbf725bc90223a455c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d6ce0b45855c1318f26251bbf725bc90223a455c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9af5c7d08603f8cb3a76066a3013aabb69603416 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12194) 
   * 4b622404a44f014885964d28f894bdd60cb20c7e UNKNOWN
   * d6ce0b45855c1318f26251bbf725bc90223a455c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r559714441



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1278,7 +1278,11 @@ public void run() {
     }
 
     void scheduleTriggerRequest() {
-        timer.execute(this::executeQueuedRequest);
+        if (isShutdown()) {
+            LOG.debug("Skip scheduling trigger request because is shutting down");

Review comment:
       ```suggestion
               LOG.debug("Skip scheduling trigger request because the CheckpointCoordinator is shut down");
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
##########
@@ -62,8 +62,15 @@ public void cleanCheckpoint(
                             }
                         }
                     } finally {
-                        numberOfCheckpointsToClean.decrementAndGet();
-                        postCleanAction.run();
+                        try {
+                            numberOfCheckpointsToClean.decrementAndGet();
+                            postCleanAction.run();
+                        } catch (Exception e) {
+                            LOG.error(
+                                    "Error while cleaning up checkpoint {}",
+                                    checkpoint.getCheckpointID(),
+                                    e);

Review comment:
       I am not entirely sure whether I would swallow the exceptions here. I think the contract should be that here mustn't occur any exceptions. If they do, then this warrants for a hard exit of the process. With a log, even if it is on error, we would probably not have found this bug very soon.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1278,7 +1278,11 @@ public void run() {
     }
 
     void scheduleTriggerRequest() {
-        timer.execute(this::executeQueuedRequest);
+        if (isShutdown()) {
+            LOG.debug("Skip scheduling trigger request because is shutting down");
+        } else {
+            timer.execute(this::executeQueuedRequest);
+        }
     }

Review comment:
       I think we are missing a test for this behaviour here.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCleanerTest.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
+
+public class CheckpointCleanerTest {
+
+    @Test
+    public void testTolerateFailureInPostCleanupSubmit() throws InterruptedException {
+        ExecutorService executor = java.util.concurrent.Executors.newSingleThreadExecutor();
+        CompletedCheckpoint checkpoint = createCheckpoint();
+        TestDiscardCallback discardCallback = new TestDiscardCallback();
+        checkpoint.setDiscardCallback(discardCallback);
+        new CheckpointsCleaner().cleanCheckpoint(checkpoint, true, executor::shutdownNow, executor);
+        checkState(executor.awaitTermination(10, SECONDS));
+        assertTrue(discardCallback.isDiscarded());
+    }
+
+    @Test
+    public void testTolerateFailureInPostCleanup() {
+        CompletedCheckpoint checkpoint = createCheckpoint();
+        TestDiscardCallback discardCallback = new TestDiscardCallback();
+        checkpoint.setDiscardCallback(discardCallback);
+        new CheckpointsCleaner()
+                .cleanCheckpoint(
+                    checkpoint,
+                        true,
+                        () -> {
+                            throw new RuntimeException();

Review comment:
       Do we really want to support this? I am not sure tbh. Afaik this is framework code we have under our control. Hence, we could say that there mustn't be an exception occurring.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCleanerTest.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
+
+public class CheckpointCleanerTest {

Review comment:
       ```suggestion
   public class CheckpointCleanerTest extends TestLogger {
   ```

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCleanerTest.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
+
+public class CheckpointCleanerTest {
+
+    @Test
+    public void testTolerateFailureInPostCleanupSubmit() throws InterruptedException {

Review comment:
       I don't fully understand the test name. What are we testing here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14683:
URL: https://github.com/apache/flink/pull/14683#issuecomment-762201068


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12178",
       "triggerID" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9af5c7d08603f8cb3a76066a3013aabb69603416",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9af5c7d08603f8cb3a76066a3013aabb69603416",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e96593cc1962fe5346515f12a9dd71ddfcae3297 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12178) 
   * 9af5c7d08603f8cb3a76066a3013aabb69603416 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r560170984



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
##########
@@ -62,8 +62,15 @@ public void cleanCheckpoint(
                             }
                         }
                     } finally {
-                        numberOfCheckpointsToClean.decrementAndGet();
-                        postCleanAction.run();
+                        try {
+                            numberOfCheckpointsToClean.decrementAndGet();
+                            postCleanAction.run();
+                        } catch (Exception e) {
+                            LOG.error(
+                                    "Error while cleaning up checkpoint {}",
+                                    checkpoint.getCheckpointID(),
+                                    e);

Review comment:
       No, I don't have a concrete scenario in mind. I'm just concerned that it can arise in the future.
   Okay, I'll add `synchronized` here; and drop the other commit - it's not necessary if `postCleanAction` is safe.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann closed pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
tillrohrmann closed pull request #14683:
URL: https://github.com/apache/flink/pull/14683


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14683:
URL: https://github.com/apache/flink/pull/14683#issuecomment-762201068


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12178",
       "triggerID" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9af5c7d08603f8cb3a76066a3013aabb69603416",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12194",
       "triggerID" : "9af5c7d08603f8cb3a76066a3013aabb69603416",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b622404a44f014885964d28f894bdd60cb20c7e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4b622404a44f014885964d28f894bdd60cb20c7e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d6ce0b45855c1318f26251bbf725bc90223a455c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12195",
       "triggerID" : "d6ce0b45855c1318f26251bbf725bc90223a455c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e99d76cdfa380f5885d89e5e81da90b25840aa52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12198",
       "triggerID" : "e99d76cdfa380f5885d89e5e81da90b25840aa52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "33866c10b72c60551b1149853af764b62353b047",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12247",
       "triggerID" : "33866c10b72c60551b1149853af764b62353b047",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4b622404a44f014885964d28f894bdd60cb20c7e UNKNOWN
   * 33866c10b72c60551b1149853af764b62353b047 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12247) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14683:
URL: https://github.com/apache/flink/pull/14683#issuecomment-762201068


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12178",
       "triggerID" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9af5c7d08603f8cb3a76066a3013aabb69603416",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12194",
       "triggerID" : "9af5c7d08603f8cb3a76066a3013aabb69603416",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b622404a44f014885964d28f894bdd60cb20c7e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4b622404a44f014885964d28f894bdd60cb20c7e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d6ce0b45855c1318f26251bbf725bc90223a455c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12195",
       "triggerID" : "d6ce0b45855c1318f26251bbf725bc90223a455c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e99d76cdfa380f5885d89e5e81da90b25840aa52",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12198",
       "triggerID" : "e99d76cdfa380f5885d89e5e81da90b25840aa52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "33866c10b72c60551b1149853af764b62353b047",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12247",
       "triggerID" : "33866c10b72c60551b1149853af764b62353b047",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4b622404a44f014885964d28f894bdd60cb20c7e UNKNOWN
   * e99d76cdfa380f5885d89e5e81da90b25840aa52 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12198) 
   * 33866c10b72c60551b1149853af764b62353b047 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12247) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r559745155



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCleanerTest.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
+
+public class CheckpointCleanerTest {
+
+    @Test
+    public void testTolerateFailureInPostCleanupSubmit() throws InterruptedException {

Review comment:
       It tests that a failure to submit a post-cleanup callback isn't propagated.
   
   We probably need to adopt some spec-based test framework :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14683:
URL: https://github.com/apache/flink/pull/14683#issuecomment-762201068


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e96593cc1962fe5346515f12a9dd71ddfcae3297",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e96593cc1962fe5346515f12a9dd71ddfcae3297 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r559737569



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1278,7 +1278,11 @@ public void run() {
     }
 
     void scheduleTriggerRequest() {
-        timer.execute(this::executeQueuedRequest);
+        if (isShutdown()) {
+            LOG.debug("Skip scheduling trigger request because is shutting down");
+        } else {
+            timer.execute(this::executeQueuedRequest);
+        }
     }

Review comment:
       Added a test. I initially assumed this change would be only a hotfix.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r560170984



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
##########
@@ -62,8 +62,15 @@ public void cleanCheckpoint(
                             }
                         }
                     } finally {
-                        numberOfCheckpointsToClean.decrementAndGet();
-                        postCleanAction.run();
+                        try {
+                            numberOfCheckpointsToClean.decrementAndGet();
+                            postCleanAction.run();
+                        } catch (Exception e) {
+                            LOG.error(
+                                    "Error while cleaning up checkpoint {}",
+                                    checkpoint.getCheckpointID(),
+                                    e);

Review comment:
       No, I don't have a concrete scenario in mind. I'm just concerned that it can arise in the future.
   Okay, I'll add `synchronized` in `scheduleTriggerRequest` and drop `CheckpointsCleaner` commit - it's not necessary if `postCleanAction` is safe.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14683: [FLINK-20992][checkpointing] Tolerate checkpoint cleanup failures

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14683:
URL: https://github.com/apache/flink/pull/14683#discussion_r559742642



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
##########
@@ -62,8 +62,15 @@ public void cleanCheckpoint(
                             }
                         }
                     } finally {
-                        numberOfCheckpointsToClean.decrementAndGet();
-                        postCleanAction.run();
+                        try {
+                            numberOfCheckpointsToClean.decrementAndGet();
+                            postCleanAction.run();
+                        } catch (Exception e) {
+                            LOG.error(
+                                    "Error while cleaning up checkpoint {}",
+                                    checkpoint.getCheckpointID(),
+                                    e);

Review comment:
       > the contract should be that here mustn't occur any exceptions
   
   To guarantee such a contract I see only two options:
   1. Wrap `timer.execute` in `CheckpointCoordinator.scheduleTriggerRequest` in a `syncrhonized` block.
   2. Tolerate `RejecectionExecutionException && isShutdown` case as I proposed in jira ticket.
   
   The 1st option is deadlock-prone IMO, the 2nd requries some discussion and rework (so not a quick solution).
   Logging is also not perfect but to me it's the lesser evil.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org