You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by xd...@apache.org on 2020/12/31 09:43:44 UTC
[airflow] branch master updated: Simplify CeleryKubernetesExecutor
tests (#13307)
This is an automated email from the ASF dual-hosted git repository.
xddeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 10be375 Simplify CeleryKubernetesExecutor tests (#13307)
10be375 is described below
commit 10be37513c420515251996602ca9362dfeeb095f
Author: dstandish <ds...@users.noreply.github.com>
AuthorDate: Thu Dec 31 01:43:19 2020 -0800
Simplify CeleryKubernetesExecutor tests (#13307)
* Simplify CeleryKubernetesExecutor tests
Co-authored-by: Daniel Standish <ds...@techstyle.com>
---
tests/executors/test_celery_kubernetes_executor.py | 265 ++++++++-------------
1 file changed, 98 insertions(+), 167 deletions(-)
diff --git a/tests/executors/test_celery_kubernetes_executor.py b/tests/executors/test_celery_kubernetes_executor.py
index cc8a958..13dd0e9 100644
--- a/tests/executors/test_celery_kubernetes_executor.py
+++ b/tests/executors/test_celery_kubernetes_executor.py
@@ -17,7 +17,13 @@
# under the License.
from unittest import mock
+from parameterized import parameterized
+
+from airflow.executors.celery_executor import CeleryExecutor
from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
+from airflow.executors.kubernetes_executor import KubernetesExecutor
+
+KUBERNETES_QUEUE = CeleryKubernetesExecutor.KUBERNETES_QUEUE
class TestCeleryKubernetesExecutor:
@@ -58,182 +64,107 @@ class TestCeleryKubernetesExecutor:
celery_executor_mock.start.assert_called()
k8s_executor_mock.start.assert_called()
- def test_queue_command(self):
- command = ['airflow', 'run', 'dag']
- priority = 1
- queue = 'default'
-
- def when_using_k8s_executor():
- celery_executor_mock = mock.MagicMock()
- k8s_executor_mock = mock.MagicMock()
- cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
- simple_task_instance = mock.MagicMock()
- simple_task_instance.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE
-
- cke.queue_command(simple_task_instance, command, priority, queue)
-
- k8s_executor_mock.queue_command.assert_called_once_with(
- simple_task_instance, command, priority, queue
- )
- celery_executor_mock.queue_command.assert_not_called()
-
- def when_using_celery_executor():
- celery_executor_mock = mock.MagicMock()
- k8s_executor_mock = mock.MagicMock()
- cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
- simple_task_instance = mock.MagicMock()
- simple_task_instance.queue = 'non-kubernetes-queue'
-
- cke.queue_command(simple_task_instance, command, priority, queue)
-
- celery_executor_mock.queue_command.assert_called_once_with(
- simple_task_instance, command, priority, queue
- )
- k8s_executor_mock.queue_command.assert_not_called()
-
- when_using_k8s_executor()
- when_using_celery_executor()
-
- def test_queue_task_instance(self):
- mark_success = False
- pickle_id = None
- ignore_all_deps = False
- ignore_depends_on_past = False
- ignore_task_deps = False
- ignore_ti_state = False
- pool = None
- cfg_path = None
-
- def when_using_k8s_executor():
- celery_executor_mock = mock.MagicMock()
- k8s_executor_mock = mock.MagicMock()
- cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
+ @parameterized.expand(
+ [
+ ('any-other-queue',),
+ (KUBERNETES_QUEUE,),
+ ]
+ )
+ @mock.patch.object(CeleryExecutor, 'queue_command')
+ @mock.patch.object(KubernetesExecutor, 'queue_command')
+ def test_queue_command(self, test_queue, k8s_queue_cmd, celery_queue_cmd):
+ kwargs = dict(
+ command=['airflow', 'run', 'dag'],
+ priority=1,
+ queue='default',
+ )
+ kwarg_values = kwargs.values()
+ cke = CeleryKubernetesExecutor(CeleryExecutor(), KubernetesExecutor())
+
+ simple_task_instance = mock.MagicMock()
+ simple_task_instance.queue = test_queue
+
+ cke.queue_command(simple_task_instance, **kwargs)
+
+ if test_queue == KUBERNETES_QUEUE:
+ k8s_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+ celery_queue_cmd.assert_not_called()
+ else:
+ celery_queue_cmd.assert_called_once_with(simple_task_instance, *kwarg_values)
+ k8s_queue_cmd.assert_not_called()
+
+ @parameterized.expand(
+ [
+ ('any-other-queue',),
+ (KUBERNETES_QUEUE,),
+ ]
+ )
+ def test_queue_task_instance(self, test_queue):
+ celery_executor_mock = mock.MagicMock()
+ k8s_executor_mock = mock.MagicMock()
+ cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
- ti = mock.MagicMock()
- ti.queue = CeleryKubernetesExecutor.KUBERNETES_QUEUE
-
- cke.queue_task_instance(
- ti,
- mark_success,
- pickle_id,
- ignore_all_deps,
- ignore_depends_on_past,
- ignore_task_deps,
- ignore_ti_state,
- pool,
- cfg_path,
- )
-
- k8s_executor_mock.queue_task_instance.assert_called_once_with(
- ti,
- mark_success,
- pickle_id,
- ignore_all_deps,
- ignore_depends_on_past,
- ignore_task_deps,
- ignore_ti_state,
- pool,
- cfg_path,
- )
+ ti = mock.MagicMock()
+ ti.queue = test_queue
+
+ kwargs = dict(
+ task_instance=ti,
+ mark_success=False,
+ pickle_id=None,
+ ignore_all_deps=False,
+ ignore_depends_on_past=False,
+ ignore_task_deps=False,
+ ignore_ti_state=False,
+ pool=None,
+ cfg_path=None,
+ )
+ kwarg_values = kwargs.values()
+ cke.queue_task_instance(**kwargs)
+ if test_queue == KUBERNETES_QUEUE:
+ k8s_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
celery_executor_mock.queue_task_instance.assert_not_called()
-
- def when_using_celery_executor():
- celery_executor_mock = mock.MagicMock()
- k8s_executor_mock = mock.MagicMock()
- cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
- ti = mock.MagicMock()
- ti.queue = 'non-kubernetes-queue'
-
- cke.queue_task_instance(
- ti,
- mark_success,
- pickle_id,
- ignore_all_deps,
- ignore_depends_on_past,
- ignore_task_deps,
- ignore_ti_state,
- pool,
- cfg_path,
- )
-
+ else:
+ celery_executor_mock.queue_task_instance.assert_called_once_with(*kwarg_values)
k8s_executor_mock.queue_task_instance.assert_not_called()
- celery_executor_mock.queue_task_instance.assert_called_once_with(
- ti,
- mark_success,
- pickle_id,
- ignore_all_deps,
- ignore_depends_on_past,
- ignore_task_deps,
- ignore_ti_state,
- pool,
- cfg_path,
- )
-
- when_using_k8s_executor()
- when_using_celery_executor()
-
- def test_has_tasks(self):
- ti = mock.MagicMock
-
- def when_ti_in_k8s_executor():
- celery_executor_mock = mock.MagicMock()
- k8s_executor_mock = mock.MagicMock()
- cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
- celery_executor_mock.has_task.return_value = False
- k8s_executor_mock.has_task.return_value = True
-
- assert cke.has_task(ti)
- celery_executor_mock.has_task.assert_called_once_with(ti)
- k8s_executor_mock.has_task.assert_called_once_with(ti)
-
- def when_ti_in_celery_executor():
- celery_executor_mock = mock.MagicMock()
- k8s_executor_mock = mock.MagicMock()
- cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
- celery_executor_mock.has_task.return_value = True
-
- assert cke.has_task(ti)
- celery_executor_mock.has_task.assert_called_once_with(ti)
-
- when_ti_in_k8s_executor()
- when_ti_in_celery_executor()
-
- def test_adopt_tasks(self):
- ti = mock.MagicMock
- def when_ti_in_k8s_executor():
- celery_executor_mock = mock.MagicMock()
- k8s_executor_mock = mock.MagicMock()
- ti.queue = "kubernetes"
- cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
-
- celery_executor_mock.try_adopt_task_instances.return_value = []
- k8s_executor_mock.try_adopt_task_instances.return_value = []
+ @parameterized.expand(
+ [
+ (True, True, True),
+ (False, True, True),
+ (True, False, True),
+ (False, False, False),
+ ]
+ )
+ def test_has_tasks(self, celery_has, k8s_has, cke_has):
+ celery_executor_mock = mock.MagicMock()
+ k8s_executor_mock = mock.MagicMock()
+ cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
- cke.try_adopt_task_instances([ti])
- celery_executor_mock.try_adopt_task_instances.assert_called_once_with([])
- k8s_executor_mock.try_adopt_task_instances.assert_called_once_with([ti])
+ celery_executor_mock.has_task.return_value = celery_has
+ k8s_executor_mock.has_task.return_value = k8s_has
+ ti = mock.MagicMock()
+ assert cke.has_task(ti) == cke_has
+ celery_executor_mock.has_task.assert_called_once_with(ti)
+ if not celery_has:
+ k8s_executor_mock.has_task.assert_called_once_with(ti)
- def when_ti_in_celery_executor():
- celery_executor_mock = mock.MagicMock()
- k8s_executor_mock = mock.MagicMock()
- ti.queue = "default"
- cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
+ @parameterized.expand([(1, 0), (0, 1), (2, 1)])
+ def test_adopt_tasks(self, num_k8s, num_celery):
+ celery_executor_mock = mock.MagicMock()
+ k8s_executor_mock = mock.MagicMock()
- celery_executor_mock.try_adopt_task_instances.return_value = []
- k8s_executor_mock.try_adopt_task_instances.return_value = []
+ def mock_ti(queue):
+ ti = mock.MagicMock()
+ ti.queue = queue
+ return ti
- cke.try_adopt_task_instances([ti])
- celery_executor_mock.try_adopt_task_instances.assert_called_once_with([ti])
- k8s_executor_mock.try_adopt_task_instances.assert_called_once_with([])
+ celery_tis = [mock_ti('default') for _ in range(num_celery)]
+ k8s_tis = [mock_ti(KUBERNETES_QUEUE) for _ in range(num_k8s)]
- when_ti_in_k8s_executor()
- when_ti_in_celery_executor()
+ cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
+ cke.try_adopt_task_instances(celery_tis + k8s_tis)
+ celery_executor_mock.try_adopt_task_instances.assert_called_once_with(celery_tis)
+ k8s_executor_mock.try_adopt_task_instances.assert_called_once_with(k8s_tis)
def test_get_event_buffer(self):
celery_executor_mock = mock.MagicMock()