You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/01 14:08:48 UTC

[GitHub] [airflow] josh-fell commented on a change in pull request #16732: Update `chain()` and `cross_downstream()` to support XComArgs

josh-fell commented on a change in pull request #16732:
URL: https://github.com/apache/airflow/pull/16732#discussion_r662322916



##########
File path: airflow/models/baseoperator.py
##########
@@ -1566,36 +1567,41 @@ def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]):
         t4.set_downstream(t6)
         t5.set_downstream(t6)
 
-    :param tasks: List of tasks or List[airflow.models.BaseOperator] to set dependencies
-    :type tasks: List[airflow.models.BaseOperator] or airflow.models.BaseOperator
+    :param tasks: List of tasks, List[airflow.models.BaseOperator], XComArg, or List[airflow.models.XComArg]
+        to set dependencies
+    :type tasks: List[airflow.models.BaseOperator], airflow.models.BaseOperator, List[airflow.models.XComArg],
+        or XComArg
     """
+    from airflow.models.xcom_arg import XComArg
+
     for index, up_task in enumerate(tasks[:-1]):
         down_task = tasks[index + 1]
-        if isinstance(up_task, BaseOperator):
+        if isinstance(up_task, (BaseOperator, XComArg)):
             up_task.set_downstream(down_task)
             continue
-        if isinstance(down_task, BaseOperator):
+        if isinstance(down_task, (BaseOperator, XComArg)):
             down_task.set_upstream(up_task)
             continue
         if not isinstance(up_task, Sequence) or not isinstance(down_task, Sequence):
             raise TypeError(
-                'Chain not supported between instances of {up_type} and {down_type}'.format(
+                "Chain not supported between instances of {up_type} and {down_type}".format(
                     up_type=type(up_task), down_type=type(down_task)
                 )
             )
         up_task_list = up_task
         down_task_list = down_task
         if len(up_task_list) != len(down_task_list):
             raise AirflowException(
-                f'Chain not supported different length Iterable '
-                f'but get {len(up_task_list)} and {len(down_task_list)}'
+                f"Chain not supported different length Iterable "
+                f"but get {len(up_task_list)} and {len(down_task_list)}"

Review comment:
       Removed stylistic changes. They were applied via black when saving the file.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org