You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/13 21:52:16 UTC

[GitHub] [airflow] subkanthi opened a new pull request #20860: Branch python operator decorator

subkanthi opened a new pull request #20860:
URL: https://github.com/apache/airflow/pull/20860


   Branch python decorator very similar to python virtualenv decorator.
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] subkanthi closed pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
subkanthi closed pull request #20860:
URL: https://github.com/apache/airflow/pull/20860


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
potiuk closed pull request #20860:
URL: https://github.com/apache/airflow/pull/20860


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] subkanthi commented on pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
subkanthi commented on pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#issuecomment-1015740779


   will wait for https://github.com/apache/airflow/pull/20933 to be merged and will rebase again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#discussion_r784510220



##########
File path: tests/decorators/test_branch_python.py
##########
@@ -0,0 +1,116 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from airflow.decorators import task
+from airflow.utils.state import State
+
+
+class Test_BranchPythonDecoratedOperator:
+    def test_branch_one(self, dag_maker):
+        @task
+        def dummy_f():
+            pass
+
+        #

Review comment:
       ```suggestion
   ```
   
   Accident?

##########
File path: tests/decorators/test_branch_python.py
##########
@@ -0,0 +1,116 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from airflow.decorators import task
+from airflow.utils.state import State
+
+
+class Test_BranchPythonDecoratedOperator:
+    def test_branch_one(self, dag_maker):
+        @task
+        def dummy_f():
+            pass
+
+        #
+        @task
+        def task_1():
+            pass
+
+        @task
+        def task_2():
+            pass
+
+        @task.branchpython(task_id="branching")
+        def branch_operator():
+            return "task_1"
+
+        with dag_maker():
+            branchoperator = branch_operator()
+            df = dummy_f()
+            task_1 = task_1()
+            task_2 = task_2()
+
+            df.set_downstream(branchoperator)
+            branchoperator.set_downstream(task_1)
+            branchoperator.set_downstream(task_2)
+
+        dr = dag_maker.create_dagrun()
+        df.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+        branchoperator.operator.run(
+            start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True
+        )
+        task_1.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+        task_2.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+        tis = dr.get_task_instances()
+
+        for ti in tis:
+            if ti.task_id == "dummy_f":
+                assert ti.state == State.SUCCESS
+            if ti.task_id == "branching":
+                assert ti.state == State.SUCCESS
+            if ti.task_id == "task_1":
+                assert ti.state == State.SUCCESS
+            if ti.task_id == "task_2":
+                assert ti.state == State.SKIPPED
+
+    def test_branch_two(self, dag_maker):

Review comment:
       These two tests should be merged into one with `@pytest.mark.parametrize`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] subkanthi commented on a change in pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
subkanthi commented on a change in pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#discussion_r787243847



##########
File path: airflow/decorators/__init__.pyi
##########
@@ -269,6 +287,8 @@ class TaskDecoratorFactory:
     @overload
     def python(self, python_callable: F) -> F: ...
     @overload
+    def branch(self, python_callable: F) -> F: ...
+    @overload

Review comment:
       Changed, thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] subkanthi commented on a change in pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
subkanthi commented on a change in pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#discussion_r786109099



##########
File path: airflow/decorators/branch_python.py
##########
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+from textwrap import dedent
+from typing import Callable, Optional, Sequence, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.operators.python import BranchPythonOperator
+from airflow.utils.python_virtualenv import remove_task_decorator

Review comment:
       Thanks @uranusjr , is it OK if I do that in a separate PR since it affects the docker decorator too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] subkanthi commented on pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
subkanthi commented on pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#issuecomment-1038389898






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] subkanthi closed pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
subkanthi closed pull request #20860:
URL: https://github.com/apache/airflow/pull/20860


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] subkanthi commented on pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
subkanthi commented on pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#issuecomment-1038534686


   > I think it is "triggered" by some of the changes you've done in `cncf.kubrmetes`. MyPy is "stable" (i.e. same files generate same errors) but the stability is a bit "fragile" - i.e. changes in one area might generate new errors discovered "around" - even if they are seemingly unrelated.
   > 
   > In this case it seems that adding new package (the `__init__.py` in decorators) triggered it.
   > 
   > I think you do not need that change actually so likely removing it will fix it.
   
   Thanks removed it, but looks like it didnt help with the mypy error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] subkanthi commented on a change in pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
subkanthi commented on a change in pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#discussion_r785334998



