You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/06/08 20:07:45 UTC

[GitHub] [airflow] jordanjeremy opened a new issue, #24338: TaskFlow AirflowSkipException causes downstream step to fail

jordanjeremy opened a new issue, #24338:
URL: https://github.com/apache/airflow/issues/24338

   ### Apache Airflow version
   
   2.3.2 (latest released)
   
   ### What happened
   
   Using TaskFlow API and have 2 tasks that lead to the same downstream task.  These tasks check for new data and when found will set an XCom entry of the new filename for the downstream to handle.  If no data is found the upstream tasks raise a skip exception. 
    The downstream task has the trigger_rule = none_failed_min_one_success.  
   
   Problem is that a task which is set to Skip doesn't set any XCom.  When the downstream task starts it raises the error:
   `airflow.exceptions.AirflowException: XComArg result from task2 at airflow_2_3_xcomarg_render_error with key="return_value" is not found!`
   
   ### What you think should happen instead
   
   Based on trigger rule of "none_failed_min_one_success", expectation is that an upstream task should be allowed to skip and the downstream task will still run.  While the downstream does try to start based on trigger rules, it never really gets to run since the error is raised when rendering the arguments.
   
   ### How to reproduce
   
   Example dag will generate the error if run.
   
   ```
   from airflow.decorators import dag, task
   from airflow.exceptions import AirflowSkipException
   
   @task
   def task1():
       return "example.csv"
   
   @task
   def task2():
       raise AirflowSkipException()
   
   @task(trigger_rule="none_failed_min_one_success")
   def downstream_task(t1, t2):
       print("task ran")
   
   @dag(
       default_args={"owner": "Airflow", "start_date": "2022-06-07"},
       schedule_interval=None,
   )
   def airflow_2_3_xcomarg_render_error():
       t1 = task1()
       t2 = task2()
       downstream_task(t1, t2)
   
   example_dag = airflow_2_3_xcomarg_render_error()
   ```
   
   ### Operating System
   
   Ubuntu 20.04.4 LTS
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hterik commented on issue #24338: TaskFlow AirflowSkipException causes downstream step to fail

Posted by GitBox <gi...@apache.org>.
hterik commented on issue #24338:
URL: https://github.com/apache/airflow/issues/24338#issuecomment-1163938179

   Same issue when using the branchoperator, eg 
   ```
   t1 = task1()
   t2 = task2()
   t_agg = combine_results(t1,t2)
   BranchPythonOperator(....) >> [t1, t2]
   ```
   We want to run either t1 or t2, or both, then combine_results the results, but if any of them are skippped we observe the same issue, `XComArg result from .... with key="return_value" is not found!` Previously we would get null values as input arguments if the upstream was skipped, that's not ideal either, some placeholder-value for skipped would be the best imo.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jordanjeremy commented on issue #24338: TaskFlow AirflowSkipException causes downstream step to fail

Posted by GitBox <gi...@apache.org>.
jordanjeremy commented on issue #24338:
URL: https://github.com/apache/airflow/issues/24338#issuecomment-1175185410

   @ashb I provided additional detail in [https://github.com/apache/airflow/pull/24401#issuecomment-1154461709](https://github.com/apache/airflow/pull/24401#issuecomment-1154461709) showing a more complete example of how this was being used.
   
   In short, passing the XComArg to a downstream task was being used as it allowed the relationship between tasks to be established automatically instead of having to explicitly set these relationships through bitshift operators or calling set_upstream/set_downstream.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jordanjeremy commented on issue #24338: TaskFlow AirflowSkipException causes downstream step to fail

Posted by GitBox <gi...@apache.org>.
jordanjeremy commented on issue #24338:
URL: https://github.com/apache/airflow/issues/24338#issuecomment-1175197065

   @ashb Previously this did return `None`.  However, I do see that this could be indeterminant for the cases where XCom value could have been set to None or it may not have been set.  Would returning the NOTSET object instead of raising an error when it is seen work better?  If that is done, then the difference between XCom is None or XCom wasn't set, could be determined in the event that someone needed to be able to tell the difference between those cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vanchaxy commented on issue #24338: TaskFlow AirflowSkipException causes downstream step to fail

Posted by GitBox <gi...@apache.org>.
vanchaxy commented on issue #24338:
URL: https://github.com/apache/airflow/issues/24338#issuecomment-1152917153

   As a workaround, you can remove t2 from downstream_task args and check if task2 result exists in the function body. Or you can add one more task between task2 and downstream_task that will check if upstream skipped and return None or return upstream value.
   
   I think downstream_task should be run in your case, but I'm not sure want value we expect to see in t2? None/skip exception/some_special_value_for_that_case?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on issue #24338: TaskFlow AirflowSkipException causes downstream step to fail

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #24338:
URL: https://github.com/apache/airflow/issues/24338#issuecomment-1175149892

   @jordanjeremy What value would you expect to be passed to `downstream_task` for the `t2` argument?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr closed issue #24338: TaskFlow AirflowSkipException causes downstream step to fail

Posted by GitBox <gi...@apache.org>.
uranusjr closed issue #24338: TaskFlow AirflowSkipException causes downstream step to fail
URL: https://github.com/apache/airflow/issues/24338


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] robinsinghstudios commented on issue #24338: TaskFlow AirflowSkipException causes downstream step to fail

Posted by GitBox <gi...@apache.org>.
robinsinghstudios commented on issue #24338:
URL: https://github.com/apache/airflow/issues/24338#issuecomment-1151962041

   The same issue has been bothering me for a while now and preventing me from upgrading to the latest airflow version.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mariotaddeucci commented on issue #24338: TaskFlow AirflowSkipException causes downstream step to fail

Posted by GitBox <gi...@apache.org>.
mariotaddeucci commented on issue #24338:
URL: https://github.com/apache/airflow/issues/24338#issuecomment-1154544262

   I have the same issue, after upgrading from 2.1.3 to 2.3.2, some dags with the trigger rule all_done, starting to raises this exception


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on issue #24338: TaskFlow AirflowSkipException causes downstream step to fail

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #24338:
URL: https://github.com/apache/airflow/issues/24338#issuecomment-1175189569

   @jordanjeremy That says the use case (I agree it makes sense) but not the runtime behaviour. I guess you're expecting `None` to be passed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] markhatch commented on issue #24338: TaskFlow AirflowSkipException causes downstream step to fail

Posted by GitBox <gi...@apache.org>.
markhatch commented on issue #24338:
URL: https://github.com/apache/airflow/issues/24338#issuecomment-1185160671

   Unsure if helpful - but tossing in my vote for this as well and thought to share my use case.
   
   Expected `trigger_rule` would be respected rather than automatically failing downstream tasks. I have downstream task that pick a random choice from any successful upstream.
   
   ```
       @task(trigger_rule=TriggerRule.ALL_DONE)
       def choose_cluster_to_run_on(acceptable_systems):
   ```
   
   ![Screen Shot 2022-07-15 at 12 18 03 PM](https://user-images.githubusercontent.com/1576203/179149531-41ced461-e8e5-4a86-9ffa-2e88973447f4.jpg)
   
   In the example above - would expect the `choose_cluster` task to pick/pass `cluster_b`. Instead throws `key="return_value" is not found` as mentioned.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org