You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/31 17:01:32 UTC

[GitHub] [airflow] uranusjr commented on a change in pull request #21210: Make `airflow dags test` be able to execute Mapped Tasks

uranusjr commented on a change in pull request #21210:
URL: https://github.com/apache/airflow/pull/21210#discussion_r795859822



##########
File path: airflow/executors/debug_executor.py
##########
@@ -76,6 +79,8 @@ def _run_task(self, ti: TaskInstance) -> bool:
         key = ti.key
         try:
             params = self.tasks_params.pop(ti.key, {})
+            if ti.task.is_mapped:
+                ti.task = cast("MappedOperator", ti.task).unmap()

Review comment:
       I wonder if it’d be nicer to also implement `unmap()` on `BaseOperator` and have it simply `return self`.

##########
File path: airflow/jobs/backfill_job.py
##########
@@ -623,15 +650,25 @@ def tabulate_ti_keys_set(set_ti_keys: Set[TaskInstanceKey]) -> str:
             # Sorting by execution date first
             sorted_ti_keys = sorted(
                 set_ti_keys,
-                key=lambda ti_key: (ti_key.run_id, ti_key.dag_id, ti_key.task_id, ti_key.try_number),
+                key=lambda ti_key: (
+                    ti_key.run_id,
+                    ti_key.dag_id,
+                    ti_key.task_id,
+                    ti_key.map_index,
+                    ti_key.try_number,
+                ),
+            )
+            return tabulate(
+                sorted_ti_keys, headers=["DAG ID", "Task ID", "Run ID", "Map Index", "Try number"]
             )
-            return tabulate(sorted_ti_keys, headers=["DAG ID", "Task ID", "Run ID", "Try number"])
 
         def tabulate_tis_set(set_tis: Set[TaskInstance]) -> str:
             # Sorting by execution date first
             sorted_tis = sorted(set_tis, key=lambda ti: (ti.run_id, ti.dag_id, ti.task_id, ti.try_number))
-            tis_values = ((ti.dag_id, ti.task_id, ti.run_id, ti.try_number) for ti in sorted_tis)
-            return tabulate(tis_values, headers=["DAG ID", "Task ID", "Run ID", "Try number"])
+            tis_values = (
+                (ti.dag_id, ti.task_id, ti.run_id, ti.map_index, ti.try_number) for ti in sorted_tis
+            )
+            return tabulate(tis_values, headers=["DAG ID", "Task ID", "Run ID", "Map Index", "Try number"])

Review comment:
       Probably should be discussed in more detail in the UX phase but I wonder if we should hide the map index column entirely if no tis are mapped, and display an empty cell (instead of -1) for non-mapped tis.

##########
File path: airflow/models/baseoperator.py
##########
@@ -1649,6 +1616,7 @@ def _trigger_rule_from_kwargs(self) -> int:
     @classmethod
     def from_operator(cls, operator: BaseOperator, mapped_kwargs: Dict[str, Any]) -> "MappedOperator":
         dag: Optional["DAG"] = getattr(operator, '_dag', None)
+        tg: Optional["TaskGroup"] = getattr(operator, 'task_group', None)

Review comment:
       When are those attributes not available? We can probably make both of these always present on the class (defaulting to None) instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org