You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by xd...@apache.org on 2022/12/19 03:29:28 UTC
[airflow] branch main updated: Log FileTaskHandler to work with KubernetesExecutor's multi_namespace_mode (#28436)
This is an automated email from the ASF dual-hosted git repository.
xddeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 497c2c243d Log FileTaskHandler to work with KubernetesExecutor's multi_namespace_mode (#28436)
497c2c243d is described below
commit 497c2c243dd168639d34ff35e02e62d5177de338
Author: Xiaodong DENG <xd...@apache.org>
AuthorDate: Mon Dec 19 04:29:09 2022 +0100
Log FileTaskHandler to work with KubernetesExecutor's multi_namespace_mode (#28436)
---
airflow/utils/log/file_task_handler.py | 8 +++--
tests/utils/test_log_handlers.py | 61 +++++++++++++++++++++++++++++++++-
2 files changed, 66 insertions(+), 3 deletions(-)
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 6aee75ee33..b8feb2997b 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -191,16 +191,20 @@ class FileTaskHandler(logging.Handler):
log += f"*** {str(e)}\n"
return log, {"end_of_log": True}
elif self._should_check_k8s(ti.queue):
+ pod_override = ti.executor_config.get("pod_override")
+ if pod_override and pod_override.metadata and pod_override.metadata.namespace:
+ namespace = pod_override.metadata.namespace
+ else:
+ namespace = conf.get("kubernetes_executor", "namespace")
try:
from airflow.kubernetes.kube_client import get_kube_client
kube_client = get_kube_client()
log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
-
res = kube_client.read_namespaced_pod_log(
name=ti.hostname,
- namespace=conf.get("kubernetes_executor", "namespace"),
+ namespace=namespace,
container="base",
follow=False,
tail_lines=100,
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index ee2ff2d9ce..8b7f0145de 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -21,9 +21,10 @@ import logging
import logging.config
import os
import re
-from unittest.mock import patch
+from unittest.mock import MagicMock, patch
import pytest
+from kubernetes.client import models as k8s
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow.models import DAG, DagRun, TaskInstance
@@ -219,6 +220,64 @@ class TestFileTaskLogHandler:
# Remove the generated tmp log file.
os.remove(log_filename)
+ @pytest.mark.parametrize(
+ "pod_override, namespace_to_call",
+ [
+ pytest.param(k8s.V1Pod(metadata=k8s.V1ObjectMeta(namespace="namespace-A")), "namespace-A"),
+ pytest.param(k8s.V1Pod(metadata=k8s.V1ObjectMeta(namespace="namespace-B")), "namespace-B"),
+ pytest.param(k8s.V1Pod(), "default"),
+ pytest.param(None, "default"),
+ pytest.param(k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="pod-name-xxx")), "default"),
+ ],
+ )
+ @patch.dict("os.environ", AIRFLOW__CORE__EXECUTOR="KubernetesExecutor")
+ @patch("airflow.kubernetes.kube_client.get_kube_client")
+ def test_read_from_k8s_under_multi_namespace_mode(
+ self, mock_kube_client, pod_override, namespace_to_call
+ ):
+ mock_read_namespaced_pod_log = MagicMock()
+ mock_kube_client.return_value.read_namespaced_pod_log = mock_read_namespaced_pod_log
+
+ def task_callable(ti):
+ ti.log.info("test")
+
+ dag = DAG("dag_for_testing_file_task_handler", start_date=DEFAULT_DATE)
+ dagrun = dag.create_dagrun(
+ run_type=DagRunType.MANUAL,
+ state=State.RUNNING,
+ execution_date=DEFAULT_DATE,
+ )
+ executor_config_pod = pod_override
+ task = PythonOperator(
+ task_id="task_for_testing_file_log_handler",
+ dag=dag,
+ python_callable=task_callable,
+ executor_config={"pod_override": executor_config_pod},
+ )
+ ti = TaskInstance(task=task, run_id=dagrun.run_id)
+ ti.try_number = 3
+
+ logger = ti.log
+ ti.log.disabled = False
+
+ file_handler = next(
+ (handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None
+ )
+ set_context(logger, ti)
+ ti.run(ignore_ti_state=True)
+
+ file_handler.read(ti, 3)
+
+ # Check if kube_client.read_namespaced_pod_log() is called with the namespace we expect
+ mock_read_namespaced_pod_log.assert_called_once_with(
+ name=ti.hostname,
+ namespace=namespace_to_call,
+ container="base",
+ follow=False,
+ tail_lines=100,
+ _preload_content=False,
+ )
+
class TestFilenameRendering:
def test_python_formatting(self, create_log_template, create_task_instance):