You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/08 16:37:50 UTC

[flink] branch release-1.12 updated: [FLINK-20417][k8s] Create a new watcher when the old one is closed with HTTP_GONE

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 698141d  [FLINK-20417][k8s] Create a new watcher when the old one is closed with HTTP_GONE
698141d is described below

commit 698141d47bb2c06b1858b575d763cb18e5a3ea55
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Tue Feb 2 20:14:33 2021 +0800

    [FLINK-20417][k8s] Create a new watcher when the old one is closed with HTTP_GONE
    
    This closes #14837.
---
 .../KubernetesResourceManagerDriver.java           | 41 ++++++++++++----
 .../KubernetesLeaderElectionDriver.java            | 39 +++++++++++++---
 .../KubernetesLeaderRetrievalDriver.java           | 43 +++++++++++++----
 .../kubernetes/kubeclient/FlinkKubeClient.java     |  2 +-
 .../resources/AbstractKubernetesWatcher.java       | 16 ++++++-
 .../KubernetesTooOldResourceVersionException.java} | 36 ++++-----------
 .../KubernetesResourceManagerDriverTest.java       | 54 +++++++++++++++++++---
 .../KubernetesHighAvailabilityTestBase.java        |  7 ++-
 .../KubernetesLeaderElectionDriverTest.java        | 34 ++++++++++++++
 .../KubernetesLeaderRetrievalDriverTest.java       | 34 ++++++++++++++
 .../kubeclient/TestingFlinkKubeClient.java         |  9 +++-
 .../resources/KubernetesPodsWatcherTest.java       | 24 +++++++++-
 .../resources/NoOpWatchCallbackHandler.java        |  2 +-
 13 files changed, 277 insertions(+), 64 deletions(-)

diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
index 939890f..92bf348 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
@@ -29,6 +29,7 @@ import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
 import org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory;
 import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
@@ -80,6 +81,8 @@ public class KubernetesResourceManagerDriver
 
     private Optional<KubernetesWatch> podsWatchOpt;
 
