You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/21 15:08:41 UTC

[GitHub] [airflow] ashb opened a new pull request #21019: 🚧 Function to expand mapped tasks in to multiple TIs

ashb opened a new pull request #21019:
URL: https://github.com/apache/airflow/pull/21019


   Builds upon #20945
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #21019: 🚧 Function to expand mapped tasks in to multiple TIs

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #21019:
URL: https://github.com/apache/airflow/pull/21019#issuecomment-1018830351


   I'm not sold that I've put that method on the right class, so opinions welcome.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21019: Function to expand mapped tasks in to multiple "real" TIs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21019:
URL: https://github.com/apache/airflow/pull/21019#discussion_r791130585



##########
File path: tests/models/test_dagrun.py
##########
@@ -874,3 +875,20 @@ def test_verify_integrity_task_start_date(Stats_incr, session, run_type, expecte
     assert len(tis) == expected_tis
 
     Stats_incr.assert_called_with('task_instance_created-DummyOperator', expected_tis)
+
+
+@pytest.mark.xfail(reason="TODO: Expand mapped literals at verify_integrity time!")

Review comment:
       It won't _only_ be in the mini scheduler run, there will still be a "expansion of last resort" in the scheduler. I guess the difference is do we want to do the expansion eagerly at DagRun creation time, when it could possibly be done in another process (the LocalTaskJob).
   
   It's probably going to be quite rare in practice that maps will be literals, so I think it's not even the cost to check this here, given that it's so unlikely it will do anything useful.

##########
File path: airflow/models/baseoperator.py
##########
@@ -1800,6 +1801,53 @@ def wait_for_downstream(self) -> bool:
     def depends_on_past(self) -> bool:
         return self.partial_kwargs.get("depends_on_past") or self.wait_for_downstream
 
+    def expand_mapped_task(self, upstream_ti: "TaskInstance", session: "Session" = NEW_SESSION) -> None:
+        """Create the mapped TaskInstances for mapped task."""
+        # TODO: support having multiuple mapped upstreams?
+        from airflow.models.taskmap import TaskMap
+        from airflow.settings import task_instance_mutation_hook
+
+        task_map_info: TaskMap = (
+            session.query(TaskMap)
+            .filter_by(
+                dag_id=upstream_ti.dag_id,
+                task_id=upstream_ti.task_id,
+                run_id=upstream_ti.run_id,
+                map_index=upstream_ti.map_index,
+            )
+            .one()
+        )
+
+        unmapped_ti: Optional[TaskInstance] = upstream_ti.dag_run.get_task_instance(
+            self.task_id, map_index=-1, session=session
+        )
+
+        maps = range(task_map_info.length)
+
+        if unmapped_ti:
+            # The unmapped TaskInstance still exists -- this means we haven't
+            # tried to run it before.
+            unmapped_ti.map_index = 0
+            maps = range(1, task_map_info.length)

Review comment:
       I think skipping makes sense, yeah.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21019: Function to expand mapped tasks in to multiple "real" TIs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21019:
URL: https://github.com/apache/airflow/pull/21019#discussion_r791456924



##########
File path: airflow/models/baseoperator.py
##########
@@ -1800,6 +1801,53 @@ def wait_for_downstream(self) -> bool:
     def depends_on_past(self) -> bool:
         return self.partial_kwargs.get("depends_on_past") or self.wait_for_downstream
 
+    def expand_mapped_task(self, upstream_ti: "TaskInstance", session: "Session" = NEW_SESSION) -> None:
+        """Create the mapped TaskInstances for mapped task."""
+        # TODO: support having multiuple mapped upstreams?
+        from airflow.models.taskmap import TaskMap
+        from airflow.settings import task_instance_mutation_hook
+
+        task_map_info: TaskMap = (
+            session.query(TaskMap)
+            .filter_by(
+                dag_id=upstream_ti.dag_id,
+                task_id=upstream_ti.task_id,
+                run_id=upstream_ti.run_id,
+                map_index=upstream_ti.map_index,
+            )
+            .one()
+        )
+
+        unmapped_ti: Optional[TaskInstance] = upstream_ti.dag_run.get_task_instance(
+            self.task_id, map_index=-1, session=session
+        )
+
+        maps = range(task_map_info.length)
+
+        if unmapped_ti:
+            # The unmapped TaskInstance still exists -- this means we haven't
+            # tried to run it before.
+            unmapped_ti.map_index = 0
+            maps = range(1, task_map_info.length)

Review comment:
       There’s an edge case I just realised, what should happen if the upstream returns an empty list/dict? It feels wrong failing the upstream, but what should we do to keep the graphs making sense? Keep the unmapped TI, and mark it as SKIPPED? Delete it without expanding? Other possibilities?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21019: Function to expand mapped tasks in to multiple "real" TIs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21019:
URL: https://github.com/apache/airflow/pull/21019#discussion_r791130585



##########
File path: tests/models/test_dagrun.py
##########
@@ -874,3 +875,20 @@ def test_verify_integrity_task_start_date(Stats_incr, session, run_type, expecte
     assert len(tis) == expected_tis
 
     Stats_incr.assert_called_with('task_instance_created-DummyOperator', expected_tis)
+
+
+@pytest.mark.xfail(reason="TODO: Expand mapped literals at verify_integrity time!")

Review comment:
       It won't _only_ be in the mini scheduler run, there will still be a "expansion of last resort" in the scheduler. I guess the difference is do we want to do the expansion eagerly at DagRun creation time, when it could possibly be done in another process (the LocalTaskJob).
   
   It's probably going to be quite rare in practice that maps will be literals, so I think it's not even the cost to check this here, given that it's so unlikely it will do anything useful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21019: Function to expand mapped tasks in to multiple "real" TIs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21019:
URL: https://github.com/apache/airflow/pull/21019#discussion_r792334796



##########
File path: airflow/models/baseoperator.py
##########
@@ -1800,6 +1801,53 @@ def wait_for_downstream(self) -> bool:
     def depends_on_past(self) -> bool:
         return self.partial_kwargs.get("depends_on_past") or self.wait_for_downstream
 
+    def expand_mapped_task(self, upstream_ti: "TaskInstance", session: "Session" = NEW_SESSION) -> None:
+        """Create the mapped TaskInstances for mapped task."""
+        # TODO: support having multiuple mapped upstreams?
+        from airflow.models.taskmap import TaskMap
+        from airflow.settings import task_instance_mutation_hook
+
+        task_map_info: TaskMap = (
+            session.query(TaskMap)
+            .filter_by(
+                dag_id=upstream_ti.dag_id,
+                task_id=upstream_ti.task_id,
+                run_id=upstream_ti.run_id,
+                map_index=upstream_ti.map_index,
+            )
+            .one()
+        )
+
+        unmapped_ti: Optional[TaskInstance] = upstream_ti.dag_run.get_task_instance(
+            self.task_id, map_index=-1, session=session
+        )
+
+        maps = range(task_map_info.length)
+
+        if unmapped_ti:
+            # The unmapped TaskInstance still exists -- this means we haven't
+            # tried to run it before.
+            unmapped_ti.map_index = 0
+            maps = range(1, task_map_info.length)

Review comment:
       I pushed a change to mark the unmapped ti as SKIPPED in this case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21019: Function to expand mapped tasks in to multiple "real" TIs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21019:
URL: https://github.com/apache/airflow/pull/21019#discussion_r792298894



##########
File path: airflow/models/taskinstance.py
##########
@@ -486,7 +486,7 @@ def __init__(
         self.test_mode = False
 
     @staticmethod
-    def insert_mapping(run_id: str, task: "BaseOperator") -> dict:
+    def insert_mapping(run_id: str, task: "BaseOperator", map_index: int = -1) -> dict:
         """:meta private:"""

Review comment:
       I think we should not have a default `map_index` here. The function is private so we don’t need to worry about backward compatibility, and not having a default helps prevent subtle bugs accidentally calling this without passing an appropriate `map_index`.

##########
File path: airflow/models/baseoperator.py
##########
@@ -1800,6 +1801,53 @@ def wait_for_downstream(self) -> bool:
     def depends_on_past(self) -> bool:
         return self.partial_kwargs.get("depends_on_past") or self.wait_for_downstream
 
+    def expand_mapped_task(self, upstream_ti: "TaskInstance", session: "Session" = NEW_SESSION) -> None:
+        """Create the mapped TaskInstances for mapped task."""
+        # TODO: support having multiuple mapped upstreams?
+        from airflow.models.taskmap import TaskMap
+        from airflow.settings import task_instance_mutation_hook
+
+        task_map_info: TaskMap = (
+            session.query(TaskMap)
+            .filter_by(
+                dag_id=upstream_ti.dag_id,
+                task_id=upstream_ti.task_id,
+                run_id=upstream_ti.run_id,
+                map_index=upstream_ti.map_index,
+            )
+            .one()
+        )
+
+        unmapped_ti: Optional[TaskInstance] = upstream_ti.dag_run.get_task_instance(
+            self.task_id, map_index=-1, session=session
+        )
+
+        maps = range(task_map_info.length)
+
+        if unmapped_ti:
+            # The unmapped TaskInstance still exists -- this means we haven't
+            # tried to run it before.
+            unmapped_ti.map_index = 0
+            maps = range(1, task_map_info.length)

Review comment:
       I pushed a change to mark the unmapped ti as SKIPPED in this case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21019: 🚧 Function to expand mapped tasks in to multiple TIs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21019:
URL: https://github.com/apache/airflow/pull/21019#discussion_r790704211



##########
File path: tests/models/test_dagrun.py
##########
@@ -874,3 +875,20 @@ def test_verify_integrity_task_start_date(Stats_incr, session, run_type, expecte
     assert len(tis) == expected_tis
 
     Stats_incr.assert_called_with('task_instance_created-DummyOperator', expected_tis)
+
+
+@pytest.mark.xfail(reason="TODO: Expand mapped literals at verify_integrity time!")

Review comment:
       I'm not sure this is actually a good idea -- although we _could_ put it here, that puts more work in the core scheduler loop so I think we could reasonably delay this to the mini scheduler in upstream task.
   
   Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21019: Function to expand mapped tasks in to multiple "real" TIs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21019:
URL: https://github.com/apache/airflow/pull/21019#discussion_r792451766



##########
File path: airflow/models/baseoperator.py
##########
@@ -1800,6 +1801,73 @@ def wait_for_downstream(self) -> bool:
     def depends_on_past(self) -> bool:
         return self.partial_kwargs.get("depends_on_past") or self.wait_for_downstream
 
+    def expand_mapped_task(self, upstream_ti: "TaskInstance", session: "Session" = NEW_SESSION) -> None:
+        """Create the mapped TaskInstances for mapped task."""
+        # TODO: support having multiuple mapped upstreams?
+        from airflow.models.taskmap import TaskMap
+        from airflow.settings import task_instance_mutation_hook
+
+        task_map_info_length: Optional[int] = (
+            session.query(TaskMap.length)
+            .filter_by(
+                dag_id=upstream_ti.dag_id,
+                task_id=upstream_ti.task_id,
+                run_id=upstream_ti.run_id,
+                map_index=upstream_ti.map_index,
+            )
+            .scalar()
+        )
+        if task_map_info_length is None:
+            # TODO: What would lead to this? How can this be better handled?
+            raise RuntimeError("mapped operator cannot be expanded; upstream not found")
+        # TODO: Add db constraint to ensure this is never negative.
+
+        unmapped_ti: Optional[TaskInstance] = (
+            session.query(TaskInstance)
+            .filter(
+                TaskInstance.dag_id == upstream_ti.dag_id,
+                TaskInstance.run_id == upstream_ti.run_id,
+                TaskInstance.task_id == self.task_id,
+                TaskInstance.map_index == -1,
+                TaskInstance.state.in_(State.unfinished),
+            )
+            .one_or_none()
+        )
+
+        if unmapped_ti:
+            # The unmapped task instance still exists and is unfinished, i.e. we
+            # haven't tried to run it before.
+            if task_map_info_length < 1:
+                # If the upstream maps this to a zero-length value, simply marked the
+                # unmapped task instance as SKIPPED (if needed).
+                unmapped_ti.state = TaskInstanceState.SKIPPED

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #21019: Function to expand mapped tasks in to multiple "real" TIs

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #21019:
URL: https://github.com/apache/airflow/pull/21019#issuecomment-1022026654


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #21019: Function to expand mapped tasks in to multiple "real" TIs

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #21019:
URL: https://github.com/apache/airflow/pull/21019#discussion_r791045742



##########
File path: airflow/models/baseoperator.py
##########
@@ -1800,6 +1801,53 @@ def wait_for_downstream(self) -> bool:
     def depends_on_past(self) -> bool:
         return self.partial_kwargs.get("depends_on_past") or self.wait_for_downstream
 
+    def expand_mapped_task(self, upstream_ti: "TaskInstance", session: "Session" = NEW_SESSION) -> None:
+        """Create the mapped TaskInstances for mapped task."""
+        # TODO: support having multiuple mapped upstreams?
+        from airflow.models.taskmap import TaskMap
+        from airflow.settings import task_instance_mutation_hook
+
+        task_map_info: TaskMap = (
+            session.query(TaskMap)
+            .filter_by(
+                dag_id=upstream_ti.dag_id,
+                task_id=upstream_ti.task_id,
+                run_id=upstream_ti.run_id,
+                map_index=upstream_ti.map_index,
+            )
+            .one()
+        )
+
+        unmapped_ti: Optional[TaskInstance] = upstream_ti.dag_run.get_task_instance(
+            self.task_id, map_index=-1, session=session
+        )
+
+        maps = range(task_map_info.length)
+
+        if unmapped_ti:
+            # The unmapped TaskInstance still exisxts -- this means we haven't

Review comment:
       ```suggestion
               # The unmapped TaskInstance still exists -- this means we haven't
   ```

##########
File path: tests/models/test_dagrun.py
##########
@@ -874,3 +875,20 @@ def test_verify_integrity_task_start_date(Stats_incr, session, run_type, expecte
     assert len(tis) == expected_tis
 
     Stats_incr.assert_called_with('task_instance_created-DummyOperator', expected_tis)
+
+
+@pytest.mark.xfail(reason="TODO: Expand mapped literals at verify_integrity time!")

Review comment:
       I'm not sure we can _only_ have it in the mini scheduler, as that can be turned off. Maybe toggle where it is run based on the mini scheduler being on or off?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #21019: Function to expand mapped tasks in to multiple "real" TIs

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #21019:
URL: https://github.com/apache/airflow/pull/21019#discussion_r791045742



##########
File path: airflow/models/baseoperator.py
##########
@@ -1800,6 +1801,53 @@ def wait_for_downstream(self) -> bool:
     def depends_on_past(self) -> bool:
         return self.partial_kwargs.get("depends_on_past") or self.wait_for_downstream
 
+    def expand_mapped_task(self, upstream_ti: "TaskInstance", session: "Session" = NEW_SESSION) -> None:
+        """Create the mapped TaskInstances for mapped task."""
+        # TODO: support having multiuple mapped upstreams?
+        from airflow.models.taskmap import TaskMap
+        from airflow.settings import task_instance_mutation_hook
+
+        task_map_info: TaskMap = (
+            session.query(TaskMap)
+            .filter_by(
+                dag_id=upstream_ti.dag_id,
+                task_id=upstream_ti.task_id,
+                run_id=upstream_ti.run_id,
+                map_index=upstream_ti.map_index,
+            )
+            .one()
+        )
+
+        unmapped_ti: Optional[TaskInstance] = upstream_ti.dag_run.get_task_instance(
+            self.task_id, map_index=-1, session=session
+        )
+
+        maps = range(task_map_info.length)
+
+        if unmapped_ti:
+            # The unmapped TaskInstance still exisxts -- this means we haven't

Review comment:
       ```suggestion
               # The unmapped TaskInstance still exists -- this means we haven't
   ```

##########
File path: tests/models/test_dagrun.py
##########
@@ -874,3 +875,20 @@ def test_verify_integrity_task_start_date(Stats_incr, session, run_type, expecte
     assert len(tis) == expected_tis
 
     Stats_incr.assert_called_with('task_instance_created-DummyOperator', expected_tis)
+
+
+@pytest.mark.xfail(reason="TODO: Expand mapped literals at verify_integrity time!")

Review comment:
       I'm not sure we can _only_ have it in the mini scheduler, as that can be turned off. Maybe toggle where it is run based on the mini scheduler being on or off?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21019: Function to expand mapped tasks in to multiple "real" TIs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21019:
URL: https://github.com/apache/airflow/pull/21019#discussion_r792448995



##########
File path: airflow/models/baseoperator.py
##########
@@ -1800,6 +1801,73 @@ def wait_for_downstream(self) -> bool:
     def depends_on_past(self) -> bool:
         return self.partial_kwargs.get("depends_on_past") or self.wait_for_downstream
 
+    def expand_mapped_task(self, upstream_ti: "TaskInstance", session: "Session" = NEW_SESSION) -> None:
+        """Create the mapped TaskInstances for mapped task."""
+        # TODO: support having multiuple mapped upstreams?
+        from airflow.models.taskmap import TaskMap
+        from airflow.settings import task_instance_mutation_hook
+
+        task_map_info_length: Optional[int] = (
+            session.query(TaskMap.length)
+            .filter_by(
+                dag_id=upstream_ti.dag_id,
+                task_id=upstream_ti.task_id,
+                run_id=upstream_ti.run_id,
+                map_index=upstream_ti.map_index,
+            )
+            .scalar()
+        )
+        if task_map_info_length is None:
+            # TODO: What would lead to this? How can this be better handled?
+            raise RuntimeError("mapped operator cannot be expanded; upstream not found")
+        # TODO: Add db constraint to ensure this is never negative.
+
+        unmapped_ti: Optional[TaskInstance] = (
+            session.query(TaskInstance)
+            .filter(
+                TaskInstance.dag_id == upstream_ti.dag_id,
+                TaskInstance.run_id == upstream_ti.run_id,
+                TaskInstance.task_id == self.task_id,
+                TaskInstance.map_index == -1,
+                TaskInstance.state.in_(State.unfinished),
+            )
+            .one_or_none()
+        )
+
+        if unmapped_ti:
+            # The unmapped task instance still exists and is unfinished, i.e. we
+            # haven't tried to run it before.
+            if task_map_info_length < 1:
+                # If the upstream maps this to a zero-length value, simply marked the
+                # unmapped task instance as SKIPPED (if needed).
+                unmapped_ti.state = TaskInstanceState.SKIPPED

Review comment:
       This should probably have an info log so there's _some clue somewhere_ why the task was skipped  (even if it's only in the scheduler log).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21019: Function to expand mapped tasks in to multiple "real" TIs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21019:
URL: https://github.com/apache/airflow/pull/21019#discussion_r791456924



##########
File path: airflow/models/baseoperator.py
##########
@@ -1800,6 +1801,53 @@ def wait_for_downstream(self) -> bool:
     def depends_on_past(self) -> bool:
         return self.partial_kwargs.get("depends_on_past") or self.wait_for_downstream
 
+    def expand_mapped_task(self, upstream_ti: "TaskInstance", session: "Session" = NEW_SESSION) -> None:
+        """Create the mapped TaskInstances for mapped task."""
+        # TODO: support having multiuple mapped upstreams?
+        from airflow.models.taskmap import TaskMap
+        from airflow.settings import task_instance_mutation_hook
+
+        task_map_info: TaskMap = (
+            session.query(TaskMap)
+            .filter_by(
+                dag_id=upstream_ti.dag_id,
+                task_id=upstream_ti.task_id,
+                run_id=upstream_ti.run_id,
+                map_index=upstream_ti.map_index,
+            )
+            .one()
+        )
+
+        unmapped_ti: Optional[TaskInstance] = upstream_ti.dag_run.get_task_instance(
+            self.task_id, map_index=-1, session=session
+        )
+
+        maps = range(task_map_info.length)
+
+        if unmapped_ti:
+            # The unmapped TaskInstance still exists -- this means we haven't
+            # tried to run it before.
+            unmapped_ti.map_index = 0
+            maps = range(1, task_map_info.length)

Review comment:
       There’s an edge case I just realised, what should happen if the upstream returns an empty list/dict? It feels wrong failing the upstream, but what should we do to keep the graphs making sense? Keep the unmapped TI, and mark it as SKIPPED? Delete it without expanding? Other possibilities?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21019: Function to expand mapped tasks in to multiple "real" TIs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21019:
URL: https://github.com/apache/airflow/pull/21019#discussion_r791721987



##########
File path: airflow/models/baseoperator.py
##########
@@ -1800,6 +1801,53 @@ def wait_for_downstream(self) -> bool:
     def depends_on_past(self) -> bool:
         return self.partial_kwargs.get("depends_on_past") or self.wait_for_downstream
 
+    def expand_mapped_task(self, upstream_ti: "TaskInstance", session: "Session" = NEW_SESSION) -> None:
+        """Create the mapped TaskInstances for mapped task."""
+        # TODO: support having multiuple mapped upstreams?
+        from airflow.models.taskmap import TaskMap
+        from airflow.settings import task_instance_mutation_hook
+
+        task_map_info: TaskMap = (
+            session.query(TaskMap)
+            .filter_by(
+                dag_id=upstream_ti.dag_id,
+                task_id=upstream_ti.task_id,
+                run_id=upstream_ti.run_id,
+                map_index=upstream_ti.map_index,
+            )
+            .one()
+        )
+
+        unmapped_ti: Optional[TaskInstance] = upstream_ti.dag_run.get_task_instance(
+            self.task_id, map_index=-1, session=session
+        )
+
+        maps = range(task_map_info.length)
+
+        if unmapped_ti:
+            # The unmapped TaskInstance still exists -- this means we haven't
+            # tried to run it before.
+            unmapped_ti.map_index = 0
+            maps = range(1, task_map_info.length)

Review comment:
       I think skipping makes sense, yeah.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #21019: Function to expand mapped tasks in to multiple "real" TIs

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #21019:
URL: https://github.com/apache/airflow/pull/21019


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org