You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/13 16:26:45 UTC

[GitHub] [airflow] dondaum commented on issue #17604: Add an option to remove or mask environment variables in the KubernetesPodOperator task instance logs on failure or error events

dondaum commented on issue #17604:
URL: https://github.com/apache/airflow/issues/17604#issuecomment-898580071


   We run version v2.1.2.
   
   So I had a look again in the 'Secret Masker" feature as I had it already configured without any positive effect on the environment variable masking. 
   
   But most likely our setup and configuration might be wrong. So let's give some context:
   
   We have a custom logging config that uses Azure blob strorage for remote logging where we have add the new filter on the remote task handler:
   ```python
   DEFAULT_LOGGING_CONFIG: Dict[str, Any] = {
       "version": 1,
       "disable_existing_loggers": False,
       "formatters": {
           "airflow": {"format": LOG_FORMAT},
           "airflow_coloured": {
               "format": COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
               "class": COLORED_FORMATTER_CLASS
               if COLORED_LOG
               else "logging.Formatter",
           },
       },
       "filters": {
           "mask_secrets": {
               "()": "airflow.utils.log.secrets_masker.SecretsMasker",
           },
       },
       "handlers": {
           "console": {
               "class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
               "formatter": "airflow_coloured",
               "stream": "sys.stdout",
               "filters": ["mask_secrets"],
           },
           "task": {
               "class": "airflow.utils.log.file_task_handler.FileTaskHandler",
               "formatter": "airflow",
               "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
               "filename_template": FILENAME_TEMPLATE,
               "filters": ["mask_secrets"],
           },
           "processor": {
               "class": "airflow.utils.log.file_processor_handler.FileProcessorHandler",  # noqa: E501
               "formatter": "airflow",
               "base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER),
               "filename_template": PROCESSOR_FILENAME_TEMPLATE,
               "filters": ["mask_secrets"],
           },
       },
       "loggers": {
           "airflow.processor": {
               "handlers": ["processor"],
               "level": LOG_LEVEL,
               "propagate": False,
           },
           "airflow.task": {
               "handlers": ["task"],
               "level": LOG_LEVEL,
               "propagate": False,
               "filters": ["mask_secrets"],
           },
           "flask_appbuilder": {
               "handler": ["console"],
               "level": FAB_LOG_LEVEL,
               "propagate": True,
           },
       },
       "root": {
           "handlers": ["console"],
           "level": LOG_LEVEL,
           "filters": ["mask_secrets"],
       },
   }
   
   EXTRA_LOGGER_NAMES: str = conf.get(
       "logging", "EXTRA_LOGGER_NAMES", fallback=None
   )
   if EXTRA_LOGGER_NAMES:
       new_loggers = {
           logger_name.strip(): {
               "handler": ["console"],
               "level": LOG_LEVEL,
               "propagate": True,
           }
           for logger_name in EXTRA_LOGGER_NAMES.split(",")
       }
       DEFAULT_LOGGING_CONFIG["loggers"].update(new_loggers)
   
   DEFAULT_DAG_PARSING_LOGGING_CONFIG: Dict[str, Dict[str, Dict[str, Any]]] = {
       "handlers": {
           "processor_manager": {
               "class": "logging.handlers.RotatingFileHandler",
               "formatter": "airflow",
               "filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION,
               "mode": "a",
               "maxBytes": 104857600,  # 100MB
               "backupCount": 5,
           }
       },
       "loggers": {
           "airflow.processor_manager": {
               "handlers": ["processor_manager"],
               "level": LOG_LEVEL,
               "propagate": False,
           }
       },
   }
   
   ...
   
       elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"):
           REMOTE_BASE_LOG_FOLDER = TARGET_HOSTNAME
           WASB_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = {
               "task": {
                   "class": "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler",  # noqa: E501
                   "formatter": "airflow",
                   "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
                   "wasb_log_folder": REMOTE_BASE_LOG_FOLDER,
                   "wasb_container": "airflow-logs",
                   "filename_template": FILENAME_TEMPLATE,
                   "delete_local_copy": False,
                   "filters": ["mask_secrets"],
               },
           }
   ```
   
   In our Airflow configuration we have (we are using helm charts):
   
   `
   AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES: "DBT_PWD,DBT_WAREHOUSE"
   `
   
   I just had a look in the Airflow doc to verify not to miss any other important config. I found AIRFLOW__CORE__HIDE_SENSITIVE_VAR_CONN_FIELDS  but it is true by default and we do not change it.
   
   
   In an example dag I use an environment variable:
   ```python
   sec = Variable.get("testvar", deserialize_json=True)
   
   with dag:
       k = KubernetesPodOperator(
           namespace=aks_kube_config.aks_namespace,
           in_cluster=True,
           image="python:3.6",
           cmds=["python", "-c"],
           arguments=["import sys; sys.exit(1)"],
           env_vars=[
               k8s.V1EnvVar(
                   name="DBT_WAREHOUSE",
                   value=sec["DBT_WAREHOUSE"],
               )
           ],
           labels={"foo": "bar"},
           name="passing-test",
           is_delete_operator_pod=True,
           task_id="passing-task",
           get_logs=True,
           resources=aks_kube_config.get_dbt_config(),
           node_selector=aks_kube_config.get_dbt_node_selector(),
       )
   ```
   
   However I still have the value of the env variable in the task logs:
   
   ```python
   [2021-08-13 15:21:38,366] {{taskinstance.py:1501}} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 368, in execute
       raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {remote_pod}')
   airflow.exceptions.AirflowException: Pod passing-test.786a2b1e2b3e4b60accdf901645c6af0 returned a failure: {'api_version': 'v1',
    'kind': 'Pod',
    'metadata': {'annotations': None,
                 'cluster_name': None,
                 'creation_timestamp': datetime.datetime(2021, 8, 13, 15, 21, 6, tzinfo=tzlocal()),
                 'deletion_grace_period_seconds': None,
                 'deletion_timestamp': None,
                 'finalizers': None,
                 'generate_name': None,
                 'generation': None,
                 'initializers': None,
                 'labels': {'airflow_version': '2.1.2',
                            'dag_id': 'k8s_v3',
                            'execution_date': '2021-08-13T151421.6018540000-bfa4f3805',
                            'foo': 'bar',
                            'kubernetes_pod_operator': 'True',
                            'task_id': 'passing-task',
                            'try_number': '2'},
                 'managed_fields': [{'api_version': 'v1',
                                     'fields': None,
                                     'manager': 'OpenAPI-Generator',
                                     'operation': 'Update',
                                     'time': datetime.datetime(2021, 8, 13, 15, 21, 6, tzinfo=tzlocal())},
                                    {'api_version': 'v1',
                                     'fields': None,
                                     'manager': 'kubelet',
                                     'operation': 'Update',
                                     'time': datetime.datetime(2021, 8, 13, 15, 21, 36, tzinfo=tzlocal())}],
                 'name': 'passing-test.786a2b1e2b3e4b60accdf901645c6af0',
                 'namespace': 'XXXX',
                 'owner_references': None,
                 'resource_version': '90185518',
                 'self_link': None,
                 'uid': '6bb3ef39-0d53-4b65-8e4b-b0a142304ef4'},
    'spec': {'active_deadline_seconds': None,
             'affinity': {'node_affinity': None,
                          'pod_affinity': None,
                          'pod_anti_affinity': None},
             'automount_service_account_token': None,
             'containers': [{'args': ['import sys; sys.exit(1)'],
                             'command': ['python', '-c'],
                             'env': [{'name': 'DBT_WAREHOUSE',
                                      'value': 'secret',
                                      'value_from': None}],
                             'env_from': None,
                             'image': 'python:3.6',
                             'image_pull_policy': 'IfNotPresent',
                             'lifecycle': None,
                             'liveness_probe': None,
                             'name': 'base',
                             'ports': None,
                             'readiness_probe': None,
                             'resources': {'limits': {'cpu': '1500m',
                                                      'memory': '512Mi'},
                                           'requests': {'cpu': '512m',
                                                        'memory': '256Mi'}},
                             'security_context': None,
                             'stdin': None,
                             'stdin_once': None,
                             'termination_message_path': '/dev/termination-log',
                             'termination_message_policy': 'File',
                             'tty': None,
                             'volume_devices': None,
                             'volume_mounts': [{'mount_path': '/var/run/secrets/kubernetes.io/serviceaccount',
                                                'mount_propagation': None,
                                                'name': 'default-token-k59nb',
                                                'read_only': True,
                                                'sub_path': None,
                                                'sub_path_expr': None}],
                             'working_dir': None}],
   ```
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org