You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/12/24 18:23:58 UTC

[GitHub] [airflow] dstandish opened a new pull request #13307: Simplify CeleryKubernetesExecutor tests

dstandish opened a new pull request #13307:
URL: https://github.com/apache/airflow/pull/13307


   Existing tests for CeleryKubernetesExecutor had some code duplication that made it somewhat more difficult to understand what was being tested.
   
   Here I use parameterization to reduce code, simplify, and make intention more clear.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #13307: Simplify CeleryKubernetesExecutor tests

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #13307:
URL: https://github.com/apache/airflow/pull/13307#issuecomment-751504871


   there are also 3 related PRs... related because they all came up as i was checking out CeleryKubernetesExecutor
   * https://github.com/apache/airflow/pull/13302
   * https://github.com/apache/airflow/pull/13247
   * https://github.com/apache/airflow/pull/13209
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG merged pull request #13307: Simplify CeleryKubernetesExecutor tests

Posted by GitBox <gi...@apache.org>.
XD-DENG merged pull request #13307:
URL: https://github.com/apache/airflow/pull/13307


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on pull request #13307: Simplify CeleryKubernetesExecutor tests

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on pull request #13307:
URL: https://github.com/apache/airflow/pull/13307#issuecomment-752905005


   Apparently the CI failure is due to un-related reason. Will merge this first.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #13307: Simplify CeleryKubernetesExecutor tests

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #13307:
URL: https://github.com/apache/airflow/pull/13307#discussion_r550349084



##########
File path: tests/executors/test_celery_kubernetes_executor.py
##########
@@ -58,182 +64,104 @@ def test_start(self):
         celery_executor_mock.start.assert_called()
         k8s_executor_mock.start.assert_called()
 
