You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/20 20:33:44 UTC

[GitHub] [airflow] collinmcnulty opened a new issue, #25200: DAG Run fails when chaining multiple empty mapped tasks

collinmcnulty opened a new issue, #25200:
URL: https://github.com/apache/airflow/issues/25200

   ### Apache Airflow version
   
   2.3.3 (latest released)
   
   ### What happened
   
   On Kubernetes Executor and Local Executor (others not tested) a significant fraction of the DAG Runs of a DAG that has two consecutive mapped tasks which are are being passed an empty list are marked as failed when all tasks are either succeeding or being skipped.
   
   ![image](https://user-images.githubusercontent.com/13177948/180075030-705b3a15-c554-49c1-8470-ecd10ee1d2dc.png)
   
   
   ### What you think should happen instead
   
   The DAG Run should be marked success as it is on LocalExecutor.
   
   ### How to reproduce
   
   Run the following DAG on Kubernetes Executor or Local Executor.
   
   The real world version of this DAG has several mapped tasks that all point to the same list, and that list is frequently empty. I have made a minimal reproducible example.
   
   ```py
   from datetime import datetime
   
   from airflow import DAG
   from airflow.decorators import task
   
   
   with DAG(dag_id="break_mapping", start_date=datetime(2022, 3, 4)) as dag:
   
       @task
       def add_one(x: int):
           return x + 1
   
       @task
       def say_hi():
           print("Hi")
   
   
       added_values = add_one.expand(x=[])
       added_more_values = add_one.expand(x=[])
       say_hi() >> added_values
       added_values >> added_more_values
   ```
   
   ### Operating System
   
   Debian Bullseye
   
   ### Versions of Apache Airflow Providers
   
   ```
   apache-airflow-providers-amazon==1!4.0.0
   apache-airflow-providers-cncf-kubernetes==1!4.1.0
   apache-airflow-providers-elasticsearch==1!4.0.0
   apache-airflow-providers-ftp==1!3.0.0
   apache-airflow-providers-google==1!8.1.0
   apache-airflow-providers-http==1!3.0.0
   apache-airflow-providers-imap==1!3.0.0
   apache-airflow-providers-microsoft-azure==1!4.0.0
   apache-airflow-providers-mysql==1!3.0.0
   apache-airflow-providers-postgres==1!5.0.0
   apache-airflow-providers-redis==1!3.0.0
   apache-airflow-providers-slack==1!5.0.0
   apache-airflow-providers-sqlite==1!3.0.0
   apache-airflow-providers-ssh==1!3.0.0
   ```
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   Local was tested on docker compose (from astro-cli)
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1191090548

   Hmm, I wonder if this has something to do with the fact that a task mapped against an empty list produces one single task instance with state REMOVED.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1198871148

   I think it is related to #23361, but I’m not entirely sure if #25312 would do it. It would fix the deadlock, but the underlying race condition between the scheduler and task process still exists. I think #25060 is also likely a consequence to this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1228846192

   And in a reverse of how it usually plays out, if you want to hit the deadlock _every_ time, disable the mini scheduler. 😁  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ldacey commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
ldacey commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1227330453

   My issue https://github.com/apache/airflow/issues/25060 about this was closed and I was told that things would be tracked here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1212708326

   I don’t think this is fixed yet; as mentioned in https://github.com/apache/airflow/issues/25060#issuecomment-1211134827:
   
   > It seems like the merged PR only solved scheduler crashes which is part of this issue but not the real issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] RNHTTR commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1198349831

   Could this be related to #23361  and resolved by #25312 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] collinmcnulty commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
