You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2022/12/23 09:24:39 UTC

[airflow] branch main updated: Ensure that pod_mutation_hook is called before logging the pod name (#28534)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a6b6655936 Ensure that pod_mutation_hook is called before logging the pod name (#28534)
a6b6655936 is described below

commit a6b66559365928084b1f4c477275cb0fc14e0884
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Fri Dec 23 09:24:24 2022 +0000

    Ensure that pod_mutation_hook is called before logging the pod name (#28534)
    
    Otherwise if pod mutation hook changes the name the log message from the
    KubeExecutor would be incorrect. As somewhat of a drive-by-refactor I
    have moved the application of the pod_mutation_hook to inside
    PodGenerator.construct_pod.
    
    I think the default for `with_mutation_hook` _should_ be True, but it's
    not clear to me if anything out side of the Airflow core might ever call
    these, and I don't want to apply the hook multiple times, so I have left
    the default as `False`.
---
 airflow/cli/commands/kubernetes_command.py  |  3 +--
 airflow/executors/kubernetes_executor.py    |  9 ++-------
 airflow/kubernetes/pod_generator.py         | 21 +++++++++++++++++++--
 airflow/models/taskinstance.py              |  2 +-
 tests/executors/test_kubernetes_executor.py |  4 ++--
 5 files changed, 25 insertions(+), 14 deletions(-)

diff --git a/airflow/cli/commands/kubernetes_command.py b/airflow/cli/commands/kubernetes_command.py
index 052bf339c0..efdcd6aec6 100644
--- a/airflow/cli/commands/kubernetes_command.py
+++ b/airflow/cli/commands/kubernetes_command.py
@@ -30,7 +30,6 @@ from airflow.kubernetes import pod_generator
 from airflow.kubernetes.kube_client import get_kube_client
 from airflow.kubernetes.pod_generator import PodGenerator
 from airflow.models import DagRun, TaskInstance
-from airflow.settings import pod_mutation_hook
 from airflow.utils import cli as cli_utils, yaml
 from airflow.utils.cli import get_dag
 
@@ -58,8 +57,8 @@ def generate_pod_yaml(args):
             scheduler_job_id="worker-config",
             namespace=kube_config.executor_namespace,
             base_worker_pod=PodGenerator.deserialize_model_file(kube_config.pod_template_file),
+            with_mutation_hook=True,
         )
-        pod_mutation_hook(pod)
         api_client = ApiClient()
         date_string = pod_generator.datetime_to_label_safe_datestring(execution_date)
         yaml_file_name = f"{args.dag_id}_{ti.task_id}_{date_string}.yml"
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index d933a294e9..42d3633a6c 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -45,7 +45,6 @@ from airflow.kubernetes.kube_config import KubeConfig
 from airflow.kubernetes.kubernetes_helper_functions import annotations_to_key, create_pod_id
 from airflow.kubernetes.pod_generator import PodGenerator
 from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
-from airflow.settings import pod_mutation_hook
 from airflow.utils import timezone
 from airflow.utils.event_scheduler import EventScheduler
 from airflow.utils.log.logging_mixin import LoggingMixin
@@ -255,11 +254,6 @@ class AirflowKubernetesScheduler(LoggingMixin):
 
     def run_pod_async(self, pod: k8s.V1Pod, **kwargs):
         """Runs POD asynchronously."""
-        try:
-            pod_mutation_hook(pod)
-        except Exception as e:
-            raise PodMutationHookException(e)
-
         sanitized_pod = self.kube_client.api_client.sanitize_for_serialization(pod)
         json_pod = json.dumps(sanitized_pod, indent=2)
 
@@ -346,6 +340,7 @@ class AirflowKubernetesScheduler(LoggingMixin):
             args=command,
             pod_override_object=kube_executor_config,
             base_worker_pod=base_worker_pod,
+            with_mutation_hook=True,
         )
         # Reconcile the pod generated by the Operator and the Pod
         # generated by the .cfg file
@@ -695,7 +690,7 @@ class KubernetesExecutor(BaseExecutor):
                     self.log.error(
                         "Pod Mutation Hook failed for the task %s. Failing task. Details: %s",
                         key,
-                        e,
+                        e.__cause__,
                     )
                     self.fail(key, e)
                 finally:
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 777fa640eb..64b7965a4c 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -37,7 +37,12 @@ from dateutil import parser
 from kubernetes.client import models as k8s
 from kubernetes.client.api_client import ApiClient
 
-from airflow.exceptions import AirflowConfigException, PodReconciliationError, RemovedInAirflow3Warning
+from airflow.exceptions import (
+    AirflowConfigException,
+    PodMutationHookException,
+    PodReconciliationError,
+    RemovedInAirflow3Warning,
+)
 from airflow.kubernetes.kubernetes_helper_functions import add_pod_suffix, rand_str
 from airflow.kubernetes.pod_generator_deprecated import PodDefaults, PodGenerator as PodGeneratorDeprecated
 from airflow.utils import yaml
@@ -349,6 +354,8 @@ class PodGenerator:
         scheduler_job_id: str,
         run_id: str | None = None,
         map_index: int = -1,
+        *,
+        with_mutation_hook: bool = False,
     ) -> k8s.V1Pod:
         """
         Create a Pod.
@@ -426,10 +433,20 @@ class PodGenerator:
         pod_list = [base_worker_pod, pod_override_object, dynamic_pod]
 
         try:
-            return reduce(PodGenerator.reconcile_pods, pod_list)
+            pod = reduce(PodGenerator.reconcile_pods, pod_list)
         except Exception as e:
             raise PodReconciliationError from e
 
+        if with_mutation_hook:
+            from airflow.settings import pod_mutation_hook
+
+            try:
+                pod_mutation_hook(pod)
+            except Exception as e:
+                raise PodMutationHookException from e
+
+        return pod
+
     @staticmethod
     def serialize_pod(pod: k8s.V1Pod) -> dict:
         """
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 62040b1a67..13722a3a8e 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2147,8 +2147,8 @@ class TaskInstance(Base, LoggingMixin):
             scheduler_job_id="0",
             namespace=kube_config.executor_namespace,
             base_worker_pod=PodGenerator.deserialize_model_file(kube_config.pod_template_file),
+            with_mutation_hook=True,
         )
-        settings.pod_mutation_hook(pod)
         sanitized_pod = ApiClient().sanitize_for_serialization(pod)
         return sanitized_pod
 
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index e48d2087cb..1385324d51 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -303,7 +303,7 @@ class TestKubernetesExecutor:
     @pytest.mark.skipif(
         AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
     )
-    @mock.patch("airflow.executors.kubernetes_executor.pod_mutation_hook")
+    @mock.patch("airflow.settings.pod_mutation_hook")
     @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
     def test_run_next_pmh_error(self, mock_get_kube_client, mock_pmh):
         """
@@ -335,7 +335,7 @@ class TestKubernetesExecutor:
             # The task is not re-queued and there is the failed record in event_buffer
             assert kubernetes_executor.task_queue.empty()
             assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
-            assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == exception_in_pmh
+            assert kubernetes_executor.event_buffer[task_instance_key][1].__cause__ == exception_in_pmh
         finally:
             kubernetes_executor.end()