+    private volatile boolean running;
+
     /**
      * Incompletion of this future indicates that there was a pod creation failure recently and the
      * driver should not retry creating pods until the future become completed again. It's
@@ -98,6 +101,7 @@ public class KubernetesResourceManagerDriver
         this.kubeClientFactory = Preconditions.checkNotNull(kubeClientFactory);
         this.requestResourceFutures = new HashMap<>();
         this.podCreationCoolDown = FutureUtils.completedVoidFuture();
+        this.running = false;
     }
 
     // ------------------------------------------------------------------------
@@ -108,17 +112,18 @@ public class KubernetesResourceManagerDriver
     protected void initializeInternal() throws Exception {
         kubeClientOpt =
                 Optional.of(kubeClientFactory.fromConfiguration(flinkConfig, getIoExecutor()));
-        podsWatchOpt =
-                Optional.of(
-                        getKubeClient()
-                                .watchPodsAndDoCallback(
-                                        KubernetesUtils.getTaskManagerLabels(clusterId),
-                                        new PodCallbackHandlerImpl()));
+        podsWatchOpt = watchTaskManagerPods();
         recoverWorkerNodesFromPreviousAttempts();
+        this.running = true;
     }
 
     @Override
     public CompletableFuture<Void> terminate() {
+        if (!running) {
+            return FutureUtils.completedVoidFuture();
+        }
+        running = false;
+
         // shut down all components
         Exception exception = null;
 
@@ -339,6 +344,14 @@ public class KubernetesResourceManagerDriver
         return kubeClientOpt.get();
     }
 
+    private Optional<KubernetesWatch> watchTaskManagerPods() {
+        return Optional.of(
+                getKubeClient()
+                        .watchPodsAndDoCallback(
+                                KubernetesUtils.getTaskManagerLabels(clusterId),
+                                new PodCallbackHandlerImpl()));
+    }
+
     // ------------------------------------------------------------------------
     //  FlinkKubeClient.WatchCallbackHandler
     // ------------------------------------------------------------------------
@@ -386,8 +399,20 @@ public class KubernetesResourceManagerDriver
         }
 
         @Override
-        public void handleFatalError(Throwable throwable) {
-            getResourceEventHandler().onError(throwable);
+        public void handleError(Throwable throwable) {
+            if (throwable instanceof KubernetesTooOldResourceVersionException) {
+                getMainThreadExecutor()
+                        .execute(
+                                () -> {
+                                    if (running) {
+                                        podsWatchOpt.ifPresent(KubernetesWatch::close);
+                                        log.info("Creating a new watch on TaskManager pods.");
+                                        podsWatchOpt = watchTaskManagerPods();
+                                    }
+                                });
+            } else {
+                getResourceEventHandler().onError(throwable);
+            }
         }
     }
 }
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java
index 2a47b1d..f622f4b 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java
@@ -23,6 +23,7 @@ import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import org.apache.flink.runtime.leaderelection.LeaderElectionDriver;
@@ -35,6 +36,8 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.GuardedBy;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -59,6 +62,8 @@ public class KubernetesLeaderElectionDriver implements LeaderElectionDriver {
 
     private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaderElectionDriver.class);
 
+    private final Object watchLock = new Object();
+
     private final FlinkKubeClient kubeClient;
 
     private final String configMapName;
@@ -72,14 +77,15 @@ public class KubernetesLeaderElectionDriver implements LeaderElectionDriver {
 
     private final LeaderElectionEventHandler leaderElectionEventHandler;
 
-    private final KubernetesWatch kubernetesWatch;
-
     private final FatalErrorHandler fatalErrorHandler;
 
     private final CountDownLatch configMapLatch = new CountDownLatch(1);
 
     private volatile boolean running;
 
+    @GuardedBy("watchLock")
+    private KubernetesWatch kubernetesWatch;
+
     public KubernetesLeaderElectionDriver(
             FlinkKubeClient kubeClient,
             KubernetesLeaderElectionConfiguration leaderConfig,
@@ -122,7 +128,12 @@ public class KubernetesLeaderElectionDriver implements LeaderElectionDriver {
 
         LOG.info("Closing {}.", this);
         leaderElector.stop();
-        kubernetesWatch.close();
+
+        synchronized (watchLock) {
+            if (kubernetesWatch != null) {
+                kubernetesWatch.close();
+            }
+        }
     }
 
     @Override
@@ -245,10 +256,24 @@ public class KubernetesLeaderElectionDriver implements LeaderElectionDriver {
         }
 
         @Override
-        public void handleFatalError(Throwable throwable) {
-            fatalErrorHandler.onFatalError(
-                    new LeaderElectionException(
-                            "Error while watching the ConfigMap " + configMapName, throwable));
+        public void handleError(Throwable throwable) {
+            if (throwable instanceof KubernetesTooOldResourceVersionException) {
+                synchronized (watchLock) {
+                    if (running) {
+                        if (kubernetesWatch != null) {
+                            kubernetesWatch.close();
+                        }
+                        LOG.info("Creating a new watch on ConfigMap {}.", configMapName);
+                        kubernetesWatch =
+                                kubeClient.watchConfigMaps(
+                                        configMapName, new ConfigMapCallbackHandlerImpl());
+                    }
+                }
+            } else {
+                fatalErrorHandler.onFatalError(
+                        new LeaderElectionException(
+                                "Error while watching the ConfigMap " + configMapName, throwable));
+            }
         }
     }
 
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
index be9335e..f866515 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.highavailability;
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalDriver;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalEventHandler;
@@ -29,6 +30,8 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.GuardedBy;
+
 import java.util.List;
 
 import static org.apache.flink.kubernetes.utils.KubernetesUtils.checkConfigMaps;
@@ -46,22 +49,27 @@ public class KubernetesLeaderRetrievalDriver implements LeaderRetrievalDriver {
     private static final Logger LOG =
             LoggerFactory.getLogger(KubernetesLeaderRetrievalDriver.class);
 
+    private final Object watchLock = new Object();
+
+    private final FlinkKubeClient kubeClient;
+
     private final String configMapName;
 
     private final LeaderRetrievalEventHandler leaderRetrievalEventHandler;
 
-    private final KubernetesWatch kubernetesWatch;
-
     private final FatalErrorHandler fatalErrorHandler;
 
     private volatile boolean running;
 
+    @GuardedBy("watchLock")
+    private KubernetesWatch kubernetesWatch;
+
     public KubernetesLeaderRetrievalDriver(
             FlinkKubeClient kubeClient,
             String configMapName,
             LeaderRetrievalEventHandler leaderRetrievalEventHandler,
             FatalErrorHandler fatalErrorHandler) {
-        checkNotNull(kubeClient, "Kubernetes client");
+        this.kubeClient = checkNotNull(kubeClient, "Kubernetes client");
         this.configMapName = checkNotNull(configMapName, "ConfigMap name");
         this.leaderRetrievalEventHandler =
                 checkNotNull(leaderRetrievalEventHandler, "LeaderRetrievalEventHandler");
@@ -81,7 +89,12 @@ public class KubernetesLeaderRetrievalDriver implements LeaderRetrievalDriver {
         running = false;
 
         LOG.info("Stopping {}.", this);
-        kubernetesWatch.close();
+
+        synchronized (watchLock) {
+            if (kubernetesWatch != null) {
+                kubernetesWatch.close();
+            }
+        }
     }
 
     private class ConfigMapCallbackHandlerImpl
@@ -114,10 +127,24 @@ public class KubernetesLeaderRetrievalDriver implements LeaderRetrievalDriver {
         }
 
         @Override
-        public void handleFatalError(Throwable throwable) {
-            fatalErrorHandler.onFatalError(
-                    new LeaderRetrievalException(
-                            "Error while watching the ConfigMap " + configMapName));
+        public void handleError(Throwable throwable) {
+            if (throwable instanceof KubernetesTooOldResourceVersionException) {
+                synchronized (watchLock) {
+                    if (running) {
+                        if (kubernetesWatch != null) {
+                            kubernetesWatch.close();
+                        }
+                        LOG.info("Creating a new watch on ConfigMap {}.", configMapName);
+                        kubernetesWatch =
+                                kubeClient.watchConfigMaps(
+                                        configMapName, new ConfigMapCallbackHandlerImpl());
+                    }
+                }
+            } else {
+                fatalErrorHandler.onFatalError(
+                        new LeaderRetrievalException(
+                                "Error while watching the ConfigMap " + configMapName, throwable));
+            }
         }
     }
 
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
index b405494..6897ac1 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
@@ -200,6 +200,6 @@ public interface FlinkKubeClient extends AutoCloseable {
 
         void onError(List<T> resources);
 
-        void handleFatalError(Throwable throwable);
+        void handleError(Throwable throwable);
     }
 }
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/AbstractKubernetesWatcher.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/AbstractKubernetesWatcher.java
index 9eae5df..2dfb19f 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/AbstractKubernetesWatcher.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/AbstractKubernetesWatcher.java
@@ -26,6 +26,8 @@ import io.fabric8.kubernetes.client.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.net.HttpURLConnection.HTTP_GONE;
+
 /** Watcher for resources in Kubernetes. */
 public abstract class AbstractKubernetesWatcher<
                 T extends HasMetadata, K extends KubernetesResource<T>>