collinmcnulty commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1218902002

   For anyone else experiencing this, there is a workaround to put a sleep between your two sets of mapped tasks.
   
   ```py
   from airflow import DAG
   from airflow.decorators import task
   
   with DAG(dag_id="break_mapping", start_date=datetime(2022, 3, 4)) as dag:
   
       @task
       def add_one(x: int):
           return x + 1
   
       @task
       def say_hi():
           print("Hi")
   
       @task(trigger_rule="all_done")
       def sleep_task():
           sleep(5)
   
       added_values = add_one.expand(x=[])
       added_more_values = add_one.expand(x=[])
       say_hi() >> added_values
       added_values >> sleep_task() >> added_more_values
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1234073635

   @frankcash My workaround specifically needs an _any_ operator downstream of a mapped task (that might get skipped), so In your example: `    added_values >> added_more_values >> sleep_task`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] frankcash commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
frankcash commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1233210071

   I can confirm the emptyoperator works in between the dynamic tasks, but not at the end of them


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1193089288

   It seems like a race condition between the scheduler calling `dag_run.update` states vs `mappedoperator expand_mapped_task `updating the task instance state to skipped


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1227870432

   Poked around this issue again and the code below resolved it:
   ```diff
   diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
   index 701618d5c4..55ca28b13e 100644
   --- a/airflow/models/dagrun.py
   +++ b/airflow/models/dagrun.py
   @@ -547,6 +547,9 @@ class DagRun(Base, LoggingMixin):
    
            leaf_task_ids = {t.task_id for t in dag.leaves}
            leaf_tis = [ti for ti in tis if ti.task_id in leaf_task_ids if ti.state != TaskInstanceState.REMOVED]
   +        
   +        if unfinished_tis:
   +            unfinished_tis = [t for t in unfinished_tis if t.state in State.unfinished]
    
            # if all roots finished and at least one failed, the run failed
            if not unfinished_tis and any(leaf_ti.state in State.failed_states for leaf_ti in leaf_tis):
   ```
   I ran the above dag several times with the change and it all succeeded(174 dagruns), however, I'm thinking it might not work on a faster machine. Maybe someone should try it. I think the better solution would be to move the mapped task expansion of last resort into the local task job


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1195248602

   The error at the scheduler log:
   ```
   2022-07-26 09:26:49,032] {dag.py:3084} INFO - Setting next_dagrun for break_mapping to 2022-05-21T00:00:00+00:00, run_after=2022-05-22T00:00:00+00:00
   [2022-07-26 09:26:49,090] {mappedoperator.py:665} INFO - Marking <TaskInstance: break_mapping.add_one scheduled__2022-05-07T00:00:00+00:00 [None]> as SKIPPED since the map has 0 values to expand
   [2022-07-26 09:26:49,113] {dagrun.py:588} ERROR - Deadlock; marking run <DagRun break_mapping @ 2022-05-07 00:00:00+00:00: scheduled__2022-05-07T00:00:00+00:00, state:running, queued_at: 2022-07-26 09:26:46.876995+00:00. externally triggered: False> failed
   ```
   The task log for the successful task with failed dagrun(last lines), the two other tasks were skipped as expected:
   ```
   [2022-07-26, 09:26:48 UTC] {logging_mixin.py:115} INFO - Hi
   [2022-07-26, 09:26:48 UTC] {python.py:173} INFO - Done. Returned value was: None
   [2022-07-26, 09:26:48 UTC] {taskinstance.py:1393} INFO - Marking task as SUCCESS. dag_id=break_mapping, task_id=say_hi, execution_date=20220507T000000, start_date=20220726T092647, end_date=20220726T092648
   [2022-07-26, 09:26:49 UTC] {local_task_job.py:163} INFO - Task exited with return code 0
   [2022-07-26, 09:26:49 UTC] {local_task_job.py:272} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```
   The task log for the successful task with successful dagrun(last lines), the two other tasks were skipped as expected:
   ```
   [2022-07-26, 09:26:43 UTC] {logging_mixin.py:115} INFO - Hi
   [2022-07-26, 09:26:43 UTC] {python.py:173} INFO - Done. Returned value was: None
   [2022-07-26, 09:26:43 UTC] {taskinstance.py:1393} INFO - Marking task as SUCCESS. dag_id=break_mapping, task_id=say_hi, execution_date=20220503T000000, start_date=20220726T092641, end_date=20220726T092643
   [2022-07-26, 09:26:43 UTC] {local_task_job.py:163} INFO - Task exited with return code 0
   [2022-07-26, 09:26:43 UTC] {mappedoperator.py:665} INFO - Marking <TaskInstance: break_mapping.add_one scheduled__2022-05-03T00:00:00+00:00 [None]> as SKIPPED since the map has 0 values to expand
   [2022-07-26, 09:26:43 UTC] {local_task_job.py:272} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ldacey commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
ldacey commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1227253277

   I am using PostgreSQL 14. No issues with version 2.3.2, but beyond that the scheduler eventually fails due to primary key constraints. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] internetcoffeephone commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
internetcoffeephone commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1227077067

   @collinmcnulty @ldacey What Airflow metadata database are you running?
   I'm asking because MySQL 5.x is known to have [deadlock issues](https://github.com/apache/airflow/issues/16982#issuecomment-1193335348). Not sure if this is related though.
   It's the MySQL metadata database version I'm running, and I'm experiencing the same issues on Airflow 2.3.3.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] collinmcnulty commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
collinmcnulty commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1212148391

   @uranusjr Does that mean it will be solved by #24432? That's marked as the solution to #25060 but seems distinct from this race condition.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] park-peter commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
park-peter commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1318967932

   The issue still persists when more than 2 empty mapped tasks are chained with 2.4.3. Example code:
   ```
   from datetime import datetime
   
   from airflow import DAG
   from airflow.decorators import task
   
   
   with DAG(dag_id="break_mapping", start_date=datetime(2022, 3, 4)) as dag:
   
       @task
       def add_one(x: int):
           return x + 1
   
       @task
       def say_hi():
           print("Hi")
   
       @task
       def say_bye():
           print("Bye")
   
       added_values = add_one.expand(x=[])
       added_more_values = add_one.expand(x=[])
       added_more_more_values = add_one.expand(x=[])
       say_hi() >> say_bye() >> added_values
       added_values >> added_more_values >> added_more_more_values
   ```
   ![image](https://user-images.githubusercontent.com/46539900/202515268-ddd5c797-faf0-4332-ad8f-b6aa5ec087b7.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb closed issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
ashb closed issue #25200: DAG Run fails when chaining multiple empty mapped tasks
URL: https://github.com/apache/airflow/issues/25200


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] collinmcnulty commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
collinmcnulty commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1228504049

   I'm running on Postgres, not MySQL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ldacey commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