##########
File path: airflow/decorators/branch_python.py
##########
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+from textwrap import dedent
+from typing import Callable, Optional, Sequence, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.operators.python import BranchPythonOperator
+from airflow.utils.python_virtualenv import remove_task_decorator
+
+
+class _BranchPythonDecoratedOperator(DecoratedOperator, BranchPythonOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields: Sequence[str] = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs: Sequence[str] = ('python_callable',)
+
+    def __init__(
+        self,
+        **kwargs,
+    ) -> None:
+        kwargs_to_upstream = {
+            "python_callable": kwargs["python_callable"],
+            "op_args": kwargs["op_args"],
+            "op_kwargs": kwargs["op_kwargs"],
+        }
+        super().__init__(kwargs_to_upstream=kwargs_to_upstream, **kwargs)
+
+    def get_python_source(self):
+        raw_source = inspect.getsource(self.python_callable)
+        res = dedent(raw_source)
+        res = remove_task_decorator(res, "@task.branch")
+        return res
+
+
+T = TypeVar("T", bound=Callable)
+
+
+class BranchPythonDecoratorMixin:
+    """
+    Mixin class for adding branch python decorator
+    to the provider managers.
+    """
+
+    def branch(
+        self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
+    ):
+        """
+        Wraps a python function into an Airflow operator to run via a Python virtual environment.

Review comment:
       Fixed.

##########
File path: airflow/decorators/branch_python.py
##########
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+from textwrap import dedent
+from typing import Callable, Optional, Sequence, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.operators.python import BranchPythonOperator
+from airflow.utils.python_virtualenv import remove_task_decorator
+
+
+class _BranchPythonDecoratedOperator(DecoratedOperator, BranchPythonOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields: Sequence[str] = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs: Sequence[str] = ('python_callable',)
+
+    def __init__(
+        self,
+        **kwargs,
+    ) -> None:
+        kwargs_to_upstream = {
+            "python_callable": kwargs["python_callable"],
+            "op_args": kwargs["op_args"],
+            "op_kwargs": kwargs["op_kwargs"],
+        }
+        super().__init__(kwargs_to_upstream=kwargs_to_upstream, **kwargs)
+
+    def get_python_source(self):
+        raw_source = inspect.getsource(self.python_callable)
+        res = dedent(raw_source)
+        res = remove_task_decorator(res, "@task.branch")
+        return res
+
+
+T = TypeVar("T", bound=Callable)
+
+
+class BranchPythonDecoratorMixin:
+    """
+    Mixin class for adding branch python decorator
+    to the provider managers.
+    """
+
+    def branch(
+        self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
+    ):
+        """
+        Wraps a python function into an Airflow operator to run via a Python virtual environment.
+
+        Accepts kwargs for operator kwarg. Can be reused in a single DAG.
+
+        :param python_callable: Function to decorate
+        :type python_callable: Optional[Callable]
+        :param multiple_outputs: if set, function return value will be
+            unrolled to multiple XCom values. List/Tuples will unroll to xcom values

Review comment:
       Removed, thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #20860:
URL: https://github.com/apache/airflow/pull/20860


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#issuecomment-1038394132


   I think it is "triggered" by some of the changes you've done in `cncf.kubrmetes`. MyPy is "stable" (i.e. same files generate same errors) but the stability is a bit "fragile" - i.e. changes in one area might generate new errors discovered "around" - even if they are seemingly unrelated.
   
   In this case it seems that adding new package (the `__init__.py` in decorators) triggered it. 
   
   I think you do not need that change actually so likely removing it will fix it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] subkanthi closed pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
subkanthi closed pull request #20860:
URL: https://github.com/apache/airflow/pull/20860


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] subkanthi commented on a change in pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
subkanthi commented on a change in pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#discussion_r784367273



