You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2023/02/23 07:14:27 UTC

[airflow] branch main updated: Fix leak sensitive field via V1EnvVar on exception (#29016)

This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 78115c5ca9 Fix leak sensitive field via V1EnvVar on exception (#29016)
78115c5ca9 is described below

commit 78115c5ca995be6f0e06f68b4d97f7c270c16685
Author: Ruben Laguna <ru...@gmail.com>
AuthorDate: Thu Feb 23 08:14:18 2023 +0100

    Fix leak sensitive field via V1EnvVar on exception (#29016)
    
    Co-authored-by: Andrey Anshin <An...@taragol.is>
---
 airflow/models/abstractoperator.py               |  4 ++-
 airflow/utils/log/secrets_masker.py              | 30 ++++++++++++++++++++--
 kubernetes_tests/test_kubernetes_pod_operator.py | 32 ++++++++++++++++++++++++
 3 files changed, 63 insertions(+), 3 deletions(-)

diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py
index fe037b984c..f0ec59eb2a 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -28,6 +28,7 @@ from airflow.models.expandinput import NotFullyPopulated
 from airflow.models.taskmixin import DAGNode
 from airflow.template.templater import Templater
 from airflow.utils.context import Context
+from airflow.utils.log.secrets_masker import redact
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.sqlalchemy import skip_locked, with_row_locks
 from airflow.utils.state import State, TaskInstanceState
@@ -573,11 +574,12 @@ class AbstractOperator(Templater, DAGNode):
                     seen_oids,
                 )
             except Exception:
+                value_masked = redact(name=attr_name, value=value)
                 self.log.exception(
                     "Exception rendering Jinja template for task '%s', field '%s'. Template: %r",
                     self.task_id,
                     attr_name,
-                    value,
+                    value_masked,
                 )
                 raise
             else:
diff --git a/airflow/utils/log/secrets_masker.py b/airflow/utils/log/secrets_masker.py
index 23a57b45d5..cbbcf8ac17 100644
--- a/airflow/utils/log/secrets_masker.py
+++ b/airflow/utils/log/secrets_masker.py
@@ -21,11 +21,29 @@ import collections
 import logging
 import re
 import sys
-from typing import Any, Callable, Dict, Generator, Iterable, List, TextIO, Tuple, TypeVar, Union
+from typing import (
+    Any,
+    Callable,
+    Dict,
+    Generator,
+    Iterable,
+    List,
+    TextIO,
+    Tuple,
+    TypeVar,
+    Union,
+)
 
 from airflow import settings
 from airflow.compat.functools import cache, cached_property
 
+try:
+    # kubernetes provider may not be installed
+    from kubernetes.client import V1EnvVar
+except ImportError:
+    V1EnvVar = type("V1EnvVar", (), {})  # keep mypy happy about the V1EnvVar check
+
+
 Redactable = TypeVar("Redactable", str, Dict[Any, Any], Tuple[Any, ...], List[Any])
 Redacted = Union[Redactable, str]
 
@@ -200,10 +218,18 @@ class SecretsMasker(logging.Filter):
             if name and should_hide_value_for_key(name):
                 return self._redact_all(item, depth)
             if isinstance(item, dict):
-                return {
+                to_return = {
                     dict_key: self._redact(subval, name=dict_key, depth=(depth + 1))
                     for dict_key, subval in item.items()
                 }
+                return to_return
+            elif isinstance(item, V1EnvVar):
+                tmp: dict = item.to_dict()
+                if should_hide_value_for_key(tmp.get("name", "")) and "value" in tmp:
+                    tmp["value"] = "***"
+                else:
+                    return self._redact(item=tmp, name=name, depth=depth)
+                return tmp
             elif isinstance(item, str):
                 if self.replacer:
                     # We can't replace specific values, but the key-based redacting
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py
index b31fe024bb..4d2fcd1d3f 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -1283,3 +1283,35 @@ class TestKubernetesPodOperatorSystem:
             == "apple-sauce"
         )
         assert MyK8SPodOperator(task_id=str(uuid4())).base_container_name == "tomato-sauce"
+
+
+def test_hide_sensitive_field_in_templated_fields_on_error(caplog, monkeypatch):
+    logger = logging.getLogger("airflow.task")
+    monkeypatch.setattr(logger, "propagate", True)
+
+    class Var:
+        def __getattr__(self, name):
+            raise KeyError(name)
+
+    context = {
+        "password": "secretpassword",
+        "var": Var(),
+    }
+    from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
+        KubernetesPodOperator,
+    )
+
+    task = KubernetesPodOperator(
+        task_id="dry_run_demo",
+        name="hello-dry-run",
+        image="python:3.8-slim-buster",
+        cmds=["printenv"],
+        env_vars={
+            "password": "{{ password }}",
+            "VAR2": "{{ var.value.nonexisting}}",
+        },
+    )
+    with pytest.raises(KeyError):
+        task.render_template_fields(context=context)
+    assert "password" in caplog.text
+    assert "secretpassword" not in caplog.text