ldacey commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1226322244

   FYI - ran into the same issue on Airflow 2.3.4. I had to revert back to 2.3.2 for a stable experience for now, and I had to delete one row from my task instance table (a UPDATE was trying to change map_index to 0 which already existed, apparently, so the primary constraint was violated and the scheduler crashed).
   
   In my case, none of my dynamic tasks happen to have empty lists though - there are values.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1228842527

   As a work around for now @collinmcnulty @ldacey : It appears that the problem only manifests itself when a mapped task is a "leaf" node in the DAG (one without any downstream tasks)-- stick a Empty task on the end `[added_values, added_more_values] >> EmptyOperator(task_id="end")` and the deadlock goes away.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] park-peter commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
park-peter commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1227259198

   > FYI - ran into the same issue on Airflow 2.3.4. I had to revert back to 2.3.2 for a stable experience for now, and I had to delete one row from my task instance table (a UPDATE was trying to change map_index to 0 which already existed, apparently, so the primary constraint was violated and the scheduler crashed).
   > 
   > In my case, none of my dynamic tasks happen to have empty lists though - there are values.
   
   @ldacey it seems like the issue you are experiencing is different than the original post with the chaining of empty dynamic tasks. I’d suggest opening a new issue post with a fresh set of reproduction steps.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ldacey commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
ldacey commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1229106992

   Thanks - I will try that out!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on issue #25200: DAG Run fails when chaining multiple empty mapped tasks

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on issue #25200:
URL: https://github.com/apache/airflow/issues/25200#issuecomment-1193272894

   Here's a solution:
   ```diff
   diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py
   index a883ff2404..1617944013 100644
   --- a/airflow/models/mappedoperator.py
   +++ b/airflow/models/mappedoperator.py
   @@ -665,6 +665,12 @@ class MappedOperator(AbstractOperator):
                        total_length,
                    )
                    unmapped_ti.state = TaskInstanceState.SKIPPED
   +                # Skip the downstream tasks as well since they'd eventually be skipped
   +                session.query(TaskInstance).filter(
   +                    TaskInstance.dag_id == self.dag_id,
   +                    TaskInstance.run_id == run_id,
   +                    TaskInstance.task_id.in_(self.downstream_task_ids)
   +                    ).update({TaskInstance.state: TaskInstanceState.SKIPPED}, synchronize_session='fetch')
                else:
                    # Otherwise convert this into the first mapped index, and create
                    # TaskInstance for other indexes.
   diff --git a/tests/models/test_mappedoperator.py b/tests/models/test_mappedoperator.py
   index 09ab87524b..8c9700ebde 100644
   --- a/tests/models/test_mappedoperator.py
   +++ b/tests/models/test_mappedoperator.py
   @@ -206,6 +206,34 @@ def test_expand_mapped_task_instance_skipped_on_zero(dag_maker, session):
    
        assert indices == [(-1, TaskInstanceState.SKIPPED)]
    
   +def test_downstream_tis_skipped_when_expand_mapped_ti_skipped_on_zero_length(dag_maker, session):
   +    """"
   +    Test that when expand_mapped_task skips task instance on having a zero expansion length,
   +    the downstream task instances are skipped as well.
   +    """
   +    with dag_maker(session=session):
   +        task1 = BaseOperator(task_id="op1")
   +        mapped = MockOperator.partial(task_id='task_2').expand(arg2=[])
   +        task2 = BaseOperator(task_id="op2")
   +        task1 >> mapped >> task2
   +
   +    dr = dag_maker.create_dagrun()
   +
   +    expand_mapped_task(mapped, dr.run_id, task1.task_id, length=0, session=session)
   +
   +    indices = (
   +        session.query(TaskInstance.map_index, TaskInstance.state)
   +        .filter_by(task_id=mapped.task_id, dag_id=mapped.dag_id, run_id=dr.run_id)
   +        .order_by(TaskInstance.map_index)
   +        .all()
   +    )
   +
   +    assert indices == [(-1, TaskInstanceState.SKIPPED)]
   +    # assert that task2 is skipped as well
   +    task2 = session.query(TaskInstance).filter_by(task_id=task2.task_id, dag_id=task2.dag_id, run_id=dr.run_id).first()
   +    assert task2.state == TaskInstanceState.SKIPPED
   +    
   +
    
    def test_mapped_task_applies_default_args_classic(dag_maker):
        with dag_maker(default_args={"execution_timeout": timedelta(minutes=30)}) as dag:
   ```
   However, I'm worried that trigger rules are not respected


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org