You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/01 14:08:48 UTC
[GitHub] [airflow] josh-fell commented on a change in pull request #16732: Update `chain()` and `cross_downstream()` to support XComArgs
josh-fell commented on a change in pull request #16732:
URL: https://github.com/apache/airflow/pull/16732#discussion_r662322916
##########
File path: airflow/models/baseoperator.py
##########
@@ -1566,36 +1567,41 @@ def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]):
t4.set_downstream(t6)
t5.set_downstream(t6)
- :param tasks: List of tasks or List[airflow.models.BaseOperator] to set dependencies
- :type tasks: List[airflow.models.BaseOperator] or airflow.models.BaseOperator
+ :param tasks: List of tasks, List[airflow.models.BaseOperator], XComArg, or List[airflow.models.XComArg]
+ to set dependencies
+ :type tasks: List[airflow.models.BaseOperator], airflow.models.BaseOperator, List[airflow.models.XComArg],
+ or XComArg
"""
+ from airflow.models.xcom_arg import XComArg
+
for index, up_task in enumerate(tasks[:-1]):
down_task = tasks[index + 1]
- if isinstance(up_task, BaseOperator):
+ if isinstance(up_task, (BaseOperator, XComArg)):
up_task.set_downstream(down_task)
continue
- if isinstance(down_task, BaseOperator):
+ if isinstance(down_task, (BaseOperator, XComArg)):
down_task.set_upstream(up_task)
continue
if not isinstance(up_task, Sequence) or not isinstance(down_task, Sequence):
raise TypeError(
- 'Chain not supported between instances of {up_type} and {down_type}'.format(
+ "Chain not supported between instances of {up_type} and {down_type}".format(
up_type=type(up_task), down_type=type(down_task)
)
)
up_task_list = up_task
down_task_list = down_task
if len(up_task_list) != len(down_task_list):
raise AirflowException(
- f'Chain not supported different length Iterable '
- f'but get {len(up_task_list)} and {len(down_task_list)}'
+ f"Chain not supported different length Iterable "
+ f"but get {len(up_task_list)} and {len(down_task_list)}"
Review comment:
Removed stylistic changes. They were applied via black when saving the file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org