You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/03/24 09:07:57 UTC

[flink] 01/02: [FLINK-26741][runtime] Changes method signature of CheckpointIDCounter.shutdown

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 34133b9d040d22f76710b8da987ad848f63063d3
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Mon Mar 21 14:15:27 2022 +0100

    [FLINK-26741][runtime] Changes method signature of CheckpointIDCounter.shutdown
    
    The new method returns a CompletableFuture that can be processed.
    This also includes adding idempotency tests for the ZK and k8s
    implementations.
---
 .../KubernetesCheckpointIDCounter.java             |  31 +++--
 .../KubernetesCheckpointIDCounterTest.java         | 138 +++++++++++++++++++++
 .../KubernetesHighAvailabilityTestBase.java        |   4 +
 .../runtime/checkpoint/CheckpointIDCounter.java    |   5 +-
 .../checkpoint/DeactivatedCheckpointIDCounter.java |   7 +-
 .../checkpoint/StandaloneCheckpointIDCounter.java  |   6 +-
 .../checkpoint/ZooKeeperCheckpointIDCounter.java   |  58 ++++++++-
 .../cleanup/CheckpointResourcesCleanupRunner.java  |   2 +-
 .../flink/runtime/scheduler/SchedulerBase.java     |   2 +-
 .../scheduler/adaptive/AdaptiveScheduler.java      |   2 +-
 .../CheckpointCoordinatorTriggeringTest.java       |   5 +-
 .../checkpoint/CheckpointIDCounterTestBase.java    |   8 +-
 .../checkpoint/TestingCheckpointIDCounter.java     |  27 ++--
 .../ZooKeeperCheckpointIDCounterITCase.java        |  97 +++++++++++++--
 .../runtime/scheduler/DefaultSchedulerTest.java    |  21 +++-
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  |   3 +-
 16 files changed, 362 insertions(+), 54 deletions(-)

diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounter.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounter.java
index a08146a..8eac11e 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounter.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounter.java
@@ -25,6 +25,7 @@ import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.flink.kubernetes.utils.Constants.CHECKPOINT_COUNTER_KEY;
@@ -71,25 +73,32 @@ public class KubernetesCheckpointIDCounter implements CheckpointIDCounter {
     }
 
     @Override
-    public void shutdown(JobStatus jobStatus) {
+    public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
         if (!running) {
-            return;
+            return FutureUtils.completedVoidFuture();
         }
         running = false;
 
         LOG.info("Shutting down.");
         if (jobStatus.isGloballyTerminalState()) {
             LOG.info("Removing counter from ConfigMap {}", configMapName);
-            kubeClient.checkAndUpdateConfigMap(
-                    configMapName,
-                    configMap -> {
-                        if (isValidOperation(configMap)) {
-                            configMap.getData().remove(CHECKPOINT_COUNTER_KEY);
-                            return Optional.of(configMap);
-                        }
-                        return Optional.empty();
-                    });
+            return kubeClient
+                    .checkAndUpdateConfigMap(
+                            configMapName,
+                            configMap -> {
+                                if (isValidOperation(configMap)) {
+                                    configMap.getData().remove(CHECKPOINT_COUNTER_KEY);
+                                    return Optional.of(configMap);
+                                }
+                                return Optional.empty();
+                            })
+                    // checkAndUpdateConfigMap only returns false if the callback returned an empty
+                    // ConfigMap. We don't want to continue the cleanup in that case, i.e. we can
+                    // ignore the return value
+                    .thenApply(valueChanged -> null);
         }
+
+        return FutureUtils.completedVoidFuture();
     }
 
     private boolean isValidOperation(KubernetesConfigMap configMap) {
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounterTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounterTest.java
index d6d4313..30b8875 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounterTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounterTest.java
@@ -18,13 +18,19 @@
 
 package org.apache.flink.kubernetes.highavailability;
 
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
 
 import org.junit.Test;
 
+import java.util.concurrent.CompletionException;
+
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
 
 /** Tests for {@link KubernetesCheckpointIDCounter} operations. */
@@ -51,6 +57,138 @@ public class KubernetesCheckpointIDCounterTest extends KubernetesHighAvailabilit
     }
 
     @Test