@@ -45,7 +47,19 @@ public abstract class AbstractKubernetesWatcher<
         if (cause == null) {
             logger.info("The watcher is closing.");
         } else {
-            callbackHandler.handleFatalError(cause);
+            // Fabric8 Kubernetes client will directly close the watcher when received a HTTP_GONE
+            // status code, so this should be handled by the caller. Refer to
+            // https://github.com/fabric8io/kubernetes-client/blob/v4.9.2/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java#L255
+            // for more information about the implementation.
+            if (cause.getCode() == HTTP_GONE) {
+                logger.debug(
+                        "Got a http code 'HTTP_GONE' which means the Kubernetes client has the "
+                                + "too old resource version.",
+                        cause);
+                callbackHandler.handleError(new KubernetesTooOldResourceVersionException(cause));
+            } else {
+                callbackHandler.handleError(cause);
+            }
         }
     }
 }
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesTooOldResourceVersionException.java
similarity index 55%
copy from flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java
copy to flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesTooOldResourceVersionException.java
index 6e64a4d..a978286 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesTooOldResourceVersionException.java
@@ -18,38 +18,18 @@
 
 package org.apache.flink.kubernetes.kubeclient.resources;
 
-import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.util.FlinkException;
 
-import java.util.List;
+/** Kubernetes too old resource version exception. */
+public class KubernetesTooOldResourceVersionException extends FlinkException {
 
-/**
- * Empty implementation of {@link FlinkKubeClient.WatchCallbackHandler}.
- *
- * @param <T> Type of resource to be watched
- */
-public class NoOpWatchCallbackHandler<T> implements FlinkKubeClient.WatchCallbackHandler<T> {
-    @Override
-    public void onAdded(List<T> resources) {
-        // noop
-    }
-
-    @Override
-    public void onModified(List<T> resources) {
-        // noop
-    }
-
-    @Override
-    public void onDeleted(List<T> resources) {
-        // noop
-    }
+    private static final long serialVersionUID = 1L;
 
-    @Override
-    public void onError(List<T> resources) {
-        // noop
+    public KubernetesTooOldResourceVersionException(Throwable cause) {
+        super(cause);
     }
 
-    @Override
-    public void handleFatalError(Throwable throwable) {
-        // noop
+    public KubernetesTooOldResourceVersionException(String message, Throwable cause) {
+        super(message, cause);
     }
 }
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java
index 330ae55..d02eb41 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.kubernetes;
 
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
@@ -27,6 +28,7 @@ import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandl
 import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.TestingKubeClientFactory;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
 import org.apache.flink.kubernetes.kubeclient.resources.TestingKubernetesPod;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
@@ -34,10 +36,12 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
 import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriverTestBase;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 
 import io.fabric8.kubernetes.api.model.ResourceRequirements;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -149,7 +153,7 @@ public class KubernetesResourceManagerDriverTest
                 runTest(
                         () -> {
                             final Throwable testingError = new Throwable("testing error");
-                            getPodCallbackHandler().handleFatalError(testingError);
+                            getPodCallbackHandler().handleError(testingError);
                             assertThat(
                                     onErrorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
                                     is(testingError));
@@ -240,6 +244,31 @@ public class KubernetesResourceManagerDriverTest
         };
     }
 
+    @Test
+    public void testNewWatchCreationWhenKubernetesTooOldResourceVersionException()
+            throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            getPodCallbackHandler()
+                                    .handleError(
+                                            new KubernetesTooOldResourceVersionException(
+                                                    new Exception("too old resource version")));
+                            // Verify the old watch is closed and a new one is created
+                            CommonTestUtils.waitUntilCondition(
+                                    () -> getPodsWatches().size() == 2,
+                                    Deadline.fromNow(Duration.ofSeconds(TIMEOUT_SEC)),
+                                    String.format(
+                                            "New watch is not created in %s seconds.",
+                                            TIMEOUT_SEC));
+                            assertThat(getPodsWatches().get(0).isClosed(), is(true));
+                            assertThat(getPodsWatches().get(1).isClosed(), is(false));
+                        });
+            }
+        };
+    }
+
     @Override
     protected ResourceManagerDriverTestBase<KubernetesWorkerNode>.Context createContext() {
         return new Context();
@@ -253,6 +282,9 @@ public class KubernetesResourceManagerDriverTest
                 setWatchPodsAndDoCallbackFuture = new CompletableFuture<>();
         private final CompletableFuture<Void> closeKubernetesWatchFuture =
                 new CompletableFuture<>();
+
+        private final List<TestingFlinkKubeClient.MockKubernetesWatch> podsWatches =
+                new ArrayList<>();
         private final CompletableFuture<String> stopAndCleanupClusterFuture =
                 new CompletableFuture<>();
         private final CompletableFuture<KubernetesPod> createTaskManagerPodFuture =
@@ -264,12 +296,16 @@ public class KubernetesResourceManagerDriverTest
                         .setWatchPodsAndDoCallbackFunction(
                                 (ignore, handler) -> {
                                     setWatchPodsAndDoCallbackFuture.complete(handler);
-                                    return new TestingFlinkKubeClient.MockKubernetesWatch() {
-                                        @Override
-                                        public void close() {
-                                            closeKubernetesWatchFuture.complete(null);
-                                        }
-                                    };
+                                    final TestingFlinkKubeClient.MockKubernetesWatch watch =
+                                            new TestingFlinkKubeClient.MockKubernetesWatch() {
+                                                @Override
+                                                public void close() {
+                                                    super.close();
+                                                    closeKubernetesWatchFuture.complete(null);
+                                                }
+                                            };
+                                    podsWatches.add(watch);
+                                    return watch;
                                 })
                         .setStopAndCleanupClusterConsumer(stopAndCleanupClusterFuture::complete)
                         .setCreateTaskManagerPodFunction(
@@ -295,6 +331,10 @@ public class KubernetesResourceManagerDriverTest
             return null;
         }
 
+        List<TestingFlinkKubeClient.MockKubernetesWatch> getPodsWatches() {
+            return podsWatches;
+        }
+
         @Override
         protected void prepareRunTest() {
             flinkConfig.setString(KubernetesConfigOptions.CLUSTER_ID, CLUSTER_ID);
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
index cecb861..4d7928b 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
@@ -92,6 +92,8 @@ public class KubernetesHighAvailabilityTestBase extends TestLogger {
         final List<CompletableFuture<FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap>>>
                 configMapCallbackFutures = new ArrayList<>();
 
+        final List<TestingFlinkKubeClient.MockKubernetesWatch> configMapWatches = new ArrayList<>();
+
         final CompletableFuture<Map<String, String>> deleteConfigMapByLabelsFuture =
                 new CompletableFuture<>();
         final CompletableFuture<Void> closeKubeClientFuture = new CompletableFuture<>();
@@ -205,7 +207,10 @@ public class KubernetesHighAvailabilityTestBase extends TestLogger {
                                                 new TestingFlinkKubeClient.MockKubernetesConfigMap(
                                                         LEADER_CONFIGMAP_NAME)));
                                 configMapCallbackFutures.add(future);
-                                return new TestingFlinkKubeClient.MockKubernetesWatch();
+                                final TestingFlinkKubeClient.MockKubernetesWatch watch =
+                                        new TestingFlinkKubeClient.MockKubernetesWatch();
+                                configMapWatches.add(watch);
+                                return watch;
                             })
                     .setDeleteConfigMapFunction(
                             name -> {
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriverTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriverTest.java
index 049e2f9..c681d34 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriverTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriverTest.java
@@ -20,7 +20,9 @@ package org.apache.flink.kubernetes.highavailability;
 
 import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
 import org.apache.flink.runtime.leaderelection.LeaderInformation;
 
 import org.junit.Test;
@@ -28,11 +30,13 @@ import org.junit.Test;
 import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
 import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_KEY;
 import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
 import static org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
@@ -254,4 +258,34 @@ public class KubernetesLeaderElectionDriverTest extends KubernetesHighAvailabili
             }
         };
     }
