You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/12/30 21:43:13 UTC

[GitHub] [airflow] dstandish commented on a change in pull request #13307: Simplify CeleryKubernetesExecutor tests

dstandish commented on a change in pull request #13307:
URL: https://github.com/apache/airflow/pull/13307#discussion_r550335032



##########
File path: tests/executors/test_celery_kubernetes_executor.py
##########
@@ -58,182 +64,104 @@ def test_start(self):
         celery_executor_mock.start.assert_called()
         k8s_executor_mock.start.assert_called()
 
-    def test_queue_command(self):
-        command = ['airflow', 'run', 'dag']
-        priority = 1
-        queue = 'default'
-
-        def when_using_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE
-
-            cke.queue_command(simple_task_instance, command, priority, queue)
-
-            k8s_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            celery_executor_mock.queue_command.assert_not_called()
-
-        def when_using_celery_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = 'non-kubernetes-queue'
+    @parameterized.expand(
+        [
+            ('other-queue',),
+            (KUBERNETES_QUEUE,),
+        ]
+    )
+    @mock.patch.object(CeleryExecutor, 'queue_command')
+    @mock.patch.object(KubernetesExecutor, 'queue_command')
+    def test_queue_command(self, test_queue, k8s_queue_cmd, celery_queue_cmd):
+        kwargs = dict(
+            command=['airflow', 'run', 'dag'],
+            priority=1,
+            queue='default',
+        )
+        kwarg_values = kwargs.values()
+        cke = CeleryKubernetesExecutor(CeleryExecutor(), KubernetesExecutor())
+
+        simple_task_instance = mock.MagicMock()
+        simple_task_instance.queue = test_queue
+
+        cke.queue_command(simple_task_instance, **kwargs)
+
+        if test_queue == KUBERNETES_QUEUE:
+            k8s_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            celery_queue_cmd.assert_not_called()
+        else:
+            celery_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            k8s_queue_cmd.assert_not_called()
+
+    @parameterized.expand(
+        [
+            ('non-kubernetes-queue',),
+            (KUBERNETES_QUEUE,),
+        ]
+    )
+    def test_queue_task_instance(self, test_queue):
+        celery_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
+        cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
 
-            cke.queue_command(simple_task_instance, command, priority, queue)
+        ti = mock.MagicMock()
+        ti.queue = test_queue
+
+        kwargs = dict(
+            task_instance=ti,
+            mark_success=False,
+            pickle_id=None,
+            ignore_all_deps=False,
+            ignore_depends_on_past=False,
+            ignore_task_deps=False,
+            ignore_ti_state=False,
+            pool=None,
+            cfg_path=None,
+        )
+        kwarg_values = kwargs.values()
+        cke.queue_task_instance(**kwargs)
+        if test_queue == KUBERNETES_QUEUE:
+            k8s_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
+            celery_executor_mock.queue_task_instance.assert_not_called()
+        else:
+            celery_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
+            k8s_executor_mock.queue_task_instance.assert_not_called()
 
-            celery_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            k8s_executor_mock.queue_command.assert_not_called()
+    @parameterized.expand(
+        [
+            (True, True, True),
+            (False, True, True),
+            (True, False, True),
+            (False, False, False),
+        ]
+    )
+    def test_has_tasks(self, celery_has, k8s_has, cke_has):
+        celery_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
+        cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
 
-        when_using_k8s_executor()
-        when_using_celery_executor()
+        celery_executor_mock.has_task.return_value = celery_has
+        k8s_executor_mock.has_task.return_value = k8s_has
 
-    def test_queue_task_instance(self):
-        mark_success = False
-        pickle_id = None
-        ignore_all_deps = False
-        ignore_depends_on_past = False
-        ignore_task_deps = False
-        ignore_ti_state = False
-        pool = None
-        cfg_path = None
+        assert cke.has_task(None) == cke_has

Review comment:
       so i tend to try to remove things that have no impact on the test
   i guess i was thinking that since we're mocking return value, the `ti` object is of no consequence -- the return value would be the same either way.  but i see now that it's worth verifying not only the return behavior and boolean logic, but also just check that the arg is forwarded properly.
   
   HOWEVER, on further inspection, I am noticing that neither CeleryExecutor nor KubernetesExecutor implements `has_task`, so celery.has_task is the same as kubernetes.has_task.  And what this means is that there's really no point in calling k8s.has_task if cel.has_task is false.
   
   this is another case where, if CeleryKubernetesExecutor simply inherited from BaseExecutor, it would avoid some code dupe and complexity.  do you know why it does not inherit from BaseExecutor?
   
   in any case, i'll update the tests to verify call args.

##########
File path: tests/executors/test_celery_kubernetes_executor.py
##########
@@ -58,182 +64,104 @@ def test_start(self):
         celery_executor_mock.start.assert_called()
         k8s_executor_mock.start.assert_called()
 
-    def test_queue_command(self):
-        command = ['airflow', 'run', 'dag']
-        priority = 1
-        queue = 'default'
-
-        def when_using_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE
-
-            cke.queue_command(simple_task_instance, command, priority, queue)
-
-            k8s_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            celery_executor_mock.queue_command.assert_not_called()
-
-        def when_using_celery_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = 'non-kubernetes-queue'
+    @parameterized.expand(
+        [
+            ('other-queue',),

Review comment:
       how about `'any-queue'` or `'any-other-queue'` -- because the behavior were testing is the difference in handling between _the_ kubernetes queue (there is only one) and any queue besides the kubernetes queue
   
   or `'arbitrarily-named-queue'` or `'could-be-anything'`:) 
   
   my only hesitation with `'non-kubernetes-queue'` is that there's some possibility it could be interpreted to mean that there is a particular designated queue, namely `'non-kubernetes-queue'` which routes to celery, and otherwise it goes to kubernetes queue
   
   my making the naming more generic, or more obviously arbitrary, i think it adds to readability.
   
   let me know what you think.  happy to defer to your judgment on this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org