You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/04 13:15:22 UTC

[GitHub] [airflow] uranusjr commented on a diff in pull request #22679: Expand mapped tasks at DagRun.Veriy_integrity

uranusjr commented on code in PR #22679:
URL: https://github.com/apache/airflow/pull/22679#discussion_r841724519


##########
airflow/models/mappedoperator.py:
##########
@@ -516,6 +515,34 @@ def _get_map_lengths(self, run_id: str, *, session: Session) -> Dict[str, int]:
 
         return map_lengths
 
+    def get_mapped_tis_to_create(self, run_id: str, *, mappings=True) -> Any:
+        """Creates a list of TaskInstances to create for the mapped task"""
+        from airflow.models.taskinstance import TaskInstance
+        from airflow.models.xcom_arg import XComArg
+
+        mapped_kwargs = self._get_expansion_kwargs()
+        map_lengths = [len(v) for _, v in mapped_kwargs.items() if not isinstance(v, XComArg)]

Review Comment:
   I’m thinking we probably should only expand here if _none of the mapped kwargs is XComArg_, because if there are XComArg, we very likely need to expand again later anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org