You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by xd...@apache.org on 2022/12/10 06:51:21 UTC
[airflow] branch main updated: KubernetesExecutor multi_namespace_mode can use namespace list to avoid requiring cluster role (#28047)
This is an automated email from the ASF dual-hosted git repository.
xddeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c739a6a087 KubernetesExecutor multi_namespace_mode can use namespace list to avoid requiring cluster role (#28047)
c739a6a087 is described below
commit c739a6a08790423ba1e38464a15c4bac078277d3
Author: Xiaodong DENG <xd...@apache.org>
AuthorDate: Fri Dec 9 22:51:00 2022 -0800
KubernetesExecutor multi_namespace_mode can use namespace list to avoid requiring cluster role (#28047)
Co-authored-by: Daniel Standish <15...@users.noreply.github.com>
Co-authored-by: potiuk <ja...@polidea.com>
---
airflow/config_templates/config.yml | 12 ++-
airflow/config_templates/default_airflow.cfg | 8 +-
airflow/executors/kubernetes_executor.py | 143 +++++++++++++++++----------
airflow/kubernetes/kube_config.py | 8 ++
tests/executors/test_kubernetes_executor.py | 124 ++++++++++++++++++++---
5 files changed, 228 insertions(+), 67 deletions(-)
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 8d99ec0470..2759eef2ff 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2429,11 +2429,21 @@
- name: multi_namespace_mode
description: |
Allows users to launch pods in multiple namespaces.
- Will require creating a cluster-role for the scheduler
+ Will require creating a cluster-role for the scheduler,
+ or use multi_namespace_mode_namespace_list configuration.
version_added: 1.10.12
type: boolean
example: ~
default: "False"
+ - name: multi_namespace_mode_namespace_list
+ description: |
+ If multi_namespace_mode is True while scheduler does not have a cluster-role,
+ give the list of namespaces where the scheduler will schedule jobs
+ Scheduler needs to have the necessary permissions in these namespaces.
+ version_added: 2.6.0
+ type: string
+ example: ~
+ default: ""
- name: in_cluster
description: |
Use the service account kubernetes gives to pods to connect to kubernetes cluster.
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 9d4e863c6e..365de48f50 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1215,9 +1215,15 @@ delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 1
# Allows users to launch pods in multiple namespaces.
-# Will require creating a cluster-role for the scheduler
+# Will require creating a cluster-role for the scheduler,
+# or use multi_namespace_mode_namespace_list configuration.
multi_namespace_mode = False
+# If multi_namespace_mode is True while scheduler does not have a cluster-role,
+# give the list of namespaces where the scheduler will schedule jobs
+# Scheduler needs to have the necessary permissions in these namespaces.
+multi_namespace_mode_namespace_list =
+
# Use the service account kubernetes gives to pods to connect to kubernetes cluster.
# It's intended for clients that expect to be running inside a pod running on kubernetes.
# It will raise an exception if called from a process not running in a kubernetes environment.
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 788f1b3fec..d933a294e9 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -23,11 +23,11 @@ KubernetesExecutor.
"""
from __future__ import annotations
-import functools
import json
import logging
import multiprocessing
import time
+from collections import defaultdict
from datetime import timedelta
from queue import Empty, Queue
from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Tuple
@@ -52,6 +52,8 @@ from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
from airflow.utils.state import State
+ALL_NAMESPACES = "ALL_NAMESPACES"
+
# TaskInstance key, command, configuration, pod_template_file
KubernetesJobType = Tuple[TaskInstanceKey, CommandType, Any, Optional[str]]
@@ -66,7 +68,7 @@ class ResourceVersion:
"""Singleton for tracking resourceVersion from Kubernetes."""
_instance = None
- resource_version = "0"
+ resource_version: dict[str, str] = {}
def __new__(cls):
if cls._instance is None:
@@ -79,8 +81,7 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
def __init__(
self,
- namespace: str | None,
- multi_namespace_mode: bool,
+ namespace: str,
watcher_queue: Queue[KubernetesWatchType],
resource_version: str | None,
scheduler_job_id: str,
@@ -88,7 +89,6 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
):
super().__init__()
self.namespace = namespace
- self.multi_namespace_mode = multi_namespace_mode
self.scheduler_job_id = scheduler_job_id
self.watcher_queue = watcher_queue
self.resource_version = resource_version
@@ -113,7 +113,7 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
except Exception:
self.log.exception("Unknown error in KubernetesJobWatcher. Failing")
self.resource_version = "0"
- ResourceVersion().resource_version = "0"
+ ResourceVersion().resource_version[self.namespace] = "0"
raise
else:
self.log.warning(
@@ -121,6 +121,14 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
self.resource_version,
)
+ def _pod_events(self, kube_client: client.CoreV1Api, query_kwargs: dict):
+ watcher = watch.Watch()
+
+ if self.namespace == ALL_NAMESPACES:
+ return watcher.stream(kube_client.list_pod_for_all_namespaces, **query_kwargs)
+ else:
+ return watcher.stream(kube_client.list_namespaced_pod, self.namespace, **query_kwargs)
+
def _run(
self,
kube_client: client.CoreV1Api,
@@ -129,7 +137,6 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
kube_config: Any,
) -> str | None:
self.log.info("Event: and now my watch begins starting at resource_version: %s", resource_version)
- watcher = watch.Watch()
kwargs = {"label_selector": f"airflow-worker={scheduler_job_id}"}
if resource_version:
@@ -139,15 +146,8 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
kwargs[key] = value
last_resource_version: str | None = None
- if self.multi_namespace_mode:
- list_worker_pods = functools.partial(
- watcher.stream, kube_client.list_pod_for_all_namespaces, **kwargs
- )
- else:
- list_worker_pods = functools.partial(
- watcher.stream, kube_client.list_namespaced_pod, self.namespace, **kwargs
- )
- for event in list_worker_pods():
+
+ for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs):
task = event["object"]
self.log.debug("Event: %s had an event of type %s", task.metadata.name, event["type"])
if event["type"] == "ERROR":
@@ -251,7 +251,7 @@ class AirflowKubernetesScheduler(LoggingMixin):
self._manager = multiprocessing.Manager()
self.watcher_queue = self._manager.Queue()
self.scheduler_job_id = scheduler_job_id
- self.kube_watcher = self._make_kube_watcher()
+ self.kube_watchers = self._make_kube_watchers()
def run_pod_async(self, pod: k8s.V1Pod, **kwargs):
"""Runs POD asynchronously."""
@@ -274,12 +274,11 @@ class AirflowKubernetesScheduler(LoggingMixin):
raise e
return resp
- def _make_kube_watcher(self) -> KubernetesJobWatcher:
- resource_version = ResourceVersion().resource_version
+ def _make_kube_watcher(self, namespace) -> KubernetesJobWatcher:
+ resource_version = ResourceVersion().resource_version.get(namespace, "0")
watcher = KubernetesJobWatcher(
watcher_queue=self.watcher_queue,
- namespace=self.kube_config.kube_namespace,
- multi_namespace_mode=self.kube_config.multi_namespace_mode,
+ namespace=namespace,
resource_version=resource_version,
scheduler_job_id=self.scheduler_job_id,
kube_config=self.kube_config,
@@ -287,15 +286,35 @@ class AirflowKubernetesScheduler(LoggingMixin):
watcher.start()
return watcher
- def _health_check_kube_watcher(self):
- if self.kube_watcher.is_alive():
- self.log.debug("KubeJobWatcher alive, continuing")
- else:
- self.log.error(
- "Error while health checking kube watcher process. Process died for unknown reasons"
+ def _make_kube_watchers(self) -> dict[str, KubernetesJobWatcher]:
+ watchers = {}
+ if self.kube_config.multi_namespace_mode:
+ namespaces_to_watch = (
+ self.kube_config.multi_namespace_mode_namespace_list
+ if self.kube_config.multi_namespace_mode_namespace_list
+ else [ALL_NAMESPACES]
)
- ResourceVersion().resource_version = "0"
- self.kube_watcher = self._make_kube_watcher()
+ else:
+ namespaces_to_watch = [self.kube_config.kube_namespace]
+
+ for namespace in namespaces_to_watch:
+ watchers[namespace] = self._make_kube_watcher(namespace)
+ return watchers
+
+ def _health_check_kube_watchers(self):
+ for namespace, kube_watcher in self.kube_watchers.items():
+ if kube_watcher.is_alive():
+ self.log.debug("KubeJobWatcher for namespace %s alive, continuing", namespace)
+ else:
+ self.log.error(
+ (
+ "Error while health checking kube watcher process for namespace %s. "
+ "Process died for unknown reasons"
+ ),
+ namespace,
+ )
+ ResourceVersion().resource_version[namespace] = "0"
+ self.kube_watchers[namespace] = self._make_kube_watcher(namespace)
def run_next(self, next_job: KubernetesJobType) -> None:
"""Receives the next job to run, builds the pod, and creates it."""
@@ -363,7 +382,7 @@ class AirflowKubernetesScheduler(LoggingMixin):
"""
self.log.debug("Syncing KubernetesExecutor")
- self._health_check_kube_watcher()
+ self._health_check_kube_watchers()
while True:
try:
task = self.watcher_queue.get_nowait()
@@ -399,10 +418,11 @@ class AirflowKubernetesScheduler(LoggingMixin):
def terminate(self) -> None:
"""Terminates the watcher."""
- self.log.debug("Terminating kube_watcher...")
- self.kube_watcher.terminate()
- self.kube_watcher.join()
- self.log.debug("kube_watcher=%s", self.kube_watcher)
+ self.log.debug("Terminating kube_watchers...")
+ for namespace, kube_watcher in self.kube_watchers.items():
+ kube_watcher.terminate()
+ kube_watcher.join()
+ self.log.debug("kube_watcher=%s", kube_watcher)
self.log.debug("Flushing watcher_queue...")
self._flush_watcher_queue()
# Queue should be empty...
@@ -446,6 +466,23 @@ class KubernetesExecutor(BaseExecutor):
self.kubernetes_queue: str | None = None
super().__init__(parallelism=self.kube_config.parallelism)
+ def _list_pods(self, query_kwargs):
+ if self.kube_config.multi_namespace_mode:
+ if self.kube_config.multi_namespace_mode_namespace_list:
+ pods = []
+ for namespace in self.kube_config.multi_namespace_mode_namespace_list:
+ pods.extend(
+ self.kube_client.list_namespaced_pod(namespace=namespace, **query_kwargs).items
+ )
+ else:
+ pods = self.kube_client.list_pod_for_all_namespaces(**query_kwargs).items
+ else:
+ pods = self.kube_client.list_namespaced_pod(
+ namespace=self.kube_config.kube_namespace, **query_kwargs
+ ).items
+
+ return pods
+
@provide_session
def clear_not_launched_queued_tasks(self, session=None) -> None:
"""
@@ -501,16 +538,16 @@ class KubernetesExecutor(BaseExecutor):
# Try run_id first
kwargs["label_selector"] += ",run_id=" + pod_generator.make_safe_label_value(ti.run_id)
- pod_list = self.kube_client.list_namespaced_pod(self.kube_config.kube_namespace, **kwargs)
- if pod_list.items:
+ pod_list = self._list_pods(kwargs)
+ if pod_list:
continue
# Fallback to old style of using execution_date
kwargs["label_selector"] = (
f"{base_label_selector},"
f"execution_date={pod_generator.datetime_to_label_safe_datestring(ti.execution_date)}"
)
- pod_list = self.kube_client.list_namespaced_pod(self.kube_config.kube_namespace, **kwargs)
- if pod_list.items:
+ pod_list = self._list_pods(kwargs)
+ if pod_list:
continue
self.log.info("TaskInstance: %s found in queued state but was not launched, rescheduling", ti)
session.query(TaskInstance).filter(
@@ -597,13 +634,13 @@ class KubernetesExecutor(BaseExecutor):
self.log.debug("self.queued: %s", self.queued_tasks)
self.kube_scheduler.sync()
- last_resource_version = None
+ last_resource_version: dict[str, str] = defaultdict(lambda: "0")
while True:
try:
results = self.result_queue.get_nowait()
try:
key, state, pod_id, namespace, resource_version = results
- last_resource_version = resource_version
+ last_resource_version[namespace] = resource_version
self.log.info("Changing state of %s to %s", results, state)
try:
self._change_state(key, state, pod_id, namespace)
@@ -621,7 +658,10 @@ class KubernetesExecutor(BaseExecutor):
break
resource_instance = ResourceVersion()
- resource_instance.resource_version = last_resource_version or resource_instance.resource_version
+ for ns in resource_instance.resource_version.keys():
+ resource_instance.resource_version[ns] = (
+ last_resource_version[ns] or resource_instance.resource_version[ns]
+ )
for _ in range(self.kube_config.worker_pods_creation_batch_size):
try:
@@ -681,15 +721,10 @@ class KubernetesExecutor(BaseExecutor):
"label_selector": f"airflow-worker={self.scheduler_job_id}",
**self.kube_config.kube_client_request_args,
}
- if self.kube_config.multi_namespace_mode:
- pending_pods = functools.partial(self.kube_client.list_pod_for_all_namespaces, **kwargs)
- else:
- pending_pods = functools.partial(
- self.kube_client.list_namespaced_pod, self.kube_config.kube_namespace, **kwargs
- )
+ pending_pods = self._list_pods(kwargs)
cutoff = timezone.utcnow() - timedelta(seconds=timeout)
- for pod in pending_pods().items:
+ for pod in pending_pods:
self.log.debug(
'Found a pending pod "%s", created "%s"', pod.metadata.name, pod.metadata.creation_timestamp
)
@@ -726,9 +761,9 @@ class KubernetesExecutor(BaseExecutor):
kube_client: client.CoreV1Api = self.kube_client
for scheduler_job_id in scheduler_job_ids:
scheduler_job_id = pod_generator.make_safe_label_value(str(scheduler_job_id))
- kwargs = {"label_selector": f"airflow-worker={scheduler_job_id}"}
- pod_list = kube_client.list_namespaced_pod(namespace=self.kube_config.kube_namespace, **kwargs)
- for pod in pod_list.items:
+ query_kwargs = {"label_selector": f"airflow-worker={scheduler_job_id}"}
+ pod_list = self._list_pods(query_kwargs)
+ for pod in pod_list:
self.adopt_launched_task(kube_client, pod, pod_ids)
self._adopt_completed_pods(kube_client)
tis_to_flush.extend(pod_ids.values())
@@ -775,12 +810,12 @@ class KubernetesExecutor(BaseExecutor):
assert self.scheduler_job_id
new_worker_id_label = pod_generator.make_safe_label_value(self.scheduler_job_id)
- kwargs = {
+ query_kwargs = {
"field_selector": "status.phase=Succeeded",
"label_selector": f"kubernetes_executor=True,airflow-worker!={new_worker_id_label}",
}
- pod_list = kube_client.list_namespaced_pod(namespace=self.kube_config.kube_namespace, **kwargs)
- for pod in pod_list.items:
+ pod_list = self._list_pods(query_kwargs)
+ for pod in pod_list:
self.log.info("Attempting to adopt pod %s", pod.metadata.name)
pod.metadata.labels["airflow-worker"] = new_worker_id_label
try:
diff --git a/airflow/kubernetes/kube_config.py b/airflow/kubernetes/kube_config.py
index 0285f65208..8d2aa9c2fa 100644
--- a/airflow/kubernetes/kube_config.py
+++ b/airflow/kubernetes/kube_config.py
@@ -57,6 +57,14 @@ class KubeConfig:
# create, watch, get, and delete pods in this namespace.
self.kube_namespace = conf.get(self.kubernetes_section, "namespace")
self.multi_namespace_mode = conf.getboolean(self.kubernetes_section, "multi_namespace_mode")
+ if self.multi_namespace_mode and conf.get(
+ self.kubernetes_section, "multi_namespace_mode_namespace_list"
+ ):
+ self.multi_namespace_mode_namespace_list = conf.get(
+ self.kubernetes_section, "multi_namespace_mode_namespace_list"
+ ).split(",")
+ else:
+ self.multi_namespace_mode_namespace_list = None
# The Kubernetes Namespace in which pods will be created by the executor. Note
# that if your
# cluster has RBAC enabled, your workers may need service account permissions to
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 367f1cb2c4..cee1f77202 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -521,6 +521,27 @@ class TestKubernetesExecutor:
assert executor.event_buffer[key][0] == State.FAILED
mock_delete_pod.assert_not_called()
+ @pytest.mark.parametrize(
+ "multi_namespace_mode_namespace_list, watchers_keys",
+ [
+ pytest.param(["A", "B", "C"], ["A", "B", "C"]),
+ pytest.param(None, ["ALL_NAMESPACES"]),
+ ],
+ )
+ @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
+ def test_watchers_under_multi_namespace_mode(
+ self, mock_get_kube_client, multi_namespace_mode_namespace_list, watchers_keys
+ ):
+ executor = self.kubernetes_executor
+ executor.kube_config.multi_namespace_mode = True
+ executor.kube_config.multi_namespace_mode_namespace_list = multi_namespace_mode_namespace_list
+ executor.start()
+ assert list(executor.kube_scheduler.kube_watchers.keys()) == watchers_keys
+ assert all(
+ isinstance(v, KubernetesJobWatcher) for v in executor.kube_scheduler.kube_watchers.values()
+ )
+ executor.end()
+
@mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
@mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod")
@@ -694,6 +715,27 @@ class TestKubernetesExecutor:
assert not mock_kube_client.patch_namespaced_pod.called
assert pod_ids == {"foobar": {}}
+ @pytest.mark.parametrize(
+ "raw_multi_namespace_mode, raw_value_namespace_list, expected_value_in_kube_config",
+ [
+ pytest.param("true", "A,B,C", ["A", "B", "C"]),
+ pytest.param("true", "", None),
+ pytest.param("false", "A,B,C", None),
+ pytest.param("false", "", None),
+ ],
+ )
+ def test_kube_config_get_namespace_list(
+ self, raw_multi_namespace_mode, raw_value_namespace_list, expected_value_in_kube_config
+ ):
+ config = {
+ ("kubernetes", "multi_namespace_mode"): raw_multi_namespace_mode,
+ ("kubernetes", "multi_namespace_mode_namespace_list"): raw_value_namespace_list,
+ }
+ with conf_vars(config):
+ executor = KubernetesExecutor()
+
+ assert executor.kube_config.multi_namespace_mode_namespace_list == expected_value_in_kube_config
+
@mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
@mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler")
@@ -735,7 +777,7 @@ class TestKubernetesExecutor:
executor._check_worker_pods_pending_timeout()
mock_kube_client.list_namespaced_pod.assert_called_once_with(
- "mynamespace",
+ namespace="mynamespace",
field_selector="status.phase=Pending",
label_selector="airflow-worker=123",
limit=5,
@@ -783,6 +825,65 @@ class TestKubernetesExecutor:
)
mock_delete_pod.assert_called_once_with("foo90", "anothernamespace")
+ @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
+ @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
+ @mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler")
+ def test_pending_pod_timeout_multi_namespace_mode_limited_namespaces(
+ self, mock_kubescheduler, mock_get_kube_client, mock_kubernetes_job_watcher
+ ):
+ mock_delete_pod = mock_kubescheduler.return_value.delete_pod
+ mock_kube_client = mock_get_kube_client.return_value
+ now = timezone.utcnow()
+ pending_pods = [
+ k8s.V1Pod(
+ metadata=k8s.V1ObjectMeta(
+ name="foo90",
+ labels={"airflow-worker": "123"},
+ creation_timestamp=now - timedelta(seconds=500),
+ namespace="namespace-2",
+ )
+ ),
+ ]
+
+ def list_namespaced_pod(namespace, *args, **kwargs):
+ if namespace == "namespace-2":
+ return k8s.V1PodList(items=pending_pods)
+ else:
+ return k8s.V1PodList(items=[])
+
+ mock_kube_client.list_namespaced_pod.side_effect = list_namespaced_pod
+
+ config = {
+ ("kubernetes", "namespace"): "mynamespace",
+ ("kubernetes", "multi_namespace_mode"): "true",
+ ("kubernetes", "multi_namespace_mode_namespace_list"): "namespace-1,namespace-2,namespace-3",
+ ("kubernetes", "kube_client_request_args"): '{"sentinel": "foo"}',
+ }
+ with conf_vars(config):
+ executor = KubernetesExecutor()
+ executor.job_id = "123"
+ executor.start()
+ executor._check_worker_pods_pending_timeout()
+ executor.end()
+
+ assert mock_kube_client.list_namespaced_pod.call_count == 3
+ mock_kube_client.list_namespaced_pod.assert_has_calls(
+ [
+ mock.call(
+ namespace=namespace,
+ field_selector="status.phase=Pending",
+ label_selector="airflow-worker=123",
+ limit=100,
+ sentinel="foo",
+ )
+ for namespace in ["namespace-1", "namespace-2", "namespace-3"]
+ ]
+ )
+
+ mock_delete_pod.assert_called_once_with("foo90", "namespace-2")
+ # mock_delete_pod should only be called once in total
+ mock_delete_pod.assert_called_once()
+
def test_clear_not_launched_queued_tasks_not_launched(self, dag_maker, create_dummy_dag, session):
"""If a pod isn't found for a TI, reset the state to scheduled"""
mock_kube_client = mock.MagicMock()
@@ -805,12 +906,12 @@ class TestKubernetesExecutor:
assert ti.state == State.SCHEDULED
assert mock_kube_client.list_namespaced_pod.call_count == 2
mock_kube_client.list_namespaced_pod.assert_any_call(
- "default", label_selector="dag_id=test_clear,task_id=task1,airflow-worker=1,run_id=test"
+ namespace="default", label_selector="dag_id=test_clear,task_id=task1,airflow-worker=1,run_id=test"
)
# also check that we fall back to execution_date if we didn't find the pod with run_id
execution_date_label = pod_generator.datetime_to_label_safe_datestring(ti.execution_date)
mock_kube_client.list_namespaced_pod.assert_called_with(
- "default",
+ namespace="default",
label_selector=(
f"dag_id=test_clear,task_id=task1,airflow-worker=1,execution_date={execution_date_label}"
),
@@ -849,7 +950,7 @@ class TestKubernetesExecutor:
ti.refresh_from_db()
assert ti.state == State.QUEUED
mock_kube_client.list_namespaced_pod.assert_called_once_with(
- "default", label_selector="dag_id=test_clear,task_id=task1,airflow-worker=1,run_id=test"
+ namespace="default", label_selector="dag_id=test_clear,task_id=task1,airflow-worker=1,run_id=test"
)
def test_clear_not_launched_queued_tasks_mapped_task(self, dag_maker, session):
@@ -893,15 +994,15 @@ class TestKubernetesExecutor:
mock_kube_client.list_namespaced_pod.assert_has_calls(
[
mock.call(
- "default",
+ namespace="default",
label_selector="dag_id=test_clear,task_id=bash,airflow-worker=1,map_index=0,run_id=test",
),
mock.call(
- "default",
+ namespace="default",
label_selector="dag_id=test_clear,task_id=bash,airflow-worker=1,map_index=1,run_id=test",
),
mock.call(
- "default",
+ namespace="default",
label_selector=f"dag_id=test_clear,task_id=bash,airflow-worker=1,map_index=1,"
f"execution_date={execution_date_label}",
),
@@ -968,10 +1069,11 @@ class TestKubernetesExecutor:
class TestKubernetesJobWatcher:
+ test_namespace = "airflow"
+
def setup_method(self):
self.watcher = KubernetesJobWatcher(
- namespace="airflow",
- multi_namespace_mode=False,
+ namespace=self.test_namespace,
watcher_queue=mock.MagicMock(),
resource_version="0",
scheduler_job_id="123",
@@ -1108,9 +1210,9 @@ class TestKubernetesJobWatcher:
except Exception as e:
assert e.args == ("sentinel",)
- # both resource_version should be 0 after _run raises and exception
+ # both resource_version should be 0 after _run raises an exception
assert self.watcher.resource_version == "0"
- assert ResourceVersion().resource_version == "0"
+ assert ResourceVersion().resource_version == {self.test_namespace: "0"}
# check that in the next run, _run is invoked with resource_version = 0
mock_underscore_run.reset_mock()