You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/03/05 21:46:55 UTC
[airflow] branch main updated: Local kubernetes executor (#19729)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 652b859 Local kubernetes executor (#19729)
652b859 is described below
commit 652b85998d1464316069c3774e49b4777168315d
Author: Kanthi <su...@gmail.com>
AuthorDate: Sat Mar 5 16:46:00 2022 -0500
Local kubernetes executor (#19729)
---
BREEZE.rst | 1 +
airflow/config_templates/config.yml | 15 ++
airflow/config_templates/default_airflow.cfg | 10 +
.../example_local_kubernetes_executor.py | 67 +++++++
airflow/executors/executor_constants.py | 1 +
airflow/executors/executor_loader.py | 14 ++
airflow/executors/local_kubernetes_executor.py | 204 +++++++++++++++++++++
airflow/settings.py | 1 +
breeze-complete | 2 +-
docs/apache-airflow/executor/index.rst | 1 +
docs/apache-airflow/executor/local_kubernetes.rst | 30 +++
tests/core/test_config_templates.py | 1 +
tests/executors/test_local_kubernetes_executor.py | 62 +++++++
13 files changed, 408 insertions(+), 1 deletion(-)
diff --git a/BREEZE.rst b/BREEZE.rst
index 574b5fe..1283473 100644
--- a/BREEZE.rst
+++ b/BREEZE.rst
@@ -2534,6 +2534,7 @@ This is the current syntax for `./breeze <./breeze>`_:
One of:
KubernetesExecutor CeleryExecutor LocalExecutor CeleryKubernetesExecutor
+ LocalKubernetesExecutor
Default: KubernetesExecutor
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index bb563ff..28ddaf7 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1528,6 +1528,21 @@
type: string
example: ~
default: ~
+- name: local_kubernetes_executor
+ description: |
+ This section only applies if you are using the ``LocalKubernetesExecutor`` in
+ ``[core]`` section above
+ options:
+ - name: kubernetes_queue
+ description: |
+ Define when to send a task to ``KubernetesExecutor`` when using ``LocalKubernetesExecutor``.
+ When the queue of a task is the value of ``kubernetes_queue`` (default ``kubernetes``),
+ the task is executed via ``KubernetesExecutor``,
+ otherwise via ``LocalExecutor``
+ version_added: 2.3.0
+ type: string
+ example: ~
+ default: "kubernetes"
- name: celery_kubernetes_executor
description: |
This section only applies if you are using the ``CeleryKubernetesExecutor`` in
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index b9ea3a1..4dfdb05 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -760,6 +760,16 @@ sentry_dsn =
# Dotted path to a before_send function that the sentry SDK should be configured to use.
# before_send =
+[local_kubernetes_executor]
+
+# This section only applies if you are using the ``LocalKubernetesExecutor`` in
+# ``[core]`` section above
+# Define when to send a task to ``KubernetesExecutor`` when using ``LocalKubernetesExecutor``.
+# When the queue of a task is the value of ``kubernetes_queue`` (default ``kubernetes``),
+# the task is executed via ``KubernetesExecutor``,
+# otherwise via ``LocalExecutor``
+kubernetes_queue = kubernetes
+
[celery_kubernetes_executor]
# This section only applies if you are using the ``CeleryKubernetesExecutor`` in
diff --git a/airflow/example_dags/example_local_kubernetes_executor.py b/airflow/example_dags/example_local_kubernetes_executor.py
new file mode 100644
index 0000000..03fbd06
--- /dev/null
+++ b/airflow/example_dags/example_local_kubernetes_executor.py
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This is an example dag for using a Local Kubernetes Executor Configuration.
+"""
+import logging
+from datetime import datetime
+
+from airflow import DAG
+from airflow.configuration import conf
+from airflow.decorators import task
+from airflow.example_dags.libs.helper import print_stuff
+
+log = logging.getLogger(__name__)
+
+worker_container_repository = conf.get('kubernetes', 'worker_container_repository')
+worker_container_tag = conf.get('kubernetes', 'worker_container_tag')
+
+try:
+ from kubernetes.client import models as k8s
+except ImportError:
+ log.warning("Could not import DAGs in example_local_kubernetes_executor.py", exc_info=True)
+ log.warning("Install Kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]")
+
+with DAG(
+ dag_id='example_local_kubernetes_executor',
+ schedule_interval=None,
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=['example3'],
+) as dag:
+ # You can use annotations on your kubernetes pods!
+ start_task_executor_config = {
+ "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+ }
+
+ @task(
+ executor_config=start_task_executor_config,
+ queue='kubernetes',
+ task_id='task_with_kubernetes_executor',
+ )
+ def task_with_template():
+ print_stuff()
+
+ @task(task_id='task_with_local_executor')
+ def task_with_local(ds=None, **kwargs):
+ """Print the Airflow context and ds variable from the context."""
+ print(kwargs)
+ print(ds)
+ return 'Whatever you return gets printed in the logs'
+
+ task_with_local() >> task_with_template()
diff --git a/airflow/executors/executor_constants.py b/airflow/executors/executor_constants.py
index fed8e27..55a3a7f 100644
--- a/airflow/executors/executor_constants.py
+++ b/airflow/executors/executor_constants.py
@@ -16,6 +16,7 @@
# under the License.
LOCAL_EXECUTOR = "LocalExecutor"
+LOCAL_KUBERNETES_EXECUTOR = "LocalKubernetesExecutor"
SEQUENTIAL_EXECUTOR = "SequentialExecutor"
CELERY_EXECUTOR = "CeleryExecutor"
CELERY_KUBERNETES_EXECUTOR = "CeleryKubernetesExecutor"
diff --git a/airflow/executors/executor_loader.py b/airflow/executors/executor_loader.py
index 36b4042..b98d2a8 100644
--- a/airflow/executors/executor_loader.py
+++ b/airflow/executors/executor_loader.py
@@ -28,6 +28,7 @@ from airflow.executors.executor_constants import (
DEBUG_EXECUTOR,
KUBERNETES_EXECUTOR,
LOCAL_EXECUTOR,
+ LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
)
from airflow.utils.module_loading import import_string
@@ -53,6 +54,7 @@ class ExecutorLoader:
_default_executor: Optional["BaseExecutor"] = None
executors = {
LOCAL_EXECUTOR: 'airflow.executors.local_executor.LocalExecutor',
+ LOCAL_KUBERNETES_EXECUTOR: 'airflow.executors.local_kubernetes_executor.LocalKubernetesExecutor',
SEQUENTIAL_EXECUTOR: 'airflow.executors.sequential_executor.SequentialExecutor',
CELERY_EXECUTOR: 'airflow.executors.celery_executor.CeleryExecutor',
CELERY_KUBERNETES_EXECUTOR: 'airflow.executors.celery_kubernetes_executor.CeleryKubernetesExecutor',
@@ -89,6 +91,9 @@ class ExecutorLoader:
"""
if executor_name == CELERY_KUBERNETES_EXECUTOR:
return cls.__load_celery_kubernetes_executor()
+ elif executor_name == LOCAL_KUBERNETES_EXECUTOR:
+ return cls.__load_local_kubernetes_executor()
+
try:
executor_cls, import_source = cls.import_executor_cls(executor_name)
log.debug("Loading executor %s from %s", executor_name, import_source.value)
@@ -137,6 +142,15 @@ class ExecutorLoader:
celery_kubernetes_executor_cls = import_string(cls.executors[CELERY_KUBERNETES_EXECUTOR])
return celery_kubernetes_executor_cls(celery_executor, kubernetes_executor)
+ @classmethod
+ def __load_local_kubernetes_executor(cls) -> "BaseExecutor":
+ """:return: an instance of LocalKubernetesExecutor"""
+ local_executor = import_string(cls.executors[LOCAL_EXECUTOR])()
+ kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
+
+ local_kubernetes_executor_cls = import_string(cls.executors[LOCAL_KUBERNETES_EXECUTOR])
+ return local_kubernetes_executor_cls(local_executor, kubernetes_executor)
+
UNPICKLEABLE_EXECUTORS = (
LOCAL_EXECUTOR,
diff --git a/airflow/executors/local_kubernetes_executor.py b/airflow/executors/local_kubernetes_executor.py
new file mode 100644
index 0000000..57968ce
--- /dev/null
+++ b/airflow/executors/local_kubernetes_executor.py
@@ -0,0 +1,204 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Dict, List, Optional, Set, Union
+
+from airflow.configuration import conf
+from airflow.executors.base_executor import CommandType, EventBufferValueType, QueuedTaskInstanceType
+from airflow.executors.kubernetes_executor import KubernetesExecutor
+from airflow.executors.local_executor import LocalExecutor
+from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, TaskInstanceKey
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class LocalKubernetesExecutor(LoggingMixin):
+ """
+ LocalKubernetesExecutor consists of LocalExecutor and KubernetesExecutor.
+ It chooses the executor to use based on the queue defined on the task.
+ When the task's queue is the value of ``kubernetes_queue`` in section ``[local_kubernetes_executor]``
+ of the configuration (default value: `kubernetes`), KubernetesExecutor is selected to run the task,
+ otherwise, LocalExecutor is used.
+ """
+
+ supports_ad_hoc_ti_run: bool = True
+
+ KUBERNETES_QUEUE = conf.get('local_kubernetes_executor', 'kubernetes_queue')
+
+ def __init__(self, local_executor: LocalExecutor, kubernetes_executor: KubernetesExecutor):
+ super().__init__()
+ self._job_id: Optional[str] = None
+ self.local_executor = local_executor
+ self.kubernetes_executor = kubernetes_executor
+
+ @property
+ def queued_tasks(self) -> Dict[TaskInstanceKey, QueuedTaskInstanceType]:
+ """Return queued tasks from local and kubernetes executor"""
+ queued_tasks = self.local_executor.queued_tasks.copy()
+ queued_tasks.update(self.kubernetes_executor.queued_tasks)
+
+ return queued_tasks
+
+ @property
+ def running(self) -> Set[TaskInstanceKey]:
+ """Return running tasks from local and kubernetes executor"""
+ return self.local_executor.running.union(self.kubernetes_executor.running)
+
+ @property
+ def job_id(self) -> Optional[str]:
+ """
+ This is a class attribute in BaseExecutor but since this is not really an executor, but a wrapper
+ of executors we implement as property so we can have custom setter.
+ """
+ return self._job_id
+
+ @job_id.setter
+ def job_id(self, value: Optional[str]) -> None:
+ """job_id is manipulated by SchedulerJob. We must propagate the job_id to wrapped executors."""
+ self._job_id = value
+ self.kubernetes_executor.job_id = value
+ self.local_executor.job_id = value
+
+ def start(self) -> None:
+ self.log.info("Starting local and Kubernetes Executor")
+ """Start local and kubernetes executor"""
+ self.local_executor.start()
+ self.kubernetes_executor.start()
+
+ @property
+ def slots_available(self) -> int:
+ """Number of new tasks this executor instance can accept"""
+ return self.local_executor.slots_available
+
+ def queue_command(
+ self,
+ task_instance: TaskInstance,
+ command: CommandType,
+ priority: int = 1,
+ queue: Optional[str] = None,
+ ) -> None:
+ """Queues command via local or kubernetes executor"""
+ executor = self._router(task_instance)
+ self.log.debug("Using executor: %s for %s", executor.__class__.__name__, task_instance.key)
+ executor.queue_command(task_instance, command, priority, queue)
+
+ def queue_task_instance(
+ self,
+ task_instance: TaskInstance,
+ mark_success: bool = False,
+ pickle_id: Optional[str] = None,
+ ignore_all_deps: bool = False,
+ ignore_depends_on_past: bool = False,
+ ignore_task_deps: bool = False,
+ ignore_ti_state: bool = False,
+ pool: Optional[str] = None,
+ cfg_path: Optional[str] = None,
+ ) -> None:
+ """Queues task instance via local or kubernetes executor"""
+ executor = self._router(SimpleTaskInstance(task_instance))
+ self.log.debug(
+ "Using executor: %s to queue_task_instance for %s", executor.__class__.__name__, task_instance.key
+ )
+ executor.queue_task_instance(
+ task_instance,
+ mark_success,
+ pickle_id,
+ ignore_all_deps,
+ ignore_depends_on_past,
+ ignore_task_deps,
+ ignore_ti_state,
+ pool,
+ cfg_path,
+ )
+
+ def has_task(self, task_instance: TaskInstance) -> bool:
+ """
+ Checks if a task is either queued or running in either local or kubernetes executor.
+
+ :param task_instance: TaskInstance
+ :return: True if the task is known to this executor
+ """
+ return self.local_executor.has_task(task_instance) or self.kubernetes_executor.has_task(task_instance)
+
+ def heartbeat(self) -> None:
+ """Heartbeat sent to trigger new jobs in local and kubernetes executor"""
+ self.local_executor.heartbeat()
+ self.kubernetes_executor.heartbeat()
+
+ def get_event_buffer(
+ self, dag_ids: Optional[List[str]] = None
+ ) -> Dict[TaskInstanceKey, EventBufferValueType]:
+ """
+ Returns and flush the event buffer from local and kubernetes executor
+
+ :param dag_ids: dag_ids to return events for, if None returns all
+ :return: a dict of events
+ """
+ cleared_events_from_local = self.local_executor.get_event_buffer(dag_ids)
+ cleared_events_from_kubernetes = self.kubernetes_executor.get_event_buffer(dag_ids)
+
+ return {**cleared_events_from_local, **cleared_events_from_kubernetes}
+
+ def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance]:
+ """
+ Try to adopt running task instances that have been abandoned by a SchedulerJob dying.
+
+ Anything that is not adopted will be cleared by the scheduler (and then become eligible for
+ re-scheduling)
+
+ :return: any TaskInstances that were unable to be adopted
+ :rtype: list[airflow.models.TaskInstance]
+ """
+ local_tis = []
+ kubernetes_tis = []
+ abandoned_tis = []
+ for ti in tis:
+ if ti.queue == self.KUBERNETES_QUEUE:
+ kubernetes_tis.append(ti)
+ else:
+ local_tis.append(ti)
+ abandoned_tis.extend(self.local_executor.try_adopt_task_instances(local_tis))
+ abandoned_tis.extend(self.kubernetes_executor.try_adopt_task_instances(kubernetes_tis))
+ return abandoned_tis
+
+ def end(self) -> None:
+ """End local and kubernetes executor"""
+ self.local_executor.end()
+ self.kubernetes_executor.end()
+
+ def terminate(self) -> None:
+ """Terminate local and kubernetes executor"""
+ self.local_executor.terminate()
+ self.kubernetes_executor.terminate()
+
+ def _router(self, simple_task_instance: SimpleTaskInstance) -> Union[LocalExecutor, KubernetesExecutor]:
+ """
+ Return either local_executor or kubernetes_executor
+
+ :param simple_task_instance: SimpleTaskInstance
+ :return: local_executor or kubernetes_executor
+ :rtype: Union[LocalExecutor, KubernetesExecutor]
+ """
+ if simple_task_instance.queue == self.KUBERNETES_QUEUE:
+ return self.kubernetes_executor
+ return self.local_executor
+
+ def debug_dump(self) -> None:
+ """Called in response to SIGUSR2 by the scheduler"""
+ self.log.info("Dumping LocalExecutor state")
+ self.local_executor.debug_dump()
+ self.log.info("Dumping KubernetesExecutor state")
+ self.kubernetes_executor.debug_dump()
diff --git a/airflow/settings.py b/airflow/settings.py
index cebf543..fdb5d80 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -599,6 +599,7 @@ LAZY_LOAD_PROVIDERS = conf.getboolean('core', 'lazy_discover_providers', fallbac
IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get('core', 'EXECUTOR') in {
executor_constants.KUBERNETES_EXECUTOR,
executor_constants.CELERY_KUBERNETES_EXECUTOR,
+ executor_constants.LOCAL_KUBERNETES_EXECUTOR,
}
HIDE_SENSITIVE_VAR_CONN_FIELDS = conf.getboolean('core', 'hide_sensitive_var_conn_fields')
diff --git a/breeze-complete b/breeze-complete
index ea0cc95..f9fbc4d 100644
--- a/breeze-complete
+++ b/breeze-complete
@@ -35,7 +35,7 @@ _breeze_allowed_mysql_versions="5.7 8"
_breeze_allowed_mssql_versions="2017-latest 2019-latest"
_breeze_allowed_postgres_versions="10 11 12 13"
_breeze_allowed_kind_operations="start stop restart status deploy test shell k9s"
-_breeze_allowed_executors="KubernetesExecutor CeleryExecutor LocalExecutor CeleryKubernetesExecutor"
+_breeze_allowed_executors="KubernetesExecutor CeleryExecutor LocalExecutor CeleryKubernetesExecutor LocalKubernetesExecutor"
_breeze_allowed_test_types="All Always Core Providers API CLI Integration Other WWW Postgres MySQL Helm Quarantined"
_breeze_allowed_package_formats="both sdist wheel"
_breeze_allowed_installation_methods=". apache-airflow"
diff --git a/docs/apache-airflow/executor/index.rst b/docs/apache-airflow/executor/index.rst
index 831b2f2..fc82336 100644
--- a/docs/apache-airflow/executor/index.rst
+++ b/docs/apache-airflow/executor/index.rst
@@ -72,6 +72,7 @@ There are two types of executor - those that run tasks *locally* (inside the ``s
celery_kubernetes
dask
kubernetes
+ local_kubernetes
.. note::
diff --git a/docs/apache-airflow/executor/local_kubernetes.rst b/docs/apache-airflow/executor/local_kubernetes.rst
new file mode 100644
index 0000000..ecb0dc5
--- /dev/null
+++ b/docs/apache-airflow/executor/local_kubernetes.rst
@@ -0,0 +1,30 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+.. _executor:LocalKubernetesExecutor:
+
+LocalKubernetes Executor
+=========================
+
+The :class:`~airflow.executors.local_kubernetes_executor.LocalKubernetesExecutor` allows users
+to simultaneously run a ``LocalExecutor`` and a ``KubernetesExecutor``.
+An executor is chosen to run a task based on the task's queue.
+
+``LocalKubernetesExecutor`` provides the capability of running tasks with either ``LocalExecutor``,
+which runs tasks within the scheduler service, or with ``KubernetesExecutor``, which runs each task
+in its own pod on a kubernetes cluster.
diff --git a/tests/core/test_config_templates.py b/tests/core/test_config_templates.py
index d2e2623..e69f38f 100644
--- a/tests/core/test_config_templates.py
+++ b/tests/core/test_config_templates.py
@@ -40,6 +40,7 @@ DEFAULT_AIRFLOW_SECTIONS = [
'email',
'smtp',
'sentry',
+ 'local_kubernetes_executor',
'celery_kubernetes_executor',
'celery',
'celery_broker_transport_options',
diff --git a/tests/executors/test_local_kubernetes_executor.py b/tests/executors/test_local_kubernetes_executor.py
new file mode 100644
index 0000000..b32e119
--- /dev/null
+++ b/tests/executors/test_local_kubernetes_executor.py
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from unittest import mock
+
+from airflow.configuration import conf
+from airflow.executors.local_executor import LocalExecutor
+from airflow.executors.local_kubernetes_executor import LocalKubernetesExecutor
+
+
+class TestLocalKubernetesExecutor:
+ def test_queued_tasks(self):
+ local_executor_mock = mock.MagicMock()
+ k8s_executor_mock = mock.MagicMock()
+ local_kubernetes_executor = LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock)
+
+ local_queued_tasks = {('dag_id', 'task_id', '2020-08-30', 1): 'queued_command'}
+ k8s_queued_tasks = {('dag_id_2', 'task_id_2', '2020-08-30', 2): 'queued_command'}
+
+ local_executor_mock.queued_tasks = local_queued_tasks
+ k8s_executor_mock.queued_tasks = k8s_queued_tasks
+
+ expected_queued_tasks = {**local_queued_tasks, **k8s_queued_tasks}
+
+ assert local_kubernetes_executor.queued_tasks == expected_queued_tasks
+ assert len(local_kubernetes_executor.queued_tasks) == 2
+
+ def test_running(self):
+ local_executor_mock = mock.MagicMock()
+ k8s_executor_mock = mock.MagicMock()
+ local_kubernetes_executor = LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock)
+
+ local_running_tasks = {('dag_id', 'task_id', '2020-08-30', 1)}
+ k8s_running_tasks = {}
+
+ local_executor_mock.running = local_running_tasks
+ k8s_executor_mock.running = k8s_running_tasks
+
+ assert local_kubernetes_executor.running == local_running_tasks.union(k8s_running_tasks)
+ assert len(local_kubernetes_executor.running) == 1
+
+ def test_slots_available(self):
+ local_executor = LocalExecutor()
+ k8s_executor_mock = mock.MagicMock()
+ local_kubernetes_executor = LocalKubernetesExecutor(local_executor, k8s_executor_mock)
+
+ # Should be equal to Local Executor default parallelism.
+ assert local_kubernetes_executor.slots_available == conf.getint('core', 'PARALLELISM')