You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/30 15:04:30 UTC

[GitHub] [airflow] josh-fell opened a new pull request #16732: Update `chain()` and `cross_downstream()` to support XComArgs

josh-fell opened a new pull request #16732:
URL: https://github.com/apache/airflow/pull/16732


   - Added support for `XComArgs` in `chain()` helper function
   - Updated type annotations for `XComArg` in `cross_downstream()` helper function
   - Added test DAG
   
   Closes: #16635
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #16732: Update `chain()` and `cross_downstream()` to support XComArgs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #16732:
URL: https://github.com/apache/airflow/pull/16732#issuecomment-872649952


   Should an examples be added to the documentation/docstring as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16732: Update `chain()` and `cross_downstream()` to support XComArgs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16732:
URL: https://github.com/apache/airflow/pull/16732#discussion_r664797224



##########
File path: airflow/models/baseoperator.py
##########
@@ -1595,11 +1652,14 @@ def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]):
 
 
 def cross_downstream(
-    from_tasks: Sequence[BaseOperator], to_tasks: Union[BaseOperator, Sequence[BaseOperator]]
+    from_tasks: Union[Sequence[BaseOperator], Sequence["XComArg"]],
+    to_tasks: Union[Sequence[BaseOperator], Sequence["XComArg"]],

Review comment:
       Shouldn't this be
   
   ```suggestion
       from_tasks: Sequence[Union[BaseOperator, "XComArg"]],
       to_tasks: Sequence[Union[BaseOperator Sequence["XComArg"]],
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16732: Update `chain()` and `cross_downstream()` to support XComArgs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16732:
URL: https://github.com/apache/airflow/pull/16732#discussion_r664974960



##########
File path: airflow/models/baseoperator.py
##########
@@ -1595,11 +1652,14 @@ def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]):
 
 
 def cross_downstream(
-    from_tasks: Sequence[BaseOperator], to_tasks: Union[BaseOperator, Sequence[BaseOperator]]
+    from_tasks: Union[Sequence[BaseOperator], Sequence["XComArg"]],
+    to_tasks: Union[Sequence[BaseOperator], Sequence["XComArg"]],

Review comment:
       Good catch, you’re right, these are heterogeneous lists, not two possible homogeneous lists.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16732: Update `chain()` and `cross_downstream()` to support XComArgs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16732:
URL: https://github.com/apache/airflow/pull/16732#discussion_r663572809



##########
File path: airflow/models/baseoperator.py
##########
@@ -1540,22 +1541,24 @@ def inherits_from_dummy_operator(self):
         return getattr(self, '_is_dummy', False)
 
 
-def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]):
+def chain(*tasks: Union[BaseOperator, "XComArg", Sequence[BaseOperator], Sequence["XComArg"]]):
     r"""
     Given a number of tasks, builds a dependency chain.
-    Support mix airflow.models.BaseOperator and List[airflow.models.BaseOperator].
-    If you want to chain between two List[airflow.models.BaseOperator], have to
-    make sure they have same length.
+    Support mix airflow.models.BaseOperator, List[airflow.models.BaseOperator], XComArg, and
+    List[airflow.models.XComArg]. If you want to chain between two List[airflow.models.BaseOperator]
+    or List[airflow.models.XComArg], you have to make sure they have same length.

Review comment:
       ```suggestion
       or List[airflow.models.XComArg], you have to make sure they have the same length.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #16732: Update `chain()` and `cross_downstream()` to support XComArgs

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #16732:
URL: https://github.com/apache/airflow/pull/16732#issuecomment-876254073


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16732: Update `chain()` and `cross_downstream()` to support XComArgs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16732:
URL: https://github.com/apache/airflow/pull/16732#discussion_r664797875



##########
File path: airflow/models/baseoperator.py
##########
@@ -1595,11 +1652,14 @@ def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]):
 
 
 def cross_downstream(
-    from_tasks: Sequence[BaseOperator], to_tasks: Union[BaseOperator, Sequence[BaseOperator]]
+    from_tasks: Union[Sequence[BaseOperator], Sequence["XComArg"]],
+    to_tasks: Union[Sequence[BaseOperator], Sequence["XComArg"]],

Review comment:
       Actually this:
   
   ```suggestion
       from_tasks: Sequence[Union[BaseOperator, "XComArg"]],
       to_tasks: Union[BaseOperator, "XComArg", Sequence[Union[BaseOperator, Sequence["XComArg"]]],
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #16732: Update `chain()` and `cross_downstream()` to support XComArgs

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #16732:
URL: https://github.com/apache/airflow/pull/16732#discussion_r662322916



##########
File path: airflow/models/baseoperator.py
##########
@@ -1566,36 +1567,41 @@ def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]):
         t4.set_downstream(t6)
         t5.set_downstream(t6)
 
-    :param tasks: List of tasks or List[airflow.models.BaseOperator] to set dependencies
-    :type tasks: List[airflow.models.BaseOperator] or airflow.models.BaseOperator
+    :param tasks: List of tasks, List[airflow.models.BaseOperator], XComArg, or List[airflow.models.XComArg]
+        to set dependencies
+    :type tasks: List[airflow.models.BaseOperator], airflow.models.BaseOperator, List[airflow.models.XComArg],
+        or XComArg
     """
+    from airflow.models.xcom_arg import XComArg
+
     for index, up_task in enumerate(tasks[:-1]):
         down_task = tasks[index + 1]
-        if isinstance(up_task, BaseOperator):
+        if isinstance(up_task, (BaseOperator, XComArg)):
             up_task.set_downstream(down_task)
             continue
-        if isinstance(down_task, BaseOperator):
+        if isinstance(down_task, (BaseOperator, XComArg)):
             down_task.set_upstream(up_task)
             continue
         if not isinstance(up_task, Sequence) or not isinstance(down_task, Sequence):
             raise TypeError(
-                'Chain not supported between instances of {up_type} and {down_type}'.format(
+                "Chain not supported between instances of {up_type} and {down_type}".format(
                     up_type=type(up_task), down_type=type(down_task)
                 )
             )
         up_task_list = up_task
         down_task_list = down_task
         if len(up_task_list) != len(down_task_list):
             raise AirflowException(
-                f'Chain not supported different length Iterable '
-                f'but get {len(up_task_list)} and {len(down_task_list)}'
+                f"Chain not supported different length Iterable "
+                f"but get {len(up_task_list)} and {len(down_task_list)}"

Review comment:
       Removed stylistic changes. They were applied via black when saving the file.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16732: Update `chain()` and `cross_downstream()` to support XComArgs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16732:
URL: https://github.com/apache/airflow/pull/16732#discussion_r662211480



##########
File path: airflow/models/baseoperator.py
##########
@@ -1566,36 +1567,41 @@ def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]):
         t4.set_downstream(t6)
         t5.set_downstream(t6)
 
-    :param tasks: List of tasks or List[airflow.models.BaseOperator] to set dependencies
-    :type tasks: List[airflow.models.BaseOperator] or airflow.models.BaseOperator
+    :param tasks: List of tasks, List[airflow.models.BaseOperator], XComArg, or List[airflow.models.XComArg]
+        to set dependencies

Review comment:
       As we are changing this line anyway, we don't need to duplicate the types here -- prose is better
   
   ```suggestion
       :param tasks: List of tasks or XComArgs to set dependencies
   ```

##########
File path: airflow/models/baseoperator.py
##########
@@ -1566,36 +1567,41 @@ def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]):
         t4.set_downstream(t6)
         t5.set_downstream(t6)
 
-    :param tasks: List of tasks or List[airflow.models.BaseOperator] to set dependencies
-    :type tasks: List[airflow.models.BaseOperator] or airflow.models.BaseOperator
+    :param tasks: List of tasks, List[airflow.models.BaseOperator], XComArg, or List[airflow.models.XComArg]
+        to set dependencies
+    :type tasks: List[airflow.models.BaseOperator], airflow.models.BaseOperator, List[airflow.models.XComArg],
+        or XComArg
     """
+    from airflow.models.xcom_arg import XComArg
+
     for index, up_task in enumerate(tasks[:-1]):
         down_task = tasks[index + 1]
-        if isinstance(up_task, BaseOperator):
+        if isinstance(up_task, (BaseOperator, XComArg)):
             up_task.set_downstream(down_task)
             continue
-        if isinstance(down_task, BaseOperator):
+        if isinstance(down_task, (BaseOperator, XComArg)):
             down_task.set_upstream(up_task)
             continue
         if not isinstance(up_task, Sequence) or not isinstance(down_task, Sequence):
             raise TypeError(
-                'Chain not supported between instances of {up_type} and {down_type}'.format(
+                "Chain not supported between instances of {up_type} and {down_type}".format(

Review comment:
       ```suggestion
                   'Chain not supported between instances of {up_type} and {down_type}'.format(
   ```
   
   Please avoid stylistic only changes 

##########
File path: tests/dags/test_chain_xcomargs.py
##########
@@ -0,0 +1,66 @@
+#

Review comment:
       This shouldn't be a new file, but some extra cases with asserts here https://github.com/apache/airflow/blob/fa811057a6ae0fc6c5e4bff1e18971c262a42a4c/tests/models/test_baseoperator.py#L381-L401

##########
File path: airflow/models/baseoperator.py
##########
@@ -1566,36 +1567,41 @@ def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]):
         t4.set_downstream(t6)
         t5.set_downstream(t6)
 
-    :param tasks: List of tasks or List[airflow.models.BaseOperator] to set dependencies
-    :type tasks: List[airflow.models.BaseOperator] or airflow.models.BaseOperator
+    :param tasks: List of tasks, List[airflow.models.BaseOperator], XComArg, or List[airflow.models.XComArg]
+        to set dependencies
+    :type tasks: List[airflow.models.BaseOperator], airflow.models.BaseOperator, List[airflow.models.XComArg],
+        or XComArg
     """
+    from airflow.models.xcom_arg import XComArg
+
     for index, up_task in enumerate(tasks[:-1]):
         down_task = tasks[index + 1]
-        if isinstance(up_task, BaseOperator):
+        if isinstance(up_task, (BaseOperator, XComArg)):
             up_task.set_downstream(down_task)
             continue
-        if isinstance(down_task, BaseOperator):
+        if isinstance(down_task, (BaseOperator, XComArg)):
             down_task.set_upstream(up_task)
             continue
         if not isinstance(up_task, Sequence) or not isinstance(down_task, Sequence):
             raise TypeError(
-                'Chain not supported between instances of {up_type} and {down_type}'.format(
+                "Chain not supported between instances of {up_type} and {down_type}".format(
                     up_type=type(up_task), down_type=type(down_task)
                 )
             )
         up_task_list = up_task
         down_task_list = down_task
         if len(up_task_list) != len(down_task_list):
             raise AirflowException(
-                f'Chain not supported different length Iterable '
-                f'but get {len(up_task_list)} and {len(down_task_list)}'
+                f"Chain not supported different length Iterable "
+                f"but get {len(up_task_list)} and {len(down_task_list)}"

Review comment:
       ```suggestion
                   f'Chain not supported different length Iterable '
                   f'but get {len(up_task_list)} and {len(down_task_list)}'
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #16732: Update `chain()` and `cross_downstream()` to support XComArgs

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #16732:
URL: https://github.com/apache/airflow/pull/16732#discussion_r664187180



##########
File path: airflow/models/baseoperator.py
##########
@@ -1540,22 +1541,24 @@ def inherits_from_dummy_operator(self):
         return getattr(self, '_is_dummy', False)
 
 
-def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]):
+def chain(*tasks: Union[BaseOperator, "XComArg", Sequence[BaseOperator], Sequence["XComArg"]]):
     r"""
     Given a number of tasks, builds a dependency chain.
-    Support mix airflow.models.BaseOperator and List[airflow.models.BaseOperator].
-    If you want to chain between two List[airflow.models.BaseOperator], have to
-    make sure they have same length.
+    Support mix airflow.models.BaseOperator, List[airflow.models.BaseOperator], XComArg, and
+    List[airflow.models.XComArg]. If you want to chain between two List[airflow.models.BaseOperator]
+    or List[airflow.models.XComArg], you have to make sure they have same length.

Review comment:
       πŸ‘ Nice catch!

##########
File path: airflow/models/baseoperator.py
##########
@@ -1540,22 +1541,24 @@ def inherits_from_dummy_operator(self):
         return getattr(self, '_is_dummy', False)
 
 
-def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]):
+def chain(*tasks: Union[BaseOperator, "XComArg", Sequence[BaseOperator], Sequence["XComArg"]]):
     r"""
     Given a number of tasks, builds a dependency chain.