##########
File path: airflow/decorators/branch_python.py
##########
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+from textwrap import dedent
+from typing import Callable, Optional, Sequence, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.operators.python import BranchPythonOperator
+from airflow.utils.python_virtualenv import remove_task_decorator
+
+
+class _BranchPythonDecoratedOperator(DecoratedOperator, BranchPythonOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields: Sequence[str] = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs: Sequence[str] = ('python_callable',)
+
+    def __init__(
+        self,
+        **kwargs,
+    ) -> None:
+        kwargs_to_upstream = {
+            "python_callable": kwargs["python_callable"],
+            "op_args": kwargs["op_args"],
+            "op_kwargs": kwargs["op_kwargs"],
+        }
+        super().__init__(kwargs_to_upstream=kwargs_to_upstream, **kwargs)
+
+    def get_python_source(self):
+        raw_source = inspect.getsource(self.python_callable)
+        res = dedent(raw_source)
+        res = remove_task_decorator(res, "@task.branchpython")
+        return res
+
+
+T = TypeVar("T", bound=Callable)
+
+
+class BranchPythonDecoratorMixin:
+    """
+    Mixin class for adding branch python decorator
+    to the provider managers.
+    """
+
+    def branchpython(

Review comment:
       Maybe branchpython is not a good name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] subkanthi commented on pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
subkanthi commented on pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#issuecomment-1039293532


   Thanks @uranusjr and @potiuk , all green now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#issuecomment-1038943518


   Urgh, I know what’s going on, Mypy doesn’t like it when you reassign a variable to a different type. For this particular case, the erroring file has
   
   https://github.com/apache/airflow/blob/28378d867afaac497529bd2e1d2c878edf66f460/airflow/example_dags/example_kubernetes_executor.py#L61
   
   which assigns a task to a variable types for the task _function_. Mypy somehow allows that (it probably shouldn’t) but errors later when you actually try to use the variable.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#issuecomment-1038394132


   I think it is "triggered" by some of the changes you've done in `cncf.kubrmetes`. MyPy is "stable" (i.e. same files generate same errors) but the stability is a bit "fragile" - i.e. changes in one area might generate new errors discovered "around" - even if they are seemingly unrelate.
   
   In this case it seems that adding new package (the `__init__.py` in decorators) triggered it. 
   
   I think you do not need that change actually so likely removing it will fix it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#discussion_r785631253



##########
File path: airflow/decorators/branch_python.py
##########
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+from textwrap import dedent
+from typing import Callable, Optional, Sequence, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.operators.python import BranchPythonOperator
+from airflow.utils.python_virtualenv import remove_task_decorator

Review comment:
       `code_utils` feels like a natural home for the function.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] subkanthi commented on a change in pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
subkanthi commented on a change in pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#discussion_r790090990