+    public void testShutdown() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesCheckpointIDCounter checkpointIDCounter =
+                                    new KubernetesCheckpointIDCounter(
+                                            flinkKubeClient, LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+                            checkpointIDCounter.start();
+
+                            checkpointIDCounter.setCount(100L);
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            .get(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is("100"));
+
+                            checkpointIDCounter.shutdown(JobStatus.FINISHED).join();
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            .containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is(false));
+                        });
+            }
+        };
+    }
+
+    @Test
+    public void testShutdownForLocallyTerminatedJobStatus() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesCheckpointIDCounter checkpointIDCounter =
+                                    new KubernetesCheckpointIDCounter(
+                                            flinkKubeClient, LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+                            checkpointIDCounter.start();
+
+                            checkpointIDCounter.setCount(100L);
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            .get(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is("100"));
+
+                            checkpointIDCounter.shutdown(JobStatus.SUSPENDED).join();
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            .containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is(true));
+                        });
+            }
+        };
+    }
+
+    @Test
+    public void testIdempotentShutdown() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesCheckpointIDCounter checkpointIDCounter =
+                                    new KubernetesCheckpointIDCounter(
+                                            flinkKubeClient, LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+                            checkpointIDCounter.start();
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            .containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is(false));
+
+                            checkpointIDCounter.shutdown(JobStatus.FINISHED).join();
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            .containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is(false));
+
+                            // a second shutdown should work without causing any errors
+                            checkpointIDCounter.shutdown(JobStatus.FINISHED).join();
+                        });
+            }
+        };
+    }
+
+    @Test
+    public void testShutdownFailureDueToMissingConfigMap() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesCheckpointIDCounter checkpointIDCounter =
+                                    new KubernetesCheckpointIDCounter(
+                                            flinkKubeClient, LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+                            checkpointIDCounter.start();
+
+                            // deleting the ConfigMap from outside of the CheckpointIDCounter while
+                            // still using the counter (which is stored as an entry in the
+                            // ConfigMap) causes an unexpected failure which we want to simulate
+                            // here
+                            flinkKubeClient.deleteConfigMap(LEADER_CONFIGMAP_NAME);
+
+                            assertThrows(
+                                    CompletionException.class,
+                                    () -> checkpointIDCounter.shutdown(JobStatus.FINISHED).get());
+
+                            // fixing the internal issue should make the shutdown succeed again
+                            KubernetesUtils.createConfigMapIfItDoesNotExist(
+                                    flinkKubeClient, LEADER_CONFIGMAP_NAME, getClusterId());
+                            checkpointIDCounter.shutdown(JobStatus.FINISHED).get();
+                        });
+            }
+        };
+    }
+
+    @Test
     public void testGetAndIncrementWithNoLeadership() throws Exception {
         new Context() {
             {
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
index 5310a8b..543a589 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
@@ -118,6 +118,10 @@ public class KubernetesHighAvailabilityTestBase extends TestLogger {
             return kubernetesTestFixture.createFlinkKubeClientBuilder();
         }
 
+        String getClusterId() {
+            return CLUSTER_ID;
+        }
+
         KubernetesConfigMap getLeaderConfigMap() {
             return kubernetesTestFixture.getLeaderConfigMap();
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
index 81e0471..bb4a395 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobStatus;
 
+import java.util.concurrent.CompletableFuture;
+
 /** A checkpoint ID counter. */
 public interface CheckpointIDCounter {
     int INITIAL_CHECKPOINT_ID = 1;
@@ -34,8 +36,9 @@ public interface CheckpointIDCounter {
      * or kept.
      *
      * @param jobStatus Job state on shut down
+     * @return The {@code CompletableFuture} holding the result of the shutdown operation.
      */
-    void shutdown(JobStatus jobStatus) throws Exception;
+    CompletableFuture<Void> shutdown(JobStatus jobStatus);
 
     /**
      * Atomically increments the current checkpoint ID.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointIDCounter.java
index ca86159..7d67010 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointIDCounter.java
@@ -19,6 +19,9 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.util.concurrent.CompletableFuture;
 
 /**
  * This class represents a {@link CheckpointIDCounter} if checkpointing is deactivated.
@@ -32,7 +35,9 @@ public enum DeactivatedCheckpointIDCounter implements CheckpointIDCounter {
     public void start() throws Exception {}
 
     @Override
-    public void shutdown(JobStatus jobStatus) throws Exception {}
+    public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
+        return FutureUtils.completedVoidFuture();
+    }
 
     @Override
     public long getAndIncrement() throws Exception {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
index 30b6539..8240035 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.concurrent.FutureUtils;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -37,7 +39,9 @@ public class StandaloneCheckpointIDCounter implements CheckpointIDCounter {
     public void start() throws Exception {}
 
     @Override
-    public void shutdown(JobStatus jobStatus) throws Exception {}
+    public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
+        return FutureUtils.completedVoidFuture();
+    }
 
     @Override
     public long getAndIncrement() throws Exception {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
index 9c649e6..6576dde 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
@@ -22,18 +22,27 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.FutureUtils;
 
+import org.apache.flink.shaded.curator5.com.google.common.collect.Sets;
 import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.shaded.curator5.org.apache.curator.framework.api.CuratorEvent;
+import org.apache.flink.shaded.curator5.org.apache.curator.framework.api.CuratorEventType;
 import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.shared.SharedCount;
 import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.shared.VersionedValue;
 import org.apache.flink.shaded.curator5.org.apache.curator.framework.state.ConnectionState;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.GuardedBy;
 
+import java.io.IOException;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -101,22 +110,65 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
     }
 
     @Override
-    public void shutdown(JobStatus jobStatus) throws Exception {
+    public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
         synchronized (startStopLock) {
             if (isStarted) {
                 LOG.info("Shutting down.");
-                sharedCount.close();
+                try {
+                    sharedCount.close();
+                } catch (IOException e) {
+                    return FutureUtils.completedExceptionally(e);
+                }
 
                 client.getConnectionStateListenable().removeListener(connectionStateListener);
 
                 if (jobStatus.isGloballyTerminalState()) {
                     LOG.info("Removing {} from ZooKeeper", counterPath);
-                    client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath);
+                    try {
+                        final CompletableFuture<Void> deletionFuture = new CompletableFuture<>();
+                        client.delete()
+                                .inBackground(
+                                        (curatorFramework, curatorEvent) ->
+                                                handleDeletionOfCounterPath(
+                                                        curatorEvent, deletionFuture))
+                                .forPath(counterPath);
+                        return deletionFuture;
+                    } catch (Exception e) {
+                        return FutureUtils.completedExceptionally(e);
+                    }
                 }
 
                 isStarted = false;
             }
         }
+
+        return FutureUtils.completedVoidFuture();
+    }
+
+    private void handleDeletionOfCounterPath(
+            CuratorEvent curatorEvent, CompletableFuture<Void> deletionFuture) {
+        Preconditions.checkArgument(
+                curatorEvent.getType() == CuratorEventType.DELETE,
+                "An unexpected CuratorEvent was monitored: " + curatorEvent.getType());
+        Preconditions.checkArgument(
+                counterPath.endsWith(curatorEvent.getPath()),
+                "An unexpected path was selected for deletion: " + curatorEvent.getPath());
+
+        final KeeperException.Code eventCode =
+                KeeperException.Code.get(curatorEvent.getResultCode());
+        if (Sets.immutableEnumSet(KeeperException.Code.OK, KeeperException.Code.NONODE)
+                .contains(eventCode)) {
+            deletionFuture.complete(null);
+        } else {
+            final String namespacedCounterPath =
+                    ZooKeeperUtils.generateZookeeperPath(client.getNamespace(), counterPath);
+            deletionFuture.completeExceptionally(
+                    new FlinkException(
+                            String.format(
+                                    "An error occurred while shutting down the CheckpointIDCounter in path '%s'.",
+                                    namespacedCounterPath),
+                            KeeperException.create(eventCode, namespacedCounterPath)));
+        }
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
index 4ae7678..c97f864 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
@@ -129,7 +129,7 @@ public class CheckpointResourcesCleanupRunner implements JobManagerRunner {
         }
 
         try {
-            checkpointIDCounter.shutdown(getJobStatus());
+            checkpointIDCounter.shutdown(getJobStatus()).get();
         } catch (Exception e) {
             exception = ExceptionUtils.firstOrSuppressed(e, exception);
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index c997cc2..0ca2233 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -245,7 +245,7 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling
         }
 
         try {
-            checkpointIdCounter.shutdown(jobStatus);
+            checkpointIdCounter.shutdown(jobStatus).get();
         } catch (Exception e) {
             exception = ExceptionUtils.firstOrSuppressed(e, exception);
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 999b2af..562c17f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -463,7 +463,7 @@ public class AdaptiveScheduler
         }
 
         try {
-            checkpointIdCounter.shutdown(terminalState);
+            checkpointIdCounter.shutdown(terminalState).get();
         } catch (Exception e) {
             exception = ExceptionUtils.firstOrSuppressed(e, exception);
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
index ddf446b..643c09d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
 
@@ -835,7 +836,9 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         public void start() {}
 
         @Override
-        public void shutdown(JobStatus jobStatus) throws Exception {}
+        public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
+            return FutureUtils.completedVoidFuture();
+        }
 
         @Override
         public long getAndIncrement() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java
index 1dcfd33..59933e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java
@@ -58,7 +58,7 @@ abstract class CheckpointIDCounterTestBase {
             counter.start();
             assertThat(counter.get()).isGreaterThanOrEqualTo(0L);
         } finally {
-            counter.shutdown(JobStatus.FINISHED);
+            counter.shutdown(JobStatus.FINISHED).join();
         }
     }
 
@@ -78,7 +78,7 @@ abstract class CheckpointIDCounterTestBase {
             assertThat(counter.get()).isEqualTo(4);
             assertThat(counter.getAndIncrement()).isEqualTo(4);
         } finally {
-            counter.shutdown(JobStatus.FINISHED);
+            counter.shutdown(JobStatus.FINISHED).join();
         }
     }
 
@@ -136,7 +136,7 @@ abstract class CheckpointIDCounterTestBase {
                 executor.shutdown();
             }
 
-            counter.shutdown(JobStatus.FINISHED);
+            counter.shutdown(JobStatus.FINISHED).join();
         }
     }
 
@@ -161,7 +161,7 @@ abstract class CheckpointIDCounterTestBase {
         assertThat(counter.get()).isEqualTo(1338);
         assertThat(counter.getAndIncrement()).isEqualTo(1338);
 
-        counter.shutdown(JobStatus.FINISHED);
+        counter.shutdown(JobStatus.FINISHED).join();
     }
 
     /** Task repeatedly incrementing the {@link CheckpointIDCounter}. */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java
index 2365ed5..466a69a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java
@@ -18,16 +18,18 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 /** Test {@link CheckpointIDCounter} implementation for testing the shutdown behavior. */
 public final class TestingCheckpointIDCounter implements CheckpointIDCounter {
 
     private final Runnable startRunnable;
-    private final Consumer<JobStatus> shutdownConsumer;
+    private final Function<JobStatus, CompletableFuture<Void>> shutdownFunction;
     private final Supplier<Integer> getAndIncrementSupplier;
     private final Supplier<Integer> getSupplier;
     private final Consumer<Long> setCountConsumer;
@@ -36,18 +38,22 @@ public final class TestingCheckpointIDCounter implements CheckpointIDCounter {
             CompletableFuture<JobStatus> shutdownFuture) {
         return TestingCheckpointIDCounter.builder()
                 .withStartRunnable(() -> {})
-                .withShutdownConsumer(shutdownFuture::complete)
+                .withShutdownConsumer(
+                        jobStatus -> {
+                            shutdownFuture.complete(jobStatus);
+                            return FutureUtils.completedVoidFuture();
+                        })
                 .build();
     }
 
     private TestingCheckpointIDCounter(
             Runnable startRunnable,
-            Consumer<JobStatus> shutdownConsumer,
+            Function<JobStatus, CompletableFuture<Void>> shutdownFunction,
             Supplier<Integer> getAndIncrementSupplier,
             Supplier<Integer> getSupplier,
             Consumer<Long> setCountConsumer) {
         this.startRunnable = startRunnable;
-        this.shutdownConsumer = shutdownConsumer;
+        this.shutdownFunction = shutdownFunction;
         this.getAndIncrementSupplier = getAndIncrementSupplier;
         this.getSupplier = getSupplier;
         this.setCountConsumer = setCountConsumer;
@@ -59,8 +65,8 @@ public final class TestingCheckpointIDCounter implements CheckpointIDCounter {
     }
 
     @Override
-    public void shutdown(JobStatus jobStatus) {
-        shutdownConsumer.accept(jobStatus);
+    public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
+        return shutdownFunction.apply(jobStatus);
     }
 
     @Override
@@ -86,7 +92,7 @@ public final class TestingCheckpointIDCounter implements CheckpointIDCounter {
     public static class Builder {
 
         private Runnable startRunnable;
-        private Consumer<JobStatus> shutdownConsumer;
+        private Function<JobStatus, CompletableFuture<Void>> shutdownFunction;
         private Supplier<Integer> getAndIncrementSupplier;
         private Supplier<Integer> getSupplier;
         private Consumer<Long> setCountConsumer;
@@ -96,8 +102,9 @@ public final class TestingCheckpointIDCounter implements CheckpointIDCounter {
             return this;
         }
 
-        public Builder withShutdownConsumer(Consumer<JobStatus> shutdownConsumer) {
-            this.shutdownConsumer = shutdownConsumer;
+        public Builder withShutdownConsumer(
+                Function<JobStatus, CompletableFuture<Void>> shutdownFunction) {
+            this.shutdownFunction = shutdownFunction;
             return this;
         }
 
@@ -119,7 +126,7 @@ public final class TestingCheckpointIDCounter implements CheckpointIDCounter {
         public TestingCheckpointIDCounter build() {
             return new TestingCheckpointIDCounter(
                     startRunnable,
-                    shutdownConsumer,
+                    shutdownFunction,
                     getAndIncrementSupplier,
                     getSupplier,
                     setCountConsumer);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java
index 798ea79..4e9c0d4e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java
@@ -19,16 +19,21 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 
 import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
 
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.concurrent.ExecutionException;
+
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
  * Unit tests for the {@link ZooKeeperCheckpointIDCounter}. The tests are inherited from the test
@@ -38,19 +43,21 @@ class ZooKeeperCheckpointIDCounterITCase extends CheckpointIDCounterTestBase {
 
     private static ZooKeeperTestEnvironment zookeeper;
 
-    @BeforeAll
-    public static void setUp() throws Exception {
+    @BeforeEach
+    void setup() {
         zookeeper = new ZooKeeperTestEnvironment(1);
     }
 
-    @AfterAll
-    private static void tearDown() throws Exception {
-        zookeeper.shutdown();
+    @AfterEach
+    void tearDown() throws Exception {
+        cleanAndStopZooKeeperIfRunning();
     }
 
-    @BeforeEach
-    private void cleanUp() throws Exception {
-        zookeeper.deleteAll();
+    private void cleanAndStopZooKeeperIfRunning() throws Exception {
+        if (zookeeper.getClient().isStarted()) {
+            zookeeper.deleteAll();
+            zookeeper.shutdown();
+        }
     }
 
     /** Tests that counter node is removed from ZooKeeper after shutdown. */
@@ -62,10 +69,73 @@ class ZooKeeperCheckpointIDCounterITCase extends CheckpointIDCounterTestBase {
         CuratorFramework client = zookeeper.getClient();
         assertThat(client.checkExists().forPath(counter.getPath())).isNotNull();
 
-        counter.shutdown(JobStatus.FINISHED);
+        counter.shutdown(JobStatus.FINISHED).join();
+        assertThat(client.checkExists().forPath(counter.getPath())).isNull();
+    }
+
+    @Test
+    public void testIdempotentShutdown() throws Exception {
+        ZooKeeperCheckpointIDCounter counter = createCheckpointIdCounter();
+        counter.start();
+
+        CuratorFramework client = zookeeper.getClient();
+        counter.shutdown(JobStatus.FINISHED).join();
+
+        // shutdown shouldn't fail due to missing path
+        counter.shutdown(JobStatus.FINISHED).join();
         assertThat(client.checkExists().forPath(counter.getPath())).isNull();
     }
 
+    @Test
+    public void testShutdownWithFailureDueToMissingConnection() throws Exception {
+        ZooKeeperCheckpointIDCounter counter = createCheckpointIdCounter();
+        counter.start();
+
+        cleanAndStopZooKeeperIfRunning();
+
+        assertThatThrownBy(() -> counter.shutdown(JobStatus.FINISHED).get())
+                .as("The shutdown should fail because of the client connection being dropped.")
+                .isInstanceOf(ExecutionException.class)
+                .hasCauseExactlyInstanceOf(IllegalStateException.class);
+    }
+
+    @Test
+    public void testShutdownWithFailureDueToExistingChildNodes() throws Exception {
+        final ZooKeeperCheckpointIDCounter counter = createCheckpointIdCounter();
+        counter.start();
+
+        final CuratorFramework client =
+                ZooKeeperUtils.useNamespaceAndEnsurePath(zookeeper.getClient(), "/");
+        final String counterNodePath = ZooKeeperUtils.generateZookeeperPath(counter.getPath());
+        final String childNodePath =
+                ZooKeeperUtils.generateZookeeperPath(
+                        counterNodePath, "unexpected-child-node-causing-a-failure");
+        client.create().forPath(childNodePath);
+
+        final String namespacedCounterNodePath =
+                ZooKeeperUtils.generateZookeeperPath(client.getNamespace(), counterNodePath);
+        final Throwable expectedRootCause =
+                KeeperException.create(KeeperException.Code.NOTEMPTY, namespacedCounterNodePath);
+        assertThatThrownBy(() -> counter.shutdown(JobStatus.FINISHED).get())
+                .as(
+                        "The shutdown should fail because of a child node being present and the shutdown not performing an explicit recursive deletion.")
+                .isInstanceOf(ExecutionException.class)
+                .extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE)
+                .anySatisfy(
+                        cause ->
+                                assertThat(cause)
+                                        .isInstanceOf(expectedRootCause.getClass())
+                                        .hasMessage(expectedRootCause.getMessage()));
+
+        client.delete().forPath(childNodePath);
+        counter.shutdown(JobStatus.FINISHED).join();
+
+        assertThat(client.checkExists().forPath(counterNodePath))
+                .as(
+                        "A retry of the shutdown should have worked now after the root cause was resolved.")
+                .isNull();
+    }
+
     /** Tests that counter node is NOT removed from ZooKeeper after suspend. */
     @Test
     public void testSuspendKeepsState() throws Exception {
@@ -75,13 +145,14 @@ class ZooKeeperCheckpointIDCounterITCase extends CheckpointIDCounterTestBase {
         CuratorFramework client = zookeeper.getClient();
         assertThat(client.checkExists().forPath(counter.getPath())).isNotNull();
 
-        counter.shutdown(JobStatus.SUSPENDED);
+        counter.shutdown(JobStatus.SUSPENDED).join();
         assertThat(client.checkExists().forPath(counter.getPath())).isNotNull();
     }
 
     @Override
     protected ZooKeeperCheckpointIDCounter createCheckpointIdCounter() throws Exception {
         return new ZooKeeperCheckpointIDCounter(
-                zookeeper.getClient(), new DefaultLastStateConnectionStateListener());
+                ZooKeeperUtils.useNamespaceAndEnsurePath(zookeeper.getClient(), "/"),
+                new DefaultLastStateConnectionStateListener());
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index a63a752..387d667 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -107,6 +107,7 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -1608,7 +1609,8 @@ public class DefaultSchedulerTest extends TestLogger {
                             throw new RuntimeException(e);
                         }
                     },
-                    executorService);
+                    executorService,
+                    log);
         } finally {
             executorService.shutdownNow();
         }
@@ -1620,7 +1622,8 @@ public class DefaultSchedulerTest extends TestLogger {
      */
     public static void doTestCheckpointCleanerIsClosedAfterCheckpointServices(
             BiFunction<CheckpointRecoveryFactory, CheckpointsCleaner, SchedulerNG> schedulerFactory,
-            ScheduledExecutorService executorService)
+            ScheduledExecutorService executorService,
+            Logger logger)
             throws Exception {
         final CountDownLatch checkpointServicesShutdownBlocked = new CountDownLatch(1);
         final CountDownLatch cleanerClosed = new CountDownLatch(1);
@@ -1638,9 +1641,17 @@ public class DefaultSchedulerTest extends TestLogger {
                 new StandaloneCheckpointIDCounter() {
 
                     @Override
-                    public void shutdown(JobStatus jobStatus) throws Exception {
-                        checkpointServicesShutdownBlocked.await();
-                        super.shutdown(jobStatus);
+                    public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
+                        try {
+                            checkpointServicesShutdownBlocked.await();
+                        } catch (InterruptedException e) {
+                            logger.error(
+                                    "An error occurred while executing waiting for the CheckpointServices shutdown.",
+                                    e);
+                            Thread.currentThread().interrupt();
+                        }
+
+                        return super.shutdown(jobStatus);
                     }
                 };
         final CheckpointsCleaner checkpointsCleaner =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index e673a28..6e6f6d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -1462,7 +1462,8 @@ public class AdaptiveSchedulerTest extends TestLogger {
                             throw new RuntimeException(e);
                         }
                     },
-                    executorService);
+                    executorService,
+                    log);
         } finally {
             executorService.shutdownNow();
         }