+
+    @Test
+    public void testNewWatchCreationWhenKubernetesTooOldResourceVersionException()
+            throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap>
+                                    callbackHandler = getLeaderElectionConfigMapCallback();
+                            callbackHandler.handleError(
+                                    new KubernetesTooOldResourceVersionException(
+                                            new Exception("too old resource version")));
+                            // Verify the old watch is closed and a new one is created
+                            assertThat(configMapWatches.size(), is(3));
+                            // The three watches are [old-leader-election-watch,
+                            // leader-retrieval-watch, new-leader-election-watch]
+                            assertThat(
+                                    configMapWatches.stream()
+                                            .map(
+                                                    TestingFlinkKubeClient.MockKubernetesWatch
+                                                            ::isClosed)
+                                            .collect(Collectors.toList()),
+                                    contains(true, false, false));
+                        });
+            }
+        };
+    }
 }
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriverTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriverTest.java
index ef97652..8febce4 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriverTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriverTest.java
@@ -20,13 +20,17 @@ package org.apache.flink.kubernetes.highavailability;
 
 import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
 import org.apache.flink.kubernetes.utils.Constants;
 
 import org.junit.Test;
 
 import java.util.Collections;
+import java.util.stream.Collectors;
 
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
@@ -103,4 +107,34 @@ public class KubernetesLeaderRetrievalDriverTest extends KubernetesHighAvailabil
             }
         };
     }
