You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/16 13:10:10 UTC

[GitHub] [airflow] ashb opened a new pull request #21614: Run mapped tasks via the normal Scheduler

ashb opened a new pull request #21614:
URL: https://github.com/apache/airflow/pull/21614


   The last remaining step to get (simple) Mapped tasks running in the scheduler was to expand the unmapped TIs when necessary. The interface of `MappedOperator.expand_mapped_task` is not right, and will be changed shortly, but this is enough to get work on the UI and APIs unblocked.
   
   Since there are a lot of moving parts in mapped tasks I have created the tests as "integration"/"end-to-end" tests rather than pure unit tests, and marked them as long_running (even if they only take sub 20s right now.)
   
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #21614: Run mapped tasks via the normal Scheduler

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #21614:
URL: https://github.com/apache/airflow/pull/21614


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21614: Run mapped tasks via the normal Scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21614:
URL: https://github.com/apache/airflow/pull/21614#discussion_r809111104



##########
File path: airflow/models/dagrun.py
##########
@@ -649,19 +651,41 @@ def task_instance_scheduling_decisions(self, session: Session = NEW_SESSION) ->
 
     def _get_ready_tis(
         self,
-        scheduleable_tasks: List[TI],
+        scheduleable_tis: List[TI],
         finished_tis: List[TI],
         session: Session,
     ) -> Tuple[List[TI], bool]:
         old_states = {}
         ready_tis: List[TI] = []
         changed_tis = False
 
-        if not scheduleable_tasks:
+        if not scheduleable_tis:
             return ready_tis, changed_tis
 
+        # If we expand TIs, we need a new list so that we iterate over them too. (We can't alter
+        # `scheduleable_tis` in place and have the `for` loop pick them up
+        expanded_tis: List[TI] = []
+
         # Check dependencies
-        for st in scheduleable_tasks:
+        for st in itertools.chain(scheduleable_tis, expanded_tis):

Review comment:
       I changed it to just `schedulable` since https://en.wikipedia.org/wiki/STI




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21614: Run mapped tasks via the normal Scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21614:
URL: https://github.com/apache/airflow/pull/21614#discussion_r808980758



