You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/05/16 12:57:28 UTC

[flink] branch release-1.10 updated: [FLINK-15836][k8s] Throw fatal error in KubernetesResourceManager when the pods watcher is closed with exception

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 8a2b13c  [FLINK-15836][k8s] Throw fatal error in KubernetesResourceManager when the pods watcher is closed with exception
8a2b13c is described below

commit 8a2b13c6308a5a2ce6657c940baa3b20da8b85c0
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Mon Mar 16 16:08:13 2020 +0800

    [FLINK-15836][k8s] Throw fatal error in KubernetesResourceManager when the pods watcher is closed with exception
    
    By default the watcher will always reconnect in Kubernetes client internally. However, if the watchReconnectLimit is configured by users via java properties or environment, the watcher may be stopped. Then all the changes of pods will not be processed properly. The reason why the the watcher closed exceptionally is usually because of network problems or port abuse. The correct way is to fail the jobmanager pod and retry in a new one.
    
    This closes #11010.
---
 .../kubernetes/KubernetesResourceManager.java      |  23 +++-
 .../kubeclient/Fabric8FlinkKubeClient.java         |  40 ++-----
 .../kubernetes/kubeclient/FlinkKubeClient.java     |   8 +-
 .../resources/KubernetesPodsWatcher.java           |  79 ++++++++++++++
 .../kubeclient/resources/KubernetesWatch.java      |  37 +++++++
 .../resources/KubernetesPodsWatcherTest.java       | 121 +++++++++++++++++++++
 6 files changed, 270 insertions(+), 38 deletions(-)

diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
index 04ac00b..35cd598 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
@@ -26,6 +26,7 @@ import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfig
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
 import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
@@ -47,6 +48,7 @@ import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerExcept
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -96,6 +98,8 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
 	/** The number of pods requested, but not yet granted. */
 	private int numPendingPodRequests = 0;
 
+	private KubernetesWatch podsWatch;
+
 	public KubernetesResourceManager(
 			RpcService rpcService,
 			String resourceManagerEndpointId,
@@ -144,21 +148,27 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
 	protected void initialize() throws ResourceManagerException {
 		recoverWorkerNodesFromPreviousAttempts();
 
-		kubeClient.watchPodsAndDoCallback(getTaskManagerLabels(), this);
+		podsWatch = kubeClient.watchPodsAndDoCallback(getTaskManagerLabels(), this);
 	}
 
 	@Override
 	public CompletableFuture<Void> onStop() {
 		// shut down all components
-		Throwable exception = null;
+		Throwable throwable = null;
+
+		try {
+			podsWatch.close();
+		} catch (Throwable t) {
+			throwable = t;
+		}
 
 		try {
 			kubeClient.close();
 		} catch (Throwable t) {
-			exception = t;
+			throwable = ExceptionUtils.firstOrSuppressed(t, throwable);
 		}
 
-		return getStopTerminationFutureOrCompletedExceptionally(exception);
+		return getStopTerminationFutureOrCompletedExceptionally(throwable);
 	}
 
 	@Override
@@ -224,6 +234,11 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
 		runAsync(() -> pods.forEach(this::removePodIfTerminated));
 	}
 
+	@Override
+	public void handleFatalError(Throwable throwable) {
+		onFatalError(throwable);
+	}
+
 	@VisibleForTesting
 	Map<ResourceID, KubernetesWorkerNode> getWorkerNodes() {
 		return workerNodes;
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
index 7051b95..096ce60 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
@@ -34,7 +34,9 @@ import org.apache.flink.kubernetes.kubeclient.resources.ActionWatcher;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesDeployment;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPodsWatcher;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.TimeUtils;
@@ -47,7 +49,6 @@ import io.fabric8.kubernetes.api.model.Service;
 import io.fabric8.kubernetes.api.model.ServicePort;
 import io.fabric8.kubernetes.api.model.apps.Deployment;
 import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.KubernetesClientException;
 import io.fabric8.kubernetes.client.Watch;
 import io.fabric8.kubernetes.client.Watcher;
 import org.slf4j.Logger;
@@ -55,7 +56,6 @@ import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -241,36 +241,12 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
 	}
 
 	@Override