##########
File path: airflow/decorators/__init__.pyi
##########
@@ -41,7 +41,6 @@ class TaskDecoratorFactory:
         **kwargs,
     ) -> TaskDecorator:
         """Create a decorator to convert the decorated callable to a task.
-

Review comment:
       Restored blank lines.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#issuecomment-1022842622


   Static checks are failing for unrelated errors. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] subkanthi commented on pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
subkanthi commented on pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#issuecomment-1039293532


   Thanks @uranusjr and @potiuk , all green now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#issuecomment-1038394132


   I think it is "triggered" by some of the changes you've done in `cncf.kubrmetes`. MyPy is "stable" (i.e. same files generate same errors) but the stability is a bit "fragile" - i.e. changes in one area might generate new errors discovered "around" - even if they are seemingly unrelate.
   
   In this case it seems that adding new package (the __init_.py in decorators) triggered it. 
   
   I think you do not need that change actually so likely removing it will fix it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#discussion_r784471104



##########
File path: airflow/decorators/branch_python.py
##########
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+from textwrap import dedent
+from typing import Callable, Optional, Sequence, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.operators.python import BranchPythonOperator
+from airflow.utils.python_virtualenv import remove_task_decorator
+
+
+class _BranchPythonDecoratedOperator(DecoratedOperator, BranchPythonOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields: Sequence[str] = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs: Sequence[str] = ('python_callable',)
+
+    def __init__(
+        self,
+        **kwargs,
+    ) -> None:
+        kwargs_to_upstream = {
+            "python_callable": kwargs["python_callable"],
+            "op_args": kwargs["op_args"],
+            "op_kwargs": kwargs["op_kwargs"],
+        }
+        super().__init__(kwargs_to_upstream=kwargs_to_upstream, **kwargs)
+
+    def get_python_source(self):
+        raw_source = inspect.getsource(self.python_callable)
+        res = dedent(raw_source)
+        res = remove_task_decorator(res, "@task.branchpython")
+        return res
+
+
+T = TypeVar("T", bound=Callable)
+
+
+class BranchPythonDecoratorMixin:
+    """
+    Mixin class for adding branch python decorator
+    to the provider managers.
+    """
+
+    def branchpython(

Review comment:
       I’d say just call it `branch`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] subkanthi commented on a change in pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
subkanthi commented on a change in pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#discussion_r785252124



##########
File path: tests/decorators/test_branch_python.py
##########
@@ -0,0 +1,116 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from airflow.decorators import task
+from airflow.utils.state import State
+
+
+class Test_BranchPythonDecoratedOperator:
+    def test_branch_one(self, dag_maker):
+        @task
+        def dummy_f():
+            pass
+
+        #
+        @task
+        def task_1():
+            pass
+
+        @task
+        def task_2():
+            pass
+
+        @task.branchpython(task_id="branching")
+        def branch_operator():
+            return "task_1"
+
+        with dag_maker():
+            branchoperator = branch_operator()
+            df = dummy_f()
+            task_1 = task_1()
+            task_2 = task_2()
+
+            df.set_downstream(branchoperator)
+            branchoperator.set_downstream(task_1)
+            branchoperator.set_downstream(task_2)
+
+        dr = dag_maker.create_dagrun()
+        df.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+        branchoperator.operator.run(
+            start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True
+        )
+        task_1.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+        task_2.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+        tis = dr.get_task_instances()
+
+        for ti in tis:
+            if ti.task_id == "dummy_f":
+                assert ti.state == State.SUCCESS
+            if ti.task_id == "branching":
+                assert ti.state == State.SUCCESS
+            if ti.task_id == "task_1":
+                assert ti.state == State.SUCCESS
+            if ti.task_id == "task_2":
+                assert ti.state == State.SKIPPED
+
+    def test_branch_two(self, dag_maker):

Review comment:
       Thanks merged.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] subkanthi commented on a change in pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
subkanthi commented on a change in pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#discussion_r785250956



##########
File path: tests/decorators/test_branch_python.py
##########
@@ -0,0 +1,116 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from airflow.decorators import task
+from airflow.utils.state import State
+
+
+class Test_BranchPythonDecoratedOperator:
+    def test_branch_one(self, dag_maker):
+        @task
+        def dummy_f():
+            pass
+
+        #

Review comment:
       Removed, thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] subkanthi commented on pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
subkanthi commented on pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#issuecomment-1038389898


   @potiuk , @uranusjr  can u please check this, the mypy error is not related to the change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#issuecomment-1038394132


   I think it is "triggered" by some of the changes you've done in `cncf.kubrmetes`. MyPy is "stable" (i.e. same files generate same errors) but the stability is a bit "fragile" - i.e. changes in one area might generate new errors discovered "around" - even if they are seemingly unrelate.
   
   In this case it seems that adding new package (the __init_.py in decorators) triggered it. 
   
   I think you do not need that change actually so likely removing it will fix it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#issuecomment-1038957581


   I pushed a commit to your branch that should fix the error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] subkanthi commented on a change in pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
subkanthi commented on a change in pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#discussion_r785250871



##########
File path: airflow/decorators/branch_python.py
##########
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+from textwrap import dedent
+from typing import Callable, Optional, Sequence, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.operators.python import BranchPythonOperator
+from airflow.utils.python_virtualenv import remove_task_decorator
+
+
+class _BranchPythonDecoratedOperator(DecoratedOperator, BranchPythonOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields: Sequence[str] = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs: Sequence[str] = ('python_callable',)
+
+    def __init__(
+        self,
+        **kwargs,
+    ) -> None:
+        kwargs_to_upstream = {
+            "python_callable": kwargs["python_callable"],
+            "op_args": kwargs["op_args"],
+            "op_kwargs": kwargs["op_kwargs"],
+        }
+        super().__init__(kwargs_to_upstream=kwargs_to_upstream, **kwargs)
+
+    def get_python_source(self):
+        raw_source = inspect.getsource(self.python_callable)
+        res = dedent(raw_source)
+        res = remove_task_decorator(res, "@task.branchpython")
+        return res
+
+
+T = TypeVar("T", bound=Callable)
+
+
+class BranchPythonDecoratorMixin:
+    """
+    Mixin class for adding branch python decorator
+    to the provider managers.
+    """
+
+    def branchpython(

Review comment:
       Thanks, changed to branch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#discussion_r787173872



##########
File path: airflow/decorators/__init__.pyi
##########
@@ -269,6 +287,8 @@ class TaskDecoratorFactory:
     @overload
     def python(self, python_callable: F) -> F: ...
     @overload
+    def branch(self, python_callable: F) -> F: ...
+    @overload

Review comment:
       @subkanthi My PR is merged now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#issuecomment-1015778533


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] subkanthi commented on a change in pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
subkanthi commented on a change in pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#discussion_r785335051



##########
File path: airflow/decorators/branch_python.py
##########
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+from textwrap import dedent
+from typing import Callable, Optional, Sequence, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.operators.python import BranchPythonOperator
+from airflow.utils.python_virtualenv import remove_task_decorator

Review comment:
       This is still referred from python_virtualenv, maybe we can move it a common place for separation of concerns, also used in docker decorator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] subkanthi commented on a change in pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
subkanthi commented on a change in pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#discussion_r785335051



##########
File path: airflow/decorators/branch_python.py
##########
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+from textwrap import dedent
+from typing import Callable, Optional, Sequence, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.operators.python import BranchPythonOperator
+from airflow.utils.python_virtualenv import remove_task_decorator

Review comment:
       This is still referred from python_virtualenv, maybe we can move it a common place for separation of concerns.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#issuecomment-1038394132






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #20860:
URL: https://github.com/apache/airflow/pull/20860


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#discussion_r787359834



##########
File path: airflow/decorators/__init__.pyi
##########
@@ -41,7 +41,6 @@ class TaskDecoratorFactory:
         **kwargs,
     ) -> TaskDecorator:
         """Create a decorator to convert the decorated callable to a task.
