You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/06/07 16:12:33 UTC

[GitHub] [airflow] bbovenzi commented on a diff in pull request #24284: Speed up grid_data endpoint by 10x

bbovenzi commented on code in PR #24284:
URL: https://github.com/apache/airflow/pull/24284#discussion_r891430623


##########
airflow/www/views.py:
##########
@@ -250,62 +251,102 @@ def _safe_parse_datetime(v):
         abort(400, f"Invalid datetime: {v!r}")
 
 
-def task_group_to_grid(task_item_or_group, dag, dag_runs, session):
+def dag_to_grid(dag, dag_runs, session):
     """
-    Create a nested dict representation of this TaskGroup and its children used to construct
-    the Graph.
+    Create a nested dict representation of the DAG's TaskGroup and its children
+    used to construct the Graph and Grid views.
     """
-    if isinstance(task_item_or_group, AbstractOperator):
-        return {
-            'id': task_item_or_group.task_id,
-            'instances': wwwutils.get_task_summaries(task_item_or_group, dag_runs, session),
-            'label': task_item_or_group.label,
-            'extra_links': task_item_or_group.extra_links,
-            'is_mapped': task_item_or_group.is_mapped,
-        }
+    query = (
+        session.query(
+            TaskInstance.task_id,
+            TaskInstance.run_id,
+            TaskInstance.map_index,
+            TaskInstance.state,
+            TaskInstance.start_date,
+            TaskInstance.end_date,
+            TaskInstance._try_number,
+        )
+        .filter(
+            TaskInstance.dag_id == dag.dag_id,
+            TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]),
+            # Only get normal task instances or the first mapped task
+            TaskInstance.map_index <= 0,
+        )
+        .order_by(TaskInstance.task_id)
+    )
 
-    # Task Group
-    task_group = task_item_or_group
+    grouped_tis = {task_id: list(tis) for task_id, tis in itertools.groupby(query, key=lambda ti: ti.task_id)}
 
-    children = [task_group_to_grid(child, dag, dag_runs, session) for child in task_group.topological_sort()]
+    def task_group_to_grid(item, dag_runs, grouped_tis):
+        if isinstance(item, AbstractOperator):
 
-    def get_summary(dag_run, children):
-        child_instances = [child['instances'] for child in children if 'instances' in child]
-        child_instances = [item for sublist in child_instances for item in sublist]
+            def _get_summary(task_instance):
+                try_count = (
+                    task_instance._try_number
+                    if task_instance._try_number != 0 or task_instance.state in State.running
+                    else task_instance._try_number + 1
+                )
 
-        children_start_dates = [item['start_date'] for item in child_instances if item]
-        children_end_dates = [item['end_date'] for item in child_instances if item]
-        children_states = [item['state'] for item in child_instances if item]
+                return {
+                    'task_id': task_instance.task_id,
+                    'run_id': task_instance.run_id,
+                    'map_index': task_instance.map_index,
+                    'state': task_instance.state,
+                    'start_date': task_instance.start_date,
+                    'end_date': task_instance.end_date,
+                    'try_number': try_count,

Review Comment:
   This no longer returns the `mapped_states` to give a status breakdown of mapped tasks. Do we still want that? If not, I need to change one bit of react code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org