You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/08 16:29:07 UTC
[flink] branch master updated: [FLINK-20417][k8s] Create a new
watcher when the old one is closed with HTTP_GONE
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new eccbfe6 [FLINK-20417][k8s] Create a new watcher when the old one is closed with HTTP_GONE
eccbfe6 is described below
commit eccbfe670f9c65325baec8d2f0af6a0a71715b82
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Tue Feb 2 20:14:33 2021 +0800
[FLINK-20417][k8s] Create a new watcher when the old one is closed with HTTP_GONE
This closes #14837.
---
.../KubernetesResourceManagerDriver.java | 41 ++++++++++++----
.../KubernetesLeaderElectionDriver.java | 39 ++++++++++++---
.../KubernetesLeaderRetrievalDriver.java | 43 +++++++++++++----
.../kubernetes/kubeclient/FlinkKubeClient.java | 2 +-
.../resources/AbstractKubernetesWatcher.java | 16 ++++++-
.../KubernetesTooOldResourceVersionException.java} | 36 ++++----------
.../KubernetesResourceManagerDriverTest.java | 55 +++++++++++++++++++---
.../KubernetesHighAvailabilityTestBase.java | 7 ++-
.../KubernetesLeaderElectionDriverTest.java | 34 +++++++++++++
.../KubernetesLeaderRetrievalDriverTest.java | 34 +++++++++++++
.../kubeclient/TestingFlinkKubeClient.java | 9 +++-
.../resources/KubernetesPodsWatcherTest.java | 24 +++++++++-
.../resources/NoOpWatchCallbackHandler.java | 2 +-
13 files changed, 278 insertions(+), 64 deletions(-)
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
index bbf5d04..0ec7aad 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
@@ -28,6 +28,7 @@ import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
import org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
@@ -77,6 +78,8 @@ public class KubernetesResourceManagerDriver
private Optional<KubernetesWatch> podsWatchOpt;
+ private volatile boolean running;
+
/**
* Incompletion of this future indicates that there was a pod creation failure recently and the
* driver should not retry creating pods until the future become completed again. It's
@@ -93,6 +96,7 @@ public class KubernetesResourceManagerDriver
this.kubeClientFactory = Preconditions.checkNotNull(kubeClientFactory);
this.requestResourceFutures = new HashMap<>();
this.podCreationCoolDown = FutureUtils.completedVoidFuture();
+ this.running = false;
}
// ------------------------------------------------------------------------
@@ -103,17 +107,18 @@ public class KubernetesResourceManagerDriver
protected void initializeInternal() throws Exception {
kubeClientOpt =
Optional.of(kubeClientFactory.fromConfiguration(flinkConfig, getIoExecutor()));
- podsWatchOpt =
- Optional.of(
- getKubeClient()
- .watchPodsAndDoCallback(
- KubernetesUtils.getTaskManagerLabels(clusterId),
- new PodCallbackHandlerImpl()));
+ podsWatchOpt = watchTaskManagerPods();
recoverWorkerNodesFromPreviousAttempts();
+ this.running = true;
}
@Override
public CompletableFuture<Void> terminate() {
+ if (!running) {
+ return FutureUtils.completedVoidFuture();
+ }
+ running = false;
+
// shut down all components
Exception exception = null;
@@ -312,6 +317,14 @@ public class KubernetesResourceManagerDriver
return kubeClientOpt.get();
}
+ private Optional<KubernetesWatch> watchTaskManagerPods() {
+ return Optional.of(
+ getKubeClient()
+ .watchPodsAndDoCallback(
+ KubernetesUtils.getTaskManagerLabels(clusterId),
+ new PodCallbackHandlerImpl()));
+ }
+
// ------------------------------------------------------------------------
// FlinkKubeClient.WatchCallbackHandler
// ------------------------------------------------------------------------
@@ -359,8 +372,20 @@ public class KubernetesResourceManagerDriver
}
@Override
- public void handleFatalError(Throwable throwable) {
- getResourceEventHandler().onError(throwable);
+ public void handleError(Throwable throwable) {
+ if (throwable instanceof KubernetesTooOldResourceVersionException) {
+ getMainThreadExecutor()
+ .execute(
+ () -> {
+ if (running) {
+ podsWatchOpt.ifPresent(KubernetesWatch::close);
+ log.info("Creating a new watch on TaskManager pods.");
+ podsWatchOpt = watchTaskManagerPods();
+ }
+ });
+ } else {
+ getResourceEventHandler().onError(throwable);
+ }
}
}
}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java
index 2ee59f2..c5ab048 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java
@@ -23,6 +23,7 @@ import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.leaderelection.LeaderElectionDriver;
@@ -34,6 +35,8 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.concurrent.GuardedBy;
+
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -57,6 +60,8 @@ public class KubernetesLeaderElectionDriver implements LeaderElectionDriver {
private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaderElectionDriver.class);
+ private final Object watchLock = new Object();
+
private final FlinkKubeClient kubeClient;
private final String configMapName;
@@ -70,12 +75,13 @@ public class KubernetesLeaderElectionDriver implements LeaderElectionDriver {
private final LeaderElectionEventHandler leaderElectionEventHandler;
- private final KubernetesWatch kubernetesWatch;
-
private final FatalErrorHandler fatalErrorHandler;
private volatile boolean running;
+ @GuardedBy("watchLock")
+ private KubernetesWatch kubernetesWatch;
+
public KubernetesLeaderElectionDriver(
FlinkKubeClient kubeClient,
KubernetesLeaderElectionConfiguration leaderConfig,
@@ -111,7 +117,12 @@ public class KubernetesLeaderElectionDriver implements LeaderElectionDriver {
LOG.info("Closing {}.", this);
leaderElector.stop();
- kubernetesWatch.close();
+
+ synchronized (watchLock) {
+ if (kubernetesWatch != null) {
+ kubernetesWatch.close();
+ }
+ }
}
@Override
@@ -234,10 +245,24 @@ public class KubernetesLeaderElectionDriver implements LeaderElectionDriver {
}
@Override
- public void handleFatalError(Throwable throwable) {
- fatalErrorHandler.onFatalError(
- new LeaderElectionException(
- "Error while watching the ConfigMap " + configMapName, throwable));
+ public void handleError(Throwable throwable) {
+ if (throwable instanceof KubernetesTooOldResourceVersionException) {
+ synchronized (watchLock) {
+ if (running) {
+ if (kubernetesWatch != null) {
+ kubernetesWatch.close();
+ }
+ LOG.info("Creating a new watch on ConfigMap {}.", configMapName);
+ kubernetesWatch =
+ kubeClient.watchConfigMaps(
+ configMapName, new ConfigMapCallbackHandlerImpl());
+ }
+ }
+ } else {
+ fatalErrorHandler.onFatalError(
+ new LeaderElectionException(
+ "Error while watching the ConfigMap " + configMapName, throwable));
+ }
}
}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
index be9335e..f866515 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.highavailability;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalDriver;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalEventHandler;
@@ -29,6 +30,8 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.concurrent.GuardedBy;
+
import java.util.List;
import static org.apache.flink.kubernetes.utils.KubernetesUtils.checkConfigMaps;
@@ -46,22 +49,27 @@ public class KubernetesLeaderRetrievalDriver implements LeaderRetrievalDriver {
private static final Logger LOG =
LoggerFactory.getLogger(KubernetesLeaderRetrievalDriver.class);
+ private final Object watchLock = new Object();
+
+ private final FlinkKubeClient kubeClient;
+
private final String configMapName;
private final LeaderRetrievalEventHandler leaderRetrievalEventHandler;
- private final KubernetesWatch kubernetesWatch;
-
private final FatalErrorHandler fatalErrorHandler;
private volatile boolean running;
+ @GuardedBy("watchLock")
+ private KubernetesWatch kubernetesWatch;
+
public KubernetesLeaderRetrievalDriver(
FlinkKubeClient kubeClient,
String configMapName,
LeaderRetrievalEventHandler leaderRetrievalEventHandler,
FatalErrorHandler fatalErrorHandler) {
- checkNotNull(kubeClient, "Kubernetes client");
+ this.kubeClient = checkNotNull(kubeClient, "Kubernetes client");
this.configMapName = checkNotNull(configMapName, "ConfigMap name");
this.leaderRetrievalEventHandler =
checkNotNull(leaderRetrievalEventHandler, "LeaderRetrievalEventHandler");
@@ -81,7 +89,12 @@ public class KubernetesLeaderRetrievalDriver implements LeaderRetrievalDriver {
running = false;
LOG.info("Stopping {}.", this);
- kubernetesWatch.close();
+
+ synchronized (watchLock) {
+ if (kubernetesWatch != null) {
+ kubernetesWatch.close();
+ }
+ }
}
private class ConfigMapCallbackHandlerImpl
@@ -114,10 +127,24 @@ public class KubernetesLeaderRetrievalDriver implements LeaderRetrievalDriver {
}
@Override
- public void handleFatalError(Throwable throwable) {
- fatalErrorHandler.onFatalError(
- new LeaderRetrievalException(
- "Error while watching the ConfigMap " + configMapName));
+ public void handleError(Throwable throwable) {
+ if (throwable instanceof KubernetesTooOldResourceVersionException) {
+ synchronized (watchLock) {
+ if (running) {
+ if (kubernetesWatch != null) {
+ kubernetesWatch.close();
+ }
+ LOG.info("Creating a new watch on ConfigMap {}.", configMapName);
+ kubernetesWatch =
+ kubeClient.watchConfigMaps(
+ configMapName, new ConfigMapCallbackHandlerImpl());
+ }
+ }
+ } else {
+ fatalErrorHandler.onFatalError(
+ new LeaderRetrievalException(
+ "Error while watching the ConfigMap " + configMapName, throwable));
+ }
}
}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
index 5f899f8..8b4d4fe 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
@@ -197,6 +197,6 @@ public interface FlinkKubeClient extends AutoCloseable {
void onError(List<T> resources);
- void handleFatalError(Throwable throwable);
+ void handleError(Throwable throwable);
}
}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/AbstractKubernetesWatcher.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/AbstractKubernetesWatcher.java
index 9eae5df..2dfb19f 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/AbstractKubernetesWatcher.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/AbstractKubernetesWatcher.java
@@ -26,6 +26,8 @@ import io.fabric8.kubernetes.client.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.net.HttpURLConnection.HTTP_GONE;
+
/** Watcher for resources in Kubernetes. */
public abstract class AbstractKubernetesWatcher<
T extends HasMetadata, K extends KubernetesResource<T>>
@@ -45,7 +47,19 @@ public abstract class AbstractKubernetesWatcher<
if (cause == null) {
logger.info("The watcher is closing.");
} else {
- callbackHandler.handleFatalError(cause);
+ // Fabric8 Kubernetes client will directly close the watcher when received a HTTP_GONE
+ // status code, so this should be handled by the caller. Refer to
+ // https://github.com/fabric8io/kubernetes-client/blob/v4.9.2/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java#L255
+ // for more information about the implementation.
+ if (cause.getCode() == HTTP_GONE) {
+ logger.debug(
+ "Got a http code 'HTTP_GONE' which means the Kubernetes client has the "
+ + "too old resource version.",
+ cause);
+ callbackHandler.handleError(new KubernetesTooOldResourceVersionException(cause));
+ } else {
+ callbackHandler.handleError(cause);
+ }
}
}
}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesTooOldResourceVersionException.java
similarity index 55%
copy from flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java
copy to flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesTooOldResourceVersionException.java
index 6e64a4d..a978286 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesTooOldResourceVersionException.java
@@ -18,38 +18,18 @@
package org.apache.flink.kubernetes.kubeclient.resources;
-import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.util.FlinkException;
-import java.util.List;
+/** Kubernetes too old resource version exception. */
+public class KubernetesTooOldResourceVersionException extends FlinkException {
-/**
- * Empty implementation of {@link FlinkKubeClient.WatchCallbackHandler}.
- *
- * @param <T> Type of resource to be watched
- */
-public class NoOpWatchCallbackHandler<T> implements FlinkKubeClient.WatchCallbackHandler<T> {
- @Override
- public void onAdded(List<T> resources) {
- // noop
- }
-
- @Override
- public void onModified(List<T> resources) {
- // noop
- }
-
- @Override
- public void onDeleted(List<T> resources) {
- // noop
- }
+ private static final long serialVersionUID = 1L;
- @Override
- public void onError(List<T> resources) {
- // noop
+ public KubernetesTooOldResourceVersionException(Throwable cause) {
+ super(cause);
}
- @Override
- public void handleFatalError(Throwable throwable) {
- // noop
+ public KubernetesTooOldResourceVersionException(String message, Throwable cause) {
+ super(message, cause);
}
}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java
index 6d9b869..5406321 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes;
+import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerDriverConfiguration;
@@ -26,6 +27,7 @@ import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandl
import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.TestingKubeClientFactory;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
import org.apache.flink.kubernetes.kubeclient.resources.TestingKubernetesPod;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
@@ -33,10 +35,13 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriverTestBase;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import org.junit.Test;
+import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -143,7 +148,7 @@ public class KubernetesResourceManagerDriverTest
runTest(
() -> {
final Throwable testingError = new Throwable("testing error");
- getPodCallbackHandler().handleFatalError(testingError);
+ getPodCallbackHandler().handleError(testingError);
assertThat(
onErrorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
is(testingError));
@@ -189,6 +194,31 @@ public class KubernetesResourceManagerDriverTest
};
}
+ @Test
+ public void testNewWatchCreationWhenKubernetesTooOldResourceVersionException()
+ throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ getPodCallbackHandler()
+ .handleError(
+ new KubernetesTooOldResourceVersionException(
+ new Exception("too old resource version")));
+ // Verify the old watch is closed and a new one is created
+ CommonTestUtils.waitUntilCondition(
+ () -> getPodsWatches().size() == 2,
+ Deadline.fromNow(Duration.ofSeconds(TIMEOUT_SEC)),
+ String.format(
+ "New watch is not created in %s seconds.",
+ TIMEOUT_SEC));
+ assertThat(getPodsWatches().get(0).isClosed(), is(true));
+ assertThat(getPodsWatches().get(1).isClosed(), is(false));
+ });
+ }
+ };
+ }
+
@Override
protected ResourceManagerDriverTestBase<KubernetesWorkerNode>.Context createContext() {
return new Context();
@@ -202,6 +232,9 @@ public class KubernetesResourceManagerDriverTest
setWatchPodsAndDoCallbackFuture = new CompletableFuture<>();
private final CompletableFuture<Void> closeKubernetesWatchFuture =
new CompletableFuture<>();
+
+ private final List<TestingFlinkKubeClient.MockKubernetesWatch> podsWatches =
+ new ArrayList<>();
private final CompletableFuture<String> stopAndCleanupClusterFuture =
new CompletableFuture<>();
private final CompletableFuture<KubernetesPod> createTaskManagerPodFuture =
@@ -213,12 +246,16 @@ public class KubernetesResourceManagerDriverTest
.setWatchPodsAndDoCallbackFunction(
(ignore, handler) -> {
setWatchPodsAndDoCallbackFuture.complete(handler);
- return new TestingFlinkKubeClient.MockKubernetesWatch() {
- @Override
- public void close() {
- closeKubernetesWatchFuture.complete(null);
- }
- };
+ final TestingFlinkKubeClient.MockKubernetesWatch watch =
+ new TestingFlinkKubeClient.MockKubernetesWatch() {
+ @Override
+ public void close() {
+ super.close();
+ closeKubernetesWatchFuture.complete(null);
+ }
+ };
+ podsWatches.add(watch);
+ return watch;
})
.setStopAndCleanupClusterConsumer(stopAndCleanupClusterFuture::complete)
.setCreateTaskManagerPodFunction(
@@ -244,6 +281,10 @@ public class KubernetesResourceManagerDriverTest
return null;
}
+ List<TestingFlinkKubeClient.MockKubernetesWatch> getPodsWatches() {
+ return podsWatches;
+ }
+
@Override
protected void prepareRunTest() {
flinkConfig.setString(KubernetesConfigOptions.CLUSTER_ID, CLUSTER_ID);
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
index efb0cf0..18aad55 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
@@ -91,6 +91,8 @@ public class KubernetesHighAvailabilityTestBase extends TestLogger {
final List<CompletableFuture<FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap>>>
configMapCallbackFutures = new ArrayList<>();
+ final List<TestingFlinkKubeClient.MockKubernetesWatch> configMapWatches = new ArrayList<>();
+
final CompletableFuture<Map<String, String>> deleteConfigMapByLabelsFuture =
new CompletableFuture<>();
final CompletableFuture<Void> closeKubeClientFuture = new CompletableFuture<>();
@@ -200,7 +202,10 @@ public class KubernetesHighAvailabilityTestBase extends TestLogger {
KubernetesConfigMap>>
future = CompletableFuture.completedFuture(handler);
configMapCallbackFutures.add(future);
- return new TestingFlinkKubeClient.MockKubernetesWatch();
+ final TestingFlinkKubeClient.MockKubernetesWatch watch =
+ new TestingFlinkKubeClient.MockKubernetesWatch();
+ configMapWatches.add(watch);
+ return watch;
})
.setDeleteConfigMapFunction(
name -> {
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriverTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriverTest.java
index 049e2f9..c681d34 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriverTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriverTest.java
@@ -20,7 +20,9 @@ package org.apache.flink.kubernetes.highavailability;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.junit.Test;
@@ -28,11 +30,13 @@ import org.junit.Test;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
+import java.util.stream.Collectors;
import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_KEY;
import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
import static org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@@ -254,4 +258,34 @@ public class KubernetesLeaderElectionDriverTest extends KubernetesHighAvailabili
}
};
}
+
+ @Test
+ public void testNewWatchCreationWhenKubernetesTooOldResourceVersionException()
+ throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap>
+ callbackHandler = getLeaderElectionConfigMapCallback();
+ callbackHandler.handleError(
+ new KubernetesTooOldResourceVersionException(
+ new Exception("too old resource version")));
+ // Verify the old watch is closed and a new one is created
+ assertThat(configMapWatches.size(), is(3));
+ // The three watches are [old-leader-election-watch,
+ // leader-retrieval-watch, new-leader-election-watch]
+ assertThat(
+ configMapWatches.stream()
+ .map(
+ TestingFlinkKubeClient.MockKubernetesWatch
+ ::isClosed)
+ .collect(Collectors.toList()),
+ contains(true, false, false));
+ });
+ }
+ };
+ }
}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriverTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriverTest.java
index ef97652..8febce4 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriverTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriverTest.java
@@ -20,13 +20,17 @@ package org.apache.flink.kubernetes.highavailability;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
import org.apache.flink.kubernetes.utils.Constants;
import org.junit.Test;
import java.util.Collections;
+import java.util.stream.Collectors;
+import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
@@ -103,4 +107,34 @@ public class KubernetesLeaderRetrievalDriverTest extends KubernetesHighAvailabil
}
};
}
+
+ @Test
+ public void testNewWatchCreationWhenKubernetesTooOldResourceVersionException()
+ throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap>
+ callbackHandler = getLeaderRetrievalConfigMapCallback();
+ callbackHandler.handleError(
+ new KubernetesTooOldResourceVersionException(
+ new Exception("too old resource version")));
+ // Verify the old watch is closed and a new one is created
+ assertThat(configMapWatches.size(), is(3));
+ // The three watches are [leader-election-watch,
+ // old-leader-retrieval-watch, new-leader-retrieval-watch]
+ assertThat(
+ configMapWatches.stream()
+ .map(
+ TestingFlinkKubeClient.MockKubernetesWatch
+ ::isClosed)
+ .collect(Collectors.toList()),
+ contains(false, true, false));
+ });
+ }
+ };
+ }
}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
index d637955..ae0093d 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
@@ -356,13 +356,20 @@ public class TestingFlinkKubeClient implements FlinkKubeClient {
/** Testing implementation of {@link KubernetesWatch}. */
public static class MockKubernetesWatch extends KubernetesWatch {
+ private boolean isClosed;
+
public MockKubernetesWatch() {
super(null);
+ this.isClosed = false;
}
@Override
public void close() {
- // noop
+ this.isClosed = true;
+ }
+
+ public boolean isClosed() {
+ return isClosed;
}
}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java
index 2edf7cc..936c8c3 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java
@@ -18,12 +18,15 @@
package org.apache.flink.kubernetes.kubeclient.resources;
+import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.util.TestLogger;
+import io.fabric8.kubernetes.api.model.StatusBuilder;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watcher;
+import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
@@ -32,6 +35,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
+import static java.net.HttpURLConnection.HTTP_GONE;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
@@ -76,6 +80,24 @@ public class KubernetesPodsWatcherTest extends TestLogger {
assertThat(podErrorList.size(), is(1));
}
+ @Test
+ public void testClosingWithTooOldResourceVersion() {
+ final String errMsg = "too old resource version";
+ final KubernetesPodsWatcher podsWatcher =
+ new KubernetesPodsWatcher(
+ new TestingCallbackHandler(
+ e -> {
+ assertThat(
+ e,
+ Matchers.instanceOf(
+ KubernetesTooOldResourceVersionException
+ .class));
+ assertThat(e, FlinkMatchers.containsMessage(errMsg));
+ }));
+ podsWatcher.onClose(
+ new KubernetesClientException(errMsg, HTTP_GONE, new StatusBuilder().build()));
+ }
+
private class TestingCallbackHandler
implements FlinkKubeClient.WatchCallbackHandler<KubernetesPod> {
@@ -106,7 +128,7 @@ public class KubernetesPodsWatcherTest extends TestLogger {
}
@Override
- public void handleFatalError(Throwable throwable) {
+ public void handleError(Throwable throwable) {
consumer.accept(throwable);
}
}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java
index 6e64a4d..8598886 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java
@@ -49,7 +49,7 @@ public class NoOpWatchCallbackHandler<T> implements FlinkKubeClient.WatchCallbac
}
@Override
- public void handleFatalError(Throwable throwable) {
+ public void handleError(Throwable throwable) {
// noop
}
}