You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "danielcham (via GitHub)" <gi...@apache.org> on 2023/08/16 15:21:56 UTC

[GitHub] [airflow] danielcham opened a new issue, #33446: Task group gets marked as upstream_failed when dynamically mapped with expand_kwargs even though all upstream tasks were skipped or successfully finished.

danielcham opened a new issue, #33446:
URL: https://github.com/apache/airflow/issues/33446

   ### Apache Airflow version
   
   2.6.3
   
   ### What happened
   
   I am writing a DAG that transfers data from MSSQL to BigQuery, The part of the ETL process that actually fetches the data from MSSQL and moves it to BQ needs to parallelized.
   I am trying to write it as a task group where the first task moves data from MSSQL to GCS, and the 2nd task loads the file into BQ.
   for some odd reason when I expand the task group it is automatically marked as upstream_failed , at the very first moment the DAG is triggered.
   
   I have tested this with a simple dag (provided below) as well and the bug was reproduced.
   
   I found a similar issue [here](https://github.com/apache/airflow/issues/27449) but the bug seems to persist even after configuring `AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION=False` 
   
   ### What you think should happen instead
   
   The task group should be dynamically expanded **after all upstream tasks have finished** since `expand_kwargs` needs the previous task's output.
   
   ### How to reproduce
   
   ```from datetime import timedelta
   
   from airflow.decorators import dag, task, task_group
   from airflow.operators.bash import BashOperator
   from pendulum import datetime
   
   
   @dag(
       dag_id="example_task_group_expansion",
       schedule="@once",
       default_args={
           "depends_on_past": False,
           "email": ["airflow@example.com"],
           "email_on_failure": True,
           "email_on_retry": True,
           "retries": 0,
           "retry_delay": timedelta(minutes=5),
       },
       start_date=datetime(2023, 8, 1),
       catchup=False,
   )
   def example_dag():
       @task(task_id="TaskDistributer")
       def task_distributer():
           step = 10_000
           return [dict(interval_start=i, interval_end=i + step) for i in range(0, 100_000, step)]
   
       @task_group(group_id="tg1")
       def tg(interval_start, interval_end):
           task1 = BashOperator(
               task_id="task1",
               bash_command="echo $interval_start -- $interval_end",
               env={"interval_start": str(interval_start), "interval_end": str(interval_end)},
           )
   
           task2 = BashOperator(
               task_id="task2",
               bash_command="echo $interval_start -- $interval_end",
               env={"interval_start": str(interval_start), "interval_end": str(interval_end)},
           )
   
           task1 >> task2
   
           return task2
   
       tg.expand_kwargs(task_distributer())
   
   
   example_dag()
   
   ### Operating System
   
   MacOS 13.4.1
   
   ### Versions of Apache Airflow Providers
   
   No providers needed to reproduce
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   Docker-compose
   
   Airflow image: apache/airflow:2.6.3-python3.9
   Executor: Celery
   Messaging queue: redis
   Metadata DB: MySQL 5.7
   
   ### Anything else
   
   The problem occurs every time.
   
   Here are some of the scheduler logs that may be relevant.
   ```
   [2023-08-16T12:48:32.520+0000] {taskinstance.py:1112} DEBUG - <TaskInstance: example_task_group_expansion.TaskDistributer manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2023-08-16T12:48:32.521+0000] {taskinstance.py:1112} DEBUG - <TaskInstance: example_task_group_expansion.TaskDistributer manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
   [2023-08-16T12:48:32.521+0000] {taskinstance.py:1112} DEBUG - <TaskInstance: example_task_group_expansion.TaskDistributer manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2023-08-16T12:48:32.521+0000] {taskinstance.py:1103} DEBUG - Dependencies all met for dep_context=None ti=<TaskInstance: example_task_group_expansion.TaskDistributer manual__2023-08-16T12:48:31.265119+00:00 [None]>
   [2023-08-16T12:48:32.521+0000] {taskinstance.py:1112} DEBUG - <TaskInstance: example_task_group_expansion.tg1.task1 manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2023-08-16T12:48:32.521+0000] {taskinstance.py:1112} DEBUG - <TaskInstance: example_task_group_expansion.tg1.task1 manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
   [2023-08-16T12:48:32.521+0000] {taskinstance.py:1112} DEBUG - <TaskInstance: example_task_group_expansion.tg1.task1 manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2023-08-16T12:48:32.521+0000] {taskinstance.py:1103} DEBUG - Dependencies all met for dep_context=None ti=<TaskInstance: example_task_group_expansion.tg1.task1 manual__2023-08-16T12:48:31.265119+00:00 [None]>
   [2023-08-16T12:48:32.524+0000] {abstractoperator.py:414} ERROR - Cannot expand <Task(BashOperator): tg1.task1> for run manual__2023-08-16T12:48:31.265119+00:00; missing upstream values: ['expand_kwargs() argument']
   [2023-08-16T12:48:32.535+0000] {taskinstance.py:1112} DEBUG - <TaskInstance: example_task_group_expansion.tg1.task2 manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2023-08-16T12:48:32.538+0000] {taskinstance.py:1112} DEBUG - <TaskInstance: example_task_group_expansion.tg1.task2 manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=0, skipped=0, failed=0, upstream_failed=0, removed=0, done=0), upstream_task_ids={'tg1.task1'}
   [2023-08-16T12:48:32.538+0000] {taskinstance.py:1093} DEBUG - Dependencies not met for <TaskInstance: example_task_group_expansion.tg1.task2 manual__2023-08-16T12:48:31.265119+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=0, skipped=0, failed=0, upstream_failed=0, removed=0, done=0), upstream_task_ids={'tg1.task1'}
   [2023-08-16T12:48:32.539+0000] {taskinstance.py:1112} DEBUG - <TaskInstance: example_task_group_expansion.tg1.task2 manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   ```
   
   As can be seen from the logs, no upstream tasks are in `done` state yet the expanded task is set as `upstream_failed`.
   
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy closed issue #33446: Task group gets marked as upstream_failed when dynamically mapped with expand_kwargs even though all upstream tasks were skipped or successfully finished.

Posted by "ephraimbuddy (via GitHub)" <gi...@apache.org>.
ephraimbuddy closed issue #33446: Task group gets marked as upstream_failed  when dynamically mapped with expand_kwargs  even though all upstream tasks were skipped or successfully finished.
URL: https://github.com/apache/airflow/issues/33446


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on issue #33446: Task group gets marked as upstream_failed when dynamically mapped with expand_kwargs even though all upstream tasks were skipped or successfully finished.

Posted by "boring-cyborg[bot] (via GitHub)" <gi...@apache.org>.
boring-cyborg[bot] commented on issue #33446:
URL: https://github.com/apache/airflow/issues/33446#issuecomment-1680818536

   Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org