You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/01 08:39:34 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #22679: Expand mapped tasks at DagRun.Veriy_integrity

ephraimbuddy opened a new pull request #22679:
URL: https://github.com/apache/airflow/pull/22679


   Create the necessary task instances for a mapped task at dagrun.verify_integrity
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #22679: Expand mapped tasks at DagRun.Veriy_integrity

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #22679:
URL: https://github.com/apache/airflow/pull/22679#issuecomment-1086614224


   The failing tests are due to mapped task flow not getting mapped kwargs(Looks related to https://github.com/apache/airflow/pull/22683)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #22679: Expand mapped tasks at DagRun.Veriy_integrity

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #22679:
URL: https://github.com/apache/airflow/pull/22679


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #22679: Expand mapped tasks at DagRun.Veriy_integrity

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #22679:
URL: https://github.com/apache/airflow/pull/22679#discussion_r840504427



##########
File path: airflow/models/mappedoperator.py
##########
@@ -511,6 +510,16 @@ def _get_map_lengths(self, run_id: str, *, session: Session) -> Dict[str, int]:
 
         return map_lengths
 
+    def get_indexes_to_map(self, run_id: str, *, session: Session) -> range:
+        total = 0
+        try:
+            values = self._get_map_lengths(run_id, session=session).values()

Review comment:
       We can only ever be able to expand tasks with literal-only kwargs at this point, so we probably don’t need to call `_get_map_lengths` (which is expensive and accesses db), but instead rely solely on the task kwargs to get this length.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #22679: Expand mapped tasks at DagRun.Veriy_integrity

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #22679:
URL: https://github.com/apache/airflow/pull/22679#discussion_r840504740



##########
File path: airflow/models/dagrun.py
##########
@@ -839,25 +839,59 @@ def task_filter(task: "Operator") -> bool:
 
         if hook_is_noop:
 
-            def create_ti_mapping(task: "Operator") -> dict:
-                created_counts[task.task_type] += 1
-                return TI.insert_mapping(self.run_id, task, map_index=-1)
+            def create_ti_mapping(task: "Operator", session: Session = session) -> List[dict]:
+
+                if isinstance(task, MappedOperator):
+                    indexes_to_map = task.get_indexes_to_map(self.run_id, session=session)

Review comment:
       We can only ever be able to expand tasks with literal-only kwargs at this point, so we probably don’t need to call `_get_map_lengths` (which is expensive and accesses db), but instead rely solely on the task kwargs to get this length.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org