-    Support mix airflow.models.BaseOperator and List[airflow.models.BaseOperator].
-    If you want to chain between two List[airflow.models.BaseOperator], have to
-    make sure they have same length.
+    Support mix airflow.models.BaseOperator, List[airflow.models.BaseOperator], XComArg, and
+    List[airflow.models.XComArg]. If you want to chain between two List[airflow.models.BaseOperator]
+    or List[airflow.models.XComArg], you have to make sure they have same length.

Review comment:
       πŸ‘ Nice catch! Updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #16732: Update `chain()` and `cross_downstream()` to support XComArgs

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #16732:
URL: https://github.com/apache/airflow/pull/16732#discussion_r662323440



##########
File path: tests/dags/test_chain_xcomargs.py
##########
@@ -0,0 +1,66 @@
+#

Review comment:
       Added similar cases for `XComArgs`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr merged pull request #16732: Update chain() and cross_downstream() to support XComArgs

Posted by GitBox <gi...@apache.org>.
uranusjr merged pull request #16732:
URL: https://github.com/apache/airflow/pull/16732


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #16732: Update `chain()` and `cross_downstream()` to support XComArgs

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #16732:
URL: https://github.com/apache/airflow/pull/16732#discussion_r664187180



##########
File path: airflow/models/baseoperator.py
##########
@@ -1540,22 +1541,24 @@ def inherits_from_dummy_operator(self):
         return getattr(self, '_is_dummy', False)
 
 
