You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/03/05 21:46:55 UTC

[airflow] branch main updated: Local kubernetes executor (#19729)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 652b859  Local kubernetes executor (#19729)
652b859 is described below

commit 652b85998d1464316069c3774e49b4777168315d
Author: Kanthi <su...@gmail.com>
AuthorDate: Sat Mar 5 16:46:00 2022 -0500

    Local kubernetes executor (#19729)
---
 BREEZE.rst                                         |   1 +
 airflow/config_templates/config.yml                |  15 ++
 airflow/config_templates/default_airflow.cfg       |  10 +
 .../example_local_kubernetes_executor.py           |  67 +++++++
 airflow/executors/executor_constants.py            |   1 +
 airflow/executors/executor_loader.py               |  14 ++
 airflow/executors/local_kubernetes_executor.py     | 204 +++++++++++++++++++++
 airflow/settings.py                                |   1 +
 breeze-complete                                    |   2 +-
 docs/apache-airflow/executor/index.rst             |   1 +
 docs/apache-airflow/executor/local_kubernetes.rst  |  30 +++
 tests/core/test_config_templates.py                |   1 +
 tests/executors/test_local_kubernetes_executor.py  |  62 +++++++
 13 files changed, 408 insertions(+), 1 deletion(-)

diff --git a/BREEZE.rst b/BREEZE.rst
index 574b5fe..1283473 100644
--- a/BREEZE.rst
+++ b/BREEZE.rst
@@ -2534,6 +2534,7 @@ This is the current syntax for  `./breeze <./breeze>`_:
           One of:
 
                  KubernetesExecutor CeleryExecutor LocalExecutor CeleryKubernetesExecutor
+                 LocalKubernetesExecutor
 
           Default: KubernetesExecutor
 
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index bb563ff..28ddaf7 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1528,6 +1528,21 @@
       type: string
       example: ~
       default: ~
+- name: local_kubernetes_executor
+  description: |
+    This section only applies if you are using the ``LocalKubernetesExecutor`` in
+    ``[core]`` section above
+  options:
+    - name: kubernetes_queue
+      description: |
+        Define when to send a task to ``KubernetesExecutor`` when using ``LocalKubernetesExecutor``.
+        When the queue of a task is the value of ``kubernetes_queue`` (default ``kubernetes``),
+        the task is executed via ``KubernetesExecutor``,
+        otherwise via ``LocalExecutor``
+      version_added: 2.3.0
+      type: string
+      example: ~
+      default: "kubernetes"
 - name: celery_kubernetes_executor
   description: |
     This section only applies if you are using the ``CeleryKubernetesExecutor`` in
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index b9ea3a1..4dfdb05 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -760,6 +760,16 @@ sentry_dsn =
 # Dotted path to a before_send function that the sentry SDK should be configured to use.
 # before_send =
 
+[local_kubernetes_executor]
+
+# This section only applies if you are using the ``LocalKubernetesExecutor`` in
+# ``[core]`` section above
+# Define when to send a task to ``KubernetesExecutor`` when using ``LocalKubernetesExecutor``.
+# When the queue of a task is the value of ``kubernetes_queue`` (default ``kubernetes``),
+# the task is executed via ``KubernetesExecutor``,
+# otherwise via ``LocalExecutor``
+kubernetes_queue = kubernetes
+
 [celery_kubernetes_executor]
 
 # This section only applies if you are using the ``CeleryKubernetesExecutor`` in
diff --git a/airflow/example_dags/example_local_kubernetes_executor.py b/airflow/example_dags/example_local_kubernetes_executor.py
new file mode 100644
index 0000000..03fbd06
--- /dev/null
+++ b/airflow/example_dags/example_local_kubernetes_executor.py
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This is an example dag for using a Local Kubernetes Executor Configuration.
+"""
+import logging
+from datetime import datetime
+
+from airflow import DAG
+from airflow.configuration import conf
+from airflow.decorators import task
+from airflow.example_dags.libs.helper import print_stuff
+
+log = logging.getLogger(__name__)
+
+worker_container_repository = conf.get('kubernetes', 'worker_container_repository')
+worker_container_tag = conf.get('kubernetes', 'worker_container_tag')
+
+try:
+    from kubernetes.client import models as k8s
+except ImportError:
+    log.warning("Could not import DAGs in example_local_kubernetes_executor.py", exc_info=True)
+    log.warning("Install Kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]")
+
+with DAG(
+    dag_id='example_local_kubernetes_executor',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=['example3'],
+) as dag:
+    # You can use annotations on your kubernetes pods!
+    start_task_executor_config = {
+        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+    }
+
+    @task(
+        executor_config=start_task_executor_config,
+        queue='kubernetes',
+        task_id='task_with_kubernetes_executor',
+    )
+    def task_with_template():
+        print_stuff()
+
+    @task(task_id='task_with_local_executor')
+    def task_with_local(ds=None, **kwargs):
+        """Print the Airflow context and ds variable from the context."""
+        print(kwargs)
+        print(ds)
+        return 'Whatever you return gets printed in the logs'
+
+    task_with_local() >> task_with_template()
diff --git a/airflow/executors/executor_constants.py b/airflow/executors/executor_constants.py
index fed8e27..55a3a7f 100644
--- a/airflow/executors/executor_constants.py
+++ b/airflow/executors/executor_constants.py
@@ -16,6 +16,7 @@
 # under the License.
 
 LOCAL_EXECUTOR = "LocalExecutor"
+LOCAL_KUBERNETES_EXECUTOR = "LocalKubernetesExecutor"
 SEQUENTIAL_EXECUTOR = "SequentialExecutor"
 CELERY_EXECUTOR = "CeleryExecutor"
 CELERY_KUBERNETES_EXECUTOR = "CeleryKubernetesExecutor"
diff --git a/airflow/executors/executor_loader.py b/airflow/executors/executor_loader.py
index 36b4042..b98d2a8 100644
--- a/airflow/executors/executor_loader.py
+++ b/airflow/executors/executor_loader.py
@@ -28,6 +28,7 @@ from airflow.executors.executor_constants import (
     DEBUG_EXECUTOR,
     KUBERNETES_EXECUTOR,
     LOCAL_EXECUTOR,
+    LOCAL_KUBERNETES_EXECUTOR,
     SEQUENTIAL_EXECUTOR,
 )
 from airflow.utils.module_loading import import_string
@@ -53,6 +54,7 @@ class ExecutorLoader:
     _default_executor: Optional["BaseExecutor"] = None
     executors = {
         LOCAL_EXECUTOR: 'airflow.executors.local_executor.LocalExecutor',
+        LOCAL_KUBERNETES_EXECUTOR: 'airflow.executors.local_kubernetes_executor.LocalKubernetesExecutor',
         SEQUENTIAL_EXECUTOR: 'airflow.executors.sequential_executor.SequentialExecutor',
         CELERY_EXECUTOR: 'airflow.executors.celery_executor.CeleryExecutor',
         CELERY_KUBERNETES_EXECUTOR: 'airflow.executors.celery_kubernetes_executor.CeleryKubernetesExecutor',
@@ -89,6 +91,9 @@ class ExecutorLoader:
         """
         if executor_name == CELERY_KUBERNETES_EXECUTOR:
             return cls.__load_celery_kubernetes_executor()
+        elif executor_name == LOCAL_KUBERNETES_EXECUTOR:
+            return cls.__load_local_kubernetes_executor()
+
         try:
             executor_cls, import_source = cls.import_executor_cls(executor_name)
             log.debug("Loading executor %s from %s", executor_name, import_source.value)
@@ -137,6 +142,15 @@ class ExecutorLoader:
         celery_kubernetes_executor_cls = import_string(cls.executors[CELERY_KUBERNETES_EXECUTOR])
         return celery_kubernetes_executor_cls(celery_executor, kubernetes_executor)
 
+    @classmethod
+    def __load_local_kubernetes_executor(cls) -> "BaseExecutor":
+        """:return: an instance of LocalKubernetesExecutor"""
+        local_executor = import_string(cls.executors[LOCAL_EXECUTOR])()
+        kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
+
+        local_kubernetes_executor_cls = import_string(cls.executors[LOCAL_KUBERNETES_EXECUTOR])
+        return local_kubernetes_executor_cls(local_executor, kubernetes_executor)
+
 
 UNPICKLEABLE_EXECUTORS = (
     LOCAL_EXECUTOR,
diff --git a/airflow/executors/local_kubernetes_executor.py b/airflow/executors/local_kubernetes_executor.py
new file mode 100644
index 0000000..57968ce
--- /dev/null
+++ b/airflow/executors/local_kubernetes_executor.py
@@ -0,0 +1,204 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Dict, List, Optional, Set, Union
+
+from airflow.configuration import conf
+from airflow.executors.base_executor import CommandType, EventBufferValueType, QueuedTaskInstanceType
+from airflow.executors.kubernetes_executor import KubernetesExecutor
+from airflow.executors.local_executor import LocalExecutor
+from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, TaskInstanceKey
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class LocalKubernetesExecutor(LoggingMixin):
+    """
+    LocalKubernetesExecutor consists of LocalExecutor and KubernetesExecutor.
+    It chooses the executor to use based on the queue defined on the task.
+    When the task's queue is the value of ``kubernetes_queue`` in section ``[local_kubernetes_executor]``
+    of the configuration (default value: `kubernetes`), KubernetesExecutor is selected to run the task,
+    otherwise, LocalExecutor is used.
+    """
+
+    supports_ad_hoc_ti_run: bool = True
+
+    KUBERNETES_QUEUE = conf.get('local_kubernetes_executor', 'kubernetes_queue')
+
+    def __init__(self, local_executor: LocalExecutor, kubernetes_executor: KubernetesExecutor):
+        super().__init__()
+        self._job_id: Optional[str] = None
+        self.local_executor = local_executor
+        self.kubernetes_executor = kubernetes_executor
+
+    @property
+    def queued_tasks(self) -> Dict[TaskInstanceKey, QueuedTaskInstanceType]:
+        """Return queued tasks from local and kubernetes executor"""
+        queued_tasks = self.local_executor.queued_tasks.copy()
+        queued_tasks.update(self.kubernetes_executor.queued_tasks)
+
+        return queued_tasks
+
+    @property
+    def running(self) -> Set[TaskInstanceKey]:
+        """Return running tasks from local and kubernetes executor"""
+        return self.local_executor.running.union(self.kubernetes_executor.running)
+
+    @property
+    def job_id(self) -> Optional[str]:
+        """
+        This is a class attribute in BaseExecutor but since this is not really an executor, but a wrapper
+        of executors we implement as property so we can have custom setter.
+        """
+        return self._job_id
+
+    @job_id.setter
+    def job_id(self, value: Optional[str]) -> None:
+        """job_id is manipulated by SchedulerJob.  We must propagate the job_id to wrapped executors."""
+        self._job_id = value
+        self.kubernetes_executor.job_id = value
+        self.local_executor.job_id = value
+
+    def start(self) -> None:
+        self.log.info("Starting local and Kubernetes Executor")
+        """Start local and kubernetes executor"""
+        self.local_executor.start()
+        self.kubernetes_executor.start()
+
+    @property
+    def slots_available(self) -> int:
+        """Number of new tasks this executor instance can accept"""
+        return self.local_executor.slots_available
+
+    def queue_command(
+        self,
+        task_instance: TaskInstance,
+        command: CommandType,
+        priority: int = 1,
+        queue: Optional[str] = None,
+    ) -> None:
+        """Queues command via local or kubernetes executor"""
+        executor = self._router(task_instance)
+        self.log.debug("Using executor: %s for %s", executor.__class__.__name__, task_instance.key)
+        executor.queue_command(task_instance, command, priority, queue)
+
+    def queue_task_instance(
+        self,
+        task_instance: TaskInstance,
+        mark_success: bool = False,
+        pickle_id: Optional[str] = None,
+        ignore_all_deps: bool = False,
+        ignore_depends_on_past: bool = False,
+        ignore_task_deps: bool = False,
+        ignore_ti_state: bool = False,
+        pool: Optional[str] = None,
+        cfg_path: Optional[str] = None,
+    ) -> None:
+        """Queues task instance via local or kubernetes executor"""
+        executor = self._router(SimpleTaskInstance(task_instance))
+        self.log.debug(
+            "Using executor: %s to queue_task_instance for %s", executor.__class__.__name__, task_instance.key
+        )
+        executor.queue_task_instance(
+            task_instance,
+            mark_success,
+            pickle_id,
+            ignore_all_deps,
+            ignore_depends_on_past,
+            ignore_task_deps,
+            ignore_ti_state,
+            pool,
+            cfg_path,
+        )
+
+    def has_task(self, task_instance: TaskInstance) -> bool:
+        """
+        Checks if a task is either queued or running in either local or kubernetes executor.
+
+        :param task_instance: TaskInstance
+        :return: True if the task is known to this executor
+        """
+        return self.local_executor.has_task(task_instance) or self.kubernetes_executor.has_task(task_instance)
+
+    def heartbeat(self) -> None:
+        """Heartbeat sent to trigger new jobs in local and kubernetes executor"""
+        self.local_executor.heartbeat()
+        self.kubernetes_executor.heartbeat()
+
+    def get_event_buffer(
+        self, dag_ids: Optional[List[str]] = None
+    ) -> Dict[TaskInstanceKey, EventBufferValueType]:
+        """
+        Returns and flush the event buffer from local and kubernetes executor
+
+        :param dag_ids: dag_ids to return events for, if None returns all
+        :return: a dict of events
+        """
+        cleared_events_from_local = self.local_executor.get_event_buffer(dag_ids)
+        cleared_events_from_kubernetes = self.kubernetes_executor.get_event_buffer(dag_ids)
+
+        return {**cleared_events_from_local, **cleared_events_from_kubernetes}
+
+    def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance]:
+        """
+        Try to adopt running task instances that have been abandoned by a SchedulerJob dying.
+
+        Anything that is not adopted will be cleared by the scheduler (and then become eligible for
+        re-scheduling)
+
+        :return: any TaskInstances that were unable to be adopted
+        :rtype: list[airflow.models.TaskInstance]
+        """
+        local_tis = []
+        kubernetes_tis = []
+        abandoned_tis = []
+        for ti in tis:
+            if ti.queue == self.KUBERNETES_QUEUE:
+                kubernetes_tis.append(ti)
+            else:
+                local_tis.append(ti)
+        abandoned_tis.extend(self.local_executor.try_adopt_task_instances(local_tis))
+        abandoned_tis.extend(self.kubernetes_executor.try_adopt_task_instances(kubernetes_tis))
+        return abandoned_tis
+
+    def end(self) -> None:
+        """End local and kubernetes executor"""
+        self.local_executor.end()
+        self.kubernetes_executor.end()
+
+    def terminate(self) -> None:
+        """Terminate local and kubernetes executor"""
+        self.local_executor.terminate()
+        self.kubernetes_executor.terminate()
+
+    def _router(self, simple_task_instance: SimpleTaskInstance) -> Union[LocalExecutor, KubernetesExecutor]:
+        """
+        Return either local_executor or kubernetes_executor
+
+        :param simple_task_instance: SimpleTaskInstance
+        :return: local_executor or kubernetes_executor
+        :rtype: Union[LocalExecutor, KubernetesExecutor]
+        """
+        if simple_task_instance.queue == self.KUBERNETES_QUEUE:
+            return self.kubernetes_executor
+        return self.local_executor
+
+    def debug_dump(self) -> None:
+        """Called in response to SIGUSR2 by the scheduler"""
+        self.log.info("Dumping LocalExecutor state")
+        self.local_executor.debug_dump()
+        self.log.info("Dumping KubernetesExecutor state")
+        self.kubernetes_executor.debug_dump()
diff --git a/airflow/settings.py b/airflow/settings.py
index cebf543..fdb5d80 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -599,6 +599,7 @@ LAZY_LOAD_PROVIDERS = conf.getboolean('core', 'lazy_discover_providers', fallbac
 IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get('core', 'EXECUTOR') in {
     executor_constants.KUBERNETES_EXECUTOR,
     executor_constants.CELERY_KUBERNETES_EXECUTOR,
+    executor_constants.LOCAL_KUBERNETES_EXECUTOR,
 }
 
 HIDE_SENSITIVE_VAR_CONN_FIELDS = conf.getboolean('core', 'hide_sensitive_var_conn_fields')
diff --git a/breeze-complete b/breeze-complete
index ea0cc95..f9fbc4d 100644
--- a/breeze-complete
+++ b/breeze-complete
@@ -35,7 +35,7 @@ _breeze_allowed_mysql_versions="5.7 8"
 _breeze_allowed_mssql_versions="2017-latest 2019-latest"
 _breeze_allowed_postgres_versions="10 11 12 13"
 _breeze_allowed_kind_operations="start stop restart status deploy test shell k9s"
-_breeze_allowed_executors="KubernetesExecutor CeleryExecutor LocalExecutor CeleryKubernetesExecutor"
+_breeze_allowed_executors="KubernetesExecutor CeleryExecutor LocalExecutor CeleryKubernetesExecutor LocalKubernetesExecutor"
 _breeze_allowed_test_types="All Always Core Providers API CLI Integration Other WWW Postgres MySQL Helm Quarantined"
 _breeze_allowed_package_formats="both sdist wheel"
 _breeze_allowed_installation_methods=". apache-airflow"
diff --git a/docs/apache-airflow/executor/index.rst b/docs/apache-airflow/executor/index.rst
index 831b2f2..fc82336 100644
--- a/docs/apache-airflow/executor/index.rst
+++ b/docs/apache-airflow/executor/index.rst
@@ -72,6 +72,7 @@ There are two types of executor - those that run tasks *locally* (inside the ``s
     celery_kubernetes
     dask
     kubernetes
+    local_kubernetes
 
 
 .. note::
diff --git a/docs/apache-airflow/executor/local_kubernetes.rst b/docs/apache-airflow/executor/local_kubernetes.rst
new file mode 100644
index 0000000..ecb0dc5
--- /dev/null
+++ b/docs/apache-airflow/executor/local_kubernetes.rst
@@ -0,0 +1,30 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+.. _executor:LocalKubernetesExecutor:
+
+LocalKubernetes Executor
+=========================
+
+The :class:`~airflow.executors.local_kubernetes_executor.LocalKubernetesExecutor` allows users
+to simultaneously run a ``LocalExecutor`` and a ``KubernetesExecutor``.
+An executor is chosen to run a task based on the task's queue.
+
+``LocalKubernetesExecutor`` provides the capability of running tasks with either ``LocalExecutor``,
+which runs tasks within the scheduler service, or with ``KubernetesExecutor``, which runs each task
+in its own pod on a kubernetes cluster.
diff --git a/tests/core/test_config_templates.py b/tests/core/test_config_templates.py
index d2e2623..e69f38f 100644
--- a/tests/core/test_config_templates.py
+++ b/tests/core/test_config_templates.py
@@ -40,6 +40,7 @@ DEFAULT_AIRFLOW_SECTIONS = [
     'email',
     'smtp',
     'sentry',
+    'local_kubernetes_executor',
     'celery_kubernetes_executor',
     'celery',
     'celery_broker_transport_options',
diff --git a/tests/executors/test_local_kubernetes_executor.py b/tests/executors/test_local_kubernetes_executor.py
new file mode 100644
index 0000000..b32e119
--- /dev/null
+++ b/tests/executors/test_local_kubernetes_executor.py
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from unittest import mock
+
+from airflow.configuration import conf
+from airflow.executors.local_executor import LocalExecutor
+from airflow.executors.local_kubernetes_executor import LocalKubernetesExecutor
+
+
+class TestLocalKubernetesExecutor:
+    def test_queued_tasks(self):
+        local_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
+        local_kubernetes_executor = LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock)
+
+        local_queued_tasks = {('dag_id', 'task_id', '2020-08-30', 1): 'queued_command'}
+        k8s_queued_tasks = {('dag_id_2', 'task_id_2', '2020-08-30', 2): 'queued_command'}
+
+        local_executor_mock.queued_tasks = local_queued_tasks
+        k8s_executor_mock.queued_tasks = k8s_queued_tasks
+
+        expected_queued_tasks = {**local_queued_tasks, **k8s_queued_tasks}
+
+        assert local_kubernetes_executor.queued_tasks == expected_queued_tasks
+        assert len(local_kubernetes_executor.queued_tasks) == 2
+
+    def test_running(self):
+        local_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
+        local_kubernetes_executor = LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock)
+
+        local_running_tasks = {('dag_id', 'task_id', '2020-08-30', 1)}
+        k8s_running_tasks = {}
+
+        local_executor_mock.running = local_running_tasks
+        k8s_executor_mock.running = k8s_running_tasks
+
+        assert local_kubernetes_executor.running == local_running_tasks.union(k8s_running_tasks)
+        assert len(local_kubernetes_executor.running) == 1
+
+    def test_slots_available(self):
+        local_executor = LocalExecutor()
+        k8s_executor_mock = mock.MagicMock()
+        local_kubernetes_executor = LocalKubernetesExecutor(local_executor, k8s_executor_mock)
+
+        # Should be equal to Local Executor default parallelism.
+        assert local_kubernetes_executor.slots_available == conf.getint('core', 'PARALLELISM')