You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/03/11 00:48:33 UTC

[GitHub] [airflow] dimberman opened a new pull request #14709: Refactor Taskflow decorator for extensibility

dimberman opened a new pull request #14709:
URL: https://github.com/apache/airflow/pull/14709


   The Taskflow API is a very clean way to turn python functions into
   Airflow tasks, but it is currently limited to only running in the
   virtualenv of the parent worker. We eventually want to offer the ability
   to use decorators that tie to docker images, kubernetes pods, etc.
   
   This commit seperates out the python specific code and creates a
   decorators directory, which will allow us to both build in more "core"
   decorators as well as decorators in providers.
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#issuecomment-796358823


   [The Workflow run](https://github.com/apache/airflow/actions/runs/641274017) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#discussion_r593275548



##########
File path: airflow/decorators/base.py
##########
@@ -0,0 +1,194 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import functools
+import inspect
+import re
+from inspect import signature
+from typing import Any, Callable, Dict, Optional, Tuple, TypeVar, cast
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.models.dag import DAG, DagContext
+from airflow.models.xcom_arg import XComArg
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.task_group import TaskGroup, TaskGroupContext
+
+
+class BaseDecoratedOperator(BaseOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)

Review comment:
       So the Base decorator handles a lot of the necessary things like verifying the function before the execution step.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman merged pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
dimberman merged pull request #14709:
URL: https://github.com/apache/airflow/pull/14709


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#discussion_r593263065



##########
File path: airflow/operators/python.py
##########
@@ -15,32 +15,59 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-import functools
 import inspect
 import os
 import pickle
-import re
 import sys
 import types
 import warnings
-from inspect import signature
 from tempfile import TemporaryDirectory
 from textwrap import dedent
-from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar, Union, cast
+from typing import Any, Callable, Dict, Iterable, List, Optional, Union
 
 import dill
 
+# To maintain backwards compatibility, we import the task object into this file
+# This prevents breakages in dags that use `from airflow.operators.python import task`
+from airflow.decorators.python import (  # noqa # pylint: disable=unused-import
+    PYTHON_OPERATOR_UI_COLOR,
+    python_task,
+)
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
-from airflow.models.dag import DAG, DagContext
 from airflow.models.skipmixin import SkipMixin
 from airflow.models.taskinstance import _CURRENT_CONTEXT
-from airflow.models.xcom_arg import XComArg
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.operator_helpers import determine_kwargs
 from airflow.utils.process_utils import execute_in_subprocess
 from airflow.utils.python_virtualenv import prepare_virtualenv, write_python_script
-from airflow.utils.task_group import TaskGroup, TaskGroupContext
+
+
+def task(python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs):
+    """
+    Deprecated function that calls @task.python and allows users to turn a python function into
+    an Airflow task. Please use the following instead:
+
+    from airflow.decorators import task
+
+    @task
+    def my_task()
+
+    @param python_callable:
+    @param multiple_outputs:
+    @param kwargs:
+    @return:

Review comment:
       needs `:param` format not `@param`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#discussion_r593215214



##########
File path: airflow/decorators/__init__.py
##########
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import warnings
+from typing import Callable, Optional
+
+from airflow.decorators.python import python_task
+from airflow.models.dag import dag  # noqa # pylint: disable=unused-import
+
+
+class _TaskDecorator:
+    def __call__(
+        self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
+    ):
+        """
+        Deprecated function that calls @task.python and allows users to turn a python function into
+        an Airflow task.
+        @param python_callable:
+        @param multiple_outputs:
+        @param kwargs:
+        @return:
+        """
+        warnings.warn(
+            """Calling airflow.decorators.task for python tasks is deprecated.
+            Please use @task.python instead.

Review comment:
       I don't like this really -- having it `@task.python` looses some of the "elegance" of the TaskFlow API.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#discussion_r593216615



##########
File path: airflow/decorators/__init__.py
##########
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import warnings
+from typing import Callable, Optional
+
+from airflow.decorators.python import python_task
+from airflow.models.dag import dag  # noqa # pylint: disable=unused-import
+
+
+class _TaskDecorator:
+    def __call__(
+        self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
+    ):
+        """
+        Deprecated function that calls @task.python and allows users to turn a python function into
+        an Airflow task.
+        @param python_callable:
+        @param multiple_outputs:
+        @param kwargs:
+        @return:
+        """
+        warnings.warn(
+            """Calling airflow.decorators.task for python tasks is deprecated.
+            Please use @task.python instead.
+            Example:
+
+            from airflow.decorators import task
+            @task.python
+            def my_task()""",
+            DeprecationWarning,
+            stacklevel=2,
+        )
+        return task.python(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs)
+
+    @staticmethod
+    def python(python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs):
+        """
+        Python operator decorator. Wraps a function into an Airflow operator.
+        Accepts kwargs for operator kwarg. Can be reused in a single DAG.

Review comment:
       ```suggestion
           Accepts kwargs for operator kwarg. Can be reused in a single DAG.
           
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#issuecomment-796359233


   [The Workflow run](https://github.com/apache/airflow/actions/runs/641280452) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#discussion_r593263170



##########
File path: airflow/operators/python.py
##########
@@ -15,32 +15,59 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-import functools
 import inspect
 import os
 import pickle
-import re
 import sys
 import types
 import warnings
-from inspect import signature
 from tempfile import TemporaryDirectory
 from textwrap import dedent
-from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar, Union, cast
+from typing import Any, Callable, Dict, Iterable, List, Optional, Union
 
 import dill
 
+# To maintain backwards compatibility, we import the task object into this file
+# This prevents breakages in dags that use `from airflow.operators.python import task`
+from airflow.decorators.python import (  # noqa # pylint: disable=unused-import
+    PYTHON_OPERATOR_UI_COLOR,
+    python_task,
+)
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
-from airflow.models.dag import DAG, DagContext
 from airflow.models.skipmixin import SkipMixin
 from airflow.models.taskinstance import _CURRENT_CONTEXT
-from airflow.models.xcom_arg import XComArg
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.operator_helpers import determine_kwargs
 from airflow.utils.process_utils import execute_in_subprocess
 from airflow.utils.python_virtualenv import prepare_virtualenv, write_python_script
-from airflow.utils.task_group import TaskGroup, TaskGroupContext
+
+
+def task(python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs):
+    """
+    Deprecated function that calls @task.python and allows users to turn a python function into
+    an Airflow task. Please use the following instead:
+
+    from airflow.decorators import task
+
+    @task
+    def my_task()
+
+    @param python_callable:
+    @param multiple_outputs:
+    @param kwargs:
+    @return:

Review comment:
       and please add description to them




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#discussion_r593215545



##########
File path: airflow/decorators/__init__.py
##########
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import warnings
+from typing import Callable, Optional
+
+from airflow.decorators.python import python_task
+from airflow.models.dag import dag  # noqa # pylint: disable=unused-import
+
+
+class _TaskDecorator:
+    def __call__(
+        self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
+    ):
+        """
+        Deprecated function that calls @task.python and allows users to turn a python function into
+        an Airflow task.
+        @param python_callable:
+        @param multiple_outputs:
+        @param kwargs:
+        @return:
+        """
+        warnings.warn(
+            """Calling airflow.decorators.task for python tasks is deprecated.
+            Please use @task.python instead.

Review comment:
       Also what about `@dag.task`? (I don't know how that's implemented, it may already delegate to this?)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#issuecomment-797554905


   [The Workflow run](https://github.com/apache/airflow/actions/runs/646601663) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#discussion_r593261669



##########
File path: airflow/decorators/base.py
##########
@@ -0,0 +1,194 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import functools
+import inspect
+import re
+from inspect import signature
+from typing import Any, Callable, Dict, Optional, Tuple, TypeVar, cast
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.models.dag import DAG, DagContext
+from airflow.models.xcom_arg import XComArg
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.task_group import TaskGroup, TaskGroupContext
+
+
+class BaseDecoratedOperator(BaseOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)

Review comment:
       Do we need this in base decorator?
   
   `op_args`, `op_kwargs` and `python_callable` are just fields of a `PythonOperator` for now. If we want to extend it to `@task.docker`, we might either want to add this to the docker / KPO or do we have a different plan




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#issuecomment-796801386


   Ah I missed the addition of `impl_class`. Cool.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#issuecomment-797555500


   [The Workflow run](https://github.com/apache/airflow/actions/runs/646567419) is cancelling this PR. Building image for the PR has been cancelled


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#discussion_r593218496



##########
File path: airflow/decorators/__init__.py
##########
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import warnings
+from typing import Callable, Optional
+
+from airflow.decorators.python import python_task
+from airflow.models.dag import dag  # noqa # pylint: disable=unused-import
+
+
+class _TaskDecorator:
+    def __call__(
+        self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
+    ):
+        """
+        Deprecated function that calls @task.python and allows users to turn a python function into
+        an Airflow task.
+        @param python_callable:
+        @param multiple_outputs:
+        @param kwargs:
+        @return:
+        """
+        warnings.warn(
+            """Calling airflow.decorators.task for python tasks is deprecated.
+            Please use @task.python instead.
+            Example:
+
+            from airflow.decorators import task
+            @task.python
+            def my_task()""",
+            DeprecationWarning,
+            stacklevel=2,
+        )
+        return task.python(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs)
+
+    @staticmethod
+    def python(python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs):
+        """
+        Python operator decorator. Wraps a function into an Airflow operator.
+        Accepts kwargs for operator kwarg. Can be reused in a single DAG.

Review comment:
       Oh nvm newline




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#discussion_r593363124



##########
File path: airflow/decorators/python.py
##########
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Callable, Dict, Optional, TypeVar
+
+from airflow.decorators.base import BaseDecoratedOperator, task_decorator_factory
+from airflow.exceptions import AirflowException
+from airflow.utils.decorators import apply_defaults
+
+PYTHON_OPERATOR_UI_COLOR = '#ffefeb'
+
+
+class _PythonDecoratedOperator(BaseDecoratedOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    ui_color = PYTHON_OPERATOR_UI_COLOR
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+    def execute(self, context: Dict):
+        return_value = self.python_callable(*self.op_args, **self.op_kwargs)
+        self.log.debug("Done. Returned value was: %s", return_value)
+        if not self.multiple_outputs:
+            return return_value
+        if isinstance(return_value, dict):
+            for key in return_value.keys():
+                if not isinstance(key, str):
+                    raise AirflowException(
+                        'Returned dictionary keys must be strings when using '
+                        f'multiple_outputs, found {key} ({type(key)}) instead'
+                    )
+            for key, value in return_value.items():
+                self.xcom_push(context, key, value)
+        else:
+            raise AirflowException(
+                f'Returned output was type {type(return_value)} expected dictionary ' 'for multiple_outputs'
+            )
+        return return_value
+
+
+T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
+
+
+def python_task(
+    python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
+):
+    """
+    Python operator decorator. Wraps a function into an Airflow operator.
+    Accepts kwargs for operator kwarg. Can be reused in a single DAG.
+    :param python_callable: Function to decorate

Review comment:
       ```suggestion
       Accepts kwargs for operator kwarg. Can be reused in a single DAG.
   
       :param python_callable: Function to decorate
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#discussion_r593217987



##########
File path: airflow/decorators/__init__.py
##########
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import warnings
+from typing import Callable, Optional
+
+from airflow.decorators.python import python_task
+from airflow.models.dag import dag  # noqa # pylint: disable=unused-import
+
+
+class _TaskDecorator:
+    def __call__(
+        self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
+    ):
+        """
+        Deprecated function that calls @task.python and allows users to turn a python function into
+        an Airflow task.
+        @param python_callable:
+        @param multiple_outputs:
+        @param kwargs:
+        @return:
+        """
+        warnings.warn(
+            """Calling airflow.decorators.task for python tasks is deprecated.
+            Please use @task.python instead.
+            Example:
+
+            from airflow.decorators import task
+            @task.python
+            def my_task()""",
+            DeprecationWarning,
+            stacklevel=2,
+        )
+        return task.python(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs)
+
+    @staticmethod
+    def python(python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs):
+        """
+        Python operator decorator. Wraps a function into an Airflow operator.
+        Accepts kwargs for operator kwarg. Can be reused in a single DAG.

Review comment:
       What's the difference?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#discussion_r593215789



##########
File path: airflow/decorators/__init__.py
##########
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import warnings
+from typing import Callable, Optional
+
+from airflow.decorators.python import python_task
+from airflow.models.dag import dag  # noqa # pylint: disable=unused-import
+
+
+class _TaskDecorator:
+    def __call__(
+        self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
+    ):
+        """
+        Deprecated function that calls @task.python and allows users to turn a python function into
+        an Airflow task.
+        @param python_callable:
+        @param multiple_outputs:
+        @param kwargs:
+        @return:
+        """
+        warnings.warn(
+            """Calling airflow.decorators.task for python tasks is deprecated.
+            Please use @task.python instead.
+            Example:
+
+            from airflow.decorators import task
+            @task.python
+            def my_task()""",
+            DeprecationWarning,
+            stacklevel=2,
+        )
+        return task.python(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs)

Review comment:
       ```suggestion
           return self.python(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#issuecomment-797685864


   [The Workflow run](https://github.com/apache/airflow/actions/runs/647095446) is cancelling this PR. Building images for the PR has failed. Follow the workflow link to check the reason.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#issuecomment-797663681


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#discussion_r592413860



##########
File path: airflow/decorators/base.py
##########
@@ -0,0 +1,191 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import functools
+import inspect
+import re
+from inspect import signature
+from typing import Any, Callable, Dict, Optional, Tuple, TypeVar, cast
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.models.dag import DAG, DagContext
+from airflow.models.xcom_arg import XComArg
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.task_group import TaskGroup, TaskGroupContext
+
+
+class BaseDecoratedOperator(BaseOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        python_callable: Callable,
+        task_id: str,
+        op_args: Tuple[Any],
+        op_kwargs: Dict[str, Any],
+        multiple_outputs: bool = False,
+        **kwargs,
+    ) -> None:
+        kwargs['task_id'] = self._get_unique_task_id(task_id, kwargs.get('dag'), kwargs.get('task_group'))
+        super().__init__(**kwargs)
+        self.python_callable = python_callable
+
+        # Check that arguments can be binded
+        signature(python_callable).bind(*op_args, **op_kwargs)
+        self.multiple_outputs = multiple_outputs
+        self.op_args = op_args
+        self.op_kwargs = op_kwargs
+
+    @staticmethod
+    def _get_unique_task_id(
+        task_id: str, dag: Optional[DAG] = None, task_group: Optional[TaskGroup] = None
+    ) -> str:
+        """
+        Generate unique task id given a DAG (or if run in a DAG context)
+        Ids are generated by appending a unique number to the end of
+        the original task id.
+
+        Example:
+          task_id
+          task_id__1
+          task_id__2
+          ...
+          task_id__20
+        """
+        dag = dag or DagContext.get_current_dag()
+        if not dag:
+            return task_id
+
+        # We need to check if we are in the context of TaskGroup as the task_id may
+        # already be altered
+        task_group = task_group or TaskGroupContext.get_current_task_group(dag)
+        tg_task_id = task_group.child_id(task_id) if task_group else task_id
+
+        if tg_task_id not in dag.task_ids:
+            return task_id
+        core = re.split(r'__\d+$', task_id)[0]
+        suffixes = sorted(
+            [
+                int(re.split(r'^.+__', task_id)[1])
+                for task_id in dag.task_ids
+                if re.match(rf'^{core}__\d+$', task_id)
+            ]
+        )
+        if not suffixes:
+            return f'{core}__1'
+        return f'{core}__{suffixes[-1] + 1}'
+
+    @staticmethod
+    def validate_python_callable(python_callable):
+        """
+        Validate that python callable can be wrapped by operator.
+        Raises exception if invalid.
+
+        :param python_callable: Python object to be validated
+        :raises: TypeError, AirflowException
+        """
+        if not callable(python_callable):
+            raise TypeError('`python_callable` param must be callable')
+        if 'self' in signature(python_callable).parameters.keys():
+            raise AirflowException('@task does not support methods')
+
+    def execute(self, context: Dict):
+        raise NotImplementedError()
+
+
+T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
+
+
+def base_task(
+    python_callable: Optional[Callable] = None,
+    multiple_outputs: Optional[bool] = None,
+    impl_class=None,
+    **kwargs,
+) -> Callable[[T], T]:
+    """
+    Python operator decorator. Wraps a function into an Airflow operator.

Review comment:
       This doc string should be updated -- it's still written as if it's for a DAG author, but this is now intended to be used by developers/internal.

##########
File path: airflow/decorators/python.py
##########
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import functools
+from typing import Callable, Dict, Optional, TypeVar
+
+from airflow.decorators.base import BaseDecoratedOperator, base_task
+from airflow.exceptions import AirflowException
+from airflow.utils.decorators import apply_defaults
+
+PYTHON_OPERATOR_UI_COLOR = '#ffefeb'
+
+
+class _PythonDecoratedOperator(BaseDecoratedOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    ui_color = PYTHON_OPERATOR_UI_COLOR
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+    def execute(self, context: Dict):
+        return_value = self.python_callable(*self.op_args, **self.op_kwargs)
+        self.log.debug("Done. Returned value was: %s", return_value)
+        if not self.multiple_outputs:
+            return return_value
+        if isinstance(return_value, dict):
+            for key in return_value.keys():
+                if not isinstance(key, str):
+                    raise AirflowException(
+                        'Returned dictionary keys must be strings when using '
+                        f'multiple_outputs, found {key} ({type(key)}) instead'
+                    )
+            for key, value in return_value.items():
+                self.xcom_push(context, key, value)
+        else:
+            raise AirflowException(
+                f'Returned output was type {type(return_value)} expected dictionary ' 'for multiple_outputs'
+            )
+        return return_value
+
+
+T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
+
+
+def task(python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs):
+    """
+    Python operator decorator. Wraps a function into an Airflow operator.
+    Accepts kwargs for operator kwarg. Can be reused in a single DAG.
+    :param python_callable: Function to decorate
+    :type python_callable: Optional[Callable]
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. List/Tuples will unroll to xcom values
+        with index as key. Dict will unroll to xcom values with keys as XCom keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    @functools.wraps(python_callable)
+    def wrapper(f):
+        return base_task(
+            python_callable=f,
+            multiple_outputs=multiple_outputs,
+            impl_class=_PythonDecoratedOperator,
+            **kwargs,
+        )
+
+    if callable(python_callable):
+        return wrapper(python_callable)
+    elif python_callable is not None:
+        raise AirflowException('No args allowed while using @task, use kwargs instead')
+    return wrapper

Review comment:
       Hmm, I wonder if we could avoid the extra wrapper here? (Not critical, just having an extra pointless wrapper would be nice)

##########
File path: airflow/operators/python.py
##########
@@ -15,32 +15,32 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-import functools
 import inspect
 import os
 import pickle
-import re
 import sys
 import types
 import warnings
-from inspect import signature
 from tempfile import TemporaryDirectory
 from textwrap import dedent
-from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar, Union, cast
+from typing import Any, Callable, Dict, Iterable, List, Optional, Union
 
 import dill
 
+# To maintain backwards compatibility, we import the task object into this file
+# This prevents breakages in dags that use `from airflow.operators.python import task`
+# pylint: disable=unused-import

Review comment:
       This disables it for the whole file, not just the next line.

##########
File path: airflow/decorators/python.py
##########
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import functools
+from typing import Callable, Dict, Optional, TypeVar
+
+from airflow.decorators.base import BaseDecoratedOperator, base_task
+from airflow.exceptions import AirflowException
+from airflow.utils.decorators import apply_defaults
+
+PYTHON_OPERATOR_UI_COLOR = '#ffefeb'
+
+
+class _PythonDecoratedOperator(BaseDecoratedOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    ui_color = PYTHON_OPERATOR_UI_COLOR
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+    def execute(self, context: Dict):
+        return_value = self.python_callable(*self.op_args, **self.op_kwargs)
+        self.log.debug("Done. Returned value was: %s", return_value)
+        if not self.multiple_outputs:
+            return return_value
+        if isinstance(return_value, dict):
+            for key in return_value.keys():
+                if not isinstance(key, str):
+                    raise AirflowException(
+                        'Returned dictionary keys must be strings when using '
+                        f'multiple_outputs, found {key} ({type(key)}) instead'
+                    )
+            for key, value in return_value.items():
+                self.xcom_push(context, key, value)
+        else:
+            raise AirflowException(
+                f'Returned output was type {type(return_value)} expected dictionary ' 'for multiple_outputs'
+            )
+        return return_value
+
+
+T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
+
+
+def task(python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs):
+    """
+    Python operator decorator. Wraps a function into an Airflow operator.
+    Accepts kwargs for operator kwarg. Can be reused in a single DAG.
+    :param python_callable: Function to decorate
+    :type python_callable: Optional[Callable]
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. List/Tuples will unroll to xcom values
+        with index as key. Dict will unroll to xcom values with keys as XCom keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    @functools.wraps(python_callable)
+    def wrapper(f):
+        return base_task(
+            python_callable=f,
+            multiple_outputs=multiple_outputs,
+            impl_class=_PythonDecoratedOperator,
+            **kwargs,
+        )
+
+    if callable(python_callable):
+        return wrapper(python_callable)
+    elif python_callable is not None:
+        raise AirflowException('No args allowed while using @task, use kwargs instead')
+    return wrapper

Review comment:
       I think this can just be:
   
   ```
    def task(python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs):
           return base_task(
               python_callable=f,
               multiple_outputs=multiple_outputs,
               impl_class=_PythonDecoratedOperator,
               **kwargs,
           )
   ```

##########
File path: airflow/operators/python.py
##########
@@ -15,32 +15,32 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-import functools
 import inspect
 import os
 import pickle
-import re
 import sys
 import types
 import warnings
-from inspect import signature
 from tempfile import TemporaryDirectory
 from textwrap import dedent
-from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar, Union, cast
+from typing import Any, Callable, Dict, Iterable, List, Optional, Union
 
 import dill
 
+# To maintain backwards compatibility, we import the task object into this file
+# This prevents breakages in dags that use `from airflow.operators.python import task`
+# pylint: disable=unused-import
+from airflow.decorators.python import task  # noqa
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
-from airflow.models.dag import DAG, DagContext
 from airflow.models.skipmixin import SkipMixin
 from airflow.models.taskinstance import _CURRENT_CONTEXT
-from airflow.models.xcom_arg import XComArg
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.operator_helpers import determine_kwargs
 from airflow.utils.process_utils import execute_in_subprocess
 from airflow.utils.python_virtualenv import prepare_virtualenv, write_python_script
-from airflow.utils.task_group import TaskGroup, TaskGroupContext
+
+PYTHON_OPERATOR_UI_COLOR = '#ffefeb'

Review comment:
       ?

##########
File path: airflow/decorators/base.py
##########
@@ -0,0 +1,191 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import functools
+import inspect
+import re
+from inspect import signature
+from typing import Any, Callable, Dict, Optional, Tuple, TypeVar, cast
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.models.dag import DAG, DagContext
+from airflow.models.xcom_arg import XComArg
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.task_group import TaskGroup, TaskGroupContext
+
+
+class BaseDecoratedOperator(BaseOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        python_callable: Callable,
+        task_id: str,
+        op_args: Tuple[Any],
+        op_kwargs: Dict[str, Any],
+        multiple_outputs: bool = False,
+        **kwargs,
+    ) -> None:
+        kwargs['task_id'] = self._get_unique_task_id(task_id, kwargs.get('dag'), kwargs.get('task_group'))
+        super().__init__(**kwargs)
+        self.python_callable = python_callable
+
+        # Check that arguments can be binded
+        signature(python_callable).bind(*op_args, **op_kwargs)
+        self.multiple_outputs = multiple_outputs
+        self.op_args = op_args
+        self.op_kwargs = op_kwargs
+
+    @staticmethod
+    def _get_unique_task_id(
+        task_id: str, dag: Optional[DAG] = None, task_group: Optional[TaskGroup] = None
+    ) -> str:
+        """
+        Generate unique task id given a DAG (or if run in a DAG context)
+        Ids are generated by appending a unique number to the end of
+        the original task id.
+
+        Example:
+          task_id
+          task_id__1
+          task_id__2
+          ...
+          task_id__20
+        """
+        dag = dag or DagContext.get_current_dag()
+        if not dag:
+            return task_id
+
+        # We need to check if we are in the context of TaskGroup as the task_id may
+        # already be altered
+        task_group = task_group or TaskGroupContext.get_current_task_group(dag)
+        tg_task_id = task_group.child_id(task_id) if task_group else task_id
+
+        if tg_task_id not in dag.task_ids:
+            return task_id
+        core = re.split(r'__\d+$', task_id)[0]
+        suffixes = sorted(
+            [
+                int(re.split(r'^.+__', task_id)[1])
+                for task_id in dag.task_ids
+                if re.match(rf'^{core}__\d+$', task_id)
+            ]
+        )
+        if not suffixes:
+            return f'{core}__1'
+        return f'{core}__{suffixes[-1] + 1}'
+
+    @staticmethod
+    def validate_python_callable(python_callable):
+        """
+        Validate that python callable can be wrapped by operator.
+        Raises exception if invalid.
+
+        :param python_callable: Python object to be validated
+        :raises: TypeError, AirflowException
+        """
+        if not callable(python_callable):
+            raise TypeError('`python_callable` param must be callable')
+        if 'self' in signature(python_callable).parameters.keys():
+            raise AirflowException('@task does not support methods')
+
+    def execute(self, context: Dict):
+        raise NotImplementedError()
+
+
+T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
+
+
+def base_task(
+    python_callable: Optional[Callable] = None,
+    multiple_outputs: Optional[bool] = None,
+    impl_class=None,
+    **kwargs,
+) -> Callable[[T], T]:
+    """
+    Python operator decorator. Wraps a function into an Airflow operator.

Review comment:
       How about calling this `task_decorator_factory`? I think that make it clearer that this what this fn is used for.

##########
File path: airflow/decorators/python.py
##########
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import functools
+from typing import Callable, Dict, Optional, TypeVar
+
+from airflow.decorators.base import BaseDecoratedOperator, base_task
+from airflow.exceptions import AirflowException
+from airflow.utils.decorators import apply_defaults
+
+PYTHON_OPERATOR_UI_COLOR = '#ffefeb'

Review comment:
       Why a constant if it's only used once?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#issuecomment-797635417


   [The Workflow run](https://github.com/apache/airflow/actions/runs/646865541) is cancelling this PR. Building images for the PR has failed. Follow the workflow link to check the reason.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jhtimmins commented on a change in pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
jhtimmins commented on a change in pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#discussion_r594810779



##########
File path: airflow/decorators/base.py
##########
@@ -0,0 +1,194 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import functools
+import inspect
+import re
+from inspect import signature
+from typing import Any, Callable, Dict, Optional, Tuple, TypeVar, cast
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.models.dag import DAG, DagContext
+from airflow.models.xcom_arg import XComArg
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.task_group import TaskGroup, TaskGroupContext
+
+
+class BaseDecoratedOperator(BaseOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        python_callable: Callable,
+        task_id: str,
+        op_args: Tuple[Any],
+        op_kwargs: Dict[str, Any],
+        multiple_outputs: bool = False,
+        **kwargs,
+    ) -> None:
+        kwargs['task_id'] = self._get_unique_task_id(task_id, kwargs.get('dag'), kwargs.get('task_group'))
+        super().__init__(**kwargs)
+        self.python_callable = python_callable
+
+        # Check that arguments can be binded
+        signature(python_callable).bind(*op_args, **op_kwargs)
+        self.multiple_outputs = multiple_outputs
+        self.op_args = op_args
+        self.op_kwargs = op_kwargs
+
+    @staticmethod
+    def _get_unique_task_id(
+        task_id: str, dag: Optional[DAG] = None, task_group: Optional[TaskGroup] = None
+    ) -> str:
+        """
+        Generate unique task id given a DAG (or if run in a DAG context)
+        Ids are generated by appending a unique number to the end of
+        the original task id.
+
+        Example:
+          task_id
+          task_id__1
+          task_id__2
+          ...
+          task_id__20
+        """
+        dag = dag or DagContext.get_current_dag()
+        if not dag:
+            return task_id
+
+        # We need to check if we are in the context of TaskGroup as the task_id may
+        # already be altered
+        task_group = task_group or TaskGroupContext.get_current_task_group(dag)
+        tg_task_id = task_group.child_id(task_id) if task_group else task_id
+
+        if tg_task_id not in dag.task_ids:
+            return task_id
+        core = re.split(r'__\d+$', task_id)[0]
+        suffixes = sorted(
+            [
+                int(re.split(r'^.+__', task_id)[1])
+                for task_id in dag.task_ids
+                if re.match(rf'^{core}__\d+$', task_id)
+            ]
+        )
+        if not suffixes:
+            return f'{core}__1'
+        return f'{core}__{suffixes[-1] + 1}'
+
+    @staticmethod
+    def validate_python_callable(python_callable):
+        """
+        Validate that python callable can be wrapped by operator.
+        Raises exception if invalid.
+
+        :param python_callable: Python object to be validated
+        :raises: TypeError, AirflowException
+        """
+        if not callable(python_callable):
+            raise TypeError('`python_callable` param must be callable')
+        if 'self' in signature(python_callable).parameters.keys():
+            raise AirflowException('@task does not support methods')
+
+    def execute(self, context: Dict):
+        raise NotImplementedError()
+
+
+T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
+
+
+def task_decorator_factory(
+    python_callable: Optional[Callable] = None,
+    multiple_outputs: Optional[bool] = None,
+    decorated_operator_class: BaseDecoratedOperator = None,
+    **kwargs,
+) -> Callable[[T], T]:
+    """
+    A factory that generates a wrapper that raps a function into an Airflow operator.
+    Accepts kwargs for operator kwarg. Can be reused in a single DAG.
+
+    :param python_callable: Function to decorate
+    :type python_callable: Optional[Callable]
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. List/Tuples will unroll to xcom values
+        with index as key. Dict will unroll to xcom values with keys as XCom keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    :param decorated_operator_class: The operator that executes the logic needed to run the python function in
+        the correct environment
+    :type decorated_operator_class: BaseDecoratedOperator
+
+    """
+    # try to infer from  type annotation
+    if python_callable and multiple_outputs is None:
+        sig = signature(python_callable).return_annotation
+        ttype = getattr(sig, "__origin__", None)
+
+        multiple_outputs = sig != inspect.Signature.empty and ttype in (dict, Dict)
+
+    def wrapper(f: T):
+        """
+        Python wrapper to generate PythonDecoratedOperator out of simple python functions.
+        Used for Airflow Decorated interface
+        """
+        BaseDecoratedOperator.validate_python_callable(f)
+        kwargs.setdefault('task_id', f.__name__)
+
+        @functools.wraps(f)
+        def factory(*args, **f_kwargs):
+            op = decorated_operator_class(
+                python_callable=f,
+                op_args=args,
+                op_kwargs=f_kwargs,
+                multiple_outputs=multiple_outputs,
+                **kwargs,
+            )
+            if f.__doc__:
+                op.doc_md = f.__doc__
+            return XComArg(op)
+
+        return cast(T, factory)
+
+    if callable(python_callable):
+        return wrapper(python_callable)
+    elif python_callable is not None:
+        raise AirflowException('No args allowed while using @task, use kwargs instead')
+    return wrapper

Review comment:
       How is it possible to get to this final `return wrapper`? What's the scenario where `python_callable` is valid but not a callable?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#discussion_r593274298



##########
File path: airflow/decorators/base.py
##########
@@ -0,0 +1,194 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import functools
+import inspect
+import re
+from inspect import signature
+from typing import Any, Callable, Dict, Optional, Tuple, TypeVar, cast
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.models.dag import DAG, DagContext
+from airflow.models.xcom_arg import XComArg
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.task_group import TaskGroup, TaskGroupContext
+
+
+class BaseDecoratedOperator(BaseOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)

Review comment:
       @kaxil yes because any subdecorator (e.g. docker or kubernetes) will require taking in a python callable. I sadly don't think it would be simple merge these with the existing DockerOperator or KPO.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#discussion_r595143246



##########
File path: airflow/decorators/base.py
##########
@@ -0,0 +1,194 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import functools
+import inspect
+import re
+from inspect import signature
+from typing import Any, Callable, Dict, Optional, Tuple, TypeVar, cast
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.models.dag import DAG, DagContext
+from airflow.models.xcom_arg import XComArg
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.task_group import TaskGroup, TaskGroupContext
+
+
+class BaseDecoratedOperator(BaseOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        python_callable: Callable,
+        task_id: str,
+        op_args: Tuple[Any],
+        op_kwargs: Dict[str, Any],
+        multiple_outputs: bool = False,
+        **kwargs,
+    ) -> None:
+        kwargs['task_id'] = self._get_unique_task_id(task_id, kwargs.get('dag'), kwargs.get('task_group'))
+        super().__init__(**kwargs)
+        self.python_callable = python_callable
+
+        # Check that arguments can be binded
+        signature(python_callable).bind(*op_args, **op_kwargs)
+        self.multiple_outputs = multiple_outputs
+        self.op_args = op_args
+        self.op_kwargs = op_kwargs
+
+    @staticmethod
+    def _get_unique_task_id(
+        task_id: str, dag: Optional[DAG] = None, task_group: Optional[TaskGroup] = None
+    ) -> str:
+        """
+        Generate unique task id given a DAG (or if run in a DAG context)
+        Ids are generated by appending a unique number to the end of
+        the original task id.
+
+        Example:
+          task_id
+          task_id__1
+          task_id__2
+          ...
+          task_id__20
+        """
+        dag = dag or DagContext.get_current_dag()
+        if not dag:
+            return task_id
+
+        # We need to check if we are in the context of TaskGroup as the task_id may
+        # already be altered
+        task_group = task_group or TaskGroupContext.get_current_task_group(dag)
+        tg_task_id = task_group.child_id(task_id) if task_group else task_id
+
+        if tg_task_id not in dag.task_ids:
+            return task_id
+        core = re.split(r'__\d+$', task_id)[0]
+        suffixes = sorted(
+            [
+                int(re.split(r'^.+__', task_id)[1])
+                for task_id in dag.task_ids
+                if re.match(rf'^{core}__\d+$', task_id)
+            ]
+        )
+        if not suffixes:
+            return f'{core}__1'
+        return f'{core}__{suffixes[-1] + 1}'
+
+    @staticmethod
+    def validate_python_callable(python_callable):
+        """
+        Validate that python callable can be wrapped by operator.
+        Raises exception if invalid.
+
+        :param python_callable: Python object to be validated
+        :raises: TypeError, AirflowException
+        """
+        if not callable(python_callable):
+            raise TypeError('`python_callable` param must be callable')
+        if 'self' in signature(python_callable).parameters.keys():
+            raise AirflowException('@task does not support methods')
+
+    def execute(self, context: Dict):
+        raise NotImplementedError()
+
+
+T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
+
+
+def task_decorator_factory(
+    python_callable: Optional[Callable] = None,
+    multiple_outputs: Optional[bool] = None,
+    decorated_operator_class: BaseDecoratedOperator = None,
+    **kwargs,
+) -> Callable[[T], T]:
+    """
+    A factory that generates a wrapper that raps a function into an Airflow operator.
+    Accepts kwargs for operator kwarg. Can be reused in a single DAG.
+
+    :param python_callable: Function to decorate
+    :type python_callable: Optional[Callable]
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. List/Tuples will unroll to xcom values
+        with index as key. Dict will unroll to xcom values with keys as XCom keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    :param decorated_operator_class: The operator that executes the logic needed to run the python function in
+        the correct environment
+    :type decorated_operator_class: BaseDecoratedOperator
+
+    """
+    # try to infer from  type annotation
+    if python_callable and multiple_outputs is None:
+        sig = signature(python_callable).return_annotation
+        ttype = getattr(sig, "__origin__", None)
+
+        multiple_outputs = sig != inspect.Signature.empty and ttype in (dict, Dict)
+
+    def wrapper(f: T):
+        """
+        Python wrapper to generate PythonDecoratedOperator out of simple python functions.
+        Used for Airflow Decorated interface
+        """
+        BaseDecoratedOperator.validate_python_callable(f)
+        kwargs.setdefault('task_id', f.__name__)
+
+        @functools.wraps(f)
+        def factory(*args, **f_kwargs):
+            op = decorated_operator_class(
+                python_callable=f,
+                op_args=args,
+                op_kwargs=f_kwargs,
+                multiple_outputs=multiple_outputs,
+                **kwargs,
+            )
+            if f.__doc__:
+                op.doc_md = f.__doc__
+            return XComArg(op)
+
+        return cast(T, factory)
+
+    if callable(python_callable):
+        return wrapper(python_callable)
+    elif python_callable is not None:
+        raise AirflowException('No args allowed while using @task, use kwargs instead')
+    return wrapper

Review comment:
       I think this is the difference between `@task` (python_callable will be passed already) and `@task()` -- python_callable won't yet be passed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
dimberman commented on pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#issuecomment-796778509


   @ashb the next step when this is merged will be to create a @docker_task decorator. The way that that the @task decorator is written right now, it's HEAVILY tied into only running a python function on the current virtual environment. Extending it is basically impossible and a lot of boilerplate would need to be rewritten to write new ones. Ultimately I want to create three extensions: 1. create a virtual environment for this task, 2. run this python function in the specified docker container, 3. launch this task in a kubernetes pod using the KPO.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#discussion_r593358170



##########
File path: airflow/decorators/base.py
##########
@@ -0,0 +1,194 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import functools
+import inspect
+import re
+from inspect import signature
+from typing import Any, Callable, Dict, Optional, Tuple, TypeVar, cast
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.models.dag import DAG, DagContext
+from airflow.models.xcom_arg import XComArg
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.task_group import TaskGroup, TaskGroupContext
+
+
+class BaseDecoratedOperator(BaseOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)

Review comment:
       SG




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #14709: Refactor Taskflow decorator for extensibility

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #14709:
URL: https://github.com/apache/airflow/pull/14709#discussion_r592203963



##########
File path: airflow/decorators/base.py
##########
@@ -0,0 +1,191 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import functools
+import inspect
+import re
+from inspect import signature
+from typing import Any, Callable, Dict, Optional, Tuple, TypeVar, cast
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.models.dag import DAG, DagContext
+from airflow.models.xcom_arg import XComArg
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.task_group import TaskGroup, TaskGroupContext
+
+
+class _BaseDecoratedOperator(BaseOperator):

Review comment:
       If this is intended to be used by others it should be `BaseDeocratedOperator`

##########
File path: airflow/decorators/__init__.py
##########
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.

Review comment:
       There is an `airflow/decorators.py` that needs to be removed and the content placed in here.
   
   Having both these files is going to not behave right.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org