You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by xd...@apache.org on 2022/12/19 03:29:28 UTC

[airflow] branch main updated: Log FileTaskHandler to work with KubernetesExecutor's multi_namespace_mode (#28436)

This is an automated email from the ASF dual-hosted git repository.

xddeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 497c2c243d Log FileTaskHandler to work with KubernetesExecutor's multi_namespace_mode (#28436)
497c2c243d is described below

commit 497c2c243dd168639d34ff35e02e62d5177de338
Author: Xiaodong DENG <xd...@apache.org>
AuthorDate: Mon Dec 19 04:29:09 2022 +0100

    Log FileTaskHandler to work with KubernetesExecutor's multi_namespace_mode (#28436)
---
 airflow/utils/log/file_task_handler.py |  8 +++--
 tests/utils/test_log_handlers.py       | 61 +++++++++++++++++++++++++++++++++-
 2 files changed, 66 insertions(+), 3 deletions(-)

diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 6aee75ee33..b8feb2997b 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -191,16 +191,20 @@ class FileTaskHandler(logging.Handler):
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
         elif self._should_check_k8s(ti.queue):
+            pod_override = ti.executor_config.get("pod_override")
+            if pod_override and pod_override.metadata and pod_override.metadata.namespace:
+                namespace = pod_override.metadata.namespace
+            else:
+                namespace = conf.get("kubernetes_executor", "namespace")
             try:
                 from airflow.kubernetes.kube_client import get_kube_client
 
                 kube_client = get_kube_client()
 
                 log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
-
                 res = kube_client.read_namespaced_pod_log(
                     name=ti.hostname,
-                    namespace=conf.get("kubernetes_executor", "namespace"),
+                    namespace=namespace,
                     container="base",
                     follow=False,
                     tail_lines=100,
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index ee2ff2d9ce..8b7f0145de 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -21,9 +21,10 @@ import logging
 import logging.config
 import os
 import re
-from unittest.mock import patch
+from unittest.mock import MagicMock, patch
 
 import pytest
+from kubernetes.client import models as k8s
 
 from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
 from airflow.models import DAG, DagRun, TaskInstance
@@ -219,6 +220,64 @@ class TestFileTaskLogHandler:
         # Remove the generated tmp log file.
         os.remove(log_filename)
 
+    @pytest.mark.parametrize(
+        "pod_override, namespace_to_call",
+        [
+            pytest.param(k8s.V1Pod(metadata=k8s.V1ObjectMeta(namespace="namespace-A")), "namespace-A"),
+            pytest.param(k8s.V1Pod(metadata=k8s.V1ObjectMeta(namespace="namespace-B")), "namespace-B"),
+            pytest.param(k8s.V1Pod(), "default"),
+            pytest.param(None, "default"),
+            pytest.param(k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="pod-name-xxx")), "default"),
+        ],
+    )
+    @patch.dict("os.environ", AIRFLOW__CORE__EXECUTOR="KubernetesExecutor")
+    @patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_read_from_k8s_under_multi_namespace_mode(
+        self, mock_kube_client, pod_override, namespace_to_call
+    ):
+        mock_read_namespaced_pod_log = MagicMock()
+        mock_kube_client.return_value.read_namespaced_pod_log = mock_read_namespaced_pod_log
+
+        def task_callable(ti):
+            ti.log.info("test")
+
+        dag = DAG("dag_for_testing_file_task_handler", start_date=DEFAULT_DATE)
+        dagrun = dag.create_dagrun(
+            run_type=DagRunType.MANUAL,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE,
+        )
+        executor_config_pod = pod_override
+        task = PythonOperator(
+            task_id="task_for_testing_file_log_handler",
+            dag=dag,
+            python_callable=task_callable,
+            executor_config={"pod_override": executor_config_pod},
+        )
+        ti = TaskInstance(task=task, run_id=dagrun.run_id)
+        ti.try_number = 3
+
+        logger = ti.log
+        ti.log.disabled = False
+
+        file_handler = next(
+            (handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None
+        )
+        set_context(logger, ti)
+        ti.run(ignore_ti_state=True)
+
+        file_handler.read(ti, 3)
+
+        # Check if kube_client.read_namespaced_pod_log() is called with the namespace we expect
+        mock_read_namespaced_pod_log.assert_called_once_with(
+            name=ti.hostname,
+            namespace=namespace_to_call,
+            container="base",
+            follow=False,
+            tail_lines=100,
+            _preload_content=False,
+        )
+
 
 class TestFilenameRendering:
     def test_python_formatting(self, create_log_template, create_task_instance):