You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/04/08 10:49:17 UTC

[GitHub] [airflow] zhongjiajie commented on a change in pull request #4779: [WIP][AIRFLOW-3958] Support list tasks as upstream in chain

zhongjiajie commented on a change in pull request #4779: [WIP][AIRFLOW-3958] Support list tasks as upstream in chain
URL: https://github.com/apache/airflow/pull/4779#discussion_r272986625
 
 

 ##########
 File path: airflow/utils/helpers.py
 ##########
 @@ -154,19 +156,46 @@ def as_flattened_list(iterable):
 
 
 def chain(*tasks):
-    """
-    Given a number of tasks, builds a dependency chain.
+    r"""
+    Given a number of tasks or list of tasks, builds a dependency chain.
 
-    chain(task_1, task_2, task_3, task_4)
+    chain(t1, [t2, t3], [t4, t5], t6)
 
     is equivalent to
 
-    task_1.set_downstream(task_2)
-    task_2.set_downstream(task_3)
-    task_3.set_downstream(task_4)
+      / -> t2 -> t4 \
+    t1               -> t6
+      \ -> t3 -> t5 /
+
+    t1.set_downstream(t2)
+    t1.set_downstream(t3)
+    t2.set_downstream(t4)
+    t3.set_downstream(t5)
+    t4.set_downstream(t6)
+    t5.set_downstream(t6)
+
+    :param tasks: tasks or list[tasks] to set dependencies
+    :type tasks: airflow.models.BaseOperator, List[airflow.models.BaseOperator]
     """
-    for up_task, down_task in zip(tasks[:-1], tasks[1:]):
-        up_task.set_downstream(down_task)
+    for index, task in enumerate(tasks[:-1]):
+        down_task = tasks[index + 1]
+        if isinstance(task, BaseOperator):
+            task.set_downstream(down_task)
+        elif isinstance(down_task, BaseOperator):
+            down_task.set_upstream(task)
+        elif isinstance(task, list) and isinstance(down_task, list):
+            # Parallel pipelines
+            if len(task) == len(down_task):
+                for a, b in zip(task, down_task):
+                    a.set_downstream(b)
+            else:
+                raise AirflowException(
 
 Review comment:
   Not really, that the `cross` mean, we already have `cross_downstream` in `airflow.utils.helper`.I am inspired in https://apache-airflow.slack.com/archives/CCPRP7943/p1554633832061000?thread_ts=1554374078.016800&cid=CCPRP7943 , thinking we should have a function to make parallel pipelines work
   
   ```
        /  -> t1 -> t3
   t0
        \  -> t2 -> t4
   ```
   If we do that, `chain` could do `chain(t0, [t1, t2, t3], [t4, t5, t6])` like
   
   ```
        /  -> t1 -> t4
   t0    --> t2 -> t5
        \  -> t3 -> t6
   ```
   and could `cross_downstream([t1, t2, t3], [t4, t5, t6])`
   
   ```
   t1   \     /  -> t4
           \ /
   t2 -> X   -> t5
           / \
   t3   /      \ -> t6
   ```
   
   make our dependent more possibilities.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services