You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2023/07/11 23:21:24 UTC
[airflow] branch main updated: Kubernetes Executor Load Time Optimizations (#30727)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f7d4d98f2b Kubernetes Executor Load Time Optimizations (#30727)
f7d4d98f2b is described below
commit f7d4d98f2b531378613a180de86620a2b0e5d925
Author: Niko Oliveira <on...@amazon.com>
AuthorDate: Tue Jul 11 16:21:17 2023 -0700
Kubernetes Executor Load Time Optimizations (#30727)
Import optimizations to decrease the amount of time it takes to
load/import the kubernetes executor.
Move some expensive typing related imports to be under TYPE_CHECKING.
Refactor expensive classes (other than the KubernetesExecutor) out of the
kubernetes_executor.py module into a utils modules so they are only
loaded at runtime.
Also move some imports to runtime imports, closer to their usage.
---
airflow/cli/commands/kubernetes_command.py | 3 +-
airflow/executors/kubernetes_executor.py | 525 +++-------------------
airflow/executors/kubernetes_executor_types.py | 35 ++
airflow/executors/kubernetes_executor_utils.py | 477 ++++++++++++++++++++
airflow/kubernetes/kubernetes_helper_functions.py | 15 +-
tests/executors/test_kubernetes_executor.py | 94 ++--
6 files changed, 628 insertions(+), 521 deletions(-)
diff --git a/airflow/cli/commands/kubernetes_command.py b/airflow/cli/commands/kubernetes_command.py
index 25cfecc2b4..c367d4be87 100644
--- a/airflow/cli/commands/kubernetes_command.py
+++ b/airflow/cli/commands/kubernetes_command.py
@@ -25,9 +25,10 @@ from kubernetes import client
from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException
-from airflow.executors.kubernetes_executor import KubeConfig, create_pod_id
+from airflow.executors.kubernetes_executor import KubeConfig
from airflow.kubernetes import pod_generator
from airflow.kubernetes.kube_client import get_kube_client
+from airflow.kubernetes.kubernetes_helper_functions import create_pod_id
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.models import DagRun, TaskInstance
from airflow.utils import cli as cli_utils, yaml
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index ea84ad6dab..c1f8861f89 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -29,477 +29,36 @@ import multiprocessing
import time
from collections import defaultdict
from contextlib import suppress
+from datetime import datetime
from queue import Empty, Queue
-from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Tuple
+from typing import TYPE_CHECKING, Any, Sequence
-from kubernetes import client, watch
-from kubernetes.client import Configuration, models as k8s
-from kubernetes.client.rest import ApiException
from sqlalchemy.orm import Session
-from urllib3.exceptions import ReadTimeoutError
from airflow.configuration import conf
-from airflow.exceptions import AirflowException, PodMutationHookException, PodReconciliationError
+from airflow.exceptions import PodMutationHookException, PodReconciliationError
from airflow.executors.base_executor import BaseExecutor
-from airflow.kubernetes import pod_generator
-from airflow.kubernetes.kube_client import get_kube_client
+from airflow.executors.kubernetes_executor_types import POD_EXECUTOR_DONE_KEY
from airflow.kubernetes.kube_config import KubeConfig
-from airflow.kubernetes.kubernetes_helper_functions import (
- annotations_for_logging_task_metadata,
- annotations_to_key,
- create_pod_id,
-)
-from airflow.kubernetes.pod_generator import PodGenerator
+from airflow.kubernetes.kubernetes_helper_functions import annotations_to_key
from airflow.utils.event_scheduler import EventScheduler
-from airflow.utils.log.logging_mixin import LoggingMixin, remove_escape_codes
+from airflow.utils.log.logging_mixin import remove_escape_codes
from airflow.utils.session import NEW_SESSION, provide_session
-from airflow.utils.singleton import Singleton
from airflow.utils.state import State, TaskInstanceState
if TYPE_CHECKING:
+ from kubernetes import client
+ from kubernetes.client import models as k8s
+
from airflow.executors.base_executor import CommandType
+ from airflow.executors.kubernetes_executor_types import (
+ KubernetesJobType,
+ KubernetesResultsType,
+ )
+ from airflow.executors.kubernetes_executor_utils import AirflowKubernetesScheduler
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
- # TaskInstance key, command, configuration, pod_template_file
- KubernetesJobType = Tuple[TaskInstanceKey, CommandType, Any, Optional[str]]
-
- # key, pod state, pod_name, namespace, resource_version
- KubernetesResultsType = Tuple[TaskInstanceKey, Optional[str], str, str, str]
-
- # pod_name, namespace, pod state, annotations, resource_version
- KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
-
-ALL_NAMESPACES = "ALL_NAMESPACES"
-POD_EXECUTOR_DONE_KEY = "airflow_executor_done"
-
-
-class ResourceVersion(metaclass=Singleton):
- """Singleton for tracking resourceVersion from Kubernetes."""
-
- resource_version: dict[str, str] = {}
-
-
-class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
- """Watches for Kubernetes jobs."""
-
- def __init__(
- self,
- namespace: str,
- watcher_queue: Queue[KubernetesWatchType],
- resource_version: str | None,
- scheduler_job_id: str,
- kube_config: Configuration,
- ):
- super().__init__()
- self.namespace = namespace
- self.scheduler_job_id = scheduler_job_id
- self.watcher_queue = watcher_queue
- self.resource_version = resource_version
- self.kube_config = kube_config
-
- def run(self) -> None:
- """Performs watching."""
- if TYPE_CHECKING:
- assert self.scheduler_job_id
-
- kube_client: client.CoreV1Api = get_kube_client()
- while True:
- try:
- self.resource_version = self._run(
- kube_client, self.resource_version, self.scheduler_job_id, self.kube_config
- )
- except ReadTimeoutError:
- self.log.warning(
- "There was a timeout error accessing the Kube API. Retrying request.", exc_info=True
- )
- time.sleep(1)
- except Exception:
- self.log.exception("Unknown error in KubernetesJobWatcher. Failing")
- self.resource_version = "0"
- ResourceVersion().resource_version[self.namespace] = "0"
- raise
- else:
- self.log.warning(
- "Watch died gracefully, starting back up with: last resource_version: %s",
- self.resource_version,
- )
-
- def _pod_events(self, kube_client: client.CoreV1Api, query_kwargs: dict):
- watcher = watch.Watch()
- try:
- if self.namespace == ALL_NAMESPACES:
- return watcher.stream(kube_client.list_pod_for_all_namespaces, **query_kwargs)
- else:
- return watcher.stream(kube_client.list_namespaced_pod, self.namespace, **query_kwargs)
- except ApiException as e:
- if e.status == 410: # Resource version is too old
- if self.namespace == ALL_NAMESPACES:
- pods = kube_client.list_pod_for_all_namespaces(watch=False)
- else:
- pods = kube_client.list_namespaced_pod(namespace=self.namespace, watch=False)
- resource_version = pods.metadata.resource_version
- query_kwargs["resource_version"] = resource_version
- return self._pod_events(kube_client=kube_client, query_kwargs=query_kwargs)
- else:
- raise
-
- def _run(
- self,
- kube_client: client.CoreV1Api,
- resource_version: str | None,
- scheduler_job_id: str,
- kube_config: Any,
- ) -> str | None:
- self.log.info("Event: and now my watch begins starting at resource_version: %s", resource_version)
-
- kwargs = {"label_selector": f"airflow-worker={scheduler_job_id}"}
- if resource_version:
- kwargs["resource_version"] = resource_version
- if kube_config.kube_client_request_args:
- for key, value in kube_config.kube_client_request_args.items():
- kwargs[key] = value
-
- last_resource_version: str | None = None
-
- for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs):
- task = event["object"]
- self.log.debug("Event: %s had an event of type %s", task.metadata.name, event["type"])
- if event["type"] == "ERROR":
- return self.process_error(event)
- annotations = task.metadata.annotations
- task_instance_related_annotations = {
- "dag_id": annotations["dag_id"],
- "task_id": annotations["task_id"],
- "execution_date": annotations.get("execution_date"),
- "run_id": annotations.get("run_id"),
- "try_number": annotations["try_number"],
- }
- map_index = annotations.get("map_index")
- if map_index is not None:
- task_instance_related_annotations["map_index"] = map_index
-
- self.process_status(
- pod_name=task.metadata.name,
- namespace=task.metadata.namespace,
- status=task.status.phase,
- annotations=task_instance_related_annotations,
- resource_version=task.metadata.resource_version,
- event=event,
- )
- last_resource_version = task.metadata.resource_version
-
- return last_resource_version
-
- def process_error(self, event: Any) -> str:
- """Process error response."""
- self.log.error("Encountered Error response from k8s list namespaced pod stream => %s", event)
- raw_object = event["raw_object"]
- if raw_object["code"] == 410:
- self.log.info(
- "Kubernetes resource version is too old, must reset to 0 => %s", (raw_object["message"],)
- )
- # Return resource version 0
- return "0"
- raise AirflowException(
- f"Kubernetes failure for {raw_object['reason']} with code {raw_object['code']} and message: "
- f"{raw_object['message']}"
- )
-
- def process_status(
- self,
- pod_name: str,
- namespace: str,
- status: str,
- annotations: dict[str, str],
- resource_version: str,
- event: Any,
- ) -> None:
- pod = event["object"]
- annotations_string = annotations_for_logging_task_metadata(annotations)
- """Process status response."""
- if status == "Pending":
- # deletion_timestamp is set by kube server when a graceful deletion is requested.
- # since kube server have received request to delete pod set TI state failed
- if event["type"] == "DELETED" and pod.metadata.deletion_timestamp:
- self.log.info("Event: Failed to start pod %s, annotations: %s", pod_name, annotations_string)
- self.watcher_queue.put((pod_name, namespace, State.FAILED, annotations, resource_version))
- else:
- self.log.debug("Event: %s Pending, annotations: %s", pod_name, annotations_string)
- elif status == "Failed":
- self.log.error("Event: %s Failed, annotations: %s", pod_name, annotations_string)
- self.watcher_queue.put((pod_name, namespace, State.FAILED, annotations, resource_version))
- elif status == "Succeeded":
- # We get multiple events once the pod hits a terminal state, and we only want to
- # send it along to the scheduler once.
- # If our event type is DELETED, we have the POD_EXECUTOR_DONE_KEY, or the pod has
- # a deletion timestamp, we've already seen the initial Succeeded event and sent it
- # along to the scheduler.
- if (
- event["type"] == "DELETED"
- or POD_EXECUTOR_DONE_KEY in pod.metadata.labels
- or pod.metadata.deletion_timestamp
- ):
- self.log.info(
- "Skipping event for Succeeded pod %s - event for this pod already sent to executor",
- pod_name,
- )
- return
- self.log.info("Event: %s Succeeded, annotations: %s", pod_name, annotations_string)
- self.watcher_queue.put((pod_name, namespace, None, annotations, resource_version))
- elif status == "Running":
- # deletion_timestamp is set by kube server when a graceful deletion is requested.
- # since kube server have received request to delete pod set TI state failed
- if event["type"] == "DELETED" and pod.metadata.deletion_timestamp:
- self.log.info(
- "Event: Pod %s deleted before it could complete, annotations: %s",
- pod_name,
- annotations_string,
- )
- self.watcher_queue.put((pod_name, namespace, State.FAILED, annotations, resource_version))
- else:
- self.log.info("Event: %s is Running, annotations: %s", pod_name, annotations_string)
- else:
- self.log.warning(
- "Event: Invalid state: %s on pod: %s in namespace %s with annotations: %s with "
- "resource_version: %s",
- status,
- pod_name,
- namespace,
- annotations,
- resource_version,
- )
-
-
-class AirflowKubernetesScheduler(LoggingMixin):
- """Airflow Scheduler for Kubernetes."""
-
- def __init__(
- self,
- kube_config: Any,
- result_queue: Queue[KubernetesResultsType],
- kube_client: client.CoreV1Api,
- scheduler_job_id: str,
- ):
- super().__init__()
- self.log.debug("Creating Kubernetes executor")
- self.kube_config = kube_config
- self.result_queue = result_queue
- self.namespace = self.kube_config.kube_namespace
- self.log.debug("Kubernetes using namespace %s", self.namespace)
- self.kube_client = kube_client
- self._manager = multiprocessing.Manager()
- self.watcher_queue = self._manager.Queue()
- self.scheduler_job_id = scheduler_job_id
- self.kube_watchers = self._make_kube_watchers()
-
- def run_pod_async(self, pod: k8s.V1Pod, **kwargs):
- """Runs POD asynchronously."""
- sanitized_pod = self.kube_client.api_client.sanitize_for_serialization(pod)
- json_pod = json.dumps(sanitized_pod, indent=2)
-
- self.log.debug("Pod Creation Request: \n%s", json_pod)
- try:
- resp = self.kube_client.create_namespaced_pod(
- body=sanitized_pod, namespace=pod.metadata.namespace, **kwargs
- )
- self.log.debug("Pod Creation Response: %s", resp)
- except Exception as e:
- self.log.exception("Exception when attempting to create Namespaced Pod: %s", json_pod)
- raise e
- return resp
-
- def _make_kube_watcher(self, namespace) -> KubernetesJobWatcher:
- resource_version = ResourceVersion().resource_version.get(namespace, "0")
- watcher = KubernetesJobWatcher(
- watcher_queue=self.watcher_queue,
- namespace=namespace,
- resource_version=resource_version,
- scheduler_job_id=self.scheduler_job_id,
- kube_config=self.kube_config,
- )
- watcher.start()
- return watcher
-
- def _make_kube_watchers(self) -> dict[str, KubernetesJobWatcher]:
- watchers = {}
- if self.kube_config.multi_namespace_mode:
- namespaces_to_watch = (
- self.kube_config.multi_namespace_mode_namespace_list
- if self.kube_config.multi_namespace_mode_namespace_list
- else [ALL_NAMESPACES]
- )
- else:
- namespaces_to_watch = [self.kube_config.kube_namespace]
-
- for namespace in namespaces_to_watch:
- watchers[namespace] = self._make_kube_watcher(namespace)
- return watchers
-
- def _health_check_kube_watchers(self):
- for namespace, kube_watcher in self.kube_watchers.items():
- if kube_watcher.is_alive():
- self.log.debug("KubeJobWatcher for namespace %s alive, continuing", namespace)
- else:
- self.log.error(
- (
- "Error while health checking kube watcher process for namespace %s. "
- "Process died for unknown reasons"
- ),
- namespace,
- )
- ResourceVersion().resource_version[namespace] = "0"
- self.kube_watchers[namespace] = self._make_kube_watcher(namespace)
-
- def run_next(self, next_job: KubernetesJobType) -> None:
- """Receives the next job to run, builds the pod, and creates it."""
- key, command, kube_executor_config, pod_template_file = next_job
-
- dag_id, task_id, run_id, try_number, map_index = key
-
- if command[0:3] != ["airflow", "tasks", "run"]:
- raise ValueError('The command must start with ["airflow", "tasks", "run"].')
-
- base_worker_pod = get_base_pod_from_template(pod_template_file, self.kube_config)
-
- if not base_worker_pod:
- raise AirflowException(
- f"could not find a valid worker template yaml at {self.kube_config.pod_template_file}"
- )
-
- pod = PodGenerator.construct_pod(
- namespace=self.namespace,
- scheduler_job_id=self.scheduler_job_id,
- pod_id=create_pod_id(dag_id, task_id),
- dag_id=dag_id,
- task_id=task_id,
- kube_image=self.kube_config.kube_image,
- try_number=try_number,
- map_index=map_index,
- date=None,
- run_id=run_id,
- args=command,
- pod_override_object=kube_executor_config,
- base_worker_pod=base_worker_pod,
- with_mutation_hook=True,
- )
- # Reconcile the pod generated by the Operator and the Pod
- # generated by the .cfg file
- self.log.info(
- "Creating kubernetes pod for job is %s, with pod name %s, annotations: %s",
- key,
- pod.metadata.name,
- annotations_for_logging_task_metadata(pod.metadata.annotations),
- )
- self.log.debug("Kubernetes running for command %s", command)
- self.log.debug("Kubernetes launching image %s", pod.spec.containers[0].image)
-
- # the watcher will monitor pods, so we do not block.
- self.run_pod_async(pod, **self.kube_config.kube_client_request_args)
- self.log.debug("Kubernetes Job created!")
-
- def delete_pod(self, pod_name: str, namespace: str) -> None:
- """Deletes Pod from a namespace. Does not raise if it does not exist."""
- try:
- self.log.debug("Deleting pod %s in namespace %s", pod_name, namespace)
- self.kube_client.delete_namespaced_pod(
- pod_name,
- namespace,
- body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
- **self.kube_config.kube_client_request_args,
- )
- except ApiException as e:
- # If the pod is already deleted
- if e.status != 404:
- raise
-
- def patch_pod_executor_done(self, *, pod_name: str, namespace: str):
- """Add a "done" annotation to ensure we don't continually adopt pods."""
- self.log.debug("Patching pod %s in namespace %s to mark it as done", pod_name, namespace)
- try:
- self.kube_client.patch_namespaced_pod(
- name=pod_name,
- namespace=namespace,
- body={"metadata": {"labels": {POD_EXECUTOR_DONE_KEY: "True"}}},
- )
- except ApiException as e:
- self.log.info("Failed to patch pod %s with done annotation. Reason: %s", pod_name, e)
-
- def sync(self) -> None:
- """
- Checks the status of all currently running kubernetes jobs.
-
- If a job is completed, its status is placed in the result queue to be sent back to the scheduler.
- """
- self.log.debug("Syncing KubernetesExecutor")
- self._health_check_kube_watchers()
- while True:
- try:
- task = self.watcher_queue.get_nowait()
- try:
- self.log.debug("Processing task %s", task)
- self.process_watcher_task(task)
- finally:
- self.watcher_queue.task_done()
- except Empty:
- break
-
- def process_watcher_task(self, task: KubernetesWatchType) -> None:
- """Process the task by watcher."""
- pod_name, namespace, state, annotations, resource_version = task
- self.log.debug(
- "Attempting to finish pod; pod_name: %s; state: %s; annotations: %s",
- pod_name,
- state,
- annotations_for_logging_task_metadata(annotations),
- )
- key = annotations_to_key(annotations=annotations)
- if key:
- self.log.debug("finishing job %s - %s (%s)", key, state, pod_name)
- self.result_queue.put((key, state, pod_name, namespace, resource_version))
-
- def _flush_watcher_queue(self) -> None:
- self.log.debug("Executor shutting down, watcher_queue approx. size=%d", self.watcher_queue.qsize())
- while True:
- try:
- task = self.watcher_queue.get_nowait()
- # Ignoring it since it can only have either FAILED or SUCCEEDED pods
- self.log.warning("Executor shutting down, IGNORING watcher task=%s", task)
- self.watcher_queue.task_done()
- except Empty:
- break
-
- def terminate(self) -> None:
- """Terminates the watcher."""
- self.log.debug("Terminating kube_watchers...")
- for namespace, kube_watcher in self.kube_watchers.items():
- kube_watcher.terminate()
- kube_watcher.join()
- self.log.debug("kube_watcher=%s", kube_watcher)
- self.log.debug("Flushing watcher_queue...")
- self._flush_watcher_queue()
- # Queue should be empty...
- self.watcher_queue.join()
- self.log.debug("Shutting down manager...")
- self._manager.shutdown()
-
-
-def get_base_pod_from_template(pod_template_file: str | None, kube_config: Any) -> k8s.V1Pod:
- """
- Get base pod from template.
-
- Reads either the pod_template_file set in the executor_config or the base pod_template_file
- set in the airflow.cfg to craft a "base pod" that will be used by the KubernetesExecutor
-
- :param pod_template_file: absolute path to a pod_template_file.yaml or None
- :param kube_config: The KubeConfig class generated by airflow that contains all kube metadata
- :return: a V1Pod that can be used as the base pod for k8s tasks
- """
- if pod_template_file:
- return PodGenerator.deserialize_model_file(pod_template_file)
- else:
- return PodGenerator.deserialize_model_file(kube_config.pod_template_file)
-
class KubernetesExecutor(BaseExecutor):
"""Executor for Kubernetes."""
@@ -536,6 +95,19 @@ class KubernetesExecutor(BaseExecutor):
return pods
+ def _make_safe_label_value(self, input_value: str | datetime) -> str:
+ """
+ Normalize a provided label to be of valid length and characters.
+ See airflow.kubernetes.pod_generator.make_safe_label_value for more details.
+ """
+ # airflow.kubernetes is an expensive import, locally import it here to
+ # speed up load times of the kubernetes_executor module.
+ from airflow.kubernetes import pod_generator
+
+ if isinstance(input_value, datetime):
+ return pod_generator.datetime_to_label_safe_datestring(input_value)
+ return pod_generator.make_safe_label_value(input_value)
+
@provide_session
def clear_not_launched_queued_tasks(self, session: Session = NEW_SESSION) -> None:
"""
@@ -580,9 +152,9 @@ class KubernetesExecutor(BaseExecutor):
# Build the pod selector
base_label_selector = (
- f"dag_id={pod_generator.make_safe_label_value(ti.dag_id)},"
- f"task_id={pod_generator.make_safe_label_value(ti.task_id)},"
- f"airflow-worker={pod_generator.make_safe_label_value(str(ti.queued_by_job_id))}"
+ f"dag_id={self._make_safe_label_value(ti.dag_id)},"
+ f"task_id={self._make_safe_label_value(ti.task_id)},"
+ f"airflow-worker={self._make_safe_label_value(str(ti.queued_by_job_id))}"
)
if ti.map_index >= 0:
# Old tasks _couldn't_ be mapped, so we don't have to worry about compat
@@ -592,15 +164,14 @@ class KubernetesExecutor(BaseExecutor):
kwargs.update(**self.kube_config.kube_client_request_args)
# Try run_id first
- kwargs["label_selector"] += ",run_id=" + pod_generator.make_safe_label_value(ti.run_id)
+ kwargs["label_selector"] += ",run_id=" + self._make_safe_label_value(ti.run_id)
pod_list = self._list_pods(kwargs)
if pod_list:
continue
# Fallback to old style of using execution_date
- kwargs["label_selector"] = (
- f"{base_label_selector},"
- f"execution_date={pod_generator.datetime_to_label_safe_datestring(ti.execution_date)}"
- )
+ kwargs[
+ "label_selector"
+ ] = f"{base_label_selector},execution_date={self._make_safe_label_value(ti.execution_date)}"
pod_list = self._list_pods(kwargs)
if pod_list:
continue
@@ -617,6 +188,9 @@ class KubernetesExecutor(BaseExecutor):
self.log.info("Start Kubernetes executor")
self.scheduler_job_id = str(self.job_id)
self.log.debug("Start with scheduler_job_id: %s", self.scheduler_job_id)
+ from airflow.executors.kubernetes_executor_utils import AirflowKubernetesScheduler
+ from airflow.kubernetes.kube_client import get_kube_client
+
self.kube_client = get_kube_client()
self.kube_scheduler = AirflowKubernetesScheduler(
kube_config=self.kube_config,
@@ -650,6 +224,8 @@ class KubernetesExecutor(BaseExecutor):
else:
self.log.info("Add task %s with command %s", key, command)
+ from airflow.kubernetes.pod_generator import PodGenerator
+
try:
kube_executor_config = PodGenerator.from_obj(executor_config)
except Exception:
@@ -706,15 +282,20 @@ class KubernetesExecutor(BaseExecutor):
except Empty:
break
+ from airflow.executors.kubernetes_executor_utils import ResourceVersion
+
resource_instance = ResourceVersion()
for ns in resource_instance.resource_version.keys():
resource_instance.resource_version[ns] = (
last_resource_version[ns] or resource_instance.resource_version[ns]
)
+ from kubernetes.client.rest import ApiException
+
for _ in range(self.kube_config.worker_pods_creation_batch_size):
try:
task = self.task_queue.get_nowait()
+
try:
self.kube_scheduler.run_next(task)
except PodReconciliationError as e:
@@ -766,7 +347,6 @@ class KubernetesExecutor(BaseExecutor):
) -> None:
if TYPE_CHECKING:
assert self.kube_scheduler
- from airflow.models.taskinstance import TaskInstance
if state == State.RUNNING:
self.event_buffer[key] = state, None
@@ -787,6 +367,8 @@ class KubernetesExecutor(BaseExecutor):
# If we don't have a TI state, look it up from the db. event_buffer expects the TI state
if state is None:
+ from airflow.models.taskinstance import TaskInstance
+
state = session.query(TaskInstance.state).filter(TaskInstance.filter_for_tis([key])).scalar()
self.event_buffer[key] = state, None
@@ -803,6 +385,7 @@ class KubernetesExecutor(BaseExecutor):
messages = []
log = []
try:
+
from airflow.kubernetes.kube_client import get_kube_client
from airflow.kubernetes.pod_generator import PodGenerator
@@ -849,7 +432,7 @@ class KubernetesExecutor(BaseExecutor):
tis_to_flush_by_key = {ti.key: ti for ti in tis if ti.queued_by_job_id}
kube_client: client.CoreV1Api = self.kube_client
for scheduler_job_id in scheduler_job_ids:
- scheduler_job_id = pod_generator.make_safe_label_value(str(scheduler_job_id))
+ scheduler_job_id = self._make_safe_label_value(str(scheduler_job_id))
# We will look for any pods owned by the no-longer-running scheduler,
# but will exclude only successful pods, as those TIs will have a terminal state
# and not be up for adoption!
@@ -880,6 +463,8 @@ class KubernetesExecutor(BaseExecutor):
:param tis: List of Task Instances to clean up
:return: List of readable task instances for a warning message
"""
+ from airflow.kubernetes.pod_generator import PodGenerator
+
if TYPE_CHECKING:
assert self.kube_client
assert self.kube_scheduler
@@ -930,8 +515,11 @@ class KubernetesExecutor(BaseExecutor):
self.log.error("attempting to adopt taskinstance which was not specified by database: %s", ti_key)
return
- new_worker_id_label = pod_generator.make_safe_label_value(self.scheduler_job_id)
+ new_worker_id_label = self._make_safe_label_value(self.scheduler_job_id)
+ from kubernetes.client.rest import ApiException
+
try:
+
kube_client.patch_namespaced_pod(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
@@ -953,7 +541,7 @@ class KubernetesExecutor(BaseExecutor):
if TYPE_CHECKING:
assert self.scheduler_job_id
- new_worker_id_label = pod_generator.make_safe_label_value(self.scheduler_job_id)
+ new_worker_id_label = self._make_safe_label_value(self.scheduler_job_id)
query_kwargs = {
"field_selector": "status.phase=Succeeded",
"label_selector": (
@@ -964,7 +552,10 @@ class KubernetesExecutor(BaseExecutor):
pod_list = self._list_pods(query_kwargs)
for pod in pod_list:
self.log.info("Attempting to adopt pod %s", pod.metadata.name)
+ from kubernetes.client.rest import ApiException
+
try:
+
kube_client.patch_namespaced_pod(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
diff --git a/airflow/executors/kubernetes_executor_types.py b/airflow/executors/kubernetes_executor_types.py
new file mode 100644
index 0000000000..a13cd35f8d
--- /dev/null
+++ b/airflow/executors/kubernetes_executor_types.py
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple
+
+if TYPE_CHECKING:
+ from airflow.executors.base_executor import CommandType
+ from airflow.models.taskinstance import TaskInstanceKey
+
+ # TaskInstance key, command, configuration, pod_template_file
+ KubernetesJobType = Tuple[TaskInstanceKey, CommandType, Any, Optional[str]]
+
+ # key, pod state, pod_name, namespace, resource_version
+ KubernetesResultsType = Tuple[TaskInstanceKey, Optional[str], str, str, str]
+
+ # pod_name, namespace, pod state, annotations, resource_version
+ KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
+
+ALL_NAMESPACES = "ALL_NAMESPACES"
+POD_EXECUTOR_DONE_KEY = "airflow_executor_done"
diff --git a/airflow/executors/kubernetes_executor_utils.py b/airflow/executors/kubernetes_executor_utils.py
new file mode 100644
index 0000000000..b1ee49cb69
--- /dev/null
+++ b/airflow/executors/kubernetes_executor_utils.py
@@ -0,0 +1,477 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+import multiprocessing
+import time
+from queue import Empty, Queue
+from typing import TYPE_CHECKING, Any
+
+from kubernetes import client, watch
+from kubernetes.client import Configuration, models as k8s
+from kubernetes.client.rest import ApiException
+from urllib3.exceptions import ReadTimeoutError
+
+from airflow.exceptions import AirflowException
+from airflow.kubernetes.kube_client import get_kube_client
+from airflow.kubernetes.kubernetes_helper_functions import (
+ annotations_for_logging_task_metadata,
+ annotations_to_key,
+ create_pod_id,
+)
+from airflow.kubernetes.pod_generator import PodGenerator
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.singleton import Singleton
+from airflow.utils.state import State
+
+if TYPE_CHECKING:
+ from airflow.executors.kubernetes_executor_types import (
+ KubernetesJobType,
+ KubernetesResultsType,
+ KubernetesWatchType,
+ )
+
+
+from airflow.executors.kubernetes_executor_types import ALL_NAMESPACES, POD_EXECUTOR_DONE_KEY
+
+
+class ResourceVersion(metaclass=Singleton):
+ """Singleton for tracking resourceVersion from Kubernetes."""
+
+ resource_version: dict[str, str] = {}
+
+
+class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
+ """Watches for Kubernetes jobs."""
+
+ def __init__(
+ self,
+ namespace: str,
+ watcher_queue: Queue[KubernetesWatchType],
+ resource_version: str | None,
+ scheduler_job_id: str,
+ kube_config: Configuration,
+ ):
+ super().__init__()
+ self.namespace = namespace
+ self.scheduler_job_id = scheduler_job_id
+ self.watcher_queue = watcher_queue
+ self.resource_version = resource_version
+ self.kube_config = kube_config
+
+ def run(self) -> None:
+ """Performs watching."""
+ if TYPE_CHECKING:
+ assert self.scheduler_job_id
+
+ kube_client: client.CoreV1Api = get_kube_client()
+ while True:
+ try:
+ self.resource_version = self._run(
+ kube_client, self.resource_version, self.scheduler_job_id, self.kube_config
+ )
+ except ReadTimeoutError:
+ self.log.warning(
+ "There was a timeout error accessing the Kube API. Retrying request.", exc_info=True
+ )
+ time.sleep(1)
+ except Exception:
+ self.log.exception("Unknown error in KubernetesJobWatcher. Failing")
+ self.resource_version = "0"
+ ResourceVersion().resource_version[self.namespace] = "0"
+ raise
+ else:
+ self.log.warning(
+ "Watch died gracefully, starting back up with: last resource_version: %s",
+ self.resource_version,
+ )
+
+ def _pod_events(self, kube_client: client.CoreV1Api, query_kwargs: dict):
+ watcher = watch.Watch()
+ try:
+ if self.namespace == ALL_NAMESPACES:
+ return watcher.stream(kube_client.list_pod_for_all_namespaces, **query_kwargs)
+ else:
+ return watcher.stream(kube_client.list_namespaced_pod, self.namespace, **query_kwargs)
+ except ApiException as e:
+ if e.status == 410: # Resource version is too old
+ if self.namespace == ALL_NAMESPACES:
+ pods = kube_client.list_pod_for_all_namespaces(watch=False)
+ else:
+ pods = kube_client.list_namespaced_pod(namespace=self.namespace, watch=False)
+ resource_version = pods.metadata.resource_version
+ query_kwargs["resource_version"] = resource_version
+ return self._pod_events(kube_client=kube_client, query_kwargs=query_kwargs)
+ else:
+ raise
+
+ def _run(
+ self,
+ kube_client: client.CoreV1Api,
+ resource_version: str | None,
+ scheduler_job_id: str,
+ kube_config: Any,
+ ) -> str | None:
+ self.log.info("Event: and now my watch begins starting at resource_version: %s", resource_version)
+
+ kwargs = {"label_selector": f"airflow-worker={scheduler_job_id}"}
+ if resource_version:
+ kwargs["resource_version"] = resource_version
+ if kube_config.kube_client_request_args:
+ for key, value in kube_config.kube_client_request_args.items():
+ kwargs[key] = value
+
+ last_resource_version: str | None = None
+
+ for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs):
+ task = event["object"]
+ self.log.debug("Event: %s had an event of type %s", task.metadata.name, event["type"])
+ if event["type"] == "ERROR":
+ return self.process_error(event)
+ annotations = task.metadata.annotations
+ task_instance_related_annotations = {
+ "dag_id": annotations["dag_id"],
+ "task_id": annotations["task_id"],
+ "execution_date": annotations.get("execution_date"),
+ "run_id": annotations.get("run_id"),
+ "try_number": annotations["try_number"],
+ }
+ map_index = annotations.get("map_index")
+ if map_index is not None:
+ task_instance_related_annotations["map_index"] = map_index
+
+ self.process_status(
+ pod_name=task.metadata.name,
+ namespace=task.metadata.namespace,
+ status=task.status.phase,
+ annotations=task_instance_related_annotations,
+ resource_version=task.metadata.resource_version,
+ event=event,
+ )
+ last_resource_version = task.metadata.resource_version
+
+ return last_resource_version
+
+ def process_error(self, event: Any) -> str:
+ """Process error response."""
+ self.log.error("Encountered Error response from k8s list namespaced pod stream => %s", event)
+ raw_object = event["raw_object"]
+ if raw_object["code"] == 410:
+ self.log.info(
+ "Kubernetes resource version is too old, must reset to 0 => %s", (raw_object["message"],)
+ )
+ # Return resource version 0
+ return "0"
+ raise AirflowException(
+ f"Kubernetes failure for {raw_object['reason']} with code {raw_object['code']} and message: "
+ f"{raw_object['message']}"
+ )
+
+ def process_status(
+ self,
+ pod_name: str,
+ namespace: str,
+ status: str,
+ annotations: dict[str, str],
+ resource_version: str,
+ event: Any,
+ ) -> None:
+ pod = event["object"]
+ annotations_string = annotations_for_logging_task_metadata(annotations)
+ """Process status response."""
+ if status == "Pending":
+ # deletion_timestamp is set by kube server when a graceful deletion is requested.
+ # since kube server have received request to delete pod set TI state failed
+ if event["type"] == "DELETED" and pod.metadata.deletion_timestamp:
+ self.log.info("Event: Failed to start pod %s, annotations: %s", pod_name, annotations_string)
+ self.watcher_queue.put((pod_name, namespace, State.FAILED, annotations, resource_version))
+ else:
+ self.log.debug("Event: %s Pending, annotations: %s", pod_name, annotations_string)
+ elif status == "Failed":
+ self.log.error("Event: %s Failed, annotations: %s", pod_name, annotations_string)
+ self.watcher_queue.put((pod_name, namespace, State.FAILED, annotations, resource_version))
+ elif status == "Succeeded":
+ # We get multiple events once the pod hits a terminal state, and we only want to
+ # send it along to the scheduler once.
+ # If our event type is DELETED, we have the POD_EXECUTOR_DONE_KEY, or the pod has
+ # a deletion timestamp, we've already seen the initial Succeeded event and sent it
+ # along to the scheduler.
+ if (
+ event["type"] == "DELETED"
+ or POD_EXECUTOR_DONE_KEY in pod.metadata.labels
+ or pod.metadata.deletion_timestamp
+ ):
+ self.log.info(
+ "Skipping event for Succeeded pod %s - event for this pod already sent to executor",
+ pod_name,
+ )
+ return
+ self.log.info("Event: %s Succeeded, annotations: %s", pod_name, annotations_string)
+ self.watcher_queue.put((pod_name, namespace, None, annotations, resource_version))
+ elif status == "Running":
+ # deletion_timestamp is set by kube server when a graceful deletion is requested.
+ # since kube server have received request to delete pod set TI state failed
+ if event["type"] == "DELETED" and pod.metadata.deletion_timestamp:
+ self.log.info(
+ "Event: Pod %s deleted before it could complete, annotations: %s",
+ pod_name,
+ annotations_string,
+ )
+ self.watcher_queue.put((pod_name, namespace, State.FAILED, annotations, resource_version))
+ else:
+ self.log.info("Event: %s is Running, annotations: %s", pod_name, annotations_string)
+ else:
+ self.log.warning(
+ "Event: Invalid state: %s on pod: %s in namespace %s with annotations: %s with "
+ "resource_version: %s",
+ status,
+ pod_name,
+ namespace,
+ annotations,
+ resource_version,
+ )
+
+
+class AirflowKubernetesScheduler(LoggingMixin):
+ """Airflow Scheduler for Kubernetes."""
+
+ def __init__(
+ self,
+ kube_config: Any,
+ result_queue: Queue[KubernetesResultsType],
+ kube_client: client.CoreV1Api,
+ scheduler_job_id: str,
+ ):
+ super().__init__()
+ self.log.debug("Creating Kubernetes executor")
+ self.kube_config = kube_config
+ self.result_queue = result_queue
+ self.namespace = self.kube_config.kube_namespace
+ self.log.debug("Kubernetes using namespace %s", self.namespace)
+ self.kube_client = kube_client
+ self._manager = multiprocessing.Manager()
+ self.watcher_queue = self._manager.Queue()
+ self.scheduler_job_id = scheduler_job_id
+ self.kube_watchers = self._make_kube_watchers()
+
+ def run_pod_async(self, pod: k8s.V1Pod, **kwargs):
+ """Runs POD asynchronously."""
+ sanitized_pod = self.kube_client.api_client.sanitize_for_serialization(pod)
+ json_pod = json.dumps(sanitized_pod, indent=2)
+
+ self.log.debug("Pod Creation Request: \n%s", json_pod)
+ try:
+ resp = self.kube_client.create_namespaced_pod(
+ body=sanitized_pod, namespace=pod.metadata.namespace, **kwargs
+ )
+ self.log.debug("Pod Creation Response: %s", resp)
+ except Exception as e:
+ self.log.exception("Exception when attempting to create Namespaced Pod: %s", json_pod)
+ raise e
+ return resp
+
+ def _make_kube_watcher(self, namespace) -> KubernetesJobWatcher:
+ resource_version = ResourceVersion().resource_version.get(namespace, "0")
+ watcher = KubernetesJobWatcher(
+ watcher_queue=self.watcher_queue,
+ namespace=namespace,
+ resource_version=resource_version,
+ scheduler_job_id=self.scheduler_job_id,
+ kube_config=self.kube_config,
+ )
+ watcher.start()
+ return watcher
+
+ def _make_kube_watchers(self) -> dict[str, KubernetesJobWatcher]:
+ watchers = {}
+ if self.kube_config.multi_namespace_mode:
+ namespaces_to_watch = (
+ self.kube_config.multi_namespace_mode_namespace_list
+ if self.kube_config.multi_namespace_mode_namespace_list
+ else [ALL_NAMESPACES]
+ )
+ else:
+ namespaces_to_watch = [self.kube_config.kube_namespace]
+
+ for namespace in namespaces_to_watch:
+ watchers[namespace] = self._make_kube_watcher(namespace)
+ return watchers
+
+ def _health_check_kube_watchers(self):
+ for namespace, kube_watcher in self.kube_watchers.items():
+ if kube_watcher.is_alive():
+ self.log.debug("KubeJobWatcher for namespace %s alive, continuing", namespace)
+ else:
+ self.log.error(
+ (
+ "Error while health checking kube watcher process for namespace %s. "
+ "Process died for unknown reasons"
+ ),
+ namespace,
+ )
+ ResourceVersion().resource_version[namespace] = "0"
+ self.kube_watchers[namespace] = self._make_kube_watcher(namespace)
+
+ def run_next(self, next_job: KubernetesJobType) -> None:
+ """Receives the next job to run, builds the pod, and creates it."""
+ key, command, kube_executor_config, pod_template_file = next_job
+
+ dag_id, task_id, run_id, try_number, map_index = key
+
+ if command[0:3] != ["airflow", "tasks", "run"]:
+ raise ValueError('The command must start with ["airflow", "tasks", "run"].')
+
+ base_worker_pod = get_base_pod_from_template(pod_template_file, self.kube_config)
+
+ if not base_worker_pod:
+ raise AirflowException(
+ f"could not find a valid worker template yaml at {self.kube_config.pod_template_file}"
+ )
+
+ pod = PodGenerator.construct_pod(
+ namespace=self.namespace,
+ scheduler_job_id=self.scheduler_job_id,
+ pod_id=create_pod_id(dag_id, task_id),
+ dag_id=dag_id,
+ task_id=task_id,
+ kube_image=self.kube_config.kube_image,
+ try_number=try_number,
+ map_index=map_index,
+ date=None,
+ run_id=run_id,
+ args=command,
+ pod_override_object=kube_executor_config,
+ base_worker_pod=base_worker_pod,
+ with_mutation_hook=True,
+ )
+ # Reconcile the pod generated by the Operator and the Pod
+ # generated by the .cfg file
+ self.log.info(
+ "Creating kubernetes pod for job is %s, with pod name %s, annotations: %s",
+ key,
+ pod.metadata.name,
+ annotations_for_logging_task_metadata(pod.metadata.annotations),
+ )
+ self.log.debug("Kubernetes running for command %s", command)
+ self.log.debug("Kubernetes launching image %s", pod.spec.containers[0].image)
+
+ # the watcher will monitor pods, so we do not block.
+ self.run_pod_async(pod, **self.kube_config.kube_client_request_args)
+ self.log.debug("Kubernetes Job created!")
+
+ def delete_pod(self, pod_name: str, namespace: str) -> None:
+ """Deletes Pod from a namespace. Does not raise if it does not exist."""
+ try:
+ self.log.debug("Deleting pod %s in namespace %s", pod_name, namespace)
+ self.kube_client.delete_namespaced_pod(
+ pod_name,
+ namespace,
+ body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
+ **self.kube_config.kube_client_request_args,
+ )
+ except ApiException as e:
+ # If the pod is already deleted
+ if e.status != 404:
+ raise
+
+ def patch_pod_executor_done(self, *, pod_name: str, namespace: str):
+ """Add a "done" annotation to ensure we don't continually adopt pods."""
+ self.log.debug("Patching pod %s in namespace %s to mark it as done", pod_name, namespace)
+ try:
+ self.kube_client.patch_namespaced_pod(
+ name=pod_name,
+ namespace=namespace,
+ body={"metadata": {"labels": {POD_EXECUTOR_DONE_KEY: "True"}}},
+ )
+ except ApiException as e:
+ self.log.info("Failed to patch pod %s with done annotation. Reason: %s", pod_name, e)
+
+ def sync(self) -> None:
+ """
+ Checks the status of all currently running kubernetes jobs.
+
+ If a job is completed, its status is placed in the result queue to be sent back to the scheduler.
+ """
+ self.log.debug("Syncing KubernetesExecutor")
+ self._health_check_kube_watchers()
+ while True:
+ try:
+ task = self.watcher_queue.get_nowait()
+ try:
+ self.log.debug("Processing task %s", task)
+ self.process_watcher_task(task)
+ finally:
+ self.watcher_queue.task_done()
+ except Empty:
+ break
+
+ def process_watcher_task(self, task: KubernetesWatchType) -> None:
+ """Process the task by watcher."""
+ pod_name, namespace, state, annotations, resource_version = task
+ self.log.debug(
+ "Attempting to finish pod; pod_name: %s; state: %s; annotations: %s",
+ pod_name,
+ state,
+ annotations_for_logging_task_metadata(annotations),
+ )
+ key = annotations_to_key(annotations=annotations)
+ if key:
+ self.log.debug("finishing job %s - %s (%s)", key, state, pod_name)
+ self.result_queue.put((key, state, pod_name, namespace, resource_version))
+
+ def _flush_watcher_queue(self) -> None:
+ self.log.debug("Executor shutting down, watcher_queue approx. size=%d", self.watcher_queue.qsize())
+ while True:
+ try:
+ task = self.watcher_queue.get_nowait()
+ # Ignoring it since it can only have either FAILED or SUCCEEDED pods
+ self.log.warning("Executor shutting down, IGNORING watcher task=%s", task)
+ self.watcher_queue.task_done()
+ except Empty:
+ break
+
+ def terminate(self) -> None:
+ """Terminates the watcher."""
+ self.log.debug("Terminating kube_watchers...")
+ for namespace, kube_watcher in self.kube_watchers.items():
+ kube_watcher.terminate()
+ kube_watcher.join()
+ self.log.debug("kube_watcher=%s", kube_watcher)
+ self.log.debug("Flushing watcher_queue...")
+ self._flush_watcher_queue()
+ # Queue should be empty...
+ self.watcher_queue.join()
+ self.log.debug("Shutting down manager...")
+ self._manager.shutdown()
+
+
+def get_base_pod_from_template(pod_template_file: str | None, kube_config: Any) -> k8s.V1Pod:
+ """
+ Get base pod from template.
+
+ Reads either the pod_template_file set in the executor_config or the base pod_template_file
+ set in the airflow.cfg to craft a "base pod" that will be used by the KubernetesExecutor
+
+ :param pod_template_file: absolute path to a pod_template_file.yaml or None
+ :param kube_config: The KubeConfig class generated by airflow that contains all kube metadata
+ :return: a V1Pod that can be used as the base pod for k8s tasks
+ """
+ if pod_template_file:
+ return PodGenerator.deserialize_model_file(pod_template_file)
+ else:
+ return PodGenerator.deserialize_model_file(kube_config.pod_template_file)
diff --git a/airflow/kubernetes/kubernetes_helper_functions.py b/airflow/kubernetes/kubernetes_helper_functions.py
index fdb76b0aa8..4cd3422cb6 100644
--- a/airflow/kubernetes/kubernetes_helper_functions.py
+++ b/airflow/kubernetes/kubernetes_helper_functions.py
@@ -19,13 +19,16 @@ from __future__ import annotations
import logging
import secrets
import string
+from typing import TYPE_CHECKING
import pendulum
from slugify import slugify
from airflow.compat.functools import cache
from airflow.configuration import conf
-from airflow.models.taskinstancekey import TaskInstanceKey
+
+if TYPE_CHECKING:
+ from airflow.models.taskinstancekey import TaskInstanceKey
log = logging.getLogger(__name__)
@@ -91,12 +94,12 @@ def annotations_to_key(annotations: dict[str, str]) -> TaskInstanceKey:
annotation_run_id = annotations.get("run_id")
map_index = int(annotations.get("map_index", -1))
- if not annotation_run_id and "execution_date" in annotations:
- # Compat: Look up the run_id from the TI table!
- from airflow.models.dagrun import DagRun
- from airflow.models.taskinstance import TaskInstance
- from airflow.settings import Session
+ # Compat: Look up the run_id from the TI table!
+ from airflow.models.dagrun import DagRun
+ from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
+ from airflow.settings import Session
+ if not annotation_run_id and "execution_date" in annotations:
execution_date = pendulum.parse(annotations["execution_date"])
# Do _not_ use create-session, we don't want to expunge
session = Session()
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index ba9c33a36e..4b4685e079 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -40,10 +40,10 @@ from airflow.utils.state import State, TaskInstanceState
from tests.test_utils.config import conf_vars
try:
- from airflow.executors.kubernetes_executor import (
- POD_EXECUTOR_DONE_KEY,
+ from airflow.executors.kubernetes_executor import KubernetesExecutor
+ from airflow.executors.kubernetes_executor_types import POD_EXECUTOR_DONE_KEY
+ from airflow.executors.kubernetes_executor_utils import (
AirflowKubernetesScheduler,
- KubernetesExecutor,
KubernetesJobWatcher,
ResourceVersion,
create_pod_id,
@@ -160,9 +160,9 @@ class TestAirflowKubernetesScheduler:
@pytest.mark.skipif(
AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
)
- @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
- @mock.patch("airflow.executors.kubernetes_executor.client")
- @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.client")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.KubernetesJobWatcher")
def test_delete_pod_successfully(self, mock_watcher, mock_client, mock_kube_client):
pod_name = "my-pod-1"
namespace = "my-namespace-1"
@@ -182,9 +182,9 @@ class TestAirflowKubernetesScheduler:
@pytest.mark.skipif(
AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
)
- @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
- @mock.patch("airflow.executors.kubernetes_executor.client")
- @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.client")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.KubernetesJobWatcher")
def test_delete_pod_raises_404(self, mock_watcher, mock_client, mock_kube_client):
pod_name = "my-pod-1"
namespace = "my-namespace-2"
@@ -205,9 +205,9 @@ class TestAirflowKubernetesScheduler:
@pytest.mark.skipif(
AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
)
- @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
- @mock.patch("airflow.executors.kubernetes_executor.client")
- @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.client")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.KubernetesJobWatcher")
def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_client):
pod_name = "my-pod-1"
namespace = "my-namespace-3"
@@ -249,8 +249,8 @@ class TestKubernetesExecutor:
pytest.param(400, False, id="400 BadRequest"),
],
)
- @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
- @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.KubernetesJobWatcher")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
def test_run_next_exception_requeue(
self, mock_get_kube_client, mock_kubernetes_job_watcher, status, should_requeue
):
@@ -319,7 +319,7 @@ class TestKubernetesExecutor:
AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
)
@mock.patch("airflow.settings.pod_mutation_hook")
- @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
def test_run_next_pmh_error(self, mock_get_kube_client, mock_pmh):
"""
Exception during Pod Mutation Hook execution should be handled gracefully.
@@ -357,8 +357,8 @@ class TestKubernetesExecutor:
@pytest.mark.skipif(
AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
)
- @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
- @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.KubernetesJobWatcher")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
def test_run_next_pod_reconciliation_error(self, mock_get_kube_client, mock_kubernetes_job_watcher):
"""
When construct_pod raises PodReconciliationError, we should fail the task.
@@ -417,8 +417,8 @@ class TestKubernetesExecutor:
]
mock_stats_gauge.assert_has_calls(calls)
- @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
- @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.KubernetesJobWatcher")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
def test_invalid_executor_config(self, mock_get_kube_client, mock_kubernetes_job_watcher):
executor = self.kubernetes_executor
executor.start()
@@ -443,8 +443,8 @@ class TestKubernetesExecutor:
@pytest.mark.skipif(
AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
)
- @mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.run_pod_async")
- @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.AirflowKubernetesScheduler.run_pod_async")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
def test_pod_template_file_override_in_executor_config(self, mock_get_kube_client, mock_run_pod_async):
current_folder = pathlib.Path(__file__).parent.resolve()
template_file = str(
@@ -528,8 +528,8 @@ class TestKubernetesExecutor:
finally:
executor.end()
- @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
- @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.KubernetesJobWatcher")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_watcher):
executor = self.kubernetes_executor
executor.start()
@@ -542,9 +542,9 @@ class TestKubernetesExecutor:
finally:
executor.end()
- @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
- @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
- @mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.KubernetesJobWatcher")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.AirflowKubernetesScheduler.delete_pod")
def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher):
executor = self.kubernetes_executor
executor.start()
@@ -558,9 +558,9 @@ class TestKubernetesExecutor:
finally:
executor.end()
- @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
- @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
- @mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.KubernetesJobWatcher")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.AirflowKubernetesScheduler")
def test_change_state_failed_no_deletion(
self, mock_kubescheduler, mock_get_kube_client, mock_kubernetes_job_watcher
):
@@ -584,9 +584,9 @@ class TestKubernetesExecutor:
@pytest.mark.parametrize(
"ti_state", [TaskInstanceState.SUCCESS, TaskInstanceState.FAILED, TaskInstanceState.DEFERRED]
)
- @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
- @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
- @mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.KubernetesJobWatcher")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.AirflowKubernetesScheduler.delete_pod")
def test_change_state_none(
self,
mock_delete_pod,
@@ -616,7 +616,7 @@ class TestKubernetesExecutor:
pytest.param(None, ["ALL_NAMESPACES"]),
],
)
- @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
def test_watchers_under_multi_namespace_mode(
self, mock_get_kube_client, multi_namespace_mode_namespace_list, watchers_keys
):
@@ -632,9 +632,9 @@ class TestKubernetesExecutor:
finally:
executor.end()
- @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
- @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
- @mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.KubernetesJobWatcher")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.AirflowKubernetesScheduler")
def test_change_state_skip_pod_deletion(
self, mock_kubescheduler, mock_get_kube_client, mock_kubernetes_job_watcher
):
@@ -656,9 +656,9 @@ class TestKubernetesExecutor:
finally:
executor.end()
- @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
- @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
- @mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.KubernetesJobWatcher")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.AirflowKubernetesScheduler")
def test_change_state_failed_pod_deletion(
self, mock_kubescheduler, mock_get_kube_client, mock_kubernetes_job_watcher
):
@@ -778,7 +778,7 @@ class TestKubernetesExecutor:
mock_adopt_launched_task.assert_not_called()
mock_adopt_completed_pods.assert_called_once()
- @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
def test_adopt_launched_task(self, mock_kube_client):
executor = self.kubernetes_executor
executor.scheduler_job_id = "modified"
@@ -803,7 +803,7 @@ class TestKubernetesExecutor:
assert tis_to_flush_by_key == {}
assert executor.running == {ti_key}
- @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
def test_adopt_launched_task_api_exception(self, mock_kube_client):
"""We shouldn't think we are running the task if aren't able to patch the pod"""
executor = self.kubernetes_executor
@@ -828,7 +828,7 @@ class TestKubernetesExecutor:
assert tis_to_flush_by_key == {ti_key: {}}
assert executor.running == set()
- @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
def test_adopt_completed_pods(self, mock_kube_client):
"""We should adopt all completed pods from other schedulers"""
executor = self.kubernetes_executor
@@ -878,7 +878,7 @@ class TestKubernetesExecutor:
)
assert executor.running == expected_running_ti_keys
- @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
def test_not_adopt_unassigned_task(self, mock_kube_client):
"""
We should not adopt any tasks that were not assigned by the scheduler.
@@ -904,8 +904,8 @@ class TestKubernetesExecutor:
assert not mock_kube_client.patch_namespaced_pod.called
assert tis_to_flush_by_key == {"foobar": {}}
- @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
- @mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod")
+ @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+ @mock.patch("airflow.executors.kubernetes_executor_utils.AirflowKubernetesScheduler.delete_pod")
def test_cleanup_stuck_queued_tasks(self, mock_delete_pod, mock_kube_client, dag_maker, session):
"""Delete any pods associated with a task stuck in queued."""
executor = KubernetesExecutor()
@@ -1232,7 +1232,7 @@ class TestKubernetesJobWatcher:
self.events = []
def _run(self):
- with mock.patch("airflow.executors.kubernetes_executor.watch") as mock_watch:
+ with mock.patch("airflow.executors.kubernetes_executor_utils.watch") as mock_watch:
mock_watch.Watch.return_value.stream.return_value = self.events
latest_resource_version = self.watcher._run(
self.kube_client,
@@ -1362,7 +1362,7 @@ class TestKubernetesJobWatcher:
self.watcher._run = mock_underscore_run
- with mock.patch("airflow.executors.kubernetes_executor.get_kube_client"):
+ with mock.patch("airflow.executors.kubernetes_executor_utils.get_kube_client"):
try:
# self.watcher._run() is mocked and return "500" as last resource_version
self.watcher.run()