+
+    @Test
+    public void testNewWatchCreationWhenKubernetesTooOldResourceVersionException()
+            throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap>
+                                    callbackHandler = getLeaderRetrievalConfigMapCallback();
+                            callbackHandler.handleError(
+                                    new KubernetesTooOldResourceVersionException(
+                                            new Exception("too old resource version")));
+                            // Verify the old watch is closed and a new one is created
+                            assertThat(configMapWatches.size(), is(3));
+                            // The three watches are [leader-election-watch,
+                            // old-leader-retrieval-watch, new-leader-retrieval-watch]
+                            assertThat(
+                                    configMapWatches.stream()
+                                            .map(
+                                                    TestingFlinkKubeClient.MockKubernetesWatch
+                                                            ::isClosed)
+                                            .collect(Collectors.toList()),
+                                    contains(false, true, false));
+                        });
+            }
+        };
+    }
 }
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
index c046613..90b4f5d 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
@@ -361,13 +361,20 @@ public class TestingFlinkKubeClient implements FlinkKubeClient {
 
     /** Testing implementation of {@link KubernetesWatch}. */
     public static class MockKubernetesWatch extends KubernetesWatch {
+        private boolean isClosed;
+
         public MockKubernetesWatch() {
             super(null);
+            this.isClosed = false;
         }
 
         @Override
         public void close() {
-            // noop
+            this.isClosed = true;
+        }
+
+        public boolean isClosed() {
+            return isClosed;
         }
     }
 
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java
index 2edf7cc..936c8c3 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java
@@ -18,12 +18,15 @@
 
 package org.apache.flink.kubernetes.kubeclient.resources;
 
+import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import org.apache.flink.util.TestLogger;
 
+import io.fabric8.kubernetes.api.model.StatusBuilder;
 import io.fabric8.kubernetes.client.KubernetesClientException;
 import io.fabric8.kubernetes.client.Watcher;
+import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -32,6 +35,7 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
+import static java.net.HttpURLConnection.HTTP_GONE;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
@@ -76,6 +80,24 @@ public class KubernetesPodsWatcherTest extends TestLogger {
         assertThat(podErrorList.size(), is(1));
     }
 
+    @Test
+    public void testClosingWithTooOldResourceVersion() {
+        final String errMsg = "too old resource version";
+        final KubernetesPodsWatcher podsWatcher =
+                new KubernetesPodsWatcher(
+                        new TestingCallbackHandler(
+                                e -> {
+                                    assertThat(
+                                            e,
+                                            Matchers.instanceOf(
+                                                    KubernetesTooOldResourceVersionException
+                                                            .class));
+                                    assertThat(e, FlinkMatchers.containsMessage(errMsg));
+                                }));
+        podsWatcher.onClose(
+                new KubernetesClientException(errMsg, HTTP_GONE, new StatusBuilder().build()));
+    }
+
     private class TestingCallbackHandler
             implements FlinkKubeClient.WatchCallbackHandler<KubernetesPod> {
 
@@ -106,7 +128,7 @@ public class KubernetesPodsWatcherTest extends TestLogger {
         }
 
         @Override
-        public void handleFatalError(Throwable throwable) {
+        public void handleError(Throwable throwable) {
             consumer.accept(throwable);
         }
     }
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java
index 6e64a4d..8598886 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java
@@ -49,7 +49,7 @@ public class NoOpWatchCallbackHandler<T> implements FlinkKubeClient.WatchCallbac
     }
 
     @Override
-    public void handleFatalError(Throwable throwable) {
+    public void handleError(Throwable throwable) {
         // noop
     }
 }