You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/06/07 12:44:14 UTC

[GitHub] [airflow] ashb commented on a diff in pull request #24101: Add a new 'item' field to TaskMap

ashb commented on code in PR #24101:
URL: https://github.com/apache/airflow/pull/24101#discussion_r891174401


##########
airflow/models/taskinstance.py:
##########
@@ -2272,19 +2271,20 @@ def _record_task_map_for_downstreams(self, task: "Operator", value: Any, *, sess
         # currently possible for a downstream to depend on one individual mapped
         # task instance, only a task as a whole. This will change in AIP-42
         # Phase 2, and we'll need to further analyze the mapped task case.
-        if next(task.iter_mapped_dependants(), None) is None:
-            return
-        if value is None:
-            raise XComForMappingNotPushed()
         if task.is_mapped:
+            # But we still need to check the mapped task did push something for
+            # its downstream tasks...
+            if next(task.iter_mapped_dependants(), None) is not None and value is None:
+                raise XComForMappingNotPushed()
+            return
+
+        it1, it2 = itertools.tee(task.iter_mapped_dependants())
+        if next(it1, None) is None:  # No children need to expand, don't bother recording.
             return
-        if not isinstance(value, collections.abc.Collection) or isinstance(value, (bytes, str)):
-            raise UnmappableXComTypePushed(value)
-        task_map = TaskMap.from_task_instance_xcom(self, value)
-        max_map_length = conf.getint("core", "max_map_length", fallback=1024)
-        if task_map.length > max_map_length:
-            raise UnmappableXComLengthPushed(value, max_map_length)
-        session.merge(task_map)
+        session.merge(TaskMap.from_task_instance_xcom(self, value, item=""))
+        if isinstance(value, dict) and any(node.should_unpack_mapped_kwargs() for node in it2):
+            for task_map in TaskMap.from_unpacking_task_instance_xcom(self, value):
+                session.merge(task_map)

Review Comment:
   Why do we create both `item=""` and item set to something here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org