You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2022/12/23 09:24:39 UTC
[airflow] branch main updated: Ensure that pod_mutation_hook is called before logging the pod name (#28534)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a6b6655936 Ensure that pod_mutation_hook is called before logging the pod name (#28534)
a6b6655936 is described below
commit a6b66559365928084b1f4c477275cb0fc14e0884
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Fri Dec 23 09:24:24 2022 +0000
Ensure that pod_mutation_hook is called before logging the pod name (#28534)
Otherwise if pod mutation hook changes the name the log message from the
KubeExecutor would be incorrect. As somewhat of a drive-by-refactor I
have moved the application of the pod_mutation_hook to inside
PodGenerator.construct_pod.
I think the default for `with_mutation_hook` _should_ be True, but it's
not clear to me if anything out side of the Airflow core might ever call
these, and I don't want to apply the hook multiple times, so I have left
the default as `False`.
---
airflow/cli/commands/kubernetes_command.py | 3 +--
airflow/executors/kubernetes_executor.py | 9 ++-------
airflow/kubernetes/pod_generator.py | 21 +++++++++++++++++++--
airflow/models/taskinstance.py | 2 +-
tests/executors/test_kubernetes_executor.py | 4 ++--
5 files changed, 25 insertions(+), 14 deletions(-)
diff --git a/airflow/cli/commands/kubernetes_command.py b/airflow/cli/commands/kubernetes_command.py
index 052bf339c0..efdcd6aec6 100644
--- a/airflow/cli/commands/kubernetes_command.py
+++ b/airflow/cli/commands/kubernetes_command.py
@@ -30,7 +30,6 @@ from airflow.kubernetes import pod_generator
from airflow.kubernetes.kube_client import get_kube_client
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.models import DagRun, TaskInstance
-from airflow.settings import pod_mutation_hook
from airflow.utils import cli as cli_utils, yaml
from airflow.utils.cli import get_dag
@@ -58,8 +57,8 @@ def generate_pod_yaml(args):
scheduler_job_id="worker-config",
namespace=kube_config.executor_namespace,
base_worker_pod=PodGenerator.deserialize_model_file(kube_config.pod_template_file),
+ with_mutation_hook=True,
)
- pod_mutation_hook(pod)
api_client = ApiClient()
date_string = pod_generator.datetime_to_label_safe_datestring(execution_date)
yaml_file_name = f"{args.dag_id}_{ti.task_id}_{date_string}.yml"
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index d933a294e9..42d3633a6c 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -45,7 +45,6 @@ from airflow.kubernetes.kube_config import KubeConfig
from airflow.kubernetes.kubernetes_helper_functions import annotations_to_key, create_pod_id
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
-from airflow.settings import pod_mutation_hook
from airflow.utils import timezone
from airflow.utils.event_scheduler import EventScheduler
from airflow.utils.log.logging_mixin import LoggingMixin
@@ -255,11 +254,6 @@ class AirflowKubernetesScheduler(LoggingMixin):
def run_pod_async(self, pod: k8s.V1Pod, **kwargs):
"""Runs POD asynchronously."""
- try:
- pod_mutation_hook(pod)
- except Exception as e:
- raise PodMutationHookException(e)
-
sanitized_pod = self.kube_client.api_client.sanitize_for_serialization(pod)
json_pod = json.dumps(sanitized_pod, indent=2)
@@ -346,6 +340,7 @@ class AirflowKubernetesScheduler(LoggingMixin):
args=command,
pod_override_object=kube_executor_config,
base_worker_pod=base_worker_pod,
+ with_mutation_hook=True,
)
# Reconcile the pod generated by the Operator and the Pod
# generated by the .cfg file
@@ -695,7 +690,7 @@ class KubernetesExecutor(BaseExecutor):
self.log.error(
"Pod Mutation Hook failed for the task %s. Failing task. Details: %s",
key,
- e,
+ e.__cause__,
)
self.fail(key, e)
finally:
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 777fa640eb..64b7965a4c 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -37,7 +37,12 @@ from dateutil import parser
from kubernetes.client import models as k8s
from kubernetes.client.api_client import ApiClient
-from airflow.exceptions import AirflowConfigException, PodReconciliationError, RemovedInAirflow3Warning
+from airflow.exceptions import (
+ AirflowConfigException,
+ PodMutationHookException,
+ PodReconciliationError,
+ RemovedInAirflow3Warning,
+)
from airflow.kubernetes.kubernetes_helper_functions import add_pod_suffix, rand_str
from airflow.kubernetes.pod_generator_deprecated import PodDefaults, PodGenerator as PodGeneratorDeprecated
from airflow.utils import yaml
@@ -349,6 +354,8 @@ class PodGenerator:
scheduler_job_id: str,
run_id: str | None = None,
map_index: int = -1,
+ *,
+ with_mutation_hook: bool = False,
) -> k8s.V1Pod:
"""
Create a Pod.
@@ -426,10 +433,20 @@ class PodGenerator:
pod_list = [base_worker_pod, pod_override_object, dynamic_pod]
try:
- return reduce(PodGenerator.reconcile_pods, pod_list)
+ pod = reduce(PodGenerator.reconcile_pods, pod_list)
except Exception as e:
raise PodReconciliationError from e
+ if with_mutation_hook:
+ from airflow.settings import pod_mutation_hook
+
+ try:
+ pod_mutation_hook(pod)
+ except Exception as e:
+ raise PodMutationHookException from e
+
+ return pod
+
@staticmethod
def serialize_pod(pod: k8s.V1Pod) -> dict:
"""
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 62040b1a67..13722a3a8e 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2147,8 +2147,8 @@ class TaskInstance(Base, LoggingMixin):
scheduler_job_id="0",
namespace=kube_config.executor_namespace,
base_worker_pod=PodGenerator.deserialize_model_file(kube_config.pod_template_file),
+ with_mutation_hook=True,
)
- settings.pod_mutation_hook(pod)
sanitized_pod = ApiClient().sanitize_for_serialization(pod)
return sanitized_pod
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index e48d2087cb..1385324d51 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -303,7 +303,7 @@ class TestKubernetesExecutor:
@pytest.mark.skipif(
AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
)
- @mock.patch("airflow.executors.kubernetes_executor.pod_mutation_hook")
+ @mock.patch("airflow.settings.pod_mutation_hook")
@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
def test_run_next_pmh_error(self, mock_get_kube_client, mock_pmh):
"""
@@ -335,7 +335,7 @@ class TestKubernetesExecutor:
# The task is not re-queued and there is the failed record in event_buffer
assert kubernetes_executor.task_queue.empty()
assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
- assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == exception_in_pmh
+ assert kubernetes_executor.event_buffer[task_instance_key][1].__cause__ == exception_in_pmh
finally:
kubernetes_executor.end()