You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by xd...@apache.org on 2020/12/31 09:43:44 UTC

[airflow] branch master updated: Simplify CeleryKubernetesExecutor tests (#13307)

This is an automated email from the ASF dual-hosted git repository.

xddeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 10be375  Simplify CeleryKubernetesExecutor tests (#13307)
10be375 is described below

commit 10be37513c420515251996602ca9362dfeeb095f
Author: dstandish <ds...@users.noreply.github.com>
AuthorDate: Thu Dec 31 01:43:19 2020 -0800

    Simplify CeleryKubernetesExecutor tests (#13307)
    
    * Simplify CeleryKubernetesExecutor tests
    
    Co-authored-by: Daniel Standish <ds...@techstyle.com>
---
 tests/executors/test_celery_kubernetes_executor.py | 265 ++++++++-------------
 1 file changed, 98 insertions(+), 167 deletions(-)

diff --git a/tests/executors/test_celery_kubernetes_executor.py b/tests/executors/test_celery_kubernetes_executor.py
index cc8a958..13dd0e9 100644
--- a/tests/executors/test_celery_kubernetes_executor.py
+++ b/tests/executors/test_celery_kubernetes_executor.py
@@ -17,7 +17,13 @@
 # under the License.
 from unittest import mock
 
+from parameterized import parameterized
+
+from airflow.executors.celery_executor import CeleryExecutor
 from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
+from airflow.executors.kubernetes_executor import KubernetesExecutor
+
+KUBERNETES_QUEUE = CeleryKubernetesExecutor.KUBERNETES_QUEUE
 
 
 class TestCeleryKubernetesExecutor:
@@ -58,182 +64,107 @@ class TestCeleryKubernetesExecutor:
         celery_executor_mock.start.assert_called()
         k8s_executor_mock.start.assert_called()
 
-    def test_queue_command(self):
-        command = ['airflow', 'run', 'dag']
-        priority = 1
-        queue = 'default'
-
-        def when_using_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE
-
-            cke.queue_command(simple_task_instance, command, priority, queue)
-
-            k8s_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            celery_executor_mock.queue_command.assert_not_called()
-
-        def when_using_celery_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = 'non-kubernetes-queue'
-
-            cke.queue_command(simple_task_instance, command, priority, queue)
-
-            celery_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            k8s_executor_mock.queue_command.assert_not_called()
-
-        when_using_k8s_executor()
-        when_using_celery_executor()
-
-    def test_queue_task_instance(self):
-        mark_success = False
-        pickle_id = None
-        ignore_all_deps = False
-        ignore_depends_on_past = False
-        ignore_task_deps = False
-        ignore_ti_state = False
-        pool = None
-        cfg_path = None
-
-        def when_using_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
+    @parameterized.expand(
+        [
+            ('any-other-queue',),
+            (KUBERNETES_QUEUE,),
+        ]
+    )
+    @mock.patch.object(CeleryExecutor, 'queue_command')
+    @mock.patch.object(KubernetesExecutor, 'queue_command')
+    def test_queue_command(self, test_queue, k8s_queue_cmd, celery_queue_cmd):
+        kwargs = dict(
+            command=['airflow', 'run', 'dag'],
+            priority=1,
+            queue='default',
+        )
+        kwarg_values = kwargs.values()
+        cke = CeleryKubernetesExecutor(CeleryExecutor(), KubernetesExecutor())
+
+        simple_task_instance = mock.MagicMock()
+        simple_task_instance.queue = test_queue
+
+        cke.queue_command(simple_task_instance, **kwargs)
+
+        if test_queue == KUBERNETES_QUEUE:
+            k8s_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            celery_queue_cmd.assert_not_called()
+        else:
+            celery_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            k8s_queue_cmd.assert_not_called()
+
+    @parameterized.expand(
+        [
+            ('any-other-queue',),
+            (KUBERNETES_QUEUE,),
+        ]
+    )
+    def test_queue_task_instance(self, test_queue):
+        celery_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
+        cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
 
-            ti = mock.MagicMock()
-            ti.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE
-
-            cke.queue_task_instance(
-                ti,
-                mark_success,
-                pickle_id,
-                ignore_all_deps,
-                ignore_depends_on_past,
-                ignore_task_deps,
-                ignore_ti_state,
-                pool,
-                cfg_path,
-            )
-
-            k8s_executor_mock.queue_task_instance.assert_called_once_with(
-                ti,
-                mark_success,
-                pickle_id,
-                ignore_all_deps,
-                ignore_depends_on_past,
-                ignore_task_deps,
-                ignore_ti_state,
-                pool,
-                cfg_path,
-            )
+        ti = mock.MagicMock()
+        ti.queue = test_queue
+
+        kwargs = dict(
+            task_instance=ti,
+            mark_success=False,
+            pickle_id=None,
+            ignore_all_deps=False,
+            ignore_depends_on_past=False,
+            ignore_task_deps=False,
+            ignore_ti_state=False,
+            pool=None,
+            cfg_path=None,
+        )
+        kwarg_values = kwargs.values()
+        cke.queue_task_instance(**kwargs)
+        if test_queue == KUBERNETES_QUEUE:
+            k8s_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
             celery_executor_mock.queue_task_instance.assert_not_called()
-
-        def when_using_celery_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            ti = mock.MagicMock()
-            ti.queue = 'non-kubernetes-queue'
-
-            cke.queue_task_instance(
-                ti,
-                mark_success,
-                pickle_id,
-                ignore_all_deps,
-                ignore_depends_on_past,
-                ignore_task_deps,
-                ignore_ti_state,
-                pool,
-                cfg_path,
-            )
-
+        else:
+            celery_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
             k8s_executor_mock.queue_task_instance.assert_not_called()
-            celery_executor_mock.queue_task_instance.assert_called_once_with(
-                ti,
-                mark_success,
-                pickle_id,
-                ignore_all_deps,
-                ignore_depends_on_past,
-                ignore_task_deps,
-                ignore_ti_state,
-                pool,
-                cfg_path,
-            )
-
-        when_using_k8s_executor()
-        when_using_celery_executor()
-
-    def test_has_tasks(self):
-        ti = mock.MagicMock
-
-        def when_ti_in_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            celery_executor_mock.has_task.return_value = False
-            k8s_executor_mock.has_task.return_value = True
-
-            assert cke.has_task(ti)
-            celery_executor_mock.has_task.assert_called_once_with(ti)
-            k8s_executor_mock.has_task.assert_called_once_with(ti)
-
-        def when_ti_in_celery_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            celery_executor_mock.has_task.return_value = True
-
-            assert cke.has_task(ti)
-            celery_executor_mock.has_task.assert_called_once_with(ti)
-
-        when_ti_in_k8s_executor()
-        when_ti_in_celery_executor()
-
-    def test_adopt_tasks(self):
-        ti = mock.MagicMock
 
-        def when_ti_in_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            ti.queue = "kubernetes"
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            celery_executor_mock.try_adopt_task_instances.return_value = []
-            k8s_executor_mock.try_adopt_task_instances.return_value = []
+    @parameterized.expand(
+        [
+            (True, True, True),
+            (False, True, True),
+            (True, False, True),
+            (False, False, False),
+        ]
+    )
+    def test_has_tasks(self, celery_has, k8s_has, cke_has):
+        celery_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
+        cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
 
-            cke.try_adopt_task_instances([ti])
-            celery_executor_mock.try_adopt_task_instances.assert_called_once_with([])
-            k8s_executor_mock.try_adopt_task_instances.assert_called_once_with([ti])
+        celery_executor_mock.has_task.return_value = celery_has
+        k8s_executor_mock.has_task.return_value = k8s_has
+        ti = mock.MagicMock()
+        assert cke.has_task(ti) == cke_has
+        celery_executor_mock.has_task.assert_called_once_with(ti)
+        if not celery_has:
+            k8s_executor_mock.has_task.assert_called_once_with(ti)
 
-        def when_ti_in_celery_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            ti.queue = "default"
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
+    @parameterized.expand([(1, 0), (0, 1), (2, 1)])
+    def test_adopt_tasks(self, num_k8s, num_celery):
+        celery_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
 
-            celery_executor_mock.try_adopt_task_instances.return_value = []
-            k8s_executor_mock.try_adopt_task_instances.return_value = []
+        def mock_ti(queue):
+            ti = mock.MagicMock()
+            ti.queue = queue
+            return ti
 
-            cke.try_adopt_task_instances([ti])
-            celery_executor_mock.try_adopt_task_instances.assert_called_once_with([ti])
-            k8s_executor_mock.try_adopt_task_instances.assert_called_once_with([])
+        celery_tis = [mock_ti('default') for _ in range(num_celery)]
+        k8s_tis = [mock_ti(KUBERNETES_QUEUE) for _ in range(num_k8s)]
 
-        when_ti_in_k8s_executor()
-        when_ti_in_celery_executor()
+        cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
+        cke.try_adopt_task_instances(celery_tis + k8s_tis)
+        celery_executor_mock.try_adopt_task_instances.assert_called_once_with(celery_tis)
+        k8s_executor_mock.try_adopt_task_instances.assert_called_once_with(k8s_tis)
 
     def test_get_event_buffer(self):
         celery_executor_mock = mock.MagicMock()