You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/12/02 23:57:38 UTC
[airflow] 01/05: Add Kubernetes cleanup-pods CLI command for Helm
Chart (#11802)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit ab00a2626d5c3de633bc67e684feea91ea2a502c
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Tue Nov 3 15:28:51 2020 +0000
Add Kubernetes cleanup-pods CLI command for Helm Chart (#11802)
closes: https://github.com/apache/airflow/issues/11146
(cherry picked from commit 980c7252c0f28c251e9f87d736cd88d6027f3da3)
---
airflow/bin/cli.py | 81 +++++++++++++++++++++++++++++++++
tests/cli/test_cli.py | 122 ++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 203 insertions(+)
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index c22e847..d5490f5 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -1464,6 +1464,74 @@ Happy Airflowing!
print(output_string)
+@cli_utils.action_logging
+def cleanup_pods(args):
+ from kubernetes.client.rest import ApiException
+
+ from airflow.kubernetes.kube_client import get_kube_client
+
+ """Clean up k8s pods in evicted/failed/succeeded states"""
+ namespace = args.namespace
+
+ # https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/
+ # All Containers in the Pod have terminated in success, and will not be restarted.
+ pod_succeeded = 'succeeded'
+
+ # All Containers in the Pod have terminated, and at least one Container has terminated in failure.
+ # That is, the Container either exited with non-zero status or was terminated by the system.
+ pod_failed = 'failed'
+
+ # https://kubernetes.io/docs/tasks/administer-cluster/out-of-resource/
+ pod_reason_evicted = 'evicted'
+ # If pod is failed and restartPolicy is:
+ # * Always: Restart Container; Pod phase stays Running.
+ # * OnFailure: Restart Container; Pod phase stays Running.
+ # * Never: Pod phase becomes Failed.
+ pod_restart_policy_never = 'never'
+
+ print('Loading Kubernetes configuration')
+ kube_client = get_kube_client()
+ print('Listing pods in namespace {}'.format(namespace))
+ continue_token = None
+ while True: # pylint: disable=too-many-nested-blocks
+ pod_list = kube_client.list_namespaced_pod(namespace=namespace, limit=500, _continue=continue_token)
+ for pod in pod_list.items:
+ pod_name = pod.metadata.name
+ print('Inspecting pod {}'.format(pod_name))
+ pod_phase = pod.status.phase.lower()
+ pod_reason = pod.status.reason.lower() if pod.status.reason else ''
+ pod_restart_policy = pod.spec.restart_policy.lower()
+
+ if (
+ pod_phase == pod_succeeded
+ or (pod_phase == pod_failed and pod_restart_policy == pod_restart_policy_never)
+ or (pod_reason == pod_reason_evicted)
+ ):
+ print('Deleting pod "{}" phase "{}" and reason "{}", restart policy "{}"'.format(
+ pod_name, pod_phase, pod_reason, pod_restart_policy)
+ )
+ try:
+ _delete_pod(pod.metadata.name, namespace)
+ except ApiException as e:
+ print("can't remove POD: {}".format(e), file=sys.stderr)
+ continue
+ print('No action taken on pod {}'.format(pod_name))
+ continue_token = pod_list.metadata._continue # pylint: disable=protected-access
+ if not continue_token:
+ break
+
+
+def _delete_pod(name, namespace):
+ """Helper Function for cleanup_pods"""
+ from kubernetes import client
+
+ core_v1 = client.CoreV1Api()
+ delete_options = client.V1DeleteOptions()
+ print('Deleting POD "{}" from "{}" namespace'.format(name, namespace))
+ api_response = core_v1.delete_namespaced_pod(name=name, namespace=namespace, body=delete_options)
+ print(api_response)
+
+
@cli_utils.deprecated_action(new_name='celery worker')
@cli_utils.action_logging
def worker(args):
@@ -2705,6 +2773,13 @@ ARG_SKIP_SERVE_LOGS = Arg(
action="store_true",
)
+# kubernetes cleanup-pods
+ARG_NAMESPACE = Arg(
+ ("--namespace",),
+ default='default',
+ help="Kubernetes Namespace",
+)
+
ALTERNATIVE_CONN_SPECS_ARGS = [
ARG_CONN_TYPE,
ARG_CONN_HOST,
@@ -3154,6 +3229,12 @@ CONFIG_COMMANDS = (
KUBERNETES_COMMANDS = (
ActionCommand(
+ name='cleanup-pods',
+ help="Clean up Kubernetes pods in evicted/failed/succeeded states",
+ func=cleanup_pods,
+ args=(ARG_NAMESPACE, ),
+ ),
+ ActionCommand(
name='generate-dag-yaml',
help="Generate YAML files for all tasks in DAG. Useful for debugging tasks without "
"launching into a cluster",
diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py
index 048f802..bb39869 100644
--- a/tests/cli/test_cli.py
+++ b/tests/cli/test_cli.py
@@ -23,6 +23,8 @@ import io
import logging
import os
+import kubernetes
+
from airflow.configuration import conf
from parameterized import parameterized
from six import StringIO, PY2
@@ -1026,3 +1028,123 @@ class TestCLIGetNumReadyWorkersRunning(unittest.TestCase):
with mock.patch('psutil.Process', return_value=self.process):
self.assertEqual(self.monitor._get_num_ready_workers_running(), 0)
+
+
+class TestCleanUpPodsCommand(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.parser = cli.get_parser()
+
+ @mock.patch('kubernetes.client.CoreV1Api.delete_namespaced_pod')
+ def test_delete_pod(self, delete_namespaced_pod):
+ cli._delete_pod('dummy', 'awesome-namespace')
+ delete_namespaced_pod.assert_called_with(body=mock.ANY, name='dummy', namespace='awesome-namespace')
+
+ @mock.patch('airflow.bin.cli._delete_pod')
+ @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod')
+ @mock.patch('airflow.kubernetes.kube_client.config.load_incluster_config')
+ def test_running_pods_are_not_cleaned(self, load_incluster_config, list_namespaced_pod, delete_pod):
+ pod1 = MagicMock()
+ pod1.metadata.name = 'dummy'
+ pod1.status.phase = 'Running'
+ pod1.status.reason = None
+ pods = list_namespaced_pod()
+ pods.metadata._continue = None
+ pods.items = [pod1]
+ cli.cleanup_pods(
+ self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace'])
+ )
+ delete_pod.assert_not_called()
+ load_incluster_config.assert_called_once()
+
+ @mock.patch('airflow.bin.cli._delete_pod')
+ @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod')
+ @mock.patch('airflow.kubernetes.kube_client.config.load_incluster_config')
+ def test_cleanup_succeeded_pods(self, load_incluster_config, list_namespaced_pod, delete_pod):
+ pod1 = MagicMock()
+ pod1.metadata.name = 'dummy'
+ pod1.status.phase = 'Succeeded'
+ pod1.status.reason = None
+ pods = list_namespaced_pod()
+ pods.metadata._continue = None
+ pods.items = [pod1]
+ cli.cleanup_pods(
+ self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace'])
+ )
+ delete_pod.assert_called_with('dummy', 'awesome-namespace')
+ load_incluster_config.assert_called_once()
+
+ @mock.patch('airflow.bin.cli._delete_pod')
+ @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod')
+ @mock.patch('kubernetes.config.load_incluster_config')
+ def test_no_cleanup_failed_pods_wo_restart_policy_never(
+ self, load_incluster_config, list_namespaced_pod, delete_pod
+ ):
+ pod1 = MagicMock()
+ pod1.metadata.name = 'dummy2'
+ pod1.status.phase = 'Failed'
+ pod1.status.reason = None
+ pod1.spec.restart_policy = 'Always'
+ pods = list_namespaced_pod()
+ pods.metadata._continue = None
+ pods.items = [pod1]
+ cli.cleanup_pods(
+ self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace'])
+ )
+ delete_pod.assert_not_called()
+ load_incluster_config.assert_called_once()
+
+ @mock.patch('airflow.bin.cli._delete_pod')
+ @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod')
+ @mock.patch('kubernetes.config.load_incluster_config')
+ def test_cleanup_failed_pods_w_restart_policy_never(
+ self, load_incluster_config, list_namespaced_pod, delete_pod
+ ):
+ pod1 = MagicMock()
+ pod1.metadata.name = 'dummy3'
+ pod1.status.phase = 'Failed'
+ pod1.status.reason = None
+ pod1.spec.restart_policy = 'Never'
+ pods = list_namespaced_pod()
+ pods.metadata._continue = None
+ pods.items = [pod1]
+ cli.cleanup_pods(
+ self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace'])
+ )
+ delete_pod.assert_called_with('dummy3', 'awesome-namespace')
+ load_incluster_config.assert_called_once()
+
+ @mock.patch('airflow.bin.cli._delete_pod')
+ @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod')
+ @mock.patch('kubernetes.config.load_incluster_config')
+ def test_cleanup_evicted_pods(self, load_incluster_config, list_namespaced_pod, delete_pod):
+ pod1 = MagicMock()
+ pod1.metadata.name = 'dummy4'
+ pod1.status.phase = 'Failed'
+ pod1.status.reason = 'Evicted'
+ pod1.spec.restart_policy = 'Never'
+ pods = list_namespaced_pod()
+ pods.metadata._continue = None
+ pods.items = [pod1]
+ cli.cleanup_pods(
+ self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace'])
+ )
+ delete_pod.assert_called_with('dummy4', 'awesome-namespace')
+ load_incluster_config.assert_called_once()
+
+ @mock.patch('airflow.bin.cli._delete_pod')
+ @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod')
+ @mock.patch('kubernetes.config.load_incluster_config')
+ def test_cleanup_api_exception_continue(self, load_incluster_config, list_namespaced_pod, delete_pod):
+ delete_pod.side_effect = kubernetes.client.rest.ApiException(status=0)
+ pod1 = MagicMock()
+ pod1.metadata.name = 'dummy'
+ pod1.status.phase = 'Succeeded'
+ pod1.status.reason = None
+ pods = list_namespaced_pod()
+ pods.metadata._continue = None
+ pods.items = [pod1]
+ cli.cleanup_pods(
+ self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace'])
+ )
+ load_incluster_config.assert_called_once()