##########
File path: airflow/models/dagrun.py
##########
@@ -649,19 +651,41 @@ def task_instance_scheduling_decisions(self, session: Session = NEW_SESSION) ->
 
     def _get_ready_tis(
         self,
-        scheduleable_tasks: List[TI],
+        scheduleable_tis: List[TI],

Review comment:
       Yes, you are correct.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21614: Run mapped tasks via the normal Scheduler

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21614:
URL: https://github.com/apache/airflow/pull/21614#discussion_r808589255



##########
File path: airflow/models/dagrun.py
##########
@@ -649,19 +651,41 @@ def task_instance_scheduling_decisions(self, session: Session = NEW_SESSION) ->
 
     def _get_ready_tis(
         self,
-        scheduleable_tasks: List[TI],
+        scheduleable_tis: List[TI],

Review comment:
       ```suggestion
           schedulable_tis: List[TI],
   ```
   
   I think…?

##########
File path: airflow/models/dagrun.py
##########
@@ -649,19 +651,41 @@ def task_instance_scheduling_decisions(self, session: Session = NEW_SESSION) ->
 
     def _get_ready_tis(
         self,
-        scheduleable_tasks: List[TI],
+        scheduleable_tis: List[TI],

Review comment:
       ```suggestion
           schedulable_tis: List[TI],
   ```
   
   I think this is the correct spelling…?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21614: Run mapped tasks via the normal Scheduler

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21614:
URL: https://github.com/apache/airflow/pull/21614#discussion_r810682765



##########
File path: airflow/models/dagrun.py
##########
@@ -649,27 +651,46 @@ def task_instance_scheduling_decisions(self, session: Session = NEW_SESSION) ->
 
     def _get_ready_tis(
         self,
-        scheduleable_tasks: List[TI],
+        schedulable_tis: List[TI],
         finished_tis: List[TI],
         session: Session,
     ) -> Tuple[List[TI], bool]:
         old_states = {}
         ready_tis: List[TI] = []
         changed_tis = False
 
-        if not scheduleable_tasks:
+        if not schedulable_tis:
             return ready_tis, changed_tis
 
+        # If we expand TIs, we need a new list so that we iterate over them too. (We can't alter
+        # `schedulable_tis` in place and have the `for` loop pick them up
+        expanded_tis: List[TI] = []
+
         # Check dependencies
-        for st in scheduleable_tasks:
-            old_state = st.state
-            if st.are_dependencies_met(
+        for schedulable in itertools.chain(schedulable_tis, expanded_tis):
+
+            # Expansion of last resort! This is ideally handled in the mini-scheduler in LocalTaskJob, but if
+            # for any reason it wasn't, we need to expand it now
+            if schedulable.map_index < 0 and schedulable.task.is_mapped:

Review comment:
       ```suggestion
               if schedulable.task.is_mapped and schedulable.map_index < 0:
   ```
   
   Perhaps a very minor opimisation? Most of the tasks should be non-mapped and all indexed -1, so this saves some checks.

##########
File path: airflow/models/dagrun.py
##########
@@ -649,27 +651,46 @@ def task_instance_scheduling_decisions(self, session: Session = NEW_SESSION) ->
 
     def _get_ready_tis(
         self,
-        scheduleable_tasks: List[TI],
+        schedulable_tis: List[TI],
         finished_tis: List[TI],
         session: Session,
     ) -> Tuple[List[TI], bool]:
         old_states = {}
         ready_tis: List[TI] = []
         changed_tis = False
 
-        if not scheduleable_tasks:
+        if not schedulable_tis:
             return ready_tis, changed_tis
 
+        # If we expand TIs, we need a new list so that we iterate over them too. (We can't alter
+        # `schedulable_tis` in place and have the `for` loop pick them up
+        expanded_tis: List[TI] = []
+
         # Check dependencies
-        for st in scheduleable_tasks:
-            old_state = st.state
-            if st.are_dependencies_met(
+        for schedulable in itertools.chain(schedulable_tis, expanded_tis):
+
+            # Expansion of last resort! This is ideally handled in the mini-scheduler in LocalTaskJob, but if
+            # for any reason it wasn't, we need to expand it now
+            if schedulable.map_index < 0 and schedulable.task.is_mapped:
+                # HACK. This needs a better way, one that copes with multiple upstreams!
+                for ti in finished_tis:
+                    if schedulable.task_id in ti.task.downstream_task_ids:
+                        upstream = ti
+
+                        assert isinstance(schedulable.task, MappedOperator)
+                        new_tis = schedulable.task.expand_mapped_task(upstream, session=session)
+                        assert new_tis[0] is schedulable
+                        expanded_tis.extend(new_tis[1:])

Review comment:
       `expand_mapped_task` has a block for when the unmapped task does not exist. Do we need to account for that case here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #21614: Run mapped tasks via the normal Scheduler

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #21614:
URL: https://github.com/apache/airflow/pull/21614#issuecomment-1044688800


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21614: Run mapped tasks via the normal Scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21614:
URL: https://github.com/apache/airflow/pull/21614#discussion_r811225561



##########
File path: airflow/models/dagrun.py
##########
@@ -649,27 +651,46 @@ def task_instance_scheduling_decisions(self, session: Session = NEW_SESSION) ->
 
     def _get_ready_tis(
         self,
-        scheduleable_tasks: List[TI],
+        schedulable_tis: List[TI],
         finished_tis: List[TI],
         session: Session,
     ) -> Tuple[List[TI], bool]:
         old_states = {}
         ready_tis: List[TI] = []
         changed_tis = False
 
-        if not scheduleable_tasks:
+        if not schedulable_tis:
             return ready_tis, changed_tis
 
+        # If we expand TIs, we need a new list so that we iterate over them too. (We can't alter
+        # `schedulable_tis` in place and have the `for` loop pick them up
+        expanded_tis: List[TI] = []
+
         # Check dependencies
-        for st in scheduleable_tasks:
-            old_state = st.state
-            if st.are_dependencies_met(
+        for schedulable in itertools.chain(schedulable_tis, expanded_tis):
+
+            # Expansion of last resort! This is ideally handled in the mini-scheduler in LocalTaskJob, but if
+            # for any reason it wasn't, we need to expand it now
+            if schedulable.map_index < 0 and schedulable.task.is_mapped:

Review comment:
       I think leave this for now, and we can optimize/benchmark later




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21614: Run mapped tasks via the normal Scheduler

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21614:
URL: https://github.com/apache/airflow/pull/21614#discussion_r811237202



##########
File path: airflow/models/dagrun.py
##########
@@ -649,19 +651,41 @@ def task_instance_scheduling_decisions(self, session: Session = NEW_SESSION) ->
 
     def _get_ready_tis(
         self,
-        scheduleable_tasks: List[TI],
+        scheduleable_tis: List[TI],
         finished_tis: List[TI],
         session: Session,
     ) -> Tuple[List[TI], bool]:
         old_states = {}
         ready_tis: List[TI] = []
         changed_tis = False
 
-        if not scheduleable_tasks:
+        if not scheduleable_tis:
             return ready_tis, changed_tis
 
+        # If we expand TIs, we need a new list so that we iterate over them too. (We can't alter
+        # `scheduleable_tis` in place and have the `for` loop pick them up
+        expanded_tis: List[TI] = []
+
         # Check dependencies
-        for st in scheduleable_tasks:
+        for st in itertools.chain(scheduleable_tis, expanded_tis):

Review comment:
       Ah, Shimano Total Integration. Yeah, best to avoid that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21614: Run mapped tasks via the normal Scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21614:
URL: https://github.com/apache/airflow/pull/21614#discussion_r811176620



##########
File path: airflow/models/dagrun.py
##########
@@ -649,27 +651,46 @@ def task_instance_scheduling_decisions(self, session: Session = NEW_SESSION) ->
 
     def _get_ready_tis(
         self,
-        scheduleable_tasks: List[TI],
+        schedulable_tis: List[TI],
         finished_tis: List[TI],
         session: Session,
     ) -> Tuple[List[TI], bool]:
         old_states = {}
         ready_tis: List[TI] = []
         changed_tis = False
 
-        if not scheduleable_tasks:
+        if not schedulable_tis:
             return ready_tis, changed_tis
 
+        # If we expand TIs, we need a new list so that we iterate over them too. (We can't alter
+        # `schedulable_tis` in place and have the `for` loop pick them up
+        expanded_tis: List[TI] = []
+
         # Check dependencies
-        for st in scheduleable_tasks:
-            old_state = st.state
-            if st.are_dependencies_met(
+        for schedulable in itertools.chain(schedulable_tis, expanded_tis):
+
+            # Expansion of last resort! This is ideally handled in the mini-scheduler in LocalTaskJob, but if
+            # for any reason it wasn't, we need to expand it now
+            if schedulable.map_index < 0 and schedulable.task.is_mapped:

Review comment:
       Hmmmm, not sure. Checking `schedulable.task.is_mapped` involves two lookups (once to get task, then again to lookup `is_mapped`, but `schedulable.map_index` just one,




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21614: Run mapped tasks via the normal Scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21614:
URL: https://github.com/apache/airflow/pull/21614#discussion_r811193533



##########
File path: airflow/models/dagrun.py
##########
@@ -649,27 +651,46 @@ def task_instance_scheduling_decisions(self, session: Session = NEW_SESSION) ->
 
     def _get_ready_tis(
         self,
-        scheduleable_tasks: List[TI],
+        schedulable_tis: List[TI],
         finished_tis: List[TI],
         session: Session,
     ) -> Tuple[List[TI], bool]:
         old_states = {}
         ready_tis: List[TI] = []
         changed_tis = False
 
-        if not scheduleable_tasks:
+        if not schedulable_tis:
             return ready_tis, changed_tis
 
+        # If we expand TIs, we need a new list so that we iterate over them too. (We can't alter
+        # `schedulable_tis` in place and have the `for` loop pick them up
+        expanded_tis: List[TI] = []
+
         # Check dependencies
-        for st in scheduleable_tasks:
-            old_state = st.state
-            if st.are_dependencies_met(
+        for schedulable in itertools.chain(schedulable_tis, expanded_tis):
+
+            # Expansion of last resort! This is ideally handled in the mini-scheduler in LocalTaskJob, but if
+            # for any reason it wasn't, we need to expand it now
+            if schedulable.map_index < 0 and schedulable.task.is_mapped:
+                # HACK. This needs a better way, one that copes with multiple upstreams!
+                for ti in finished_tis:
+                    if schedulable.task_id in ti.task.downstream_task_ids:
+                        upstream = ti
+
+                        assert isinstance(schedulable.task, MappedOperator)
+                        new_tis = schedulable.task.expand_mapped_task(upstream, session=session)
+                        assert new_tis[0] is schedulable
+                        expanded_tis.extend(new_tis[1:])

Review comment:
       Done and tested in 907cceb57




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21614: Run mapped tasks via the normal Scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21614:
URL: https://github.com/apache/airflow/pull/21614#discussion_r808981152



##########
File path: airflow/models/dagrun.py
##########
@@ -649,19 +651,41 @@ def task_instance_scheduling_decisions(self, session: Session = NEW_SESSION) ->
 
     def _get_ready_tis(
         self,
-        scheduleable_tasks: List[TI],
+        scheduleable_tis: List[TI],
         finished_tis: List[TI],
         session: Session,
     ) -> Tuple[List[TI], bool]:
         old_states = {}
         ready_tis: List[TI] = []
         changed_tis = False
 
-        if not scheduleable_tasks:
+        if not scheduleable_tis:
             return ready_tis, changed_tis
 
+        # If we expand TIs, we need a new list so that we iterate over them too. (We can't alter
+        # `scheduleable_tis` in place and have the `for` loop pick them up
+        expanded_tis: List[TI] = []
+
         # Check dependencies
-        for st in scheduleable_tasks:
+        for st in itertools.chain(scheduleable_tis, expanded_tis):
+
+            # Expansion of last resort! This is ideally handled in the mini-scheduler in LocalTaskJob, but if
+            # for any reason it wasn't, we need to expand it now
+            if st.map_index < 0 and st.task.is_mapped:
+                # HACK. This needs a better way, one that copes with multiple upstreams!
+                for ti in finished_tis:
+                    if st.task_id in ti.task.downstream_task_ids:
+                        upstream = ti
+
+                        assert isinstance(st.task, MappedOperator)
+                        new_tis = st.task.expand_mapped_task(upstream, session=session)
+                        assert new_tis[0] is st
+                        # Add the new TIs to the list to be checked
+                        for new_ti in new_tis[1:]:
+                            new_ti.task = st.task

Review comment:
       Good shout.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21614: Run mapped tasks via the normal Scheduler

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21614:
URL: https://github.com/apache/airflow/pull/21614#discussion_r808589851



##########
File path: airflow/models/dagrun.py
##########
@@ -649,19 +651,41 @@ def task_instance_scheduling_decisions(self, session: Session = NEW_SESSION) ->
 
     def _get_ready_tis(
         self,
-        scheduleable_tasks: List[TI],
+        scheduleable_tis: List[TI],
         finished_tis: List[TI],
         session: Session,
     ) -> Tuple[List[TI], bool]:
         old_states = {}
         ready_tis: List[TI] = []
         changed_tis = False
 
-        if not scheduleable_tasks:
+        if not scheduleable_tis:
             return ready_tis, changed_tis
 
+        # If we expand TIs, we need a new list so that we iterate over them too. (We can't alter
+        # `scheduleable_tis` in place and have the `for` loop pick them up
+        expanded_tis: List[TI] = []
+
         # Check dependencies
-        for st in scheduleable_tasks:
+        for st in itertools.chain(scheduleable_tis, expanded_tis):

Review comment:
       Maybe call this `sti` instead to signify it’s a ti, not a t(ask)? (or `schedulable_ti`)

##########
File path: airflow/models/dagrun.py
##########
@@ -649,19 +651,41 @@ def task_instance_scheduling_decisions(self, session: Session = NEW_SESSION) ->
 
     def _get_ready_tis(
         self,
-        scheduleable_tasks: List[TI],
+        scheduleable_tis: List[TI],
         finished_tis: List[TI],
         session: Session,
     ) -> Tuple[List[TI], bool]:
         old_states = {}
         ready_tis: List[TI] = []
         changed_tis = False
 
-        if not scheduleable_tasks:
+        if not scheduleable_tis:
             return ready_tis, changed_tis
 
+        # If we expand TIs, we need a new list so that we iterate over them too. (We can't alter
+        # `scheduleable_tis` in place and have the `for` loop pick them up
+        expanded_tis: List[TI] = []
+
         # Check dependencies
-        for st in scheduleable_tasks:
+        for st in itertools.chain(scheduleable_tis, expanded_tis):
+
+            # Expansion of last resort! This is ideally handled in the mini-scheduler in LocalTaskJob, but if
+            # for any reason it wasn't, we need to expand it now
+            if st.map_index < 0 and st.task.is_mapped:
+                # HACK. This needs a better way, one that copes with multiple upstreams!
+                for ti in finished_tis:
+                    if st.task_id in ti.task.downstream_task_ids:
+                        upstream = ti
+
+                        assert isinstance(st.task, MappedOperator)
+                        new_tis = st.task.expand_mapped_task(upstream, session=session)
+                        assert new_tis[0] is st
+                        # Add the new TIs to the list to be checked
+                        for new_ti in new_tis[1:]:
+                            new_ti.task = st.task

Review comment:
       Perhaps `expand_mapped_task` should take care of assigning `TI.task`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21614: Run mapped tasks via the normal Scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21614:
URL: https://github.com/apache/airflow/pull/21614#discussion_r809111365



##########
File path: airflow/models/dagrun.py
##########
@@ -649,19 +651,41 @@ def task_instance_scheduling_decisions(self, session: Session = NEW_SESSION) ->
 
     def _get_ready_tis(
         self,
-        scheduleable_tasks: List[TI],
+        scheduleable_tis: List[TI],
         finished_tis: List[TI],
         session: Session,
     ) -> Tuple[List[TI], bool]:
         old_states = {}
         ready_tis: List[TI] = []
         changed_tis = False
 
-        if not scheduleable_tasks:
+        if not scheduleable_tis:
             return ready_tis, changed_tis
 
+        # If we expand TIs, we need a new list so that we iterate over them too. (We can't alter
+        # `scheduleable_tis` in place and have the `for` loop pick them up
+        expanded_tis: List[TI] = []
+
         # Check dependencies
-        for st in scheduleable_tasks:
+        for st in itertools.chain(scheduleable_tis, expanded_tis):
+
+            # Expansion of last resort! This is ideally handled in the mini-scheduler in LocalTaskJob, but if
+            # for any reason it wasn't, we need to expand it now
+            if st.map_index < 0 and st.task.is_mapped:
+                # HACK. This needs a better way, one that copes with multiple upstreams!
+                for ti in finished_tis:
+                    if st.task_id in ti.task.downstream_task_ids:
+                        upstream = ti
+
+                        assert isinstance(st.task, MappedOperator)
+                        new_tis = st.task.expand_mapped_task(upstream, session=session)
+                        assert new_tis[0] is st
+                        # Add the new TIs to the list to be checked
+                        for new_ti in new_tis[1:]:
+                            new_ti.task = st.task

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21614: Run mapped tasks via the normal Scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21614:
URL: https://github.com/apache/airflow/pull/21614#discussion_r811179367



##########
File path: airflow/models/dagrun.py
##########
@@ -649,27 +651,46 @@ def task_instance_scheduling_decisions(self, session: Session = NEW_SESSION) ->
 
     def _get_ready_tis(
         self,
-        scheduleable_tasks: List[TI],
+        schedulable_tis: List[TI],
         finished_tis: List[TI],
         session: Session,
     ) -> Tuple[List[TI], bool]:
         old_states = {}
         ready_tis: List[TI] = []
         changed_tis = False
 
-        if not scheduleable_tasks:
+        if not schedulable_tis:
             return ready_tis, changed_tis
 
+        # If we expand TIs, we need a new list so that we iterate over them too. (We can't alter
+        # `schedulable_tis` in place and have the `for` loop pick them up
+        expanded_tis: List[TI] = []
+
         # Check dependencies
-        for st in scheduleable_tasks:
-            old_state = st.state
-            if st.are_dependencies_met(
+        for schedulable in itertools.chain(schedulable_tis, expanded_tis):
+
+            # Expansion of last resort! This is ideally handled in the mini-scheduler in LocalTaskJob, but if
+            # for any reason it wasn't, we need to expand it now
+            if schedulable.map_index < 0 and schedulable.task.is_mapped:
+                # HACK. This needs a better way, one that copes with multiple upstreams!
+                for ti in finished_tis:
+                    if schedulable.task_id in ti.task.downstream_task_ids:
+                        upstream = ti
+
+                        assert isinstance(schedulable.task, MappedOperator)
+                        new_tis = schedulable.task.expand_mapped_task(upstream, session=session)
+                        assert new_tis[0] is schedulable
+                        expanded_tis.extend(new_tis[1:])

Review comment:
       Oh yeah, we do. -- if we return 0 tasks to map then the downstream should be skipped.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21614: Run mapped tasks via the normal Scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21614:
URL: https://github.com/apache/airflow/pull/21614#discussion_r811176620



##########
File path: airflow/models/dagrun.py
##########
@@ -649,27 +651,46 @@ def task_instance_scheduling_decisions(self, session: Session = NEW_SESSION) ->
 
     def _get_ready_tis(
         self,
-        scheduleable_tasks: List[TI],
+        schedulable_tis: List[TI],
         finished_tis: List[TI],
         session: Session,
     ) -> Tuple[List[TI], bool]:
         old_states = {}
         ready_tis: List[TI] = []
         changed_tis = False
 
-        if not scheduleable_tasks:
+        if not schedulable_tis:
             return ready_tis, changed_tis
 
+        # If we expand TIs, we need a new list so that we iterate over them too. (We can't alter
+        # `schedulable_tis` in place and have the `for` loop pick them up
+        expanded_tis: List[TI] = []
+
         # Check dependencies
-        for st in scheduleable_tasks:
-            old_state = st.state
-            if st.are_dependencies_met(
+        for schedulable in itertools.chain(schedulable_tis, expanded_tis):
+
+            # Expansion of last resort! This is ideally handled in the mini-scheduler in LocalTaskJob, but if
+            # for any reason it wasn't, we need to expand it now
+            if schedulable.map_index < 0 and schedulable.task.is_mapped:

Review comment:
       Hmmmm, not sure. Checking `schedulable.task.is_mapped` involves two lookups (once to get task, then again to lookup `is_mapped`), but `schedulable.map_index` just one,




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org