-    def test_queue_command(self):
-        command = ['airflow', 'run', 'dag']
-        priority = 1
-        queue = 'default'
-
-        def when_using_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE
-
-            cke.queue_command(simple_task_instance, command, priority, queue)
-
-            k8s_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            celery_executor_mock.queue_command.assert_not_called()
-
-        def when_using_celery_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = 'non-kubernetes-queue'
+    @parameterized.expand(
+        [
+            ('other-queue',),

Review comment:
       i went with `'any-other-queue'`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #13307: Simplify CeleryKubernetesExecutor tests

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #13307:
URL: https://github.com/apache/airflow/pull/13307#discussion_r550317450



##########
File path: tests/executors/test_celery_kubernetes_executor.py
##########
@@ -58,182 +64,104 @@ def test_start(self):
         celery_executor_mock.start.assert_called()
         k8s_executor_mock.start.assert_called()
 
-    def test_queue_command(self):
-        command = ['airflow', 'run', 'dag']
-        priority = 1
-        queue = 'default'
-
-        def when_using_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE
-
-            cke.queue_command(simple_task_instance, command, priority, queue)
-
-            k8s_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            celery_executor_mock.queue_command.assert_not_called()
-
-        def when_using_celery_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = 'non-kubernetes-queue'
+    @parameterized.expand(
+        [
+            ('other-queue',),

Review comment:
       nit: let's use the original queue name "'non-kubernetes-queue"? to be consistent with other related test cases, also being more explicit about what's tested here.

##########
File path: tests/executors/test_celery_kubernetes_executor.py
##########
@@ -58,182 +64,104 @@ def test_start(self):
         celery_executor_mock.start.assert_called()
         k8s_executor_mock.start.assert_called()
 
-    def test_queue_command(self):
-        command = ['airflow', 'run', 'dag']
-        priority = 1
-        queue = 'default'
-
-        def when_using_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE
-
-            cke.queue_command(simple_task_instance, command, priority, queue)
-
-            k8s_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            celery_executor_mock.queue_command.assert_not_called()
-
-        def when_using_celery_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = 'non-kubernetes-queue'
+    @parameterized.expand(
+        [
+            ('other-queue',),
+            (KUBERNETES_QUEUE,),
+        ]
+    )
+    @mock.patch.object(CeleryExecutor, 'queue_command')
+    @mock.patch.object(KubernetesExecutor, 'queue_command')
+    def test_queue_command(self, test_queue, k8s_queue_cmd, celery_queue_cmd):
+        kwargs = dict(
+            command=['airflow', 'run', 'dag'],
+            priority=1,
+            queue='default',
+        )
+        kwarg_values = kwargs.values()
+        cke = CeleryKubernetesExecutor(CeleryExecutor(), KubernetesExecutor())
+
+        simple_task_instance = mock.MagicMock()
+        simple_task_instance.queue = test_queue
+
+        cke.queue_command(simple_task_instance, **kwargs)
+
+        if test_queue == KUBERNETES_QUEUE:
+            k8s_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            celery_queue_cmd.assert_not_called()
+        else:
+            celery_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            k8s_queue_cmd.assert_not_called()
+
+    @parameterized.expand(
+        [
+            ('non-kubernetes-queue',),
+            (KUBERNETES_QUEUE,),
+        ]
+    )
+    def test_queue_task_instance(self, test_queue):
+        celery_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
+        cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
 
-            cke.queue_command(simple_task_instance, command, priority, queue)
+        ti = mock.MagicMock()
+        ti.queue = test_queue
+
+        kwargs = dict(
+            task_instance=ti,
+            mark_success=False,
+            pickle_id=None,
+            ignore_all_deps=False,
+            ignore_depends_on_past=False,
+            ignore_task_deps=False,
+            ignore_ti_state=False,
+            pool=None,
+            cfg_path=None,
+        )
+        kwarg_values = kwargs.values()
+        cke.queue_task_instance(**kwargs)
+        if test_queue == KUBERNETES_QUEUE:
+            k8s_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
+            celery_executor_mock.queue_task_instance.assert_not_called()
+        else:
+            celery_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
+            k8s_executor_mock.queue_task_instance.assert_not_called()
 
-            celery_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            k8s_executor_mock.queue_command.assert_not_called()
+    @parameterized.expand(
+        [
+            (True, True, True),
+            (False, True, True),
+            (True, False, True),
+            (False, False, False),
+        ]
+    )
+    def test_has_tasks(self, celery_has, k8s_has, cke_has):
+        celery_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
+        cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
 
-        when_using_k8s_executor()
-        when_using_celery_executor()
+        celery_executor_mock.has_task.return_value = celery_has
+        k8s_executor_mock.has_task.return_value = k8s_has
 
-    def test_queue_task_instance(self):
-        mark_success = False
-        pickle_id = None
-        ignore_all_deps = False
-        ignore_depends_on_past = False
-        ignore_task_deps = False
-        ignore_ti_state = False
-        pool = None
-        cfg_path = None
+        assert cke.has_task(None) == cke_has

Review comment:
       This is different from the original assertion (`assert cke.has_task(ti)`). May you clarify a bit ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #13307: Simplify CeleryKubernetesExecutor tests

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #13307:
URL: https://github.com/apache/airflow/pull/13307#discussion_r550335032



##########
File path: tests/executors/test_celery_kubernetes_executor.py
##########
@@ -58,182 +64,104 @@ def test_start(self):
         celery_executor_mock.start.assert_called()
         k8s_executor_mock.start.assert_called()
 
-    def test_queue_command(self):
-        command = ['airflow', 'run', 'dag']
-        priority = 1
-        queue = 'default'
-
-        def when_using_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE
-
-            cke.queue_command(simple_task_instance, command, priority, queue)
-
-            k8s_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            celery_executor_mock.queue_command.assert_not_called()
-
-        def when_using_celery_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = 'non-kubernetes-queue'
+    @parameterized.expand(
+        [
+            ('other-queue',),
+            (KUBERNETES_QUEUE,),
+        ]
+    )
+    @mock.patch.object(CeleryExecutor, 'queue_command')
+    @mock.patch.object(KubernetesExecutor, 'queue_command')
+    def test_queue_command(self, test_queue, k8s_queue_cmd, celery_queue_cmd):
+        kwargs = dict(
+            command=['airflow', 'run', 'dag'],
+            priority=1,
+            queue='default',
+        )
+        kwarg_values = kwargs.values()
+        cke = CeleryKubernetesExecutor(CeleryExecutor(), KubernetesExecutor())
+
+        simple_task_instance = mock.MagicMock()
+        simple_task_instance.queue = test_queue
+
+        cke.queue_command(simple_task_instance, **kwargs)
+
+        if test_queue == KUBERNETES_QUEUE:
+            k8s_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            celery_queue_cmd.assert_not_called()
+        else:
+            celery_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            k8s_queue_cmd.assert_not_called()
+
+    @parameterized.expand(
+        [
+            ('non-kubernetes-queue',),
+            (KUBERNETES_QUEUE,),
+        ]
+    )
+    def test_queue_task_instance(self, test_queue):
+        celery_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
+        cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
 
-            cke.queue_command(simple_task_instance, command, priority, queue)
+        ti = mock.MagicMock()
+        ti.queue = test_queue
+
+        kwargs = dict(
+            task_instance=ti,
+            mark_success=False,
+            pickle_id=None,
+            ignore_all_deps=False,
+            ignore_depends_on_past=False,
+            ignore_task_deps=False,
+            ignore_ti_state=False,
+            pool=None,
+            cfg_path=None,
+        )
+        kwarg_values = kwargs.values()
+        cke.queue_task_instance(**kwargs)
+        if test_queue == KUBERNETES_QUEUE:
+            k8s_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
+            celery_executor_mock.queue_task_instance.assert_not_called()
+        else:
+            celery_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
+            k8s_executor_mock.queue_task_instance.assert_not_called()
 
-            celery_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            k8s_executor_mock.queue_command.assert_not_called()
+    @parameterized.expand(
+        [
+            (True, True, True),
+            (False, True, True),
+            (True, False, True),
+            (False, False, False),
+        ]
+    )
+    def test_has_tasks(self, celery_has, k8s_has, cke_has):
+        celery_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
+        cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
 
-        when_using_k8s_executor()
-        when_using_celery_executor()
+        celery_executor_mock.has_task.return_value = celery_has
+        k8s_executor_mock.has_task.return_value = k8s_has
 
-    def test_queue_task_instance(self):
-        mark_success = False
-        pickle_id = None
-        ignore_all_deps = False
-        ignore_depends_on_past = False
-        ignore_task_deps = False
-        ignore_ti_state = False
-        pool = None
-        cfg_path = None
+        assert cke.has_task(None) == cke_has

Review comment:
       i will update




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #13307: Simplify CeleryKubernetesExecutor tests

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #13307:
URL: https://github.com/apache/airflow/pull/13307#discussion_r550335032



##########
File path: tests/executors/test_celery_kubernetes_executor.py
##########
@@ -58,182 +64,104 @@ def test_start(self):
         celery_executor_mock.start.assert_called()
         k8s_executor_mock.start.assert_called()
 
-    def test_queue_command(self):
-        command = ['airflow', 'run', 'dag']
-        priority = 1
-        queue = 'default'
-
-        def when_using_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE
-
-            cke.queue_command(simple_task_instance, command, priority, queue)
-
-            k8s_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            celery_executor_mock.queue_command.assert_not_called()
-
-        def when_using_celery_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = 'non-kubernetes-queue'
+    @parameterized.expand(
+        [
+            ('other-queue',),
+            (KUBERNETES_QUEUE,),
+        ]
+    )
+    @mock.patch.object(CeleryExecutor, 'queue_command')
+    @mock.patch.object(KubernetesExecutor, 'queue_command')
+    def test_queue_command(self, test_queue, k8s_queue_cmd, celery_queue_cmd):
+        kwargs = dict(
+            command=['airflow', 'run', 'dag'],
+            priority=1,
+            queue='default',
+        )
+        kwarg_values = kwargs.values()
+        cke = CeleryKubernetesExecutor(CeleryExecutor(), KubernetesExecutor())
+
+        simple_task_instance = mock.MagicMock()
+        simple_task_instance.queue = test_queue
+
+        cke.queue_command(simple_task_instance, **kwargs)
+
+        if test_queue == KUBERNETES_QUEUE:
+            k8s_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            celery_queue_cmd.assert_not_called()
+        else:
+            celery_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            k8s_queue_cmd.assert_not_called()
+
+    @parameterized.expand(
+        [
+            ('non-kubernetes-queue',),
+            (KUBERNETES_QUEUE,),
+        ]
+    )
+    def test_queue_task_instance(self, test_queue):
+        celery_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
+        cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
 
-            cke.queue_command(simple_task_instance, command, priority, queue)
+        ti = mock.MagicMock()
+        ti.queue = test_queue
+
+        kwargs = dict(
+            task_instance=ti,
+            mark_success=False,
+            pickle_id=None,
+            ignore_all_deps=False,
+            ignore_depends_on_past=False,
+            ignore_task_deps=False,
+            ignore_ti_state=False,
+            pool=None,
+            cfg_path=None,
+        )
+        kwarg_values = kwargs.values()
+        cke.queue_task_instance(**kwargs)
+        if test_queue == KUBERNETES_QUEUE:
+            k8s_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
+            celery_executor_mock.queue_task_instance.assert_not_called()
+        else:
+            celery_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
+            k8s_executor_mock.queue_task_instance.assert_not_called()
 
-            celery_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            k8s_executor_mock.queue_command.assert_not_called()
+    @parameterized.expand(
+        [
+            (True, True, True),
+            (False, True, True),
+            (True, False, True),
+            (False, False, False),
+        ]
+    )
+    def test_has_tasks(self, celery_has, k8s_has, cke_has):
+        celery_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
+        cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
 
-        when_using_k8s_executor()
-        when_using_celery_executor()
+        celery_executor_mock.has_task.return_value = celery_has
+        k8s_executor_mock.has_task.return_value = k8s_has
 
-    def test_queue_task_instance(self):
-        mark_success = False
-        pickle_id = None
-        ignore_all_deps = False
-        ignore_depends_on_past = False
-        ignore_task_deps = False
-        ignore_ti_state = False
-        pool = None
-        cfg_path = None
+        assert cke.has_task(None) == cke_has

Review comment:
       so i tend to try to remove things that have no impact on the test
   i guess i was thinking that since we're mocking return value, the `ti` object is of no consequence -- the return value would be the same either way.  but i see now that it's worth verifying not only the return behavior and boolean logic, but also just check that the arg is forwarded properly.
   
   HOWEVER, on further inspection, I am noticing that neither CeleryExecutor nor KubernetesExecutor implements `has_task`, so celery.has_task is the same as kubernetes.has_task.  And what this means is that there's really no point in calling k8s.has_task if cel.has_task is false.
   
   this is another case where, if CeleryKubernetesExecutor simply inherited from BaseExecutor, it would avoid some code dupe and complexity.  do you know why it does not inherit from BaseExecutor?
   
   in any case, i'll update the tests to verify call args.

##########
File path: tests/executors/test_celery_kubernetes_executor.py
##########
@@ -58,182 +64,104 @@ def test_start(self):
         celery_executor_mock.start.assert_called()
         k8s_executor_mock.start.assert_called()
 
-    def test_queue_command(self):
-        command = ['airflow', 'run', 'dag']
-        priority = 1
-        queue = 'default'
-
-        def when_using_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE
-
-            cke.queue_command(simple_task_instance, command, priority, queue)
-
-            k8s_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            celery_executor_mock.queue_command.assert_not_called()
-
-        def when_using_celery_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = 'non-kubernetes-queue'
+    @parameterized.expand(
+        [
+            ('other-queue',),

Review comment:
       how about `'any-queue'` or `'any-other-queue'` -- because the behavior were testing is the difference in handling between _the_ kubernetes queue (there is only one) and any queue besides the kubernetes queue
   
   or `'arbitrarily-named-queue'` or `'could-be-anything'`:) 
   
   my only hesitation with `'non-kubernetes-queue'` is that there's some possibility it could be interpreted to mean that there is a particular designated queue, namely `'non-kubernetes-queue'` which routes to celery, and otherwise it goes to kubernetes queue
   
   my making the naming more generic, or more obviously arbitrary, i think it adds to readability.
   
   let me know what you think.  happy to defer to your judgment on this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on pull request #13307: Simplify CeleryKubernetesExecutor tests

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on pull request #13307:
URL: https://github.com/apache/airflow/pull/13307#issuecomment-751455005


   > Also related @XD-DENG: can you please kindly take a look at #13323. this should enormously help in analysing logs of failed kubernetes jobs in CI. I added groups/sections to logs and they should load many times faster and by default hide irrelevant details (but they are easy to unfold :)).
   
   Sure. Will be a bit slow checking it if you don't mind ;-) But should be getting back to you within today.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #13307: Simplify CeleryKubernetesExecutor tests

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #13307:
URL: https://github.com/apache/airflow/pull/13307#discussion_r550336776



##########
File path: tests/executors/test_celery_kubernetes_executor.py
##########
@@ -58,182 +64,104 @@ def test_start(self):
         celery_executor_mock.start.assert_called()
         k8s_executor_mock.start.assert_called()
 
-    def test_queue_command(self):
-        command = ['airflow', 'run', 'dag']
-        priority = 1
-        queue = 'default'
-
-        def when_using_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE
-
-            cke.queue_command(simple_task_instance, command, priority, queue)
-
-            k8s_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            celery_executor_mock.queue_command.assert_not_called()
-
-        def when_using_celery_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = 'non-kubernetes-queue'
+    @parameterized.expand(
+        [
+            ('other-queue',),
+            (KUBERNETES_QUEUE,),
+        ]
+    )
+    @mock.patch.object(CeleryExecutor, 'queue_command')
+    @mock.patch.object(KubernetesExecutor, 'queue_command')
+    def test_queue_command(self, test_queue, k8s_queue_cmd, celery_queue_cmd):
+        kwargs = dict(
+            command=['airflow', 'run', 'dag'],
+            priority=1,
+            queue='default',
+        )
+        kwarg_values = kwargs.values()
+        cke = CeleryKubernetesExecutor(CeleryExecutor(), KubernetesExecutor())
+
+        simple_task_instance = mock.MagicMock()
+        simple_task_instance.queue = test_queue
+
+        cke.queue_command(simple_task_instance, **kwargs)
+
+        if test_queue == KUBERNETES_QUEUE:
+            k8s_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            celery_queue_cmd.assert_not_called()
+        else:
+            celery_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            k8s_queue_cmd.assert_not_called()
+
+    @parameterized.expand(
+        [
+            ('non-kubernetes-queue',),
+            (KUBERNETES_QUEUE,),
+        ]
+    )
+    def test_queue_task_instance(self, test_queue):
+        celery_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
+        cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
 
-            cke.queue_command(simple_task_instance, command, priority, queue)
+        ti = mock.MagicMock()
+        ti.queue = test_queue
+
+        kwargs = dict(
+            task_instance=ti,
+            mark_success=False,
+            pickle_id=None,
+            ignore_all_deps=False,
+            ignore_depends_on_past=False,
+            ignore_task_deps=False,
+            ignore_ti_state=False,
+            pool=None,
+            cfg_path=None,
+        )
+        kwarg_values = kwargs.values()
+        cke.queue_task_instance(**kwargs)
+        if test_queue == KUBERNETES_QUEUE:
+            k8s_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
+            celery_executor_mock.queue_task_instance.assert_not_called()
+        else:
+            celery_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
+            k8s_executor_mock.queue_task_instance.assert_not_called()
 
-            celery_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            k8s_executor_mock.queue_command.assert_not_called()
+    @parameterized.expand(
+        [
+            (True, True, True),
+            (False, True, True),
+            (True, False, True),
+            (False, False, False),
+        ]
+    )
+    def test_has_tasks(self, celery_has, k8s_has, cke_has):
+        celery_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
+        cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
 
-        when_using_k8s_executor()
-        when_using_celery_executor()
+        celery_executor_mock.has_task.return_value = celery_has
+        k8s_executor_mock.has_task.return_value = k8s_has
 
-    def test_queue_task_instance(self):
-        mark_success = False
-        pickle_id = None
-        ignore_all_deps = False
-        ignore_depends_on_past = False
-        ignore_task_deps = False
-        ignore_ti_state = False
-        pool = None
-        cfg_path = None
+        assert cke.has_task(None) == cke_has

Review comment:
       ok i see....  there are instance attributes that could differ between the two executors, even though the method is taken from BaseExecutor ----- so it can make sense to check both....  but i still don't understand why this doesn't inherit from BaseExecutor... shouldn't all executors do thisL?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on pull request #13307: Simplify CeleryKubernetesExecutor tests

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on pull request #13307:
URL: https://github.com/apache/airflow/pull/13307#issuecomment-751453435


   Hi @dstandish mind checking the failures in K8S tests (https://github.com/apache/airflow/runs/1607640815) before we can further review?
   
   It should be related to the change introduced in https://github.com/apache/airflow/pull/13289, and @potiuk has fixed it in https://github.com/apache/airflow/pull/13316 . So simply rebasing to the latest master should help. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #13307: Simplify CeleryKubernetesExecutor tests

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #13307:
URL: https://github.com/apache/airflow/pull/13307#issuecomment-751496699


   thanks @XD-DENG, done


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #13307: Simplify CeleryKubernetesExecutor tests

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #13307:
URL: https://github.com/apache/airflow/pull/13307#discussion_r550356005



##########
File path: tests/executors/test_celery_kubernetes_executor.py
##########
@@ -58,182 +64,107 @@ def test_start(self):
         celery_executor_mock.start.assert_called()
         k8s_executor_mock.start.assert_called()
 
-    def test_queue_command(self):
-        command = ['airflow', 'run', 'dag']
-        priority = 1
-        queue = 'default'
-
-        def when_using_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE
-
-            cke.queue_command(simple_task_instance, command, priority, queue)
-
-            k8s_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            celery_executor_mock.queue_command.assert_not_called()
-
-        def when_using_celery_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = 'non-kubernetes-queue'
-
-            cke.queue_command(simple_task_instance, command, priority, queue)
-
-            celery_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            k8s_executor_mock.queue_command.assert_not_called()
-
-        when_using_k8s_executor()
-        when_using_celery_executor()
-
-    def test_queue_task_instance(self):
-        mark_success = False
-        pickle_id = None
-        ignore_all_deps = False
-        ignore_depends_on_past = False
-        ignore_task_deps = False
-        ignore_ti_state = False
-        pool = None
-        cfg_path = None
-
-        def when_using_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
+    @parameterized.expand(
+        [
+            ('any-other-queue',),
+            (KUBERNETES_QUEUE,),
+        ]
+    )
+    @mock.patch.object(CeleryExecutor, 'queue_command')
+    @mock.patch.object(KubernetesExecutor, 'queue_command')
+    def test_queue_command(self, test_queue, k8s_queue_cmd, celery_queue_cmd):
+        kwargs = dict(
+            command=['airflow', 'run', 'dag'],
+            priority=1,
+            queue='default',
+        )
+        kwarg_values = kwargs.values()
+        cke = CeleryKubernetesExecutor(CeleryExecutor(), KubernetesExecutor())
+
+        simple_task_instance = mock.MagicMock()
+        simple_task_instance.queue = test_queue
+
+        cke.queue_command(simple_task_instance, **kwargs)
+
+        if test_queue == KUBERNETES_QUEUE:
+            k8s_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            celery_queue_cmd.assert_not_called()
+        else:
+            celery_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            k8s_queue_cmd.assert_not_called()
+
+    @parameterized.expand(
+        [
+            ('non-kubernetes-queue',),

Review comment:
       Cool. 
   
   I was about to request for a rebase to the latest master, but I see you have already done that 👍
   
   Overall in good shape now. It's a great PR which does make the tests much cleaner. I will do a final check and get back to you tomorrow. Cheers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #13307: Simplify CeleryKubernetesExecutor tests

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #13307:
URL: https://github.com/apache/airflow/pull/13307#discussion_r550353772



##########
File path: tests/executors/test_celery_kubernetes_executor.py
##########
@@ -58,182 +64,107 @@ def test_start(self):
         celery_executor_mock.start.assert_called()
         k8s_executor_mock.start.assert_called()
 
-    def test_queue_command(self):
-        command = ['airflow', 'run', 'dag']
-        priority = 1
-        queue = 'default'
-
-        def when_using_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE
-
-            cke.queue_command(simple_task_instance, command, priority, queue)
-
-            k8s_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            celery_executor_mock.queue_command.assert_not_called()
-
-        def when_using_celery_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = 'non-kubernetes-queue'
-
-            cke.queue_command(simple_task_instance, command, priority, queue)
-
-            celery_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            k8s_executor_mock.queue_command.assert_not_called()
-
-        when_using_k8s_executor()
-        when_using_celery_executor()
-
-    def test_queue_task_instance(self):
-        mark_success = False
-        pickle_id = None
-        ignore_all_deps = False
-        ignore_depends_on_past = False
-        ignore_task_deps = False
-        ignore_ti_state = False
-        pool = None
-        cfg_path = None
-
-        def when_using_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
+    @parameterized.expand(
+        [
+            ('any-other-queue',),
+            (KUBERNETES_QUEUE,),
+        ]
+    )
+    @mock.patch.object(CeleryExecutor, 'queue_command')
+    @mock.patch.object(KubernetesExecutor, 'queue_command')
+    def test_queue_command(self, test_queue, k8s_queue_cmd, celery_queue_cmd):
+        kwargs = dict(
+            command=['airflow', 'run', 'dag'],
+            priority=1,
+            queue='default',
+        )
+        kwarg_values = kwargs.values()
+        cke = CeleryKubernetesExecutor(CeleryExecutor(), KubernetesExecutor())
+
+        simple_task_instance = mock.MagicMock()
+        simple_task_instance.queue = test_queue
+
+        cke.queue_command(simple_task_instance, **kwargs)
+
+        if test_queue == KUBERNETES_QUEUE:
+            k8s_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            celery_queue_cmd.assert_not_called()
+        else:
+            celery_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            k8s_queue_cmd.assert_not_called()
+
+    @parameterized.expand(
+        [
+            ('non-kubernetes-queue',),

Review comment:
       @dstandish I agree with your proposal. Then I assume this line should be updated as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #13307: Simplify CeleryKubernetesExecutor tests

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #13307:
URL: https://github.com/apache/airflow/pull/13307#discussion_r550354747



##########
File path: tests/executors/test_celery_kubernetes_executor.py
##########
@@ -58,182 +64,107 @@ def test_start(self):
         celery_executor_mock.start.assert_called()
         k8s_executor_mock.start.assert_called()
 
-    def test_queue_command(self):
-        command = ['airflow', 'run', 'dag']
-        priority = 1
-        queue = 'default'
-
-        def when_using_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE
-
-            cke.queue_command(simple_task_instance, command, priority, queue)
-
-            k8s_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            celery_executor_mock.queue_command.assert_not_called()
-
-        def when_using_celery_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = 'non-kubernetes-queue'
-
-            cke.queue_command(simple_task_instance, command, priority, queue)
-
-            celery_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            k8s_executor_mock.queue_command.assert_not_called()
-
-        when_using_k8s_executor()
-        when_using_celery_executor()
-
-    def test_queue_task_instance(self):
-        mark_success = False
-        pickle_id = None
-        ignore_all_deps = False
-        ignore_depends_on_past = False
-        ignore_task_deps = False
-        ignore_ti_state = False
-        pool = None
-        cfg_path = None
-
-        def when_using_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
+    @parameterized.expand(
+        [
+            ('any-other-queue',),
+            (KUBERNETES_QUEUE,),
+        ]
+    )
+    @mock.patch.object(CeleryExecutor, 'queue_command')
+    @mock.patch.object(KubernetesExecutor, 'queue_command')
+    def test_queue_command(self, test_queue, k8s_queue_cmd, celery_queue_cmd):
+        kwargs = dict(
+            command=['airflow', 'run', 'dag'],
+            priority=1,
+            queue='default',
+        )
+        kwarg_values = kwargs.values()
+        cke = CeleryKubernetesExecutor(CeleryExecutor(), KubernetesExecutor())
+
+        simple_task_instance = mock.MagicMock()
+        simple_task_instance.queue = test_queue
+
+        cke.queue_command(simple_task_instance, **kwargs)
+
+        if test_queue == KUBERNETES_QUEUE:
+            k8s_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            celery_queue_cmd.assert_not_called()
+        else:
+            celery_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            k8s_queue_cmd.assert_not_called()
+
+    @parameterized.expand(
+        [
+            ('non-kubernetes-queue',),

Review comment:
       yup. didn't see it. done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #13307: Simplify CeleryKubernetesExecutor tests

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #13307:
URL: https://github.com/apache/airflow/pull/13307#discussion_r550336776



##########
File path: tests/executors/test_celery_kubernetes_executor.py
##########
@@ -58,182 +64,104 @@ def test_start(self):
         celery_executor_mock.start.assert_called()
         k8s_executor_mock.start.assert_called()
 
-    def test_queue_command(self):
-        command = ['airflow', 'run', 'dag']
-        priority = 1
-        queue = 'default'
-
-        def when_using_k8s_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE
-
-            cke.queue_command(simple_task_instance, command, priority, queue)
-
-            k8s_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            celery_executor_mock.queue_command.assert_not_called()
-
-        def when_using_celery_executor():
-            celery_executor_mock = mock.MagicMock()
-            k8s_executor_mock = mock.MagicMock()
-            cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
-            simple_task_instance = mock.MagicMock()
-            simple_task_instance.queue = 'non-kubernetes-queue'
+    @parameterized.expand(
+        [
+            ('other-queue',),
+            (KUBERNETES_QUEUE,),
+        ]
+    )
+    @mock.patch.object(CeleryExecutor, 'queue_command')
+    @mock.patch.object(KubernetesExecutor, 'queue_command')
+    def test_queue_command(self, test_queue, k8s_queue_cmd, celery_queue_cmd):
+        kwargs = dict(
+            command=['airflow', 'run', 'dag'],
+            priority=1,
+            queue='default',
+        )
+        kwarg_values = kwargs.values()
+        cke = CeleryKubernetesExecutor(CeleryExecutor(), KubernetesExecutor())
+
+        simple_task_instance = mock.MagicMock()
+        simple_task_instance.queue = test_queue
+
+        cke.queue_command(simple_task_instance, **kwargs)
+
+        if test_queue == KUBERNETES_QUEUE:
+            k8s_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            celery_queue_cmd.assert_not_called()
+        else:
+            celery_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+            k8s_queue_cmd.assert_not_called()
+
+    @parameterized.expand(
+        [
+            ('non-kubernetes-queue',),
+            (KUBERNETES_QUEUE,),
+        ]
+    )
+    def test_queue_task_instance(self, test_queue):
+        celery_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
+        cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
 
-            cke.queue_command(simple_task_instance, command, priority, queue)
+        ti = mock.MagicMock()
+        ti.queue = test_queue
+
+        kwargs = dict(
+            task_instance=ti,
+            mark_success=False,
+            pickle_id=None,
+            ignore_all_deps=False,
+            ignore_depends_on_past=False,
+            ignore_task_deps=False,
+            ignore_ti_state=False,
+            pool=None,
+            cfg_path=None,
+        )
+        kwarg_values = kwargs.values()
+        cke.queue_task_instance(**kwargs)
+        if test_queue == KUBERNETES_QUEUE:
+            k8s_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
+            celery_executor_mock.queue_task_instance.assert_not_called()
+        else:
+            celery_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
+            k8s_executor_mock.queue_task_instance.assert_not_called()
 
-            celery_executor_mock.queue_command.assert_called_once_with(
-                simple_task_instance, command, priority, queue
-            )
-            k8s_executor_mock.queue_command.assert_not_called()
+    @parameterized.expand(
+        [
+            (True, True, True),
+            (False, True, True),
+            (True, False, True),
+            (False, False, False),
+        ]
+    )
+    def test_has_tasks(self, celery_has, k8s_has, cke_has):
+        celery_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
+        cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
 
-        when_using_k8s_executor()
-        when_using_celery_executor()
+        celery_executor_mock.has_task.return_value = celery_has
+        k8s_executor_mock.has_task.return_value = k8s_has
 
-    def test_queue_task_instance(self):
-        mark_success = False
-        pickle_id = None
-        ignore_all_deps = False
-        ignore_depends_on_past = False
-        ignore_task_deps = False
-        ignore_ti_state = False
-        pool = None
-        cfg_path = None
+        assert cke.has_task(None) == cke_has

Review comment:
       ok i see....  there are instance attributes that could differ between the two executors, even though the method is taken from BaseExecutor ----- so it can make sense to check both....  but i still don't understand why this doesn't inherit from BaseExecutor... shouldn't all executors do this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #13307: Simplify CeleryKubernetesExecutor tests

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #13307:
URL: https://github.com/apache/airflow/pull/13307#issuecomment-751454203


   Also related @XD-DENG:  can you please kindly take a look at #13323. this should enormously help in analysing logs of failed kubernetes jobs in CI. I added groups/sections to logs and they should load many times faster and by default hide irrelevant details (but they are easy to unfold :)).  


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org