You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/12/03 15:57:06 UTC

[airflow] 01/06: Add Kubernetes cleanup-pods CLI command for Helm Chart (#11802)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2b8b8a85e04f4ee632601be4524faaaf5a04ce6d
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Tue Nov 3 15:28:51 2020 +0000

    Add Kubernetes cleanup-pods CLI command for Helm Chart (#11802)
    
    closes: https://github.com/apache/airflow/issues/11146
    (cherry picked from commit 980c7252c0f28c251e9f87d736cd88d6027f3da3)
---
 airflow/bin/cli.py    |  81 +++++++++++++++++++++++++++++++++
 tests/cli/test_cli.py | 122 ++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 203 insertions(+)

diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index a155cff..4f23038 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -1464,6 +1464,74 @@ Happy Airflowing!
     print(output_string)
 
 
+@cli_utils.action_logging
+def cleanup_pods(args):
+    from kubernetes.client.rest import ApiException
+
+    from airflow.kubernetes.kube_client import get_kube_client
+
+    """Clean up k8s pods in evicted/failed/succeeded states"""
+    namespace = args.namespace
+
+    # https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/
+    # All Containers in the Pod have terminated in success, and will not be restarted.
+    pod_succeeded = 'succeeded'
+
+    # All Containers in the Pod have terminated, and at least one Container has terminated in failure.
+    # That is, the Container either exited with non-zero status or was terminated by the system.
+    pod_failed = 'failed'
+
+    # https://kubernetes.io/docs/tasks/administer-cluster/out-of-resource/
+    pod_reason_evicted = 'evicted'
+    # If pod is failed and restartPolicy is:
+    # * Always: Restart Container; Pod phase stays Running.
+    # * OnFailure: Restart Container; Pod phase stays Running.
+    # * Never: Pod phase becomes Failed.
+    pod_restart_policy_never = 'never'
+
+    print('Loading Kubernetes configuration')
+    kube_client = get_kube_client()
+    print('Listing pods in namespace {}'.format(namespace))
+    continue_token = None
+    while True:  # pylint: disable=too-many-nested-blocks
+        pod_list = kube_client.list_namespaced_pod(namespace=namespace, limit=500, _continue=continue_token)
+        for pod in pod_list.items:
+            pod_name = pod.metadata.name
+            print('Inspecting pod {}'.format(pod_name))
+            pod_phase = pod.status.phase.lower()
+            pod_reason = pod.status.reason.lower() if pod.status.reason else ''
+            pod_restart_policy = pod.spec.restart_policy.lower()
+
+            if (
+                pod_phase == pod_succeeded
+                or (pod_phase == pod_failed and pod_restart_policy == pod_restart_policy_never)
+                or (pod_reason == pod_reason_evicted)
+            ):
+                print('Deleting pod "{}" phase "{}" and reason "{}", restart policy "{}"'.format(
+                    pod_name, pod_phase, pod_reason, pod_restart_policy)
+                )
+                try:
+                    _delete_pod(pod.metadata.name, namespace)
+                except ApiException as e:
+                    print("can't remove POD: {}".format(e), file=sys.stderr)
+                continue
+            print('No action taken on pod {}'.format(pod_name))
+        continue_token = pod_list.metadata._continue  # pylint: disable=protected-access
+        if not continue_token:
+            break
+
+
+def _delete_pod(name, namespace):
+    """Helper Function for cleanup_pods"""
+    from kubernetes import client
+
+    core_v1 = client.CoreV1Api()
+    delete_options = client.V1DeleteOptions()
+    print('Deleting POD "{}" from "{}" namespace'.format(name, namespace))
+    api_response = core_v1.delete_namespaced_pod(name=name, namespace=namespace, body=delete_options)
+    print(api_response)
+
+
 @cli_utils.deprecated_action(new_name='celery worker')
 @cli_utils.action_logging
 def worker(args):
@@ -2705,6 +2773,13 @@ ARG_SKIP_SERVE_LOGS = Arg(
     action="store_true",
 )
 
+# kubernetes cleanup-pods
+ARG_NAMESPACE = Arg(
+    ("--namespace",),
+    default='default',
+    help="Kubernetes Namespace",
+)
+
 ALTERNATIVE_CONN_SPECS_ARGS = [
     ARG_CONN_TYPE,
     ARG_CONN_HOST,
@@ -3154,6 +3229,12 @@ CONFIG_COMMANDS = (
 
 KUBERNETES_COMMANDS = (
     ActionCommand(
+        name='cleanup-pods',
+        help="Clean up Kubernetes pods in evicted/failed/succeeded states",
+        func=cleanup_pods,
+        args=(ARG_NAMESPACE, ),
+    ),
+    ActionCommand(
         name='generate-dag-yaml',
         help="Generate YAML files for all tasks in DAG. Useful for debugging tasks without "
         "launching into a cluster",
diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py
index 048f802..07d31ac 100644
--- a/tests/cli/test_cli.py
+++ b/tests/cli/test_cli.py
@@ -23,6 +23,8 @@ import io
 import logging
 import os
 
+import kubernetes
+
 from airflow.configuration import conf
 from parameterized import parameterized
 from six import StringIO, PY2
@@ -1026,3 +1028,123 @@ class TestCLIGetNumReadyWorkersRunning(unittest.TestCase):
 
         with mock.patch('psutil.Process', return_value=self.process):
             self.assertEqual(self.monitor._get_num_ready_workers_running(), 0)
+
+
+class TestCleanUpPodsCommand(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        cls.parser = cli.get_parser()
+
+    @mock.patch('kubernetes.client.CoreV1Api.delete_namespaced_pod')
+    def test_delete_pod(self, delete_namespaced_pod):
+        cli._delete_pod('dummy', 'awesome-namespace')
+        delete_namespaced_pod.assert_called_with(body=mock.ANY, name='dummy', namespace='awesome-namespace')
+
+    @mock.patch('airflow.bin.cli._delete_pod')
+    @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod')
+    @mock.patch('airflow.kubernetes.kube_client.config.load_incluster_config')
+    def test_running_pods_are_not_cleaned(self, load_incluster_config, list_namespaced_pod, delete_pod):
+        pod1 = MagicMock()
+        pod1.metadata.name = 'dummy'
+        pod1.status.phase = 'Running'
+        pod1.status.reason = None
+        pods = list_namespaced_pod()
+        pods.metadata._continue = None
+        pods.items = [pod1]
+        cli.cleanup_pods(
+            self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace'])
+        )
+        delete_pod.assert_not_called()
+        load_incluster_config.assert_called_once_with()
+
+    @mock.patch('airflow.bin.cli._delete_pod')
+    @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod')
+    @mock.patch('airflow.kubernetes.kube_client.config.load_incluster_config')
+    def test_cleanup_succeeded_pods(self, load_incluster_config, list_namespaced_pod, delete_pod):
+        pod1 = MagicMock()
+        pod1.metadata.name = 'dummy'
+        pod1.status.phase = 'Succeeded'
+        pod1.status.reason = None
+        pods = list_namespaced_pod()
+        pods.metadata._continue = None
+        pods.items = [pod1]
+        cli.cleanup_pods(
+            self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace'])
+        )
+        delete_pod.assert_called_with('dummy', 'awesome-namespace')
+        load_incluster_config.assert_called_once_with()
+
+    @mock.patch('airflow.bin.cli._delete_pod')
+    @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod')
+    @mock.patch('kubernetes.config.load_incluster_config')
+    def test_no_cleanup_failed_pods_wo_restart_policy_never(
+        self, load_incluster_config, list_namespaced_pod, delete_pod
+    ):
+        pod1 = MagicMock()
+        pod1.metadata.name = 'dummy2'
+        pod1.status.phase = 'Failed'
+        pod1.status.reason = None
+        pod1.spec.restart_policy = 'Always'
+        pods = list_namespaced_pod()
+        pods.metadata._continue = None
+        pods.items = [pod1]
+        cli.cleanup_pods(
+            self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace'])
+        )
+        delete_pod.assert_not_called()
+        load_incluster_config.assert_called_once_with()
+
+    @mock.patch('airflow.bin.cli._delete_pod')
+    @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod')
+    @mock.patch('kubernetes.config.load_incluster_config')
+    def test_cleanup_failed_pods_w_restart_policy_never(
+        self, load_incluster_config, list_namespaced_pod, delete_pod
+    ):
+        pod1 = MagicMock()
+        pod1.metadata.name = 'dummy3'
+        pod1.status.phase = 'Failed'
+        pod1.status.reason = None
+        pod1.spec.restart_policy = 'Never'
+        pods = list_namespaced_pod()
+        pods.metadata._continue = None
+        pods.items = [pod1]
+        cli.cleanup_pods(
+            self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace'])
+        )
+        delete_pod.assert_called_with('dummy3', 'awesome-namespace')
+        load_incluster_config.assert_called_once_with()
+
+    @mock.patch('airflow.bin.cli._delete_pod')
+    @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod')
+    @mock.patch('kubernetes.config.load_incluster_config')
+    def test_cleanup_evicted_pods(self, load_incluster_config, list_namespaced_pod, delete_pod):
+        pod1 = MagicMock()
+        pod1.metadata.name = 'dummy4'
+        pod1.status.phase = 'Failed'
+        pod1.status.reason = 'Evicted'
+        pod1.spec.restart_policy = 'Never'
+        pods = list_namespaced_pod()
+        pods.metadata._continue = None
+        pods.items = [pod1]
+        cli.cleanup_pods(
+            self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace'])
+        )
+        delete_pod.assert_called_with('dummy4', 'awesome-namespace')
+        load_incluster_config.assert_called_once_with()
+
+    @mock.patch('airflow.bin.cli._delete_pod')
+    @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod')
+    @mock.patch('kubernetes.config.load_incluster_config')
+    def test_cleanup_api_exception_continue(self, load_incluster_config, list_namespaced_pod, delete_pod):
+        delete_pod.side_effect = kubernetes.client.rest.ApiException(status=0)
+        pod1 = MagicMock()
+        pod1.metadata.name = 'dummy'
+        pod1.status.phase = 'Succeeded'
+        pod1.status.reason = None
+        pods = list_namespaced_pod()
+        pods.metadata._continue = None
+        pods.items = [pod1]
+        cli.cleanup_pods(
+            self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace'])
+        )
+        load_incluster_config.assert_called_once_with()