-

Review comment:
       Please don’t remove the blank line (and please add one for `branch`). Those are intended to separate the “summary” and other descriptions.
   
   https://www.python.org/dev/peps/pep-0257/#multi-line-docstrings




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#issuecomment-1038933027


   Then I guess you need to fix the error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#discussion_r785304987



##########
File path: airflow/decorators/branch_python.py
##########
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+from textwrap import dedent
+from typing import Callable, Optional, Sequence, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.operators.python import BranchPythonOperator
+from airflow.utils.python_virtualenv import remove_task_decorator
+
+
+class _BranchPythonDecoratedOperator(DecoratedOperator, BranchPythonOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields: Sequence[str] = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs: Sequence[str] = ('python_callable',)
+
+    def __init__(
+        self,
+        **kwargs,
+    ) -> None:
+        kwargs_to_upstream = {
+            "python_callable": kwargs["python_callable"],
+            "op_args": kwargs["op_args"],
+            "op_kwargs": kwargs["op_kwargs"],
+        }
+        super().__init__(kwargs_to_upstream=kwargs_to_upstream, **kwargs)
+
+    def get_python_source(self):
+        raw_source = inspect.getsource(self.python_callable)
+        res = dedent(raw_source)
+        res = remove_task_decorator(res, "@task.branch")
+        return res
+
+
+T = TypeVar("T", bound=Callable)
+
+
+class BranchPythonDecoratorMixin:
+    """
+    Mixin class for adding branch python decorator
+    to the provider managers.
+    """
+
+    def branch(
+        self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
+    ):
+        """
+        Wraps a python function into an Airflow operator to run via a Python virtual environment.
+
+        Accepts kwargs for operator kwarg. Can be reused in a single DAG.
+
+        :param python_callable: Function to decorate
+        :type python_callable: Optional[Callable]
+        :param multiple_outputs: if set, function return value will be
+            unrolled to multiple XCom values. List/Tuples will unroll to xcom values

Review comment:
       List/Tuples aren't handled by `multiple_outputs`.

##########
File path: airflow/decorators/branch_python.py
##########
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+from textwrap import dedent
+from typing import Callable, Optional, Sequence, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.operators.python import BranchPythonOperator
+from airflow.utils.python_virtualenv import remove_task_decorator
+
+
+class _BranchPythonDecoratedOperator(DecoratedOperator, BranchPythonOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields: Sequence[str] = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs: Sequence[str] = ('python_callable',)
+
+    def __init__(
+        self,
+        **kwargs,
+    ) -> None:
+        kwargs_to_upstream = {
+            "python_callable": kwargs["python_callable"],
+            "op_args": kwargs["op_args"],
+            "op_kwargs": kwargs["op_kwargs"],
+        }
+        super().__init__(kwargs_to_upstream=kwargs_to_upstream, **kwargs)
+
+    def get_python_source(self):
+        raw_source = inspect.getsource(self.python_callable)
+        res = dedent(raw_source)
+        res = remove_task_decorator(res, "@task.branch")
+        return res
+
+
+T = TypeVar("T", bound=Callable)
+
+
+class BranchPythonDecoratorMixin:
+    """
+    Mixin class for adding branch python decorator
+    to the provider managers.
+    """
+
+    def branch(
+        self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
+    ):
+        """
+        Wraps a python function into an Airflow operator to run via a Python virtual environment.

Review comment:
       This should be updated to describe the new decorator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20860: Branch python operator decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20860:
URL: https://github.com/apache/airflow/pull/20860#discussion_r787074554



##########
File path: airflow/decorators/__init__.pyi
##########
@@ -269,6 +287,8 @@ class TaskDecoratorFactory:
     @overload
     def python(self, python_callable: F) -> F: ...
     @overload
+    def branch(self, python_callable: F) -> F: ...
+    @overload

Review comment:
       Ah good -- you spotted my change, this will need moving to be next to the other branch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org