You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/18 09:17:45 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request, #25788: Properly check the existence of missing indexes

ephraimbuddy opened a new pull request, #25788:
URL: https://github.com/apache/airflow/pull/25788

   For mapped tasks that expands at runtime, the
   Dagrun.verify_integrity is being called even when there's no change
   to the mapped indexes. This is because the missing_indexes variable contains keys
   to the mapped task with empty diff([]) value at times. This leads to the "if" check
   thinking there's something in the defaultdict.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy merged pull request #25788: Properly check the existence of missing mapped TIs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy merged PR #25788:
URL: https://github.com/apache/airflow/pull/25788


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #25788: Properly check the existence of missing mapped TIs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25788:
URL: https://github.com/apache/airflow/pull/25788#discussion_r954640536


##########
airflow/models/dagrun.py:
##########
@@ -1081,44 +1057,49 @@ def _create_task_instances(
             # TODO[HA]: We probably need to savepoint this so we can keep the transaction alive.
             session.rollback()
 
-    def _revise_mapped_task_indexes(
-        self,
-        tis: Iterable[TI],
-        *,
-        session: Session,
-    ) -> Dict["MappedOperator", Sequence[int]]:
-        """Check if the length of the mapped task instances changed at runtime and find the missing indexes.
-
-        :param tis: Task instances to check
-        :param session: The session to use
-        """
-        from airflow.models.mappedoperator import MappedOperator
+    def _revise_mapped_task_indexes(self, task, session: Session):
+        """Check if task increased or reduced in length and handle appropriately"""
+        from airflow.models.taskinstance import TaskInstance
+        from airflow.settings import task_instance_mutation_hook
 
-        existing_indexes: Dict[MappedOperator, List[int]] = defaultdict(list)
-        new_indexes: Dict[MappedOperator, Sequence[int]] = defaultdict(list)
-        for ti in tis:
-            task = ti.task
-            if not isinstance(task, MappedOperator):
-                continue
-            # skip unexpanded tasks and also tasks that expands with literal arguments
-            if ti.map_index < 0 or task.parse_time_mapped_ti_count:
-                continue
-            existing_indexes[task].append(ti.map_index)
-            task.run_time_mapped_ti_count.cache_clear()  # type: ignore[attr-defined]
-            new_length = task.run_time_mapped_ti_count(self.run_id, session=session) or 0
-
-            if ti.map_index >= new_length:
-                self.log.debug(
-                    "Removing task '%s' as the map_index is longer than the resolved mapping list (%d)",
-                    ti,
-                    new_length,
-                )
-                ti.state = State.REMOVED
-            new_indexes[task] = range(new_length)
-        missing_indexes: Dict[MappedOperator, Sequence[int]] = defaultdict(list)
-        for k, v in existing_indexes.items():
-            missing_indexes.update({k: list(set(new_indexes[k]).difference(v))})
-        return missing_indexes
+        task.run_time_mapped_ti_count.cache_clear()
+        total_length = (
+            task.parse_time_mapped_ti_count
+            or task.run_time_mapped_ti_count(self.run_id, session=session)
+            or 0
+        )
+        existing_tis = (
+            session.query(TaskInstance)
+            .filter(
+                TaskInstance.dag_id == self.dag_id,
+                TaskInstance.task_id == task.task_id,
+                TaskInstance.run_id == self.run_id,
+            )
+            .all()
+        )
+        existing_indexes = [i.map_index for i in existing_tis]

Review Comment:
   ```suggestion
           query = (
               session.query(TaskInstance.map_index)
               .filter(
                   TaskInstance.dag_id == self.dag_id,
                   TaskInstance.task_id == task.task_id,
                   TaskInstance.run_id == self.run_id,
               )
           )
           existing_indexes = [i for (i,) in query]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #25788: Properly check the existence of missing mapped TIs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25788:
URL: https://github.com/apache/airflow/pull/25788#discussion_r954640536


##########
airflow/models/dagrun.py:
##########
@@ -1081,44 +1057,49 @@ def _create_task_instances(
             # TODO[HA]: We probably need to savepoint this so we can keep the transaction alive.
             session.rollback()
 
-    def _revise_mapped_task_indexes(
-        self,
-        tis: Iterable[TI],
-        *,
-        session: Session,
-    ) -> Dict["MappedOperator", Sequence[int]]:
-        """Check if the length of the mapped task instances changed at runtime and find the missing indexes.
-
-        :param tis: Task instances to check
-        :param session: The session to use
-        """
-        from airflow.models.mappedoperator import MappedOperator
+    def _revise_mapped_task_indexes(self, task, session: Session):
+        """Check if task increased or reduced in length and handle appropriately"""
+        from airflow.models.taskinstance import TaskInstance
+        from airflow.settings import task_instance_mutation_hook
 
-        existing_indexes: Dict[MappedOperator, List[int]] = defaultdict(list)
-        new_indexes: Dict[MappedOperator, Sequence[int]] = defaultdict(list)
-        for ti in tis:
-            task = ti.task
-            if not isinstance(task, MappedOperator):
-                continue
-            # skip unexpanded tasks and also tasks that expands with literal arguments
-            if ti.map_index < 0 or task.parse_time_mapped_ti_count:
-                continue
-            existing_indexes[task].append(ti.map_index)
-            task.run_time_mapped_ti_count.cache_clear()  # type: ignore[attr-defined]
-            new_length = task.run_time_mapped_ti_count(self.run_id, session=session) or 0
-
-            if ti.map_index >= new_length:
-                self.log.debug(
-                    "Removing task '%s' as the map_index is longer than the resolved mapping list (%d)",
-                    ti,
-                    new_length,
-                )
-                ti.state = State.REMOVED
-            new_indexes[task] = range(new_length)
-        missing_indexes: Dict[MappedOperator, Sequence[int]] = defaultdict(list)
-        for k, v in existing_indexes.items():
-            missing_indexes.update({k: list(set(new_indexes[k]).difference(v))})
-        return missing_indexes
+        task.run_time_mapped_ti_count.cache_clear()
+        total_length = (
+            task.parse_time_mapped_ti_count
+            or task.run_time_mapped_ti_count(self.run_id, session=session)
+            or 0
+        )
+        existing_tis = (
+            session.query(TaskInstance)
+            .filter(
+                TaskInstance.dag_id == self.dag_id,
+                TaskInstance.task_id == task.task_id,
+                TaskInstance.run_id == self.run_id,
+            )
+            .all()
+        )
+        existing_indexes = [i.map_index for i in existing_tis]

Review Comment:
   ```suggestion
           query = (
               session.query(TaskInstance.map_index)
               .filter(
                   TaskInstance.dag_id == self.dag_id,
                   TaskInstance.task_id == task.task_id,
                   TaskInstance.run_id == self.run_id,
               )
           )
           existing_indexes = {i for (i,) in query}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #25788: Properly check the existence of missing mapped TIs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #25788:
URL: https://github.com/apache/airflow/pull/25788#discussion_r948958793


##########
airflow/models/dagrun.py:
##########
@@ -1030,7 +1030,7 @@ def expand_mapped_literals(
         tasks_and_map_idxs = map(expand_mapped_literals, filter(task_filter, dag.task_dict.values()))
 
         tasks = itertools.chain.from_iterable(itertools.starmap(task_creator, tasks_and_map_idxs))
-        if missing_indexes:
+        if missing_indexes and any(len(v) for _, v in missing_indexes.items()):

Review Comment:
   It's not the root cause of the issue but it calls verify_integrity unnecessarily



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #25788: Properly check the existence of missing mapped TIs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25788:
URL: https://github.com/apache/airflow/pull/25788#discussion_r954852616


##########
airflow/models/dagrun.py:
##########
@@ -1082,44 +1058,45 @@ def _create_task_instances(
             # TODO[HA]: We probably need to savepoint this so we can keep the transaction alive.
             session.rollback()
 
-    def _revise_mapped_task_indexes(
-        self,
-        tis: Iterable[TI],
-        *,
-        session: Session,
-    ) -> Dict["MappedOperator", Sequence[int]]:
-        """Check if the length of the mapped task instances changed at runtime and find the missing indexes.
+    def _revise_mapped_task_indexes(self, task, session: Session):
+        """Check if task increased or reduced in length and handle appropriately"""
+        from airflow.models.taskinstance import TaskInstance
+        from airflow.settings import task_instance_mutation_hook
 
-        :param tis: Task instances to check
-        :param session: The session to use
-        """
-        from airflow.models.mappedoperator import MappedOperator
+        task.run_time_mapped_ti_count.cache_clear()
+        total_length = (
+            task.parse_time_mapped_ti_count
+            or task.run_time_mapped_ti_count(self.run_id, session=session)
+            or 0
+        )
+        query = session.query(TaskInstance.map_index).filter(
+            TaskInstance.dag_id == self.dag_id,
+            TaskInstance.task_id == task.task_id,
+            TaskInstance.run_id == self.run_id,
+        )
+        existing_indexes = {i for (i,) in query}
+        missing_indexes = set(range(total_length)).difference(set(existing_indexes))
+        removed_indexes = set(existing_indexes).difference(range(total_length))

Review Comment:
   ```suggestion
           removed_indexes = existing_indexes.difference(range(total_length))
   ```



##########
airflow/models/dagrun.py:
##########
@@ -1082,44 +1058,45 @@ def _create_task_instances(
             # TODO[HA]: We probably need to savepoint this so we can keep the transaction alive.
             session.rollback()
 
-    def _revise_mapped_task_indexes(
-        self,
-        tis: Iterable[TI],
-        *,
-        session: Session,
-    ) -> Dict["MappedOperator", Sequence[int]]:
-        """Check if the length of the mapped task instances changed at runtime and find the missing indexes.
+    def _revise_mapped_task_indexes(self, task, session: Session):
+        """Check if task increased or reduced in length and handle appropriately"""
+        from airflow.models.taskinstance import TaskInstance
+        from airflow.settings import task_instance_mutation_hook
 
-        :param tis: Task instances to check
-        :param session: The session to use
-        """
-        from airflow.models.mappedoperator import MappedOperator
+        task.run_time_mapped_ti_count.cache_clear()
+        total_length = (
+            task.parse_time_mapped_ti_count
+            or task.run_time_mapped_ti_count(self.run_id, session=session)
+            or 0
+        )
+        query = session.query(TaskInstance.map_index).filter(
+            TaskInstance.dag_id == self.dag_id,
+            TaskInstance.task_id == task.task_id,
+            TaskInstance.run_id == self.run_id,
+        )
+        existing_indexes = {i for (i,) in query}
+        missing_indexes = set(range(total_length)).difference(set(existing_indexes))
+        removed_indexes = set(existing_indexes).difference(range(total_length))
+        created_indexes = []

Review Comment:
   This one should still be `created_tis`β€”it holds task instances πŸ˜„ 



##########
airflow/models/dagrun.py:
##########
@@ -1082,44 +1058,45 @@ def _create_task_instances(
             # TODO[HA]: We probably need to savepoint this so we can keep the transaction alive.
             session.rollback()
 
-    def _revise_mapped_task_indexes(
-        self,
-        tis: Iterable[TI],
-        *,
-        session: Session,
-    ) -> Dict["MappedOperator", Sequence[int]]:
-        """Check if the length of the mapped task instances changed at runtime and find the missing indexes.
+    def _revise_mapped_task_indexes(self, task, session: Session):
+        """Check if task increased or reduced in length and handle appropriately"""
+        from airflow.models.taskinstance import TaskInstance
+        from airflow.settings import task_instance_mutation_hook
 
-        :param tis: Task instances to check
-        :param session: The session to use
-        """
-        from airflow.models.mappedoperator import MappedOperator
+        task.run_time_mapped_ti_count.cache_clear()
+        total_length = (
+            task.parse_time_mapped_ti_count
+            or task.run_time_mapped_ti_count(self.run_id, session=session)
+            or 0
+        )
+        query = session.query(TaskInstance.map_index).filter(
+            TaskInstance.dag_id == self.dag_id,
+            TaskInstance.task_id == task.task_id,
+            TaskInstance.run_id == self.run_id,
+        )
+        existing_indexes = {i for (i,) in query}
+        missing_indexes = set(range(total_length)).difference(set(existing_indexes))

Review Comment:
   ```suggestion
           missing_indexes = set(range(total_length)).difference(existing_indexes)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #25788: Properly check the existence of missing mapped TIs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25788:
URL: https://github.com/apache/airflow/pull/25788#discussion_r954647739


##########
airflow/models/dagrun.py:
##########
@@ -1081,44 +1057,49 @@ def _create_task_instances(
             # TODO[HA]: We probably need to savepoint this so we can keep the transaction alive.
             session.rollback()
 
-    def _revise_mapped_task_indexes(
-        self,
-        tis: Iterable[TI],
-        *,
-        session: Session,
-    ) -> Dict["MappedOperator", Sequence[int]]:
-        """Check if the length of the mapped task instances changed at runtime and find the missing indexes.
-
-        :param tis: Task instances to check
-        :param session: The session to use
-        """
-        from airflow.models.mappedoperator import MappedOperator
+    def _revise_mapped_task_indexes(self, task, session: Session):
+        """Check if task increased or reduced in length and handle appropriately"""
+        from airflow.models.taskinstance import TaskInstance
+        from airflow.settings import task_instance_mutation_hook
 
-        existing_indexes: Dict[MappedOperator, List[int]] = defaultdict(list)
-        new_indexes: Dict[MappedOperator, Sequence[int]] = defaultdict(list)
-        for ti in tis:
-            task = ti.task
-            if not isinstance(task, MappedOperator):
-                continue
-            # skip unexpanded tasks and also tasks that expands with literal arguments
-            if ti.map_index < 0 or task.parse_time_mapped_ti_count:
-                continue
-            existing_indexes[task].append(ti.map_index)
-            task.run_time_mapped_ti_count.cache_clear()  # type: ignore[attr-defined]
-            new_length = task.run_time_mapped_ti_count(self.run_id, session=session) or 0
-
-            if ti.map_index >= new_length:
-                self.log.debug(
-                    "Removing task '%s' as the map_index is longer than the resolved mapping list (%d)",
-                    ti,
-                    new_length,
-                )
-                ti.state = State.REMOVED
-            new_indexes[task] = range(new_length)
-        missing_indexes: Dict[MappedOperator, Sequence[int]] = defaultdict(list)
-        for k, v in existing_indexes.items():
-            missing_indexes.update({k: list(set(new_indexes[k]).difference(v))})
-        return missing_indexes
+        task.run_time_mapped_ti_count.cache_clear()
+        total_length = (
+            task.parse_time_mapped_ti_count
+            or task.run_time_mapped_ti_count(self.run_id, session=session)
+            or 0
+        )
+        existing_tis = (
+            session.query(TaskInstance)
+            .filter(
+                TaskInstance.dag_id == self.dag_id,
+                TaskInstance.task_id == task.task_id,
+                TaskInstance.run_id == self.run_id,
+            )
+            .all()
+        )
+        existing_indexes = [i.map_index for i in existing_tis]
+        missing_tis = set(range(total_length)).difference(set(existing_indexes))
+        removed_tis = set(existing_indexes).difference(range(total_length))

Review Comment:
   These should be named `*_indexes` instead of `*_tis`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #25788: Properly check the existence of missing mapped TIs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25788:
URL: https://github.com/apache/airflow/pull/25788#discussion_r954646675


##########
airflow/models/dagrun.py:
##########
@@ -1081,44 +1057,49 @@ def _create_task_instances(
             # TODO[HA]: We probably need to savepoint this so we can keep the transaction alive.
             session.rollback()
 
-    def _revise_mapped_task_indexes(
-        self,
-        tis: Iterable[TI],
-        *,
-        session: Session,
-    ) -> Dict["MappedOperator", Sequence[int]]:
-        """Check if the length of the mapped task instances changed at runtime and find the missing indexes.
-
-        :param tis: Task instances to check
-        :param session: The session to use
-        """
-        from airflow.models.mappedoperator import MappedOperator
+    def _revise_mapped_task_indexes(self, task, session: Session):
+        """Check if task increased or reduced in length and handle appropriately"""
+        from airflow.models.taskinstance import TaskInstance
+        from airflow.settings import task_instance_mutation_hook
 
-        existing_indexes: Dict[MappedOperator, List[int]] = defaultdict(list)
-        new_indexes: Dict[MappedOperator, Sequence[int]] = defaultdict(list)
-        for ti in tis:
-            task = ti.task
-            if not isinstance(task, MappedOperator):
-                continue
-            # skip unexpanded tasks and also tasks that expands with literal arguments
-            if ti.map_index < 0 or task.parse_time_mapped_ti_count:
-                continue
-            existing_indexes[task].append(ti.map_index)
-            task.run_time_mapped_ti_count.cache_clear()  # type: ignore[attr-defined]
-            new_length = task.run_time_mapped_ti_count(self.run_id, session=session) or 0
-
-            if ti.map_index >= new_length:
-                self.log.debug(
-                    "Removing task '%s' as the map_index is longer than the resolved mapping list (%d)",
-                    ti,
-                    new_length,
-                )
-                ti.state = State.REMOVED
-            new_indexes[task] = range(new_length)
-        missing_indexes: Dict[MappedOperator, Sequence[int]] = defaultdict(list)
-        for k, v in existing_indexes.items():
-            missing_indexes.update({k: list(set(new_indexes[k]).difference(v))})
-        return missing_indexes
+        task.run_time_mapped_ti_count.cache_clear()
+        total_length = (
+            task.parse_time_mapped_ti_count
+            or task.run_time_mapped_ti_count(self.run_id, session=session)
+            or 0
+        )

Review Comment:
   This induces an unnecessary query of `parse_time_mapped_ti_count` is 0. Reading how `run_time_mapped_ti_count` is used, some refactoring might be a good idea. I’ll do it after this PR is done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25788: Properly check the existence of missing indexes

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25788:
URL: https://github.com/apache/airflow/pull/25788#discussion_r948885926


##########
airflow/models/dagrun.py:
##########
@@ -656,7 +656,7 @@ def _filter_tis_and_exclude_removed(dag: "DAG", tis: List[TI]) -> Iterable[TI]:
 
         tis = list(_filter_tis_and_exclude_removed(self.get_dag(), tis))
         missing_indexes = self._revise_mapped_task_indexes(tis, session=session)
-        if missing_indexes:
+        if missing_indexes and any(len(v) for _, v in missing_indexes.items()):

Review Comment:
   ```suggestion
           if missing_indexes and any(len(v) for v in missing_indexes.values()):
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25788: Properly check the existence of missing mapped TIs

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25788:
URL: https://github.com/apache/airflow/pull/25788#discussion_r948920787


##########
airflow/models/dagrun.py:
##########
@@ -1117,7 +1119,9 @@ def _revise_mapped_task_indexes(
             new_indexes[task] = range(new_length)
         missing_indexes: Dict[MappedOperator, Sequence[int]] = defaultdict(list)
         for k, v in existing_indexes.items():
-            missing_indexes.update({k: list(set(new_indexes[k]).difference(v))})
+            indexes = list(set(new_indexes[k]).difference(v))
+            if indexes:
+                missing_indexes.update({k: indexes})

Review Comment:
   ```suggestion
                   missing_indexes[k] = indexes
   ```
   I think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #25788: Properly check the existence of missing mapped TIs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #25788:
URL: https://github.com/apache/airflow/pull/25788#discussion_r949911201


##########
airflow/models/dagrun.py:
##########
@@ -1081,44 +1058,49 @@ def _create_task_instances(
             # TODO[HA]: We probably need to savepoint this so we can keep the transaction alive.
             session.rollback()
 
-    def _revise_mapped_task_indexes(
-        self,
-        tis: Iterable[TI],
-        *,
-        session: Session,
-    ) -> Dict["MappedOperator", Sequence[int]]:
-        """Check if the length of the mapped task instances changed at runtime and find the missing indexes.
-
-        :param tis: Task instances to check
-        :param session: The session to use
-        """
-        from airflow.models.mappedoperator import MappedOperator
+    def _revise_mapped_task_indexes(self, task, session: Session):
+        """Check if task increased or reduced in length and handle appropriately"""
+        from airflow.models.taskinstance import TaskInstance
+        from airflow.settings import task_instance_mutation_hook
 
-        existing_indexes: Dict[MappedOperator, List[int]] = defaultdict(list)
-        new_indexes: Dict[MappedOperator, Sequence[int]] = defaultdict(list)
-        for ti in tis:
-            task = ti.task
-            if not isinstance(task, MappedOperator):
-                continue
-            # skip unexpanded tasks and also tasks that expands with literal arguments
-            if ti.map_index < 0 or task.parse_time_mapped_ti_count:
-                continue
-            existing_indexes[task].append(ti.map_index)
-            task.run_time_mapped_ti_count.cache_clear()  # type: ignore[attr-defined]
-            new_length = task.run_time_mapped_ti_count(self.run_id, session=session) or 0
-
-            if ti.map_index >= new_length:
-                self.log.debug(
-                    "Removing task '%s' as the map_index is longer than the resolved mapping list (%d)",
-                    ti,
-                    new_length,
-                )
-                ti.state = State.REMOVED
-            new_indexes[task] = range(new_length)
-        missing_indexes: Dict[MappedOperator, Sequence[int]] = defaultdict(list)
-        for k, v in existing_indexes.items():
-            missing_indexes.update({k: list(set(new_indexes[k]).difference(v))})
-        return missing_indexes
+        task.run_time_mapped_ti_count.cache_clear()
+        total_length = (
+            task.parse_time_mapped_ti_count
+            or task.run_time_mapped_ti_count(self.run_id, session=session)
+            or 0
+        )
+        existing_tis = (
+            session.query(TaskInstance)
+            .filter(
+                TaskInstance.dag_id == self.dag_id,
+                TaskInstance.task_id == task.task_id,
+                TaskInstance.run_id == self.run_id,
+            )
+            .all()
+        )
+        existing_indexes = [i.map_index for i in existing_tis]
+        missing_tis = set(range(total_length)).difference(set(existing_indexes))
+        removed_tis = set(existing_indexes).difference(range(total_length))
+        created_tis = []
+
+        if missing_tis:
+            for index in missing_tis:
+                ti = TaskInstance(task, run_id=self.run_id, map_index=index, state=None)
+                self.log.debug("Expanding TIs upserted %s", ti)
+                task_instance_mutation_hook(ti)
+                ti = session.merge(ti)
+                ti.refresh_from_task(task)
+                session.flush()
+                created_tis.append(ti)
+        elif removed_tis:
+            session.query(TaskInstance).filter(
+                TaskInstance.dag_id == self.dag_id,
+                TaskInstance.task_id == task.task_id,
+                TaskInstance.run_id == self.run_id,
+                TaskInstance.map_index.in_(removed_tis),
+            ).update({TaskInstance.state: TaskInstanceState.REMOVED})
+            session.flush()

Review Comment:
   If we have missing `tis`, then no tis was removed, so `elif` captures it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #25788: Properly check the existence of missing mapped TIs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25788:
URL: https://github.com/apache/airflow/pull/25788#discussion_r949899183


##########
airflow/models/dagrun.py:
##########
@@ -864,29 +861,27 @@ def task_filter(task: "Operator") -> bool:
         task_creator = self._get_task_creator(created_counts, task_instance_mutation_hook, hook_is_noop)
 
         # Create the missing tasks, including mapped tasks
-        tasks = self._create_missing_tasks(dag, task_creator, task_filter, missing_indexes, session=session)
+        tasks = self._create_tasks(dag, task_creator, task_filter, session=session)
 
         self._create_task_instances(dag.dag_id, tasks, created_counts, hook_is_noop, session=session)
 
     def _check_for_removed_or_restored_tasks(
         self, dag: "DAG", ti_mutation_hook, *, session: Session
-    ) -> Tuple[Set[str], Dict["MappedOperator", Sequence[int]]]:
+    ) -> Set[str]:
         """
         Check for removed tasks/restored/missing tasks.
 
         :param dag: DAG object corresponding to the dagrun
         :param ti_mutation_hook: task_instance_mutation_hook function
         :param session: Sqlalchemy ORM Session
 
-        :return: List of task_ids in the dagrun and missing task indexes
+        :return: List of task_ids in the dagrun

Review Comment:
   ```suggestion
           :return: Task IDs in the DAG run
   ```
   
   This is no longer a list πŸ˜› 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25788: Properly check the existence of missing indexes

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25788:
URL: https://github.com/apache/airflow/pull/25788#discussion_r948891233


##########
airflow/models/dagrun.py:
##########
@@ -1030,7 +1030,7 @@ def expand_mapped_literals(
         tasks_and_map_idxs = map(expand_mapped_literals, filter(task_filter, dag.task_dict.values()))
 
         tasks = itertools.chain.from_iterable(itertools.starmap(task_creator, tasks_and_map_idxs))
-        if missing_indexes:
+        if missing_indexes and any(len(v) for _, v in missing_indexes.items()):

Review Comment:
   Though surely this change isn't needed as it's only passed in the one case from ti_scheduling_decisons when we've already performed this check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25788: Properly check the existence of missing indexes

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25788:
URL: https://github.com/apache/airflow/pull/25788#discussion_r948886286


##########
airflow/models/dagrun.py:
##########
@@ -1030,7 +1030,7 @@ def expand_mapped_literals(
         tasks_and_map_idxs = map(expand_mapped_literals, filter(task_filter, dag.task_dict.values()))
 
         tasks = itertools.chain.from_iterable(itertools.starmap(task_creator, tasks_and_map_idxs))
-        if missing_indexes:
+        if missing_indexes and any(len(v) for _, v in missing_indexes.items()):

Review Comment:
   ```suggestion
           if missing_indexes and any(len(v) for v in missing_indexes.values()):
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #25788: Properly check the existence of missing mapped TIs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #25788:
URL: https://github.com/apache/airflow/pull/25788#discussion_r949911201


##########
airflow/models/dagrun.py:
##########
@@ -1081,44 +1058,49 @@ def _create_task_instances(
             # TODO[HA]: We probably need to savepoint this so we can keep the transaction alive.
             session.rollback()
 
-    def _revise_mapped_task_indexes(
-        self,
-        tis: Iterable[TI],
-        *,
-        session: Session,
-    ) -> Dict["MappedOperator", Sequence[int]]:
-        """Check if the length of the mapped task instances changed at runtime and find the missing indexes.
-
-        :param tis: Task instances to check
-        :param session: The session to use
-        """
-        from airflow.models.mappedoperator import MappedOperator
+    def _revise_mapped_task_indexes(self, task, session: Session):
+        """Check if task increased or reduced in length and handle appropriately"""
+        from airflow.models.taskinstance import TaskInstance
+        from airflow.settings import task_instance_mutation_hook
 
-        existing_indexes: Dict[MappedOperator, List[int]] = defaultdict(list)
-        new_indexes: Dict[MappedOperator, Sequence[int]] = defaultdict(list)
-        for ti in tis:
-            task = ti.task
-            if not isinstance(task, MappedOperator):
-                continue
-            # skip unexpanded tasks and also tasks that expands with literal arguments
-            if ti.map_index < 0 or task.parse_time_mapped_ti_count:
-                continue
-            existing_indexes[task].append(ti.map_index)
-            task.run_time_mapped_ti_count.cache_clear()  # type: ignore[attr-defined]
-            new_length = task.run_time_mapped_ti_count(self.run_id, session=session) or 0
-
-            if ti.map_index >= new_length:
-                self.log.debug(
-                    "Removing task '%s' as the map_index is longer than the resolved mapping list (%d)",
-                    ti,
-                    new_length,
-                )
-                ti.state = State.REMOVED
-            new_indexes[task] = range(new_length)
-        missing_indexes: Dict[MappedOperator, Sequence[int]] = defaultdict(list)
-        for k, v in existing_indexes.items():
-            missing_indexes.update({k: list(set(new_indexes[k]).difference(v))})
-        return missing_indexes
+        task.run_time_mapped_ti_count.cache_clear()
+        total_length = (
+            task.parse_time_mapped_ti_count
+            or task.run_time_mapped_ti_count(self.run_id, session=session)
+            or 0
+        )
+        existing_tis = (
+            session.query(TaskInstance)
+            .filter(
+                TaskInstance.dag_id == self.dag_id,
+                TaskInstance.task_id == task.task_id,
+                TaskInstance.run_id == self.run_id,
+            )
+            .all()
+        )
+        existing_indexes = [i.map_index for i in existing_tis]
+        missing_tis = set(range(total_length)).difference(set(existing_indexes))
+        removed_tis = set(existing_indexes).difference(range(total_length))
+        created_tis = []
+
+        if missing_tis:
+            for index in missing_tis:
+                ti = TaskInstance(task, run_id=self.run_id, map_index=index, state=None)
+                self.log.debug("Expanding TIs upserted %s", ti)
+                task_instance_mutation_hook(ti)
+                ti = session.merge(ti)
+                ti.refresh_from_task(task)
+                session.flush()
+                created_tis.append(ti)
+        elif removed_tis:
+            session.query(TaskInstance).filter(
+                TaskInstance.dag_id == self.dag_id,
+                TaskInstance.task_id == task.task_id,
+                TaskInstance.run_id == self.run_id,
+                TaskInstance.map_index.in_(removed_tis),
+            ).update({TaskInstance.state: TaskInstanceState.REMOVED})
+            session.flush()

Review Comment:
   If we have missing tis, then no tis was removed, so elif captures it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #25788: Properly check the existence of missing mapped TIs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25788:
URL: https://github.com/apache/airflow/pull/25788#discussion_r949904849


##########
airflow/models/dagrun.py:
##########
@@ -1081,44 +1058,49 @@ def _create_task_instances(
             # TODO[HA]: We probably need to savepoint this so we can keep the transaction alive.
             session.rollback()
 
-    def _revise_mapped_task_indexes(
-        self,
-        tis: Iterable[TI],
-        *,
-        session: Session,
-    ) -> Dict["MappedOperator", Sequence[int]]:
-        """Check if the length of the mapped task instances changed at runtime and find the missing indexes.
-
-        :param tis: Task instances to check
-        :param session: The session to use
-        """
-        from airflow.models.mappedoperator import MappedOperator
+    def _revise_mapped_task_indexes(self, task, session: Session):
+        """Check if task increased or reduced in length and handle appropriately"""
+        from airflow.models.taskinstance import TaskInstance
+        from airflow.settings import task_instance_mutation_hook
 
-        existing_indexes: Dict[MappedOperator, List[int]] = defaultdict(list)
-        new_indexes: Dict[MappedOperator, Sequence[int]] = defaultdict(list)
-        for ti in tis:
-            task = ti.task
-            if not isinstance(task, MappedOperator):
-                continue
-            # skip unexpanded tasks and also tasks that expands with literal arguments
-            if ti.map_index < 0 or task.parse_time_mapped_ti_count:
-                continue
-            existing_indexes[task].append(ti.map_index)
-            task.run_time_mapped_ti_count.cache_clear()  # type: ignore[attr-defined]
-            new_length = task.run_time_mapped_ti_count(self.run_id, session=session) or 0
-
-            if ti.map_index >= new_length:
-                self.log.debug(
-                    "Removing task '%s' as the map_index is longer than the resolved mapping list (%d)",
-                    ti,
-                    new_length,
-                )
-                ti.state = State.REMOVED
-            new_indexes[task] = range(new_length)
-        missing_indexes: Dict[MappedOperator, Sequence[int]] = defaultdict(list)
-        for k, v in existing_indexes.items():
-            missing_indexes.update({k: list(set(new_indexes[k]).difference(v))})
-        return missing_indexes
+        task.run_time_mapped_ti_count.cache_clear()
+        total_length = (
+            task.parse_time_mapped_ti_count
+            or task.run_time_mapped_ti_count(self.run_id, session=session)
+            or 0
+        )
+        existing_tis = (
+            session.query(TaskInstance)
+            .filter(
+                TaskInstance.dag_id == self.dag_id,
+                TaskInstance.task_id == task.task_id,
+                TaskInstance.run_id == self.run_id,
+            )
+            .all()
+        )
+        existing_indexes = [i.map_index for i in existing_tis]
+        missing_tis = set(range(total_length)).difference(set(existing_indexes))
+        removed_tis = set(existing_indexes).difference(range(total_length))
+        created_tis = []
+
+        if missing_tis:
+            for index in missing_tis:
+                ti = TaskInstance(task, run_id=self.run_id, map_index=index, state=None)
+                self.log.debug("Expanding TIs upserted %s", ti)
+                task_instance_mutation_hook(ti)
+                ti = session.merge(ti)
+                ti.refresh_from_task(task)
+                session.flush()
+                created_tis.append(ti)
+        elif removed_tis:
+            session.query(TaskInstance).filter(
+                TaskInstance.dag_id == self.dag_id,
+                TaskInstance.task_id == task.task_id,
+                TaskInstance.run_id == self.run_id,
+                TaskInstance.map_index.in_(removed_tis),
+            ).update({TaskInstance.state: TaskInstanceState.REMOVED})
+            session.flush()

Review Comment:
   Should this be two separate `if`s instead of `if-elif`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25788: Properly check the existence of missing indexes

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25788:
URL: https://github.com/apache/airflow/pull/25788#discussion_r948892480


##########
airflow/models/dagrun.py:
##########
@@ -656,7 +656,7 @@ def _filter_tis_and_exclude_removed(dag: "DAG", tis: List[TI]) -> Iterable[TI]:
 
         tis = list(_filter_tis_and_exclude_removed(self.get_dag(), tis))
         missing_indexes = self._revise_mapped_task_indexes(tis, session=session)
-        if missing_indexes:
+        if missing_indexes and any(len(v) for v in missing_indexes.values()):

Review Comment:
   Rather than doing this check here it feels better if we do it inside `_revise_mapped_task_indexes` in some way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25788: Properly check the existence of missing mapped TIs

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25788:
URL: https://github.com/apache/airflow/pull/25788#discussion_r949001650


##########
tests/models/test_dagrun.py:
##########
@@ -1279,6 +1279,65 @@ def task_2(arg2):
     ]
 
 
+def test_mapped_literal_length_with_no_change_at_runtime_doesnt_call_verify_integrity(dag_maker, session):
+    """
+    Test that when there's no change to mapped task indexes at runtime, the dagrun.verify_integrity
+    is not called
+    """
+    from airflow.models import Variable
+
+    Variable.set(key='arg1', value=[1, 2, 3])
+
+    @task
+    def task_1():
+        return Variable.get('arg1', deserialize_json=True)
+
+    with dag_maker(session=session) as dag:
+
+        @task
+        def task_2(arg2):
+            ...
+
+        task_2.expand(arg2=task_1())
+
+    dr = dag_maker.create_dagrun()
+    ti = dr.get_task_instance(task_id='task_1')
+    ti.run()
+    dr.task_instance_scheduling_decisions()
+    tis = dr.get_task_instances()
+    indices = [(ti.map_index, ti.state) for ti in tis if ti.map_index >= 0]
+    assert sorted(indices) == [
+        (0, State.NONE),
+        (1, State.NONE),
+        (2, State.NONE),
+    ]
+
+    # Now "clear" and "reduce" the length of literal
+    dag.clear()
+    Variable.set(key='arg1', value=[1, 2, 3])

Review Comment:
   This is neither a litteral, nor a reductin in length - it's 3 before and after.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25788: Properly check the existence of missing mapped TIs

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25788:
URL: https://github.com/apache/airflow/pull/25788#discussion_r948921825


##########
airflow/models/dagrun.py:
##########
@@ -1117,7 +1119,9 @@ def _revise_mapped_task_indexes(
             new_indexes[task] = range(new_length)
         missing_indexes: Dict[MappedOperator, Sequence[int]] = defaultdict(list)
         for k, v in existing_indexes.items():
-            missing_indexes.update({k: list(set(new_indexes[k]).difference(v))})
+            indexes = list(set(new_indexes[k]).difference(v))
+            if indexes:
+                missing_indexes.update({k: indexes})

Review Comment:
   ```suggestion
               indexes = set(new_indexes[k]).difference(v)
               if indexes:
                   missing_indexes[k] = list(indexes)
   ```
   
   I think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #25788: Properly check the existence of missing mapped TIs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25788:
URL: https://github.com/apache/airflow/pull/25788#discussion_r949779317


##########
airflow/models/dagrun.py:
##########
@@ -729,6 +726,10 @@ def _get_ready_tis(
                     additional_tis.extend(expanded_tis[1:])
                 expansion_happened = True
             if schedulable.state in SCHEDULEABLE_STATES:
+                if schedulable.task.is_mapped:
+                    task = cast("MappedOperator", schedulable.task)

Review Comment:
   ```suggestion
                   task = schedulable.task
                   if isinstance(task, MappedOperator):
   ```
   
   since we are dealing with MappedOperator explicitly, using `isinstance` is more correct here.
   
   Why are we putting this method on MappedOperator though? The logic does not really need much from the operator and the previous way putting the logic on DagRun makes more sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #25788: Properly check the existence of missing mapped TIs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #25788:
URL: https://github.com/apache/airflow/pull/25788#discussion_r949873532


##########
airflow/models/dagrun.py:
##########
@@ -729,6 +726,10 @@ def _get_ready_tis(
                     additional_tis.extend(expanded_tis[1:])
                 expansion_happened = True
             if schedulable.state in SCHEDULEABLE_STATES:
+                if schedulable.task.is_mapped:
+                    task = cast("MappedOperator", schedulable.task)

Review Comment:
   Well, no much reason, it just gave me a clear head to think about what was going on, I felt it was better there after because of the cache on `task.run_time_mapped_ti_count` but I'm returning it back to dagrun.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #25788: Properly check the existence of missing mapped TIs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #25788:
URL: https://github.com/apache/airflow/pull/25788#discussion_r949180248


##########
tests/models/test_dagrun.py:
##########
@@ -1279,6 +1279,65 @@ def task_2(arg2):
     ]
 
 
+def test_mapped_literal_length_with_no_change_at_runtime_doesnt_call_verify_integrity(dag_maker, session):
+    """
+    Test that when there's no change to mapped task indexes at runtime, the dagrun.verify_integrity
+    is not called
+    """
+    from airflow.models import Variable
+
+    Variable.set(key='arg1', value=[1, 2, 3])
+
+    @task
+    def task_1():
+        return Variable.get('arg1', deserialize_json=True)
+
+    with dag_maker(session=session) as dag:
+
+        @task
+        def task_2(arg2):
+            ...
+
+        task_2.expand(arg2=task_1())
+
+    dr = dag_maker.create_dagrun()
+    ti = dr.get_task_instance(task_id='task_1')
+    ti.run()
+    dr.task_instance_scheduling_decisions()
+    tis = dr.get_task_instances()
+    indices = [(ti.map_index, ti.state) for ti in tis if ti.map_index >= 0]
+    assert sorted(indices) == [
+        (0, State.NONE),
+        (1, State.NONE),
+        (2, State.NONE),
+    ]
+
+    # Now "clear" and "reduce" the length of literal
+    dag.clear()
+    Variable.set(key='arg1', value=[1, 2, 3])

Review Comment:
   Copy paste issues. Will go through it and check others



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org