-def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]):
+def chain(*tasks: Union[BaseOperator, "XComArg", Sequence[BaseOperator], Sequence["XComArg"]]):
     r"""
     Given a number of tasks, builds a dependency chain.
-    Support mix airflow.models.BaseOperator and List[airflow.models.BaseOperator].
-    If you want to chain between two List[airflow.models.BaseOperator], have to
-    make sure they have same length.
+    Support mix airflow.models.BaseOperator, List[airflow.models.BaseOperator], XComArg, and
+    List[airflow.models.XComArg]. If you want to chain between two List[airflow.models.BaseOperator]
+    or List[airflow.models.XComArg], you have to make sure they have same length.

Review comment:
       πŸ‘ Nice catch!

##########
File path: airflow/models/baseoperator.py
##########
@@ -1540,22 +1541,24 @@ def inherits_from_dummy_operator(self):
         return getattr(self, '_is_dummy', False)
 
 
-def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]):
+def chain(*tasks: Union[BaseOperator, "XComArg", Sequence[BaseOperator], Sequence["XComArg"]]):
     r"""
     Given a number of tasks, builds a dependency chain.
-    Support mix airflow.models.BaseOperator and List[airflow.models.BaseOperator].
-    If you want to chain between two List[airflow.models.BaseOperator], have to
-    make sure they have same length.
+    Support mix airflow.models.BaseOperator, List[airflow.models.BaseOperator], XComArg, and
+    List[airflow.models.XComArg]. If you want to chain between two List[airflow.models.BaseOperator]
+    or List[airflow.models.XComArg], you have to make sure they have same length.

