You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/12/30 20:25:26 UTC

[GitHub] [airflow] XD-DENG commented on a change in pull request #13307: Simplify CeleryKubernetesExecutor tests

XD-DENG commented on a change in pull request #13307:
URL: https://github.com/apache/airflow/pull/13307#discussion_r550317450



##########
File path: tests/executors/test_celery_kubernetes_executor.py
##########
@@ -58,182 +64,104 @@ def test_start(self):
         celery_executor_mock.start.assert_called()
         k8s_executor_mock.start.assert_called()
 
-    def test_queue_command(self):
-        command = ['airflow', 'run', 'dag']
-        priority = 1
-        queue = 'default'
-
-        def when_using_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE
-
-            cke.queue_command(simple_task_instance, command, priority, queue)
-
-            k8s_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            celery_executor_mock.queue_command.assert_not_called()
-
-        def when_using_celery_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = 'non-kubernetes-queue'
+    @parameterized.expand(
+        [
+            ('other-queue',),

Review comment:
       nit: let's use the original queue name "'non-kubernetes-queue"? to be consistent with other related test cases, also being more explicit about what's tested here.

##########
File path: tests/executors/test_celery_kubernetes_executor.py
##########
@@ -58,182 +64,104 @@ def test_start(self):
         celery_executor_mock.start.assert_called()
         k8s_executor_mock.start.assert_called()
 
-    def test_queue_command(self):
-        command = ['airflow', 'run', 'dag']
-        priority = 1
-        queue = 'default'
-
-        def when_using_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE
-
-            cke.queue_command(simple_task_instance, command, priority, queue)
-
-            k8s_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            celery_executor_mock.queue_command.assert_not_called()
-
-        def when_using_celery_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = 'non-kubernetes-queue'
+    @parameterized.expand(
+        [
+            ('other-queue',),
+            (KUBERNETES_QUEUE,),
+        ]
+    )
+    @mock.patch.object(CeleryExecutor, 'queue_command')
+    @mock.patch.object(KubernetesExecutor, 'queue_command')
+    def test_queue_command(self, test_queue, k8s_queue_cmd, celery_queue_cmd):
+        kwargs = dict(
+            command=['airflow', 'run', 'dag'],
+            priority=1,
+            queue='default',
+        )
+        kwarg_values = kwargs.values()
+        cke = CeleryKubernetesExecutor(CeleryExecutor(), KubernetesExecutor())
+
+        simple_task_instance = mock.MagicMock()
+        simple_task_instance.queue = test_queue
+
+        cke.queue_command(simple_task_instance, **kwargs)
+
+        if test_queue == KUBERNETES_QUEUE:
+            k8s_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            celery_queue_cmd.assert_not_called()
+        else:
+            celery_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            k8s_queue_cmd.assert_not_called()
+
+    @parameterized.expand(
+        [
+            ('non-kubernetes-queue',),
+            (KUBERNETES_QUEUE,),
+        ]
+    )
+    def test_queue_task_instance(self, test_queue):
+        celery_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
+        cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
 
-            cke.queue_command(simple_task_instance, command, priority, queue)
+        ti = mock.MagicMock()
+        ti.queue = test_queue
+
+        kwargs = dict(
+            task_instance=ti,
+            mark_success=False,
+            pickle_id=None,
+            ignore_all_deps=False,
+            ignore_depends_on_past=False,
+            ignore_task_deps=False,
+            ignore_ti_state=False,
+            pool=None,
+            cfg_path=None,
+        )
+        kwarg_values = kwargs.values()
+        cke.queue_task_instance(**kwargs)
+        if test_queue == KUBERNETES_QUEUE:
+            k8s_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
+            celery_executor_mock.queue_task_instance.assert_not_called()
+        else:
+            celery_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
+            k8s_executor_mock.queue_task_instance.assert_not_called()
 
-            celery_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            k8s_executor_mock.queue_command.assert_not_called()
+    @parameterized.expand(
+        [
+            (True, True, True),
+            (False, True, True),
+            (True, False, True),
+            (False, False, False),
+        ]
+    )
+    def test_has_tasks(self, celery_has, k8s_has, cke_has):
+        celery_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
+        cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
 
-        when_using_k8s_executor()
-        when_using_celery_executor()
+        celery_executor_mock.has_task.return_value = celery_has
+        k8s_executor_mock.has_task.return_value = k8s_has
 
-    def test_queue_task_instance(self):
-        mark_success = False
-        pickle_id = None
-        ignore_all_deps = False
-        ignore_depends_on_past = False
-        ignore_task_deps = False
-        ignore_ti_state = False
-        pool = None
-        cfg_path = None
+        assert cke.has_task(None) == cke_has

Review comment:
       This is different from the original assertion (`assert cke.has_task(ti)`). May you clarify a bit ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org