You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/09/28 17:21:57 UTC

[GitHub] [airflow] potiuk commented on a change in pull request #18571: Operator help code optimisation

potiuk commented on a change in pull request #18571:
URL: https://github.com/apache/airflow/pull/18571#discussion_r717799840



##########
File path: airflow/utils/operator_helpers.py
##########
@@ -58,37 +59,32 @@ def context_to_airflow_vars(context, in_env_var_format=False):
         name_format = 'env_var_format'
     else:
         name_format = 'default'
+
     task = context.get('task')
-    if task and task.email:
-        if isinstance(task.email, str):
-            params[AIRFLOW_VAR_NAME_FORMAT_MAPPING['AIRFLOW_CONTEXT_DAG_EMAIL'][name_format]] = task.email
-        elif isinstance(task.email, list):
-            # os env variable value needs to be string
-            params[AIRFLOW_VAR_NAME_FORMAT_MAPPING['AIRFLOW_CONTEXT_DAG_EMAIL'][name_format]] = ','.join(
-                task.email
-            )
-    if task and task.owner:
-        if isinstance(task.owner, str):
-            params[AIRFLOW_VAR_NAME_FORMAT_MAPPING['AIRFLOW_CONTEXT_DAG_OWNER'][name_format]] = task.owner
-        elif isinstance(task.owner, list):
-            # os env variable value needs to be string
-            params[AIRFLOW_VAR_NAME_FORMAT_MAPPING['AIRFLOW_CONTEXT_DAG_OWNER'][name_format]] = ','.join(
-                task.owner
-            )
     task_instance = context.get('task_instance')
-    if task_instance and task_instance.dag_id:
-        params[AIRFLOW_VAR_NAME_FORMAT_MAPPING['AIRFLOW_CONTEXT_DAG_ID'][name_format]] = task_instance.dag_id
-    if task_instance and task_instance.task_id:
-        params[
-            AIRFLOW_VAR_NAME_FORMAT_MAPPING['AIRFLOW_CONTEXT_TASK_ID'][name_format]
-        ] = task_instance.task_id
-    if task_instance and task_instance.execution_date:
-        params[
-            AIRFLOW_VAR_NAME_FORMAT_MAPPING['AIRFLOW_CONTEXT_EXECUTION_DATE'][name_format]
-        ] = task_instance.execution_date.isoformat()
     dag_run = context.get('dag_run')
-    if dag_run and dag_run.run_id:
-        params[AIRFLOW_VAR_NAME_FORMAT_MAPPING['AIRFLOW_CONTEXT_DAG_RUN_ID'][name_format]] = dag_run.run_id
+
+    ops = [
+        (task, 'email', 'AIRFLOW_CONTEXT_DAG_EMAIL'),
+        (task, 'owner', 'AIRFLOW_CONTEXT_DAG_OWNER'),
+        (task_instance, 'dag_id', 'AIRFLOW_CONTEXT_DAG_ID'),
+        (task_instance, 'task_id', 'AIRFLOW_CONTEXT_TASK_ID'),
+        (task_instance, 'execution_date', 'AIRFLOW_CONTEXT_EXECUTION_DATE'),
+        (dag_run, 'run_id', 'AIRFLOW_CONTEXT_DAG_RUN_ID'),
+    ]
+
+    for subject, attr, mapping_key in ops:
+        _attr = getattr(subject, attr, None)
+        if subject and _attr:
+            mapping_value = AIRFLOW_VAR_NAME_FORMAT_MAPPING[mapping_key][name_format]
+            if isinstance(_attr, str):
+                params[mapping_value] = _attr
+            if isinstance(_attr, datetime):

Review comment:
       ```suggestion
               elif isinstance(_attr, datetime):
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org