You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/11 14:39:40 UTC

[GitHub] [airflow] ashb opened a new pull request, #22909: Fix bug where dynamically mapped tasks got set to REMOVED

ashb opened a new pull request, #22909:
URL: https://github.com/apache/airflow/pull/22909

   This mostly affects backfil/`airflow tasks test`.
   
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847715879


##########
tests/models/test_dagrun.py:
##########
@@ -1036,14 +1036,17 @@ def test_mapped_mixed__literal_not_expanded_at_create(dag_maker, session):
         mapped = MockOperator.partial(task_id='task_2').expand(arg1=literal, arg2=XComArg(task))
 
     dr = dag_maker.create_dagrun()
-    indices = (
-        session.query(TI.map_index)
+    query = (
+        session.query(TI.map_index, TI.state)
         .filter_by(task_id=mapped.task_id, dag_id=mapped.dag_id, run_id=dr.run_id)
         .order_by(TI.map_index)
-        .all()
     )
 
-    assert indices == [(-1,)]
+    assert query.all() == [(-1, None)]
+
+    # Verify_integrity shouldn't change the result now that the TIs exist
+    dr.verify_integrity()
+    assert query.all() == [(-1, None)]

Review Comment:
   It has a flush inside it already https://github.com/apache/airflow/blob/9c9272de3a32d30d831fac1272a07244d5fb8e0b/airflow/models/dagrun.py#L903



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847410390


##########
tests/models/test_dagrun.py:
##########
@@ -1036,14 +1036,17 @@ def test_mapped_mixed__literal_not_expanded_at_create(dag_maker, session):
         mapped = MockOperator.partial(task_id='task_2').expand(arg1=literal, arg2=XComArg(task))
 
     dr = dag_maker.create_dagrun()
-    indices = (
-        session.query(TI.map_index)
+    query = (
+        session.query(TI.map_index, TI.state)
         .filter_by(task_id=mapped.task_id, dag_id=mapped.dag_id, run_id=dr.run_id)
         .order_by(TI.map_index)
-        .all()
     )
 
-    assert indices == [(-1,)]
+    assert query.all() == [(-1, None)]
+
+    # Verify_integrity shouldn't change the restul now that the TIs exist

Review Comment:
   ```suggestion
       # Verify_integrity shouldn't change the result now that the TIs exist
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847411155


##########
tests/models/test_dagrun.py:
##########
@@ -1036,14 +1036,17 @@ def test_mapped_mixed__literal_not_expanded_at_create(dag_maker, session):
         mapped = MockOperator.partial(task_id='task_2').expand(arg1=literal, arg2=XComArg(task))
 
     dr = dag_maker.create_dagrun()
-    indices = (
-        session.query(TI.map_index)
+    query = (
+        session.query(TI.map_index, TI.state)
         .filter_by(task_id=mapped.task_id, dag_id=mapped.dag_id, run_id=dr.run_id)
         .order_by(TI.map_index)
-        .all()
     )
 
-    assert indices == [(-1,)]
+    assert query.all() == [(-1, None)]
+
+    # Verify_integrity shouldn't change the restul now that the TIs exist

Review Comment:
   ```suggestion
       # Verify_integrity shouldn't change the result now that the TIs exist
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847645333


##########
airflow/serialization/serialized_objects.py:
##########
@@ -1081,12 +1081,12 @@ def deserialize_dag(cls, encoded_dag: Dict[str, Any]) -> 'SerializedDAG':
                 setattr(task.subdag, 'parent_dag', dag)
 
             if isinstance(task, MappedOperator):
-                for d in (task.mapped_kwargs, task.partial_kwargs):
-                    for k, v in d.items():
-                        if not isinstance(v, _XComRef):
-                            continue
+                expansion_kwargs = task._get_expansion_kwargs()

Review Comment:
   This was a bug that I don't know how we didn't hit before? The serialized task as a result had these objects left as _XcomRef, which as it is a named tuple has a `__len__` of 2 😱 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847689404


##########
airflow/models/mappedoperator.py:
##########
@@ -559,7 +562,12 @@ def _get_map_lengths(self, run_id: str, *, session: Session) -> Dict[str, int]:
         for task_id, length in xcom_query:
             for mapped_arg_name in mapped_dep_keys[task_id]:
                 map_lengths[mapped_arg_name] += length
+        return map_lengths
 
+    def _resolve_map_lengths(self, run_id: str, *, session: Session) -> Dict[str, int]:
+        """Return dict of argument name to map length, or throw if some are not resolvable"""
+        expansion_kwargs = self._get_expansion_kwargs()
+        map_lengths = self._get_map_lengths(run_id, session=session)

Review Comment:
   We should document `_get_map_lengths` to explain how it’s different from `_resolve_map_lengths`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847420061


##########
airflow/models/dagrun.py:
##########
@@ -838,14 +838,24 @@ def verify_integrity(self, session: Session = NEW_SESSION):
                     ti.state = State.REMOVED
                 continue
 
-            if task.is_mapped:
-                task = cast("MappedOperator", task)
-                num_mapped_tis = task.parse_time_mapped_ti_count
-                # Check if the number of mapped literals has changed and we need to mark this TI as removed
-                if not num_mapped_tis or ti.map_index >= num_mapped_tis:
+            if not task.is_mapped:
+                continue
+            task = cast("MappedOperator", task)
+            num_mapped_tis = task.parse_time_mapped_ti_count
+            # Check if the number of mapped literals has changed and we need to mark this TI as removed
+            if num_mapped_tis is not None:
+                if ti.map_index >= num_mapped_tis:
+                    self.log.debug(

Review Comment:
   This was already covered from before, I've just reworked the conditional https://github.com/apache/airflow/blob/f662b7de8c5e61f640f150d4e68bde21dcdd09b4/tests/models/test_dagrun.py#L953-L986



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #22909:
URL: https://github.com/apache/airflow/pull/22909#issuecomment-1095440612

   This turned out a bit more complex :(


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847413388


##########
airflow/models/dagrun.py:
##########
@@ -838,14 +838,24 @@ def verify_integrity(self, session: Session = NEW_SESSION):
                     ti.state = State.REMOVED
                 continue
 
-            if task.is_mapped:
-                task = cast("MappedOperator", task)
-                num_mapped_tis = task.parse_time_mapped_ti_count
-                # Check if the number of mapped literals has changed and we need to mark this TI as removed
-                if not num_mapped_tis or ti.map_index >= num_mapped_tis:
+            if not task.is_mapped:
+                continue
+            task = cast("MappedOperator", task)

Review Comment:
   nit: Should we extract L843-858 to a separate function? This might help test that function separately too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847650700


##########
airflow/serialization/serialized_objects.py:
##########
@@ -1081,12 +1081,12 @@ def deserialize_dag(cls, encoded_dag: Dict[str, Any]) -> 'SerializedDAG':
                 setattr(task.subdag, 'parent_dag', dag)
 
             if isinstance(task, MappedOperator):
-                for d in (task.mapped_kwargs, task.partial_kwargs):
-                    for k, v in d.items():
-                        if not isinstance(v, _XComRef):
-                            continue
+                expansion_kwargs = task._get_expansion_kwargs()

Review Comment:
   Because those attributes _shouldn’t_ be accessed by the scheduler anyway? So this likely only caused the task to be rendered incorrectly in UI.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847719456


##########
tests/models/test_dagrun.py:
##########
@@ -1036,14 +1036,17 @@ def test_mapped_mixed__literal_not_expanded_at_create(dag_maker, session):
         mapped = MockOperator.partial(task_id='task_2').expand(arg1=literal, arg2=XComArg(task))
 
     dr = dag_maker.create_dagrun()
-    indices = (
-        session.query(TI.map_index)
+    query = (
+        session.query(TI.map_index, TI.state)
         .filter_by(task_id=mapped.task_id, dag_id=mapped.dag_id, run_id=dr.run_id)
         .order_by(TI.map_index)
-        .all()
     )
 
-    assert indices == [(-1,)]
+    assert query.all() == [(-1, None)]
+
+    # Verify_integrity shouldn't change the result now that the TIs exist
+    dr.verify_integrity()
+    assert query.all() == [(-1, None)]

Review Comment:
   Oh yes. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847687365


##########
airflow/decorators/base.py:
##########
@@ -406,6 +406,9 @@ class DecoratedMappedOperator(MappedOperator):
     # in partial_kwargs, and MappedOperator prevents duplication.
     mapped_op_kwargs: Dict[str, "Mappable"]
 
+    def __hash__(self):

Review Comment:
   This was needed for the `@cache` decorator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #22909:
URL: https://github.com/apache/airflow/pull/22909#issuecomment-1095137488

   Fixing a bug, so that we can then go and fix https://github.com/apache/airflow/pull/22904


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847653426


##########
airflow/serialization/serialized_objects.py:
##########
@@ -1081,12 +1081,12 @@ def deserialize_dag(cls, encoded_dag: Dict[str, Any]) -> 'SerializedDAG':
                 setattr(task.subdag, 'parent_dag', dag)
 
             if isinstance(task, MappedOperator):
-                for d in (task.mapped_kwargs, task.partial_kwargs):
-                    for k, v in d.items():
-                        if not isinstance(v, _XComRef):
-                            continue
+                expansion_kwargs = task._get_expansion_kwargs()

Review Comment:
   Ah, yeah that makes sense. When the LocalTaskJob checks this it has the real DAG.
   
   It's just my change here now makes it be accessed via verify_integrity



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb merged pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
ashb merged PR #22909:
URL: https://github.com/apache/airflow/pull/22909


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847446426


##########
airflow/models/dagrun.py:
##########
@@ -838,14 +838,24 @@ def verify_integrity(self, session: Session = NEW_SESSION):
                     ti.state = State.REMOVED
                 continue
 
-            if task.is_mapped:
-                task = cast("MappedOperator", task)
-                num_mapped_tis = task.parse_time_mapped_ti_count
-                # Check if the number of mapped literals has changed and we need to mark this TI as removed
-                if not num_mapped_tis or ti.map_index >= num_mapped_tis:
+            if not task.is_mapped:
+                continue
+            task = cast("MappedOperator", task)
+            num_mapped_tis = task.parse_time_mapped_ti_count
+            # Check if the number of mapped literals has changed and we need to mark this TI as removed
+            if num_mapped_tis is not None:
+                if ti.map_index >= num_mapped_tis:
+                    self.log.debug(
+                        "Removing task '%s' as the map_index is longer than the literal list (%s)",
+                        ti,
+                        num_mapped_tis,
+                    )
                     ti.state = State.REMOVED
                 elif ti.map_index < 0:
+                    self.log.debug("Removing the unmapped TI '%s' as the mapping can now be performed", ti)
                     ti.state = State.REMOVED
+            # TODO: What if it is _now_ None, but wasn't before? How do we detect that? And how do we detect

Review Comment:
   Turns out need to fix this -- it's already tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22909:
URL: https://github.com/apache/airflow/pull/22909#issuecomment-1095147614

   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847411405


##########
tests/models/test_dagrun.py:
##########
@@ -1036,14 +1036,17 @@ def test_mapped_mixed__literal_not_expanded_at_create(dag_maker, session):
         mapped = MockOperator.partial(task_id='task_2').expand(arg1=literal, arg2=XComArg(task))
 
     dr = dag_maker.create_dagrun()
-    indices = (
-        session.query(TI.map_index)
+    query = (
+        session.query(TI.map_index, TI.state)
         .filter_by(task_id=mapped.task_id, dag_id=mapped.dag_id, run_id=dr.run_id)
         .order_by(TI.map_index)
-        .all()
     )
 
-    assert indices == [(-1,)]
+    assert query.all() == [(-1, None)]
+
+    # Verify_integrity shouldn't change the restul now that the TIs exist

Review Comment:
   ```suggestion
       # Verify_integrity shouldn't change the result now that the TIs exist
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847691219


##########
airflow/models/dagrun.py:
##########
@@ -697,8 +697,9 @@ def _get_ready_tis(
                 old_states[schedulable.key] = old_state
                 continue
 
-            # Expansion of last resort! This is ideally handled in the mini-scheduler in LocalTaskJob, but if
-            # for any reason it wasn't, we need to expand it now
+            # This is called in two places: First (and ideally) is from the mini scheduler at the end of
+            # LocalTaskJob, and then as an "expansion of last resort" this is also called from the scheduler

Review Comment:
   ```suggestion
               # This is called in two places: First (and ideally) is in the mini scheduler at the end of
               # LocalTaskJob, and then as an "expansion of last resort" in the scheduler
   ```
   
   nit



##########
airflow/models/dagrun.py:
##########
@@ -838,14 +839,47 @@ def verify_integrity(self, session: Session = NEW_SESSION):
                     ti.state = State.REMOVED
                 continue
 
-            if task.is_mapped:
-                task = cast("MappedOperator", task)
-                num_mapped_tis = task.parse_time_mapped_ti_count
-                # Check if the number of mapped literals has changed and we need to mark this TI as removed
-                if not num_mapped_tis or ti.map_index >= num_mapped_tis:
+            if not task.is_mapped:
+                continue
+            task = cast("MappedOperator", task)
+            num_mapped_tis = task.parse_time_mapped_ti_count
+            # Check if the number of mapped literals has changed and we need to mark this TI as removed
+            if num_mapped_tis is not None:
+                if ti.map_index >= num_mapped_tis:
+                    self.log.debug(
+                        "Removing task '%s' as the map_index is longer than the literal mapping list (%s)",
+                        ti,
+                        num_mapped_tis,
+                    )
                     ti.state = State.REMOVED
                 elif ti.map_index < 0:
+                    self.log.debug("Removing the unmapped TI '%s' as the mapping can now be performed", ti)
                     ti.state = State.REMOVED
+                else:
+                    self.log.info("Restoring mapped task '%s'", ti)
+                    Stats.incr(f"task_restored_to_dag.{dag.dag_id}", 1, 1)
+                    ti.state = State.NONE
+            else:
+                #  What if it is _now_ dynamically mapped, but wasn't before?
+                total_length = task.run_time_mapped_ti_count(self.run_id, session=session)
+
+                if total_length is None:
+                    # Not all upstreams finished, so we can't tell what should be here. Remove everying

Review Comment:
   ```suggestion
                       # Not all upstreams finished, so we can't tell what should be here. Remove everything.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847411580


##########
airflow/models/dagrun.py:
##########
@@ -838,14 +838,24 @@ def verify_integrity(self, session: Session = NEW_SESSION):
                     ti.state = State.REMOVED
                 continue
 
-            if task.is_mapped:
-                task = cast("MappedOperator", task)
-                num_mapped_tis = task.parse_time_mapped_ti_count
-                # Check if the number of mapped literals has changed and we need to mark this TI as removed
-                if not num_mapped_tis or ti.map_index >= num_mapped_tis:
+            if not task.is_mapped:
+                continue
+            task = cast("MappedOperator", task)
+            num_mapped_tis = task.parse_time_mapped_ti_count
+            # Check if the number of mapped literals has changed and we need to mark this TI as removed
+            if num_mapped_tis is not None:
+                if ti.map_index >= num_mapped_tis:
+                    self.log.debug(

Review Comment:
   Should we add a test case for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847700088


##########
airflow/models/mappedoperator.py:
##########
@@ -559,7 +562,12 @@ def _get_map_lengths(self, run_id: str, *, session: Session) -> Dict[str, int]:
         for task_id, length in xcom_query:
             for mapped_arg_name in mapped_dep_keys[task_id]:
                 map_lengths[mapped_arg_name] += length
+        return map_lengths
 
+    def _resolve_map_lengths(self, run_id: str, *, session: Session) -> Dict[str, int]:
+        """Return dict of argument name to map length, or throw if some are not resolvable"""
+        expansion_kwargs = self._get_expansion_kwargs()
+        map_lengths = self._get_map_lengths(run_id, session=session)

Review Comment:
   Done:
   
   >         """Return dict of argument name to map length.
   >
   >        If any arguments are not known right now (upstream task not finished) they will not be present in the
   >        dict.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #22909:
URL: https://github.com/apache/airflow/pull/22909#issuecomment-1095441003

   And I'm not too happy with how I had to structure the conditionals. Suggestions appreciated. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847713698


##########
tests/models/test_dagrun.py:
##########
@@ -1036,14 +1036,17 @@ def test_mapped_mixed__literal_not_expanded_at_create(dag_maker, session):
         mapped = MockOperator.partial(task_id='task_2').expand(arg1=literal, arg2=XComArg(task))
 
     dr = dag_maker.create_dagrun()
-    indices = (
-        session.query(TI.map_index)
+    query = (
+        session.query(TI.map_index, TI.state)
         .filter_by(task_id=mapped.task_id, dag_id=mapped.dag_id, run_id=dr.run_id)
         .order_by(TI.map_index)
-        .all()
     )
 
-    assert indices == [(-1,)]
+    assert query.all() == [(-1, None)]
+
+    # Verify_integrity shouldn't change the result now that the TIs exist
+    dr.verify_integrity()
+    assert query.all() == [(-1, None)]

Review Comment:
   We probably need to do this?
   
   ```suggestion
       # Verify_integrity shouldn't change the result now that the TIs exist
       dr.verify_integrity(session=session)
       session.flush()
       assert query.all() == [(-1, None)]
   ```
   
   Otherwise `verify_integrity` would use a new session, and the query result won’t change regardless due to transaction isolation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847717386


##########
tests/models/test_dagrun.py:
##########
@@ -1036,14 +1036,17 @@ def test_mapped_mixed__literal_not_expanded_at_create(dag_maker, session):
         mapped = MockOperator.partial(task_id='task_2').expand(arg1=literal, arg2=XComArg(task))
 
     dr = dag_maker.create_dagrun()
-    indices = (
-        session.query(TI.map_index)
+    query = (
+        session.query(TI.map_index, TI.state)
         .filter_by(task_id=mapped.task_id, dag_id=mapped.dag_id, run_id=dr.run_id)
         .order_by(TI.map_index)
-        .all()
     )
 
-    assert indices == [(-1,)]
+    assert query.all() == [(-1, None)]
+
+    # Verify_integrity shouldn't change the result now that the TIs exist
+    dr.verify_integrity()
+    assert query.all() == [(-1, None)]

Review Comment:
   Passing in `session` should be enough then



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org