You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/04/16 08:03:04 UTC

[GitHub] [airflow] alexandrecaze commented on a change in pull request #8277: [AIRFLOW-8187] Extend elastic DAG with a binary tree, grid, star

alexandrecaze commented on a change in pull request #8277: [AIRFLOW-8187] Extend elastic DAG with a binary tree, grid, star
URL: https://github.com/apache/airflow/pull/8277#discussion_r409359371
 
 

 ##########
 File path: scripts/perf/dags/elastic_dag.py
 ##########
 @@ -74,14 +74,79 @@ def safe_dag_id(s: str) -> str:
     return re.sub('[^0-9a-zA-Z_]+', '_', s)
 
 
-# TODO: We should add more shape types e.g. binary tree
+def chain_as_binary_tree(*tasks: BashOperator):
+    r'''
+    Chain tasks as a binary tree where task i is child of task (i - 1) // 2 :
+
+        t0 -> t1 -> t3 -> t7
+          |    \
+          |      -> t4 -> t8
+          |
+           -> t2 -> t5 -> t9
+               \
+                 -> t6
+    '''
+    for i in range(1, len(tasks)):
+        tasks[i].set_downstream(tasks[(i - 1) // 2])
+
+
+def chain_as_grid(*tasks: BashOperator):
+    '''
+    Chain tasks as a grid:
+
+     t0 -> t1 -> t2 -> t3
+      |     |     |
+      v     v     v
+     t4 -> t5 -> t6
+      |     |
+      v     v
+     t7 -> t8
+      |
+      v
+     t9
+    '''
+    if len(tasks) > 100 * 99 / 2:
+        raise ValueError('Cannot generate grid DAGs with lateral size larger than 100 tasks.')
+    grid_size = min([n for n in range(100) if n * (n + 1) / 2 >= len(tasks)])
+
+    def index(i, j):
+        '''
+        Return the index of node (i, j) on the grid.
+        '''
+        return int(grid_size * i - i * (i - 1) / 2 + j)
+
+    for i in range(grid_size - 1):
+        for j in range(grid_size - i - 1):
+            if index(i + 1, j) < len(tasks):
+                tasks[index(i + 1, j)].set_downstream(tasks[index(i, j)])
+            if index(i, j + 1) < len(tasks):
+                tasks[index(i, j + 1)].set_downstream(tasks[index(i, j)])
+
+
+def chain_as_star(*tasks: BashOperator):
+    '''
+    Chain tasks as a star (all tasks are children of task 0)
+
+     t0 -> t1
+      | -> t2
+      | -> t3
+      | -> t4
+      | -> t5
+    '''
+    for i in range(1, len(tasks)):
+        tasks[i].set_downstream(tasks[0])
 
 Review comment:
   oooh nice

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services