You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/04/15 11:28:46 UTC

[airflow] 05/08: BugFix: CLI 'kubernetes cleanup-pods' should only clean up Airflow-created Pods (#15204)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 315005b2ffe7b180f5d4f873278606fd8ffaf1ca
Author: Xiaodong DENG <xd...@apache.org>
AuthorDate: Thu Apr 8 12:11:08 2021 +0200

    BugFix: CLI 'kubernetes cleanup-pods' should only clean up Airflow-created Pods (#15204)
    
    closes: #15193
    
    Currently condition if the pod is created by Airflow is not considered. This commit fixes this.
    
    We decide if the Pod is created by Airflow via checking if it has all the labels added in PodGenerator.construct_pod() or KubernetesPodOperator.create_labels_for_pod().
    
    (cherry picked from commit c594d9cfb32bbcfe30af3f5dcb452c6053cacc95)
---
 airflow/cli/cli_parser.py                     |  6 +++-
 airflow/cli/commands/kubernetes_command.py    | 18 +++++++++++-
 tests/cli/commands/test_kubernetes_command.py | 40 +++++++++++++++++++++------
 3 files changed, 54 insertions(+), 10 deletions(-)

diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index c33a854..b5384b4 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -1341,7 +1341,11 @@ CONFIG_COMMANDS = (
 KUBERNETES_COMMANDS = (
     ActionCommand(
         name='cleanup-pods',
-        help="Clean up Kubernetes pods in evicted/failed/succeeded states",
+        help=(
+            "Clean up Kubernetes pods "
+            "(created by KubernetesExecutor/KubernetesPodOperator) "
+            "in evicted/failed/succeeded states"
+        ),
         func=lazy_load_command('airflow.cli.commands.kubernetes_command.cleanup_pods'),
         args=(ARG_NAMESPACE,),
     ),
diff --git a/airflow/cli/commands/kubernetes_command.py b/airflow/cli/commands/kubernetes_command.py
index f98c45e..daf11a3 100644
--- a/airflow/cli/commands/kubernetes_command.py
+++ b/airflow/cli/commands/kubernetes_command.py
@@ -90,7 +90,23 @@ def cleanup_pods(args):
     print('Loading Kubernetes configuration')
     kube_client = get_kube_client()
     print(f'Listing pods in namespace {namespace}')
-    list_kwargs = {"namespace": namespace, "limit": 500}
+    airflow_pod_labels = [
+        'dag_id',
+        'task_id',
+        'execution_date',
+        'try_number',
+        'airflow_version',
+    ]
+    list_kwargs = {
+        "namespace": namespace,
+        "limit": 500,
+        "label_selector": client.V1LabelSelector(
+            match_expressions=[
+                client.V1LabelSelectorRequirement(key=label, operator="Exists")
+                for label in airflow_pod_labels
+            ]
+        ),
+    }
     while True:  # pylint: disable=too-many-nested-blocks
         pod_list = kube_client.list_namespaced_pod(**list_kwargs)
         for pod in pod_list.items:
diff --git a/tests/cli/commands/test_kubernetes_command.py b/tests/cli/commands/test_kubernetes_command.py
index 8ae2eef..707eb55 100644
--- a/tests/cli/commands/test_kubernetes_command.py
+++ b/tests/cli/commands/test_kubernetes_command.py
@@ -55,6 +55,13 @@ class TestGenerateDagYamlCommand(unittest.TestCase):
 
 
 class TestCleanUpPodsCommand(unittest.TestCase):
+    label_selector = kubernetes.client.V1LabelSelector(
+        match_expressions=[
+            kubernetes.client.V1LabelSelectorRequirement(key=label, operator="Exists")
+            for label in ['dag_id', 'task_id', 'execution_date', 'try_number', 'airflow_version']
+        ]
+    )
+
     @classmethod
     def setUpClass(cls):
         cls.parser = cli_parser.get_parser()
@@ -79,7 +86,9 @@ class TestCleanUpPodsCommand(unittest.TestCase):
         kubernetes_command.cleanup_pods(
             self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace'])
         )
-        list_namespaced_pod.assert_called_once_with(namespace='awesome-namespace', limit=500)
+        list_namespaced_pod.assert_called_once_with(
+            namespace='awesome-namespace', limit=500, label_selector=self.label_selector
+        )
         delete_pod.assert_not_called()
         load_incluster_config.assert_called_once()
 
@@ -98,7 +107,9 @@ class TestCleanUpPodsCommand(unittest.TestCase):
         kubernetes_command.cleanup_pods(
             self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace'])
         )
-        list_namespaced_pod.assert_called_once_with(namespace='awesome-namespace', limit=500)
+        list_namespaced_pod.assert_called_once_with(
+            namespace='awesome-namespace', limit=500, label_selector=self.label_selector
+        )
         delete_pod.assert_called_with('dummy', 'awesome-namespace')
         load_incluster_config.assert_called_once()
 
@@ -120,7 +131,9 @@ class TestCleanUpPodsCommand(unittest.TestCase):
         kubernetes_command.cleanup_pods(
             self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace'])
         )
-        list_namespaced_pod.assert_called_once_with(namespace='awesome-namespace', limit=500)
+        list_namespaced_pod.assert_called_once_with(
+            namespace='awesome-namespace', limit=500, label_selector=self.label_selector
+        )
         delete_pod.assert_not_called()
         load_incluster_config.assert_called_once()
 
@@ -142,7 +155,9 @@ class TestCleanUpPodsCommand(unittest.TestCase):
         kubernetes_command.cleanup_pods(
             self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace'])
         )
-        list_namespaced_pod.assert_called_once_with(namespace='awesome-namespace', limit=500)
+        list_namespaced_pod.assert_called_once_with(
+            namespace='awesome-namespace', limit=500, label_selector=self.label_selector
+        )
         delete_pod.assert_called_with('dummy3', 'awesome-namespace')
         load_incluster_config.assert_called_once()
 
@@ -162,7 +177,9 @@ class TestCleanUpPodsCommand(unittest.TestCase):
         kubernetes_command.cleanup_pods(
             self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace'])
         )
-        list_namespaced_pod.assert_called_once_with(namespace='awesome-namespace', limit=500)
+        list_namespaced_pod.assert_called_once_with(
+            namespace='awesome-namespace', limit=500, label_selector=self.label_selector
+        )
         delete_pod.assert_called_with('dummy4', 'awesome-namespace')
         load_incluster_config.assert_called_once()
 
@@ -182,7 +199,9 @@ class TestCleanUpPodsCommand(unittest.TestCase):
         kubernetes_command.cleanup_pods(
             self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace'])
         )
-        list_namespaced_pod.assert_called_once_with(namespace='awesome-namespace', limit=500)
+        list_namespaced_pod.assert_called_once_with(
+            namespace='awesome-namespace', limit=500, label_selector=self.label_selector
+        )
         load_incluster_config.assert_called_once()
 
     @mock.patch('airflow.cli.commands.kubernetes_command._delete_pod')
@@ -204,8 +223,13 @@ class TestCleanUpPodsCommand(unittest.TestCase):
             self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace'])
         )
         calls = [
-            call.first(namespace='awesome-namespace', limit=500),
-            call.second(namespace='awesome-namespace', limit=500, _continue='dummy-token'),
+            call.first(namespace='awesome-namespace', limit=500, label_selector=self.label_selector),
+            call.second(
+                namespace='awesome-namespace',
+                limit=500,
+                label_selector=self.label_selector,
+                _continue='dummy-token',
+            ),
         ]
         list_namespaced_pod.assert_has_calls(calls)
         delete_pod.assert_called_with('dummy', 'awesome-namespace')