-	public void watchPodsAndDoCallback(Map<String, String> labels, PodCallbackHandler callbackHandler) {
-		final Watcher<Pod> watcher = new Watcher<Pod>() {
-			@Override
-			public void eventReceived(Action action, Pod pod) {
-				LOG.debug("Received {} event for pod {}, details: {}", action, pod.getMetadata().getName(), pod.getStatus());
-				switch (action) {
-					case ADDED:
-						callbackHandler.onAdded(Collections.singletonList(new KubernetesPod(flinkConfig, pod)));
-						break;
-					case MODIFIED:
-						callbackHandler.onModified(Collections.singletonList(new KubernetesPod(flinkConfig, pod)));
-						break;
-					case ERROR:
-						callbackHandler.onError(Collections.singletonList(new KubernetesPod(flinkConfig, pod)));
-						break;
-					case DELETED:
-						callbackHandler.onDeleted(Collections.singletonList(new KubernetesPod(flinkConfig, pod)));
-						break;
-					default:
-						LOG.debug("Ignore handling {} event for pod {}", action, pod.getMetadata().getName());
-						break;
-				}
-			}
-
-			@Override
-			public void onClose(KubernetesClientException e) {
-				LOG.error("The pods watcher is closing.", e);
-			}
-		};
-		this.internalClient.pods().withLabels(labels).watch(watcher);
+	public KubernetesWatch watchPodsAndDoCallback(Map<String, String> labels, PodCallbackHandler podCallbackHandler) {
+		return new KubernetesWatch(
+			flinkConfig,
+			this.internalClient.pods()
+				.withLabels(labels)
+				.watch(new KubernetesPodsWatcher(flinkConfig, podCallbackHandler)));
 	}
 
 	@Override
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
index 357f2af..f35835f 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
@@ -21,6 +21,7 @@ package org.apache.flink.kubernetes.kubeclient;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
 
 import java.util.List;
 import java.util.Map;
@@ -127,9 +128,10 @@ public interface FlinkKubeClient extends AutoCloseable {
 	 * Watch the pods selected by labels and do the {@link PodCallbackHandler}.
 	 *
 	 * @param labels labels to filter the pods to watch
-	 * @param callbackHandler {@link PodCallbackHandler} will be called when the watcher receive the corresponding events.
+	 * @param podCallbackHandler podCallbackHandler which reacts to pod events
+	 * @return Return a watch for pods. It needs to be closed after use.
 	 */
-	void watchPodsAndDoCallback(Map<String, String> labels, PodCallbackHandler callbackHandler);
+	KubernetesWatch watchPodsAndDoCallback(Map<String, String> labels, PodCallbackHandler podCallbackHandler);
 
 	/**
 	 * Callback handler for kubernetes pods.
@@ -143,6 +145,8 @@ public interface FlinkKubeClient extends AutoCloseable {
 		void onDeleted(List<KubernetesPod> pods);
 
 		void onError(List<KubernetesPod> pods);
+
+		void handleFatalError(Throwable throwable);
 	}
 
 }
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcher.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcher.java
new file mode 100644
index 0000000..3dbddc9
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcher.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+/**
+ * Watcher for pods in Kubernetes.
+ */
+public class KubernetesPodsWatcher implements Watcher<Pod> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KubernetesPodsWatcher.class);
+
+	private final Configuration configuration;
+
+	private final FlinkKubeClient.PodCallbackHandler podsCallbackHandler;
+
+	public KubernetesPodsWatcher(Configuration configuration, FlinkKubeClient.PodCallbackHandler callbackHandler) {
+		this.configuration = configuration;
+		this.podsCallbackHandler = callbackHandler;
+	}
+
+	@Override
+	public void eventReceived(Action action, Pod pod) {
+		LOG.debug("Received {} event for pod {}, details: {}", action, pod.getMetadata().getName(), pod.getStatus());
+		switch (action) {
+			case ADDED:
+				podsCallbackHandler.onAdded(Collections.singletonList(new KubernetesPod(configuration, pod)));
+				break;
+			case MODIFIED:
+				podsCallbackHandler.onModified(Collections.singletonList(new KubernetesPod(configuration, pod)));
+				break;
+			case ERROR:
+				podsCallbackHandler.onError(Collections.singletonList(new KubernetesPod(configuration, pod)));
+				break;
+			case DELETED:
+				podsCallbackHandler.onDeleted(Collections.singletonList(new KubernetesPod(configuration, pod)));
+				break;
+			default:
+				LOG.debug("Ignore handling {} event for pod {}", action, pod.getMetadata().getName());
+				break;
+		}
+	}
+
+	@Override
+	public void onClose(KubernetesClientException cause) {
+		// null means the watcher is closed by expected.
+		if (cause == null) {
+			LOG.info("The pods watcher is closing.");
+		} else {
+			podsCallbackHandler.handleFatalError(cause);
+		}
+	}
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesWatch.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesWatch.java
new file mode 100644
index 0000000..3389690
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesWatch.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.configuration.Configuration;
+
+import io.fabric8.kubernetes.client.Watch;
+
+/**
+ * Watch resource in Kubernetes.
+ */
+public class KubernetesWatch extends KubernetesResource<Watch> {
+
+	public KubernetesWatch(Configuration configuration, Watch watch) {
+		super(configuration, watch);
+	}
+
+	public void close() {
+		getInternalResource().close();
+	}
+}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java
new file mode 100644
index 0000000..c368676
--- /dev/null
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.util.TestLogger;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watcher;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link KubernetesPodsWatcher}.
+ */
+public class KubernetesPodsWatcherTest extends TestLogger {
+
+	private final List<KubernetesPod> podAddedList = new ArrayList<>();
+	private final List<KubernetesPod> podModifiedList = new ArrayList<>();
+	private final List<KubernetesPod> podDeletedList = new ArrayList<>();
+	private final List<KubernetesPod> podErrorList = new ArrayList<>();
+
+	@Test
+	public void testClosingWithNullException() {
+		final KubernetesPodsWatcher podsWatcher = new KubernetesPodsWatcher(
+			new Configuration(),
+			new TestingCallbackHandler(e -> Assert.fail("Should not reach here.")));
+		podsWatcher.onClose(null);
+	}
+
+	@Test
+	public void testClosingWithException() {
+		final AtomicBoolean called = new AtomicBoolean(false);
+		final KubernetesPodsWatcher podsWatcher = new KubernetesPodsWatcher(
+			new Configuration(),
+			new TestingCallbackHandler(e -> called.set(true)));
+		podsWatcher.onClose(new KubernetesClientException("exception"));
+		assertThat(called.get(), is(true));
+	}
+
+	@Test
+	public void testCallbackHandler() {
+		Pod pod = new PodBuilder()
+			.withNewMetadata()
+			.endMetadata()
+			.withNewSpec()
+			.endSpec()
+			.build();
+		final KubernetesPodsWatcher podsWatcher = new KubernetesPodsWatcher(new Configuration(), new TestingCallbackHandler(e -> {}));
+		podsWatcher.eventReceived(Watcher.Action.ADDED, pod);
+		podsWatcher.eventReceived(Watcher.Action.MODIFIED, pod);
+		podsWatcher.eventReceived(Watcher.Action.DELETED, pod);
+		podsWatcher.eventReceived(Watcher.Action.ERROR, pod);
+
+		assertThat(podAddedList.size(), is(1));
+		assertThat(podModifiedList.size(), is(1));
+		assertThat(podDeletedList.size(), is(1));
+		assertThat(podErrorList.size(), is(1));
+	}
+
+	private class TestingCallbackHandler implements FlinkKubeClient.PodCallbackHandler {
+
+		final Consumer<Throwable> consumer;
+
+		TestingCallbackHandler(Consumer<Throwable> consumer) {
+			this.consumer = consumer;
+		}
+
+		@Override
+		public void onAdded(List<KubernetesPod> pods) {
+			podAddedList.addAll(pods);
+		}
+
+		@Override
+		public void onModified(List<KubernetesPod> pods) {
+			podModifiedList.addAll(pods);
+		}
+
+		@Override
+		public void onDeleted(List<KubernetesPod> pods) {
+			podDeletedList.addAll(pods);
+		}
+
+		@Override
+		public void onError(List<KubernetesPod> pods) {
+			podErrorList.addAll(pods);
+		}
+
+		@Override
+		public void handleFatalError(Throwable throwable) {
+			consumer.accept(throwable);
+		}
+	}
+}