Review comment:
       πŸ‘ Nice catch! Updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16732: Update `chain()` and `cross_downstream()` to support XComArgs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16732:
URL: https://github.com/apache/airflow/pull/16732#discussion_r665992076



##########
File path: airflow/models/baseoperator.py
##########
@@ -1540,22 +1541,24 @@ def inherits_from_dummy_operator(self):
         return getattr(self, '_is_dummy', False)
 
 
-def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]):
+def chain(*tasks: Union[BaseOperator, "XComArg", Sequence[BaseOperator], Sequence["XComArg"]]):
     r"""
     Given a number of tasks, builds a dependency chain.
-    Support mix airflow.models.BaseOperator and List[airflow.models.BaseOperator].
-    If you want to chain between two List[airflow.models.BaseOperator], have to
-    make sure they have same length.
+    Support mix airflow.models.BaseOperator, List[airflow.models.BaseOperator], XComArg, and
+    List[airflow.models.XComArg]. If you want to chain between two List[airflow.models.BaseOperator]
+    or List[airflow.models.XComArg], you have to make sure they have the same length.

Review comment:
       ```suggestion
   
       This function accepts values of BaseOperator (aka tasks), XComArg, or lists containing
       either type (or a mix of both in the same list). If you want to chain between two lists you must
       ensure they have the same length.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #16732: Update `chain()` and `cross_downstream()` to support XComArgs

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #16732:
URL: https://github.com/apache/airflow/pull/16732#discussion_r665482055



##########
File path: airflow/models/baseoperator.py
##########
@@ -1595,11 +1652,14 @@ def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]):
 
 
 def cross_downstream(
-    from_tasks: Sequence[BaseOperator], to_tasks: Union[BaseOperator, Sequence[BaseOperator]]
+    from_tasks: Union[Sequence[BaseOperator], Sequence["XComArg"]],
+    to_tasks: Union[Sequence[BaseOperator], Sequence["XComArg"]],

Review comment:
       Agreed, you're right πŸ‘ . Updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on pull request #16732: Update `chain()` and `cross_downstream()` to support XComArgs

Posted by GitBox <gi...@apache.org>.
josh-fell commented on pull request #16732:
URL: https://github.com/apache/airflow/pull/16732#issuecomment-873258264


   > Should an examples be added to the documentation/docstring as well?
   
   Great idea. I'll expand the examples in the docstrings for `chain()` and `cross_downstream()`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16732: Update `chain()` and `cross_downstream()` to support XComArgs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16732:
URL: https://github.com/apache/airflow/pull/16732#discussion_r665988385



##########
File path: airflow/models/baseoperator.py
##########
@@ -1540,22 +1541,24 @@ def inherits_from_dummy_operator(self):
         return getattr(self, '_is_dummy', False)
 
 
-def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]):
+def chain(*tasks: Union[BaseOperator, "XComArg", Sequence[BaseOperator], Sequence["XComArg"]]):

Review comment:
       ```suggestion
   def chain(*tasks: Union[BaseOperator, "XComArg", Sequence[Union[BaseOperator, "XComArg"]]]):
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org