You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "zhangw (via GitHub)" <gi...@apache.org> on 2023/12/01 10:36:59 UTC

[I] dag_hash changed when using the XComArgs feature [airflow]

zhangw opened a new issue, #35998:
URL: https://github.com/apache/airflow/issues/35998

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   ## Airflow version 2.1.2
   
   Dag file is simple but using the XComArgs feature, and I notice the dag_hash changed when parsing and serializing every time. I thought the hashing should be stable in this case.
   
   ## The Dag file for testing
   
   ```
   import logging
   
   from airflow import DAG
   from airflow.decorators import task
   from airflow.operators.bash import BashOperator
   from airflow.operators.python import PythonOperator, get_current_context
   from airflow.utils.dates import days_ago
   
   log = logging.getLogger(__name__)
   
   
   def generate_value():
       """Dummy function"""
       return "Bring me a shrubbery!"
   
   
   @task()
   def print_value(value):
       """Dummy function"""
       ctx = get_current_context()
       log.info("The knights of Ni say: %s (at %s)", value, ctx['ts'])
   
   
   with DAG(
       dag_id='example_xcom_args',
       default_args={'owner': 'airflow'},
       start_date=days_ago(2),
       schedule_interval=None,
       tags=['example'],
   ) as dag:
       task1 = PythonOperator(
           task_id='generate_value',
           python_callable=generate_value,
       )
   
       print_value(task1.output)
   
   
   with DAG(
       "example_xcom_args_with_operators",
       default_args={'owner': 'airflow'},
       start_date=days_ago(2),
       schedule_interval=None,
       tags=['example'],
   ) as dag2:
       bash_op1 = BashOperator(task_id="c", bash_command="echo c")
       bash_op2 = BashOperator(task_id="d", bash_command="echo c")
       xcom_args_a = print_value("first!")
       xcom_args_b = print_value("second!")
   
       bash_op1 >> xcom_args_a >> xcom_args_b >> bash_op2
   ```
   
   ## one of the serialized data
   
   ```
   {"dag": {"tags": ["example"], "tasks": [{"pool": "default_pool", "label": "generate_value", "owner": "airflow", "_inlets": [], "op_args": [], "task_id": "generate_value", "_outlets": [], "ui_color": "#ffefeb", "_is_dummy": false, "op_kwargs": {}, "_task_type": "PythonOperator", "ui_fgcolor": "#000", "_task_module": "airflow.operators.python", "template_fields": ["templates_dict", "op_args", "op_kwargs"], "_downstream_task_ids": ["print_value"], "template_fields_renderers": {"op_args": "py", "op_kwargs": "py", "templates_dict": "json"}}, {"pool": "default_pool", "label": "print_value", "owner": "airflow", "doc_md": "Dummy function", "_inlets": [], "op_args": "(<airflow.models.xcom_arg.XComArg object at 0x107415d30>,)", "task_id": "print_value", "_outlets": [], "ui_color": "#ffefeb", "_is_dummy": false, "op_kwargs": {}, "_task_type": "_PythonDecoratedOperator", "ui_fgcolor": "#000", "_task_module": "airflow.decorators.python", "template_fields": ["op_args", "op_kwargs"], "_downstrea
 m_task_ids": [], "template_fields_renderers": {"op_args": "py", "op_kwargs": "py"}}], "_dag_id": "example_xcom_args", "fileloc": "/Users/vincent/Codes/Workspace/webull/airflow/airflow/airflow/example_dags/example_xcomargs.py", "timezone": "UTC", "edge_info": {}, "start_date": 1701216000.0, "_task_group": {"tooltip": "", "children": {"print_value": ["operator", "print_value"], "generate_value": ["operator", "generate_value"]}, "ui_color": "CornflowerBlue", "_group_id": null, "ui_fgcolor": "#000", "prefix_group_id": true, "upstream_task_ids": [], "upstream_group_ids": [], "downstream_task_ids": [], "downstream_group_ids": []}, "default_args": {"__var": {"owner": "airflow"}, "__type": "dict"}, "dag_dependencies": [], "schedule_interval": null}, "__version": 1}
   ```
   
   ## another serialized data
   
   ```
   {"dag": {"tags": ["example"], "tasks": [{"pool": "default_pool", "label": "generate_value", "owner": "airflow", "_inlets": [], "op_args": [], "task_id": "generate_value", "_outlets": [], "ui_color": "#ffefeb", "_is_dummy": false, "op_kwargs": {}, "_task_type": "PythonOperator", "ui_fgcolor": "#000", "_task_module": "airflow.operators.python", "template_fields": ["templates_dict", "op_args", "op_kwargs"], "_downstream_task_ids": ["print_value"], "template_fields_renderers": {"op_args": "py", "op_kwargs": "py", "templates_dict": "json"}}, {"pool": "default_pool", "label": "print_value", "owner": "airflow", "doc_md": "Dummy function", "_inlets": [], "op_args": "(<airflow.models.xcom_arg.XComArg object at 0x112a51d60>,)", "task_id": "print_value", "_outlets": [], "ui_color": "#ffefeb", "_is_dummy": false, "op_kwargs": {}, "_task_type": "_PythonDecoratedOperator", "ui_fgcolor": "#000", "_task_module": "airflow.decorators.python", "template_fields": ["op_args", "op_kwargs"], "_downstrea
 m_task_ids": [], "template_fields_renderers": {"op_args": "py", "op_kwargs": "py"}}], "_dag_id": "example_xcom_args", "fileloc": "/Users/vincent/Codes/Workspace/webull/airflow/airflow/airflow/example_dags/example_xcomargs.py", "timezone": "UTC", "edge_info": {}, "start_date": 1701216000.0, "_task_group": {"tooltip": "", "children": {"print_value": ["operator", "print_value"], "generate_value": ["operator", "generate_value"]}, "ui_color": "CornflowerBlue", "_group_id": null, "ui_fgcolor": "#000", "prefix_group_id": true, "upstream_task_ids": [], "upstream_group_ids": [], "downstream_task_ids": [], "downstream_group_ids": []}, "default_args": {"__var": {"owner": "airflow"}, "__type": "dict"}, "dag_dependencies": [], "schedule_interval": null}, "__version": 1}
   ```
   
   ## the only difference between them
   
   the value of the op_args
   
   ![image-20231201173700350](https://github.com/apache/airflow/assets/196561/ae1ebf10-4a76-448d-a0a2-551d60f7a666)
   
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   1. just copy the above dag file into airflow environment and make the scheduler running.
   2. run the sql several times with the interval 30s (it depends on your settings of the configuration min_serialized_dag_update_interval)
   
   ```
   select `dag_id`, `dag_hash`, `last_updated`, `data` from `serialized_dag` where `dag_id` = 'example_xcom_args';
   ```
   
   and compare the row results for these executions.
   
   ### Operating System
   
   My MacPro 14.1.1 (23B81) M1 chipset
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] dag_hash changed when using the XComArgs feature [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on issue #35998:
URL: https://github.com/apache/airflow/issues/35998#issuecomment-1837425016

   What you see is likely effect of serialization how it was implemented 2.5 years ago - with thousands of bug fixes released since - including tens of them released in the 2.1 line you are using (2.1.3. and 2.1.4).
   
   hash calculation for Airflow only uses those fields:
   
   ```
       _comps = {
           "task_id",
           "dag_id",
           "owner",
           "email",
           "email_on_retry",
           "retry_delay",
           "retry_exponential_backoff",
           "max_retry_delay",
           "start_date",
           "end_date",
           "depends_on_past",
           "wait_for_downstream",
           "priority_weight",
           "sla",
           "execution_timeout",
           "on_execute_callback",
           "on_failure_callback",
           "on_success_callback",
           "on_retry_callback",
           "do_xcom_push",
       }
   ```
   
   But since 2.1.2 there were probably 10s of changes in this area.
   
   >  Please advise more.
   
   Upgrade to latest version of Airflow. This is the fastest way. I you want to see if there are any bugfixes related to serialization, to be sure that it is worth it - you can go through release notes https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html and check the few thousands of fixes since, but I strongly advice you to just upgrade - even if there was a bug in the hash implementation back then, the only way to fix it is to upgrade anyway, so you can safe a lot of time on looking by just upgrading.
   
   If you see similar problems after upgrading. please report it here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] dag_hash changed when using the XComArgs feature [airflow]

Posted by "boring-cyborg[bot] (via GitHub)" <gi...@apache.org>.
boring-cyborg[bot] commented on issue #35998:
URL: https://github.com/apache/airflow/issues/35998#issuecomment-1835858719

   Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] dag_hash changed when using the XComArgs feature [airflow]

Posted by "zhangw (via GitHub)" <gi...@apache.org>.
zhangw commented on issue #35998:
URL: https://github.com/apache/airflow/issues/35998#issuecomment-1837308308

   > Don't use `days_ago`. It's a bad practice from Airlow 1.10 which we since fixed in all (I believe) our examples and documentations. `days_ago` effectively calculates new start_date for the DAG every time the dag is parsed, which mean that yes - the dag is different every time.
   > 
   > Generally when you create a DAG you should decide WHEN it's life should start (fixed date) rather than continue moving the start date over and over again - which is effectively what days_ago does.
   
   For my case, the value of start_date calculated by `days_ago` not changed, the changed thing is the value of op_args. Please advise more.
   
   ![image](https://github.com/apache/airflow/assets/196561/eba1a6ab-c150-4e4e-82e1-d87dbe62a805)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] dag_hash changed when using the XComArgs feature [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on issue #35998:
URL: https://github.com/apache/airflow/issues/35998#issuecomment-1836241002

   Don't use `days_ago`. It's a bad practice from Airlow 1.10 which we since fixed in all (I believe)  our examples and documentations. `days_ago` effectively calculates new start_date for the DAG every time the dag is parsed, which mean that yes - the dag is different every time.
   
   Generally when you create a DAG you should decide WHEN it's life  should start rather than continu moving the start date over and over again - which is effectively what days_ago does.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] dag_hash changed when using the XComArgs feature [airflow]

Posted by "zhangw (via GitHub)" <gi...@apache.org>.
zhangw commented on issue #35998:
URL: https://github.com/apache/airflow/issues/35998#issuecomment-1837725670

   > What you see is likely effect of serialization how it was implemented 2.5 years ago - with thousands of bug fixes released since - including tens of them released in the 2.1 line you are using (2.1.3. and 2.1.4).
   > 
   > hash calculation for Airflow only uses those fields:
   > 
   > ```
   >     _comps = {
   >         "task_id",
   >         "dag_id",
   >         "owner",
   >         "email",
   >         "email_on_retry",
   >         "retry_delay",
   >         "retry_exponential_backoff",
   >         "max_retry_delay",
   >         "start_date",
   >         "end_date",
   >         "depends_on_past",
   >         "wait_for_downstream",
   >         "priority_weight",
   >         "sla",
   >         "execution_timeout",
   >         "on_execute_callback",
   >         "on_failure_callback",
   >         "on_success_callback",
   >         "on_retry_callback",
   >         "do_xcom_push",
   >     }
   > ```
   > 
   > But since 2.1.2 there were probably 10s of changes in this area.
   > 
   > > Please advise more.
   > 
   > Upgrade to latest version of Airflow. This is the fastest way. I you want to see if there are any bugfixes related to serialization, to be sure that it is worth it - you can go through release notes https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html and check the few thousands of fixes since, but I strongly advice you to just upgrade - even if there was a bug in the hash implementation back then, the only way to fix it is to upgrade anyway, so you can safe a lot of time on looking by just upgrading.
   > 
   > If you see similar problems after upgrading. please report it here.
   
   Thanks for your suggestions!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] dag_hash changed when using the XComArgs feature [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk closed issue #35998: dag_hash changed when using the XComArgs feature
URL: https://github.com/apache/airflow/issues/35998


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org