You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/12 15:06:13 UTC

[GitHub] [airflow] dimberman opened a new pull request #15330: Add a Docker Taskflow decorator

dimberman opened a new pull request #15330:
URL: https://github.com/apache/airflow/pull/15330


   Add the ability to run @task.docker on a python function and turn it into a DockerOperator that can run that python function remotely.
   ```
   @task.docker(
       image="quay.io/bitnami/python:3.8.8",
       force_pull=True,
       docker_url="unix://var/run/docker.sock",
       network_mode="bridge",
       api_version='auto',
   )
   def f():
       import random
       return [random.random() for i in range(10000000)]
   ```
   
   One notable aspect of this architecture is that we had to build it to make as few assumptions about user setups as possible. We could not share a volume between the worker and the container as this would break if the user runs the airflow worker on a docker container. We could not assume that users would have any specialized system libraries on their images (this implementation only requires python 3 and bash).
   
   To work with these requirements, we use base64 encoding to store a jinja generated python file and inputs (which are generated using the same functions used by the PythonVirtualEnvOperator). Once the container starts, it uses these environment variables to deserialize the strings, run the function, and store the result in a file located at /tmp/script.out.
   
   Once the function completes, we create a sleep loop until the DockerOperator retrieves the result via docker's get_archive API. This result can then be deserialized using pickle and sent to Airflow's XCom library in the same fashion as a python or python_virtualenv result.
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#issuecomment-900074401


   Hey @dimberman -> as discussed in #17625 - the providers_manager has been simplified as part of the change and now it should be a bit easier (less duplicated code) to add task_decorators in. You can also see the PR to see which tests should be added for providers/cli entries added. Those are very small tests/cli commands to add.
   
   Also I am updating the documentation as follow-up of the discussion https://lists.apache.org/x/thread.html/rf3e3beba4dac92adcd2e2f11088f992c23baa1aa43518e9efbb625f8@%3Cdev.airflow.apache.org%3E to add better discoverability in Airlfow code, so the documentation update will be a separate PR (which might be the case here as well).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r614359893



##########
File path: airflow/decorators/__init__.py
##########
@@ -139,5 +139,169 @@ def virtualenv(
             **kwargs,
         )
 
+    @staticmethod
+    def docker(  # pylint: disable=too-many-arguments,too-many-locals
+        python_callable: Optional[Callable] = None,

Review comment:
       I am all for that if we can do so in a way that still allows for IDE completion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r614337640



##########
File path: airflow/decorators/docker.py
##########
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+import inspect
+import os
+import pickle
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Callable, Dict, Optional, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator

Review comment:
       @ashb this might be more reason to make the decorator a part of the docker provider? We can then have the task object import as needed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] isaulv commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
isaulv commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r614318172



##########
File path: tests/decorators/test_docker.py
##########
@@ -0,0 +1,170 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import sys
+import unittest.mock
+from datetime import timedelta
+
+from airflow.decorators import task
+from airflow.models import DAG, DagRun, TaskInstance as TI
+from airflow.utils import timezone
+from airflow.utils.session import create_session
+from airflow.utils.state import State
+from airflow.utils.types import DagRunType
+
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+END_DATE = timezone.datetime(2016, 1, 2)
+INTERVAL = timedelta(hours=12)
+FROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1)
+
+TI_CONTEXT_ENV_VARS = [
+    'AIRFLOW_CTX_DAG_ID',
+    'AIRFLOW_CTX_TASK_ID',
+    'AIRFLOW_CTX_EXECUTION_DATE',
+    'AIRFLOW_CTX_DAG_RUN_ID',
+]
+
+PYTHON_VERSION = sys.version_info[0]
+
+
+class TestDockerDecorator(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        super().setUpClass()
+
+        with create_session() as session:
+            session.query(DagRun).delete()
+            session.query(TI).delete()
+
+    def setUp(self):
+        super().setUp()
+        self.dag = DAG('test_dag', default_args={'owner': 'airflow', 'start_date': DEFAULT_DATE})
+        self.addCleanup(self.dag.clear)
+        self.clear_run()
+        self.addCleanup(self.clear_run)
+
+    def clear_run(self):
+        self.run = False
+
+    def tearDown(self):
+        super().tearDown()
+
+        with create_session() as session:
+            session.query(DagRun).delete()
+            session.query(TI).delete()
+
+    def test_basic_docker_operator(self):
+        @task.docker(
+            image="quay.io/bitnami/python:3.8.8",
+            force_pull=True,
+            docker_url="unix://var/run/docker.sock",
+            network_mode="bridge",
+            api_version='auto',
+        )
+        def f():
+            import random
+
+            return [random.random() for i in range(100)]
+
+        with self.dag:
+            ret = f()
+
+        dr = self.dag.create_dagrun(
+            run_id=DagRunType.MANUAL.value,
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)  # pylint: disable=no-member
+        ti = dr.get_task_instances()[0]
+        assert len(ti.xcom_pull()) == 100
+
+    def test_basic_docker_operator_with_param(self):
+        @task.docker(
+            image="quay.io/bitnami/python:3.8.8",
+            force_pull=True,
+            docker_url="unix://var/run/docker.sock",
+            network_mode="bridge",
+            api_version='auto',
+        )
+        def f(num_results):
+            import random
+
+            return [random.random() for i in range(num_results)]
+
+        with self.dag:
+            ret = f(100)
+
+        dr = self.dag.create_dagrun(
+            run_id=DagRunType.MANUAL.value,
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)  # pylint: disable=no-member
+        ti = dr.get_task_instances()[0]
+        assert len(ti.xcom_pull()) == 100
+
+    def test_basic_docker_operator_multiple_output(self):
+        @task.docker(
+            image="quay.io/bitnami/python:3.8.8",
+            force_pull=True,
+            docker_url="unix://var/run/docker.sock",
+            network_mode="bridge",
+            api_version='auto',
+            multiple_outputs=True,
+        )
+        def return_dict(number: int):
+            return {'number': number + 1, '43': 43}
+
+        test_number = 10
+        with self.dag:
+            ret = return_dict(test_number)
+
+        dr = self.dag.create_dagrun(
+            run_id=DagRunType.MANUAL,
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)  # pylint: disable=maybe-no-member
+
+        ti = dr.get_task_instances()[0]
+        assert ti.xcom_pull(key='number') == test_number + 1
+        assert ti.xcom_pull(key='43') == 43
+        assert ti.xcom_pull() == {'number': test_number + 1, '43': 43}
+
+    def test_call_20(self):
+        """Test calling decorated function 21 times in a DAG"""

Review comment:
       func name and doc string don't agree.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r614349308



##########
File path: airflow/decorators/docker.py
##########
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+import inspect
+import os
+import pickle
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Callable, Dict, Optional, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator

Review comment:
       We talked about making the "sub-decorators' (i.e. `task.x`) pluggable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#issuecomment-818970079


   [The Workflow run](https://github.com/apache/airflow/actions/runs/745710812) is cancelling this PR. Building images for the PR has failed. Follow the workflow link to check the reason.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#issuecomment-826946788


   [The Workflow run](https://github.com/apache/airflow/actions/runs/786137653) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r688945978



##########
File path: airflow/provider.yaml.schema.json
##########
@@ -210,7 +210,20 @@
     "additional-extras": {
       "type": "object",
       "description": "Additional extras that the provider should have"
+    },
+    "task-decorators": {
+        "type": "array",
+        "description": "Apply custom decorators to the TaskFlow API. Can be accessed by users via '@task.<name>'",

Review comment:
       Reword to say something along the lines of "Decorators to use with TaskFlow API"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r712206749



##########
File path: airflow/providers/docker/decorators/docker.py
##########
@@ -0,0 +1,298 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+import inspect
+import os
+import pickle
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Callable, Dict, Iterable, List, Optional, TypeVar, Union
+
+import dill
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator
+from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script
+
+
+def _generate_decode_command(env_var, file):
+    return (
+        f'python -c "import os; import base64;'
+        f' x = base64.b64decode(os.environ[\\"{env_var}\\"]);'
+        f' f = open(\\"{file}\\", \\"wb\\"); f.write(x);'
+        f' f.close()"'
+    )
+
+
+def _b64_encode_file(filename):
+    with open(filename, "rb") as file_to_encode:
+        return base64.b64encode(file_to_encode.read())
+
+
+class _DockerDecoratedOperator(DecoratedOperator, DockerOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    def __init__(
+        self,
+        use_dill=False,
+        **kwargs,
+    ) -> None:
+        self.string_args = [1, 2, 1]

Review comment:
       What's this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r708269466



##########
File path: airflow/providers_manager.py
##########
@@ -531,6 +555,52 @@ def _get_attr(obj: Any, attr_name: str):
             return None
         return getattr(obj, attr_name)
 
+    def _add_taskflow_decorator(
+        self, decorator_name, decorator_class_name: str, provider_package: str
+    ) -> None:
+        if provider_package.startswith("apache-airflow"):
+            provider_path = provider_package[len("apache-") :].replace("-", ".")
+            if not decorator_class_name.startswith(provider_path):
+                log.warning(
+                    "Sanity check failed when importing '%s' from '%s' package. It should start with '%s'",
+                    decorator_class_name,
+                    provider_package,
+                    provider_path,
+                )
+                return
+        if decorator_name in self._taskflow_decorator_dict:
+            log.warning(
+                "The taskflow decoraotr '%s' has been already registered (by %s).",

Review comment:
       ```suggestion
                   "The taskflow decorator '%s' has been already registered (by %s).",
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r614321936



##########
File path: airflow/decorators/__init__.py
##########
@@ -139,5 +140,167 @@ def virtualenv(
             **kwargs,
         )
 
+    @staticmethod
+    def docker(  # pylint: disable=too-many-arguments,too-many-locals
+        python_callable: Optional[Callable] = None,
+        multiple_outputs: Optional[bool] = None,
+        image: str = "",
+        api_version: Optional[str] = None,
+        container_name: Optional[str] = None,
+        cpus: float = 1.0,
+        docker_url: str = 'unix://var/run/docker.sock',
+        environment: Optional[Dict] = None,
+        private_environment: Optional[Dict] = None,
+        force_pull: bool = False,
+        mem_limit: Optional[Union[float, str]] = None,
+        host_tmp_dir: Optional[str] = None,
+        network_mode: Optional[str] = None,
+        tls_ca_cert: Optional[str] = None,
+        tls_client_cert: Optional[str] = None,
+        tls_client_key: Optional[str] = None,
+        tls_hostname: Optional[Union[str, bool]] = None,
+        tls_ssl_version: Optional[str] = None,
+        tmp_dir: str = '/tmp/airflow',
+        user: Optional[Union[str, int]] = None,
+        volumes: Optional[List[str]] = None,
+        working_dir: Optional[str] = None,
+        xcom_all: bool = False,
+        docker_conn_id: Optional[str] = None,
+        dns: Optional[List[str]] = None,
+        dns_search: Optional[List[str]] = None,
+        auto_remove: bool = False,
+        shm_size: Optional[int] = None,
+        tty: bool = False,
+        privileged: bool = False,
+        cap_add: Optional[Iterable[str]] = None,
+        extra_hosts: Optional[Dict[str, str]] = None,
+        **kwargs,
+    ):
+        """
+        :param python_callable: A python function with no references to outside variables,
+            defined with def, which will be run in a virtualenv
+        :type python_callable: function
+        :param multiple_outputs: if set, function return value will be
+            unrolled to multiple XCom values. List/Tuples will unroll to xcom values
+            with index as key. Dict will unroll to xcom values with keys as XCom keys.
+            Defaults to False.
+        :type multiple_outputs: bool
+        :param image: Docker image from which to create the container.
+            If image tag is omitted, "latest" will be used.
+        :type image: str
+        :param api_version: Remote API version. Set to ``auto`` to automatically
+            detect the server's version.
+        :type api_version: str
+        :param container_name: Name of the container. Optional (templated)
+        :type container_name: str or None
+        :param cpus: Number of CPUs to assign to the container.
+            This value gets multiplied with 1024. See
+            https://docs.docker.com/engine/reference/run/#cpu-share-constraint
+        :type cpus: float
+        :param docker_url: URL of the host running the docker daemon.
+            Default is unix://var/run/docker.sock
+        :type docker_url: str
+        :param environment: Environment variables to set in the container. (templated)
+        :type environment: dict
+        :param private_environment: Private environment variables to set in the container.
+            These are not templated, and hidden from the website.
+        :type private_environment: dict
+        :param force_pull: Pull the docker image on every run. Default is False.
+        :type force_pull: bool
+        :param mem_limit: Maximum amount of memory the container can use.
+            Either a float value, which represents the limit in bytes,
+            or a string like ``128m`` or ``1g``.
+        :type mem_limit: float or str
+        :param host_tmp_dir: Specify the location of the temporary directory on the host which will
+            be mapped to tmp_dir. If not provided defaults to using the standard system temp directory.
+        :type host_tmp_dir: str
+        :param network_mode: Network mode for the container.
+        :type network_mode: str
+        :param tls_ca_cert: Path to a PEM-encoded certificate authority
+            to secure the docker connection.
+        :type tls_ca_cert: str
+        :param tls_client_cert: Path to the PEM-encoded certificate
+            used to authenticate docker client.
+        :type tls_client_cert: str
+        :param tls_client_key: Path to the PEM-encoded key used to authenticate docker client.
+        :type tls_client_key: str
+        :param tls_hostname: Hostname to match against
+            the docker server certificate or False to disable the check.
+        :type tls_hostname: str or bool
+        :param tls_ssl_version: Version of SSL to use when communicating with docker daemon.
+        :type tls_ssl_version: str
+        :param tmp_dir: Mount point inside the container to
+            a temporary directory created on the host by the operator.
+            The path is also made available via the environment variable
+            ``AIRFLOW_TMP_DIR`` inside the container.
+        :type tmp_dir: str
+        :param user: Default user inside the docker container.
+        :type user: int or str
+        :param volumes: List of volumes to mount into the container, e.g.
+            ``['/host/path:/container/path', '/host/path2:/container/path2:ro']``.
+        :type volumes: list
+        :param working_dir: Working directory to
+            set on the container (equivalent to the -w switch the docker client)
+        :type working_dir: str
+        :param xcom_all: Push all the stdout or just the last line.
+            The default is False (last line).
+        :type xcom_all: bool
+        :param docker_conn_id: ID of the Airflow connection to use
+        :type docker_conn_id: str
+        :param dns: Docker custom DNS servers
+        :type dns: list[str]
+        :param dns_search: Docker custom DNS search domain
+        :type dns_search: list[str]
+        :param auto_remove: Auto-removal of the container on daemon side when the
+            container's process exits.
+            The default is False.
+        :type auto_remove: bool
+        :param shm_size: Size of ``/dev/shm`` in bytes. The size must be
+            greater than 0. If omitted uses system default.
+        :type shm_size: int
+        :param tty: Allocate pseudo-TTY to the container
+            This needs to be set see logs of the Docker container.
+        :type tty: bool
+        :param privileged: Give extended privileges to this container.
+        :type privileged: bool
+        :param cap_add: Include container capabilities
+        :type cap_add: list[str]
+        """
+        return _docker_task(
+            python_callable=python_callable,
+            multiple_outputs=multiple_outputs,
+            image=image,
+            api_version=api_version,
+            container_name=container_name,
+            cpus=cpus,
+            docker_url=docker_url,
+            environment=environment,
+            private_environment=private_environment,
+            force_pull=force_pull,
+            mem_limit=mem_limit,
+            host_tmp_dir=host_tmp_dir,
+            network_mode=network_mode,
+            tls_ca_cert=tls_ca_cert,
+            tls_client_cert=tls_client_cert,
+            tls_client_key=tls_client_key,
+            tls_hostname=tls_hostname,
+            tls_ssl_version=tls_ssl_version,
+            tmp_dir=tmp_dir,
+            user=user,
+            volumes=volumes,
+            working_dir=working_dir,
+            xcom_all=xcom_all,
+            docker_conn_id=docker_conn_id,
+            dns=dns,
+            dns_search=dns_search,
+            auto_remove=auto_remove,
+            shm_size=shm_size,
+            tty=tty,
+            privileged=privileged,
+            cap_add=cap_add,
+            extra_hosts=extra_hosts,
+            **kwargs,
+        )

Review comment:
       Couldn't this all just be:
   
   
   ```python
           docker = _docker_task
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#issuecomment-898908874


   I have not looked at all details yet but this one looks really great :)
   
   One small nit: would be great to add "task-decorators" command to the "airflow providers" cli. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#issuecomment-817928550


   [The Workflow run](https://github.com/apache/airflow/actions/runs/741704940) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#issuecomment-817915740


   Love this! I wonder if it would be better to use something like `@task.container` to abstract away the specific operator being used so that DAG author doesn't need to care about it, and data engineer can control either `DockerOperator` or `KueberntesPodOperator` depending on their infrastructure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r688943999



##########
File path: airflow/decorators/__init__.pyi
##########
@@ -0,0 +1,105 @@
+from typing import Optional, Callable, Iterable, Dict, Union, List
+
+try:
+    from airflow.providers.docker.decorators.docker import DockerDecoratorMixin
+except ImportError:
+    DockerDecoratorMixin = None
+
+class _TaskDecorator:
+    def __call__(self, *args, **kwargs):
+        try:
+            from airflow.providers.docker.decorators import docker
+
+        except ImportError:
+            pass
+    def python(

Review comment:
       Makes perfect sense!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r707646679



##########
File path: airflow/providers_manager.py
##########
@@ -278,6 +309,51 @@ def _get_attr(obj: Any, attr_name: str):
             return None
         return getattr(obj, attr_name)
 
+    def _add_taskflow_decorator(
+        self, decorator_name, decorator_class_name: str, provider_package: str
+    ) -> None:
+        if provider_package.startswith("apache-airflow"):
+            provider_path = provider_package[len("apache-") :].replace("-", ".")
+            if not decorator_class_name.startswith(provider_path):
+                log.warning(
+                    "Sanity check failed when importing '%s' from '%s' package. It should start with '%s'",
+                    decorator_class_name,
+                    provider_package,
+                    provider_path,
+                )
+                return
+        if decorator_name in self._taskflow_decorator_dict:
+            log.warning(
+                "The hook_class '%s' has been already registered.",
+                decorator_class_name,
+            )
+            return
+        try:
+            module, class_name = decorator_class_name.rsplit('.', maxsplit=1)
+            decorator_class = getattr(importlib.import_module(module), class_name)
+            self._taskflow_decorator_dict[decorator_name] = decorator_class

Review comment:
       @ashb I think we can kick the can on this. For now there will be all of zero decorators that will be loaded in this case. I think we can reasonably assume that we're not going to see a flood of custom decorators in the near future. This can be a 2.2.1 feature etc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r690242757



##########
File path: airflow/providers/docker/decorators/docker.py
##########
@@ -0,0 +1,459 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+import inspect
+import os
+import pickle
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Callable, Dict, Iterable, List, Optional, TypeVar, Union
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script
+
+
+def _generate_decode_command(env_var, file):
+    return (
+        f'python -c "import os; import base64;'
+        f' x = base64.b64decode(os.environ[\\"{env_var}\\"]);'
+        f' f = open(\\"{file}\\", \\"wb\\"); f.write(x);'
+        f' f.close()"'
+    )
+
+
+def _b64_encode_file(filename):
+    with open(filename, "rb") as file_to_encode:
+        return base64.b64encode(file_to_encode.read())
+
+
+class _DockerDecoratedOperator(DecoratedOperator, DockerOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        **kwargs,
+    ) -> None:
+        self.string_args = [1, 2, 1]
+        self._output_filename = ""
+        command = "dummy command"
+        self.pickling_library = pickle
+        super().__init__(
+            command=command, retrieve_output=True, retrieve_output_path="/tmp/script.out", **kwargs
+        )
+
+    def execute(self, context: Dict):
+        with TemporaryDirectory(prefix='venv') as tmp_dir:
+            input_filename = os.path.join(tmp_dir, 'script.in')
+            self._output_filename = os.path.join(tmp_dir, 'script.out')
+            string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
+            script_filename = os.path.join(tmp_dir, 'script.py')
+            self._write_args(input_filename)
+            self._write_string_args(string_args_filename)
+            py_source = self._get_python_source()
+            write_python_script(
+                jinja_context=dict(
+                    op_args=self.op_args,
+                    op_kwargs=self.op_kwargs,
+                    pickling_library=self.pickling_library.__name__,
+                    python_callable=self.python_callable.__name__,
+                    python_callable_source=py_source,
+                ),
+                filename=script_filename,
+            )
+
+            self.environment["PYTHON_SCRIPT"] = _b64_encode_file(script_filename)
+            if self.op_args or self.op_kwargs:
+                self.environment["PYTHON_INPUT"] = _b64_encode_file(input_filename)
+            else:
+                self.environment["PYTHON_INPUT"] = ""
+
+            self.command = (
+                f'bash -cx  \'{_generate_decode_command("PYTHON_SCRIPT", "/tmp/script.py")} &&'
+                f'touch /tmp/string_args &&'
+                f'touch /tmp/script.in &&'
+                f'{_generate_decode_command("PYTHON_INPUT", "/tmp/script.in")} &&'
+                f'python /tmp/script.py /tmp/script.in /tmp/script.out /tmp/string_args\''
+            )
+            return super().execute(context)
+
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
+
+    def _write_string_args(self, filename):
+        with open(filename, 'w') as file:
+            file.write('\n'.join(map(str, self.string_args)))
+
+    def _get_python_source(self):
+        raw_source = inspect.getsource(self.python_callable)
+        res = dedent(raw_source)
+        res = remove_task_decorator(res, "@task.docker")
+        return res
+
+
+T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
+
+
+def _docker_task(
+    python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
+):
+    """
+    Python operator decorator. Wraps a function into an Airflow operator.
+    Accepts kwargs for operator kwarg. Can be reused in a single DAG.

Review comment:
       ```suggestion
       Also accepts any argument that DockerOperator will via ``kwargs``. Can be reused in a single DAG.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#issuecomment-820750777


   [The Workflow run](https://github.com/apache/airflow/actions/runs/753441606) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r614322337



##########
File path: airflow/decorators/docker.py
##########
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+import inspect
+import os
+import pickle
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Callable, Dict, Optional, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator

Review comment:
       This import cant be at the top level -- it makes the docker provider a required dep of core as a result.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r712207206



##########
File path: airflow/providers/docker/decorators/docker.py
##########
@@ -0,0 +1,298 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+import inspect
+import os
+import pickle
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Callable, Dict, Iterable, List, Optional, TypeVar, Union
+
+import dill
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator
+from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script
+
+
+def _generate_decode_command(env_var, file):
+    return (
+        f'python -c "import os; import base64;'
+        f' x = base64.b64decode(os.environ[\\"{env_var}\\"]);'
+        f' f = open(\\"{file}\\", \\"wb\\"); f.write(x);'
+        f' f.close()"'
+    )
+
+
+def _b64_encode_file(filename):
+    with open(filename, "rb") as file_to_encode:
+        return base64.b64encode(file_to_encode.read())
+
+
+class _DockerDecoratedOperator(DecoratedOperator, DockerOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    def __init__(
+        self,
+        use_dill=False,
+        **kwargs,
+    ) -> None:
+        self.string_args = [1, 2, 1]

Review comment:
       Why `[1, 2, 1]`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r712203364



##########
File path: airflow/decorators/__init__.py
##########
@@ -15,129 +15,36 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Callable, Dict, Iterable, List, Optional, Union
+from typing import TYPE_CHECKING
 
-from airflow.decorators.python import python_task
-from airflow.decorators.python_virtualenv import _virtualenv_task
+from airflow.decorators.python import PythonDecoratorMixin, python_task  # noqa
+from airflow.decorators.python_virtualenv import PythonVirtualenvDecoratorMixin
 from airflow.decorators.task_group import task_group  # noqa
 from airflow.models.dag import dag  # noqa
+from airflow.providers_manager import ProvidersManager
 
 
-class _TaskDecorator:
-    def __call__(
-        self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
-    ):
-        """
-        Python operator decorator. Wraps a function into an Airflow operator.
-        Accepts kwargs for operator kwarg. This decorator can be reused in a single DAG.
+class _TaskDecorator(PythonDecoratorMixin, PythonVirtualenvDecoratorMixin):
+    def __getattr__(self, name):
+        if name.startswith("__"):
+            raise AttributeError(f'{type(self).__name__} has not attriubte {name!r}')

Review comment:
       ```suggestion
               raise AttributeError(f'{type(self).__name__} has no attribute {name!r}')
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r614353734



##########
File path: airflow/decorators/__init__.py
##########
@@ -139,5 +139,169 @@ def virtualenv(
             **kwargs,
         )
 
+    @staticmethod
+    def docker(  # pylint: disable=too-many-arguments,too-many-locals
+        python_callable: Optional[Callable] = None,

Review comment:
       Yeah, it would be _super_ good if we didn't have to duplicate the args, defaults and docstring from DockerOperator here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#issuecomment-817928013


   [The Workflow run](https://github.com/apache/airflow/actions/runs/741701994) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r712206077



##########
File path: airflow/decorators/__init__.py
##########
@@ -15,129 +15,36 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Callable, Dict, Iterable, List, Optional, Union
+from typing import TYPE_CHECKING
 
-from airflow.decorators.python import python_task
-from airflow.decorators.python_virtualenv import _virtualenv_task
+from airflow.decorators.python import PythonDecoratorMixin, python_task  # noqa
+from airflow.decorators.python_virtualenv import PythonVirtualenvDecoratorMixin
 from airflow.decorators.task_group import task_group  # noqa
 from airflow.models.dag import dag  # noqa
+from airflow.providers_manager import ProvidersManager
 
 
-class _TaskDecorator:
-    def __call__(
-        self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
-    ):
-        """
-        Python operator decorator. Wraps a function into an Airflow operator.
-        Accepts kwargs for operator kwarg. This decorator can be reused in a single DAG.
+class _TaskDecorator(PythonDecoratorMixin, PythonVirtualenvDecoratorMixin):
+    def __getattr__(self, name):
+        if name.startswith("__"):
+            raise AttributeError(f'{type(self).__name__} has not attriubte {name!r}')

Review comment:
       What a mishmash of typing there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r706998645



##########
File path: airflow/providers_manager.py
##########
@@ -278,6 +309,51 @@ def _get_attr(obj: Any, attr_name: str):
             return None
         return getattr(obj, attr_name)
 
+    def _add_taskflow_decorator(
+        self, decorator_name, decorator_class_name: str, provider_package: str
+    ) -> None:
+        if provider_package.startswith("apache-airflow"):
+            provider_path = provider_package[len("apache-") :].replace("-", ".")
+            if not decorator_class_name.startswith(provider_path):
+                log.warning(
+                    "Sanity check failed when importing '%s' from '%s' package. It should start with '%s'",
+                    decorator_class_name,
+                    provider_package,
+                    provider_path,
+                )
+                return
+        if decorator_name in self._taskflow_decorator_dict:
+            log.warning(
+                "The hook_class '%s' has been already registered.",
+                decorator_class_name,
+            )
+            return
+        try:
+            module, class_name = decorator_class_name.rsplit('.', maxsplit=1)
+            decorator_class = getattr(importlib.import_module(module), class_name)
+            self._taskflow_decorator_dict[decorator_name] = decorator_class

Review comment:
       @potiuk did you ever make a refactor for this? If not I'd like to figure out a way we can defer this issue so we can get this merged.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r710995098



##########
File path: docs/spelling_wordlist.txt
##########
@@ -141,6 +141,7 @@ Firehose
 Firestore
 Flink
 FluentD
+FooDecoratedOperator

Review comment:
       ```suggestion
   ```
   code-block are excluded from spell checking.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r688946158



##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -141,6 +144,12 @@ class DockerOperator(BaseOperator):
     :type privileged: bool
     :param cap_add: Include container capabilities
     :type cap_add: list[str]
+    :param retrieve_output: Should this docker image consistently attempt to pull from and output

Review comment:
       Pull from XCom .. "XCom" is missing




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#issuecomment-898909679


   Also another small nit: we need to add a paragraph about task-decorators to "Custom providers" docs https://github.com/apache/airflow/blob/main/docs/apache-airflow-providers/index.rst#custom-provider-packages


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r677957000



##########
File path: airflow/providers/docker/decorators/docker.py
##########
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+import inspect
+import os
+import pickle
+import shlex
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Callable, Dict, Optional, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script
+
+
+def _generate_decode_command(env_var, file):
+    return shlex.quote(
+        f'python -c "import os; import base64;'
+        f' x = base64.b64decode(os.environ["{env_var}"]);'
+        f' f = open("{file}", "wb"); f.write(x);'

Review comment:
       Theoratically `env_var` and `file` could contain the quote character (`"`), so it’s better to use `!r` instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r700358693



##########
File path: airflow/decorators/__init__.py
##########
@@ -15,34 +15,58 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Callable, Optional
-
-from airflow.decorators.python import python_task
-from airflow.decorators.python_virtualenv import _virtualenv_task
+from airflow.decorators.python import PythonDecoratorMixin
+from airflow.decorators.python_virtualenv import PythonVirtualenvDecoratorMixin
 from airflow.decorators.task_group import task_group  # noqa
 from airflow.exceptions import AirflowException
 from airflow.models.dag import dag  # noqa
 from airflow.providers_manager import ProvidersManager
 
+# [START import_docker]
+try:
+    from airflow.providers.docker.decorators.docker import DockerDecoratorMixin
+except ImportError:
+    DockerDecoratorMixin = None
+# [END import_docker]
+
 
 class _TaskDecorator:

Review comment:
       We can mix these ones in directly:
   
   ```python
   class _TaskDecorator(PythonDecoratorMixin, DockerDecoratorMixin):
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r690246906



##########
File path: docs/apache-airflow/docker_decorator.py
##########
@@ -0,0 +1,460 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       I don't think you meant to have this file here.

##########
File path: airflow/providers_manager.py
##########
@@ -278,6 +309,51 @@ def _get_attr(obj: Any, attr_name: str):
             return None
         return getattr(obj, attr_name)
 
+    def _add_taskflow_decorator(
+        self, decorator_name, decorator_class_name: str, provider_package: str
+    ) -> None:
+        if provider_package.startswith("apache-airflow"):
+            provider_path = provider_package[len("apache-") :].replace("-", ".")
+            if not decorator_class_name.startswith(provider_path):
+                log.warning(
+                    "Sanity check failed when importing '%s' from '%s' package. It should start with '%s'",
+                    decorator_class_name,
+                    provider_package,
+                    provider_path,
+                )
+                return
+        if decorator_name in self._taskflow_decorator_dict:
+            log.warning(
+                "The hook_class '%s' has been already registered.",
+                decorator_class_name,
+            )
+            return
+        try:
+            module, class_name = decorator_class_name.rsplit('.', maxsplit=1)
+            decorator_class = getattr(importlib.import_module(module), class_name)
+            self._taskflow_decorator_dict[decorator_name] = decorator_class

Review comment:
       This causes _all_ decorators to be imported up front, which was something I wanted us to avoid -- i.e. I'd like `airflow.providers.docker.decorators.docker.docker_decorator` to only be imported on the first access (per process) of `@task.docker`.

##########
File path: docs/apache-airflow/tutorial_taskflow_api.rst
##########
@@ -180,16 +180,155 @@ and even a different python version to run your function.
 
 This option should allow for far greater flexibility for users who wish to keep their workflows more simple
 and pythonic.
+Using the Taskflow API with Docker or Virtual Environments
+----------------------------------------------------------
+
+As of Airflow <Airflow version>, you will have the ability to use the Taskflow API with either a

Review comment:
       ```suggestion
   As of Airflow 2.2, you will have the ability to use the Taskflow API with either a
   ```

##########
File path: docs/apache-airflow/tutorial_taskflow_api.rst
##########
@@ -180,16 +180,155 @@ and even a different python version to run your function.
 
 This option should allow for far greater flexibility for users who wish to keep their workflows more simple
 and pythonic.
+Using the Taskflow API with Docker or Virtual Environments

Review comment:
       ```suggestion
   
   Using the Taskflow API with Docker or Virtual Environments
   ```

##########
File path: docs/apache-airflow/tutorial_taskflow_api.rst
##########
@@ -180,16 +180,155 @@ and even a different python version to run your function.
 
 This option should allow for far greater flexibility for users who wish to keep their workflows more simple
 and pythonic.
+Using the Taskflow API with Docker or Virtual Environments
+----------------------------------------------------------
+
+As of Airflow <Airflow version>, you will have the ability to use the Taskflow API with either a
+docker container or python virtual environment. This added functionality will allow a much more
+comprehensive range of use-cases for the Taskflow API, as you will not be limited to the
+packages and system libraries of the Airflow worker.
+
+To use a docker image with the Taskflow API, change the decorator to ``@task.docker``
+and add any needed arguments to correctly run the task. Please note that the docker
+image must have a python library and take in a bash command as the ``command`` argument.
+
+Below is an example of using the ``@task.docker`` decorator to run a python task.
+
+.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py
+    :language: python
+    :dedent: 4
+    :start-after: [START transform_docker]
+    :end-before: [END transform_docker]
+
+If you don't want to run your image on a docker environment, and instead want to create a separate virtual

Review comment:
       ```suggestion
   If you don't want to run your image on a Docker environment, and instead want to create a separate virtual
   ```

##########
File path: docs/apache-airflow/tutorial_taskflow_api.rst
##########
@@ -180,16 +180,155 @@ and even a different python version to run your function.
 
 This option should allow for far greater flexibility for users who wish to keep their workflows more simple
 and pythonic.
+Using the Taskflow API with Docker or Virtual Environments
+----------------------------------------------------------
+
+As of Airflow <Airflow version>, you will have the ability to use the Taskflow API with either a
+docker container or python virtual environment. This added functionality will allow a much more

Review comment:
       ```suggestion
   Docker container or Python virtual environment. This added functionality will allow a much more
   ```

##########
File path: docs/apache-airflow/tutorial_taskflow_api.rst
##########
@@ -180,16 +180,155 @@ and even a different python version to run your function.
 
 This option should allow for far greater flexibility for users who wish to keep their workflows more simple
 and pythonic.
+Using the Taskflow API with Docker or Virtual Environments
+----------------------------------------------------------
+
+As of Airflow <Airflow version>, you will have the ability to use the Taskflow API with either a
+docker container or python virtual environment. This added functionality will allow a much more
+comprehensive range of use-cases for the Taskflow API, as you will not be limited to the
+packages and system libraries of the Airflow worker.
+
+To use a docker image with the Taskflow API, change the decorator to ``@task.docker``
+and add any needed arguments to correctly run the task. Please note that the docker
+image must have a python library and take in a bash command as the ``command`` argument.
+
+Below is an example of using the ``@task.docker`` decorator to run a python task.
+
+.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py
+    :language: python
+    :dedent: 4
+    :start-after: [START transform_docker]
+    :end-before: [END transform_docker]
+
+If you don't want to run your image on a docker environment, and instead want to create a separate virtual
+environment on the same machine, you can use the ``@task.virtualenv`` decorator instead. The ``@task.virtualenv``
+decorator will allow you to create a new virtualenv with custom libraries and even a different
+python version to run your function.
+
+.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py
+    :language: python
+    :dedent: 4
+    :start-after: [START extract_virtualenv]
+    :end-before: [END extract_virtualenv]
+
+These two options should allow for far greater flexibility for users who wish to keep their workflows more simple
+and pythonic.

Review comment:
       Please add some info here about limitations you might need to place on input or output? (Size limits or output types?)

##########
File path: docs/apache-airflow/tutorial_taskflow_api.rst
##########
@@ -180,16 +180,155 @@ and even a different python version to run your function.
 
 This option should allow for far greater flexibility for users who wish to keep their workflows more simple
 and pythonic.
+Using the Taskflow API with Docker or Virtual Environments
+----------------------------------------------------------
+
+As of Airflow <Airflow version>, you will have the ability to use the Taskflow API with either a
+docker container or python virtual environment. This added functionality will allow a much more
+comprehensive range of use-cases for the Taskflow API, as you will not be limited to the
+packages and system libraries of the Airflow worker.
+
+To use a docker image with the Taskflow API, change the decorator to ``@task.docker``
+and add any needed arguments to correctly run the task. Please note that the docker
+image must have a python library and take in a bash command as the ``command`` argument.

Review comment:
       ```suggestion
   image must have a working Python installed and take in a bash command as the ``command`` argument.
   ```

##########
File path: docs/apache-airflow/tutorial_taskflow_api.rst
##########
@@ -180,16 +180,155 @@ and even a different python version to run your function.
 
 This option should allow for far greater flexibility for users who wish to keep their workflows more simple
 and pythonic.
+Using the Taskflow API with Docker or Virtual Environments
+----------------------------------------------------------
+
+As of Airflow <Airflow version>, you will have the ability to use the Taskflow API with either a
+docker container or python virtual environment. This added functionality will allow a much more
+comprehensive range of use-cases for the Taskflow API, as you will not be limited to the
+packages and system libraries of the Airflow worker.
+
+To use a docker image with the Taskflow API, change the decorator to ``@task.docker``
+and add any needed arguments to correctly run the task. Please note that the docker
+image must have a python library and take in a bash command as the ``command`` argument.
+
+Below is an example of using the ``@task.docker`` decorator to run a python task.
+
+.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py
+    :language: python
+    :dedent: 4
+    :start-after: [START transform_docker]
+    :end-before: [END transform_docker]
+
+If you don't want to run your image on a docker environment, and instead want to create a separate virtual
+environment on the same machine, you can use the ``@task.virtualenv`` decorator instead. The ``@task.virtualenv``
+decorator will allow you to create a new virtualenv with custom libraries and even a different
+python version to run your function.

Review comment:
       ```suggestion
   Python version to run your function.
   ```

##########
File path: docs/apache-airflow/tutorial_taskflow_api.rst
##########
@@ -180,16 +180,155 @@ and even a different python version to run your function.
 
 This option should allow for far greater flexibility for users who wish to keep their workflows more simple
 and pythonic.
+Using the Taskflow API with Docker or Virtual Environments
+----------------------------------------------------------
+
+As of Airflow <Airflow version>, you will have the ability to use the Taskflow API with either a
+docker container or python virtual environment. This added functionality will allow a much more
+comprehensive range of use-cases for the Taskflow API, as you will not be limited to the
+packages and system libraries of the Airflow worker.
+
+To use a docker image with the Taskflow API, change the decorator to ``@task.docker``
+and add any needed arguments to correctly run the task. Please note that the docker
+image must have a python library and take in a bash command as the ``command`` argument.
+
+Below is an example of using the ``@task.docker`` decorator to run a python task.
+
+.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py
+    :language: python
+    :dedent: 4
+    :start-after: [START transform_docker]
+    :end-before: [END transform_docker]
+
+If you don't want to run your image on a docker environment, and instead want to create a separate virtual
+environment on the same machine, you can use the ``@task.virtualenv`` decorator instead. The ``@task.virtualenv``
+decorator will allow you to create a new virtualenv with custom libraries and even a different
+python version to run your function.
+
+.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py
+    :language: python
+    :dedent: 4
+    :start-after: [START extract_virtualenv]
+    :end-before: [END extract_virtualenv]
+
+These two options should allow for far greater flexibility for users who wish to keep their workflows more simple
+and pythonic.
+
+Creating Custom TaskFlow Decorators

Review comment:
       This section should be in a separate doc in howto I hink




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r706998932



##########
File path: airflow/decorators/__init__.py
##########
@@ -15,34 +15,58 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Callable, Optional
-
-from airflow.decorators.python import python_task
-from airflow.decorators.python_virtualenv import _virtualenv_task
+from airflow.decorators.python import PythonDecoratorMixin
+from airflow.decorators.python_virtualenv import PythonVirtualenvDecoratorMixin
 from airflow.decorators.task_group import task_group  # noqa
 from airflow.exceptions import AirflowException
 from airflow.models.dag import dag  # noqa
 from airflow.providers_manager import ProvidersManager
 
+# [START import_docker]
+try:
+    from airflow.providers.docker.decorators.docker import DockerDecoratorMixin
+except ImportError:
+    DockerDecoratorMixin = None
+# [END import_docker]
+
 
 class _TaskDecorator:

Review comment:
       @ashb I don't think you can import from None
   
   ```
   Python 3.7.8 (default, Oct  8 2020, 09:28:20)
   [Clang 12.0.0 (clang-1200.0.32.2)] on darwin
   Type "help", "copyright", "credits" or "license" for more information.
   >>> class Foo(None):
   ...     def bar():
   ...             pass
   ...
   Traceback (most recent call last):
     File "<stdin>", line 1, in <module>
   TypeError: NoneType takes no arguments
   >>>
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r677956474



##########
File path: airflow/decorators/__init__.py
##########
@@ -139,5 +139,169 @@ def virtualenv(
             **kwargs,
         )
 
+    @staticmethod
+    def docker(  # pylint: disable=too-many-arguments,too-many-locals
+        python_callable: Optional[Callable] = None,

Review comment:
       I wonder if `functools.update_wrapper()` or `functools.wraps()` would work here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r698946198



##########
File path: airflow/providers/docker/decorators/docker.py
##########
@@ -0,0 +1,459 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+import inspect
+import os
+import pickle
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Callable, Dict, Iterable, List, Optional, TypeVar, Union
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script
+
+
+def _generate_decode_command(env_var, file):
+    return (
+        f'python -c "import os; import base64;'
+        f' x = base64.b64decode(os.environ[\\"{env_var}\\"]);'
+        f' f = open(\\"{file}\\", \\"wb\\"); f.write(x);'
+        f' f.close()"'
+    )
+
+
+def _b64_encode_file(filename):
+    with open(filename, "rb") as file_to_encode:
+        return base64.b64encode(file_to_encode.read())
+
+
+class _DockerDecoratedOperator(DecoratedOperator, DockerOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        **kwargs,
+    ) -> None:
+        self.string_args = [1, 2, 1]
+        self._output_filename = ""
+        command = "dummy command"
+        self.pickling_library = pickle

Review comment:
       I'm going to do pickle and dill for now as those are the two options that our virtualenv operator supports. Not sure about what others there are.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r698946198



##########
File path: airflow/providers/docker/decorators/docker.py
##########
@@ -0,0 +1,459 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+import inspect
+import os
+import pickle
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Callable, Dict, Iterable, List, Optional, TypeVar, Union
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script
+
+
+def _generate_decode_command(env_var, file):
+    return (
+        f'python -c "import os; import base64;'
+        f' x = base64.b64decode(os.environ[\\"{env_var}\\"]);'
+        f' f = open(\\"{file}\\", \\"wb\\"); f.write(x);'
+        f' f.close()"'
+    )
+
+
+def _b64_encode_file(filename):
+    with open(filename, "rb") as file_to_encode:
+        return base64.b64encode(file_to_encode.read())
+
+
+class _DockerDecoratedOperator(DecoratedOperator, DockerOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        **kwargs,
+    ) -> None:
+        self.string_args = [1, 2, 1]
+        self._output_filename = ""
+        command = "dummy command"
+        self.pickling_library = pickle

Review comment:
       I'm going to do pickle and dill for now as those are the two options that our virtualenv operator supports. Not sure about what others there are.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r688773634



##########
File path: airflow/decorators/__init__.pyi
##########
@@ -0,0 +1,105 @@
+from typing import Optional, Callable, Iterable, Dict, Union, List
+
+try:
+    from airflow.providers.docker.decorators.docker import DockerDecoratorMixin
+except ImportError:
+    DockerDecoratorMixin = None
+
+class _TaskDecorator:
+    def __call__(self, *args, **kwargs):
+        try:
+            from airflow.providers.docker.decorators import docker
+
+        except ImportError:
+            pass
+    def python(

Review comment:
       @ashb and I spoke it over and for now we're just going to keep python and virtualenv in the taskdecorator class. It doesn't change anything functionally and unlike external providers, python and virtualenv will ONLY update with updates to Airflow.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r694154586



##########
File path: airflow/providers_manager.py
##########
@@ -278,6 +309,51 @@ def _get_attr(obj: Any, attr_name: str):
             return None
         return getattr(obj, attr_name)
 
+    def _add_taskflow_decorator(
+        self, decorator_name, decorator_class_name: str, provider_package: str
+    ) -> None:
+        if provider_package.startswith("apache-airflow"):
+            provider_path = provider_package[len("apache-") :].replace("-", ".")
+            if not decorator_class_name.startswith(provider_path):
+                log.warning(
+                    "Sanity check failed when importing '%s' from '%s' package. It should start with '%s'",
+                    decorator_class_name,
+                    provider_package,
+                    provider_path,
+                )
+                return
+        if decorator_name in self._taskflow_decorator_dict:
+            log.warning(
+                "The hook_class '%s' has been already registered.",
+                decorator_class_name,
+            )
+            return
+        try:
+            module, class_name = decorator_class_name.rsplit('.', maxsplit=1)
+            decorator_class = getattr(importlib.import_module(module), class_name)
+            self._taskflow_decorator_dict[decorator_name] = decorator_class

Review comment:
       @ashb so is this something that should be addressed in this PR or another? As it stands there won't be many (if any) decorators to import, so maybe we can defer?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r690240731



##########
File path: airflow/providers/docker/decorators/docker.py
##########
@@ -0,0 +1,459 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+import inspect
+import os
+import pickle
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Callable, Dict, Iterable, List, Optional, TypeVar, Union
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script
+
+
+def _generate_decode_command(env_var, file):
+    return (
+        f'python -c "import os; import base64;'
+        f' x = base64.b64decode(os.environ[\\"{env_var}\\"]);'
+        f' f = open(\\"{file}\\", \\"wb\\"); f.write(x);'
+        f' f.close()"'
+    )
+
+
+def _b64_encode_file(filename):
+    with open(filename, "rb") as file_to_encode:
+        return base64.b64encode(file_to_encode.read())
+
+
+class _DockerDecoratedOperator(DecoratedOperator, DockerOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}

Review comment:
       Given this operates on a function, I don't think we want to render files here.
   
   ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r614327894



##########
File path: airflow/decorators/docker.py
##########
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+import inspect
+import os
+import pickle
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Callable, Dict, Optional, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script
+
+
+def _generate_decode_command(env_var, file):
+    return (
+        f'python -c "import os; import base64;'
+        f' x = base64.b64decode(os.environ[\\"{env_var}\\"]);'
+        f' f = open(\\"{file}\\", \\"wb\\"); f.write(x);'
+        f' f.close()"'
+    )
+
+
+def _b64_encode_file(filename):
+    data = open(filename, "rb").read()
+    encoded = base64.b64encode(data)
+    return encoded
+
+
+class _DockerDecoratedOperator(DecoratedOperator, DockerOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        **kwargs,
+    ) -> None:
+        self.string_args = [1, 2, 1]
+        self._output_filename = ""
+        command = "dummy command"
+        self.pickling_library = pickle
+        super().__init__(
+            command=command, retrieve_output=True, retrieve_output_path="/tmp/script.out", **kwargs
+        )
+
+    def execute(self, context: Dict):
+        with TemporaryDirectory(prefix='venv') as tmp_dir:
+            input_filename = os.path.join(tmp_dir, 'script.in')
+            self._output_filename = os.path.join(tmp_dir, 'script.out')
+            string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
+            script_filename = os.path.join(tmp_dir, 'script.py')
+            self._write_args(input_filename)
+            self._write_string_args(string_args_filename)
+            py_source = self._get_python_source()
+            write_python_script(
+                jinja_context=dict(
+                    op_args=self.op_args,
+                    op_kwargs=self.op_kwargs,
+                    pickling_library=self.pickling_library.__name__,
+                    python_callable=self.python_callable.__name__,
+                    python_callable_source=py_source,
+                ),
+                filename=script_filename,
+            )
+
+            self.environment["PYTHON_SCRIPT"] = _b64_encode_file(script_filename)
+            if self.op_args or self.op_kwargs:
+                self.environment["PYTHON_INPUT"] = _b64_encode_file(input_filename)
+            else:
+                self.environment["PYTHON_INPUT"] = ""
+
+            self.command = (
+                f'bash -cx \'{_generate_decode_command("PYTHON_SCRIPT", "/tmp/script.py")} &&'
+                f'touch /tmp/string_args &&'
+                f'touch /tmp/script.in &&'
+                f'{_generate_decode_command("PYTHON_INPUT", "/tmp/script.in")} &&'
+                f'python /tmp/script.py /tmp/script.in /tmp/script.out /tmp/string_args \''
+            )
+            return super().execute(context)
+
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
+
+    def _write_string_args(self, filename):
+        with open(filename, 'w') as file:
+            file.write('\n'.join(map(str, self.string_args)))

Review comment:
       Two things here:
   
   1. This is duplicating code from VenvOperator, we should fix a way to not need that (make a mixin perhaps?)
   2. The whole `self.string_args` is a misfeature of Venv we shouldn't copy over anyway.

##########
File path: airflow/decorators/docker.py
##########
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+import inspect
+import os
+import pickle
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Callable, Dict, Optional, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script
+
+
+def _generate_decode_command(env_var, file):
+    return (
+        f'python -c "import os; import base64;'
+        f' x = base64.b64decode(os.environ[\\"{env_var}\\"]);'
+        f' f = open(\\"{file}\\", \\"wb\\"); f.write(x);'
+        f' f.close()"'
+    )

Review comment:
       ```suggestion
       return shlex.quote(
           f'python -c "import os; import base64;'
           f' x = base64.b64decode(os.environ["{env_var}"]);'
           f' f = open("{file}", "wb"); f.write(x);'
           f' f.close()"'
       )
   ```

##########
File path: airflow/decorators/docker.py
##########
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+import inspect
+import os
+import pickle
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Callable, Dict, Optional, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script
+
+
+def _generate_decode_command(env_var, file):
+    return (
+        f'python -c "import os; import base64;'
+        f' x = base64.b64decode(os.environ[\\"{env_var}\\"]);'
+        f' f = open(\\"{file}\\", \\"wb\\"); f.write(x);'
+        f' f.close()"'
+    )
+
+
+def _b64_encode_file(filename):
+    data = open(filename, "rb").read()
+    encoded = base64.b64encode(data)
+    return encoded

Review comment:
       "Leaking" a FH here (not strictly, as it'll be closed when GC'd)
   
   ```suggestion
       with open(filename, "rb") as fh:
           return  base64.b64encode(fh.read())
   ```

##########
File path: airflow/decorators/docker.py
##########
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+import inspect
+import os
+import pickle
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Callable, Dict, Optional, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script
+
+
+def _generate_decode_command(env_var, file):
+    return (
+        f'python -c "import os; import base64;'
+        f' x = base64.b64decode(os.environ[\\"{env_var}\\"]);'
+        f' f = open(\\"{file}\\", \\"wb\\"); f.write(x);'
+        f' f.close()"'
+    )
+
+
+def _b64_encode_file(filename):
+    data = open(filename, "rb").read()
+    encoded = base64.b64encode(data)
+    return encoded
+
+
+class _DockerDecoratedOperator(DecoratedOperator, DockerOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        **kwargs,
+    ) -> None:
+        self.string_args = [1, 2, 1]
+        self._output_filename = ""
+        command = "dummy command"
+        self.pickling_library = pickle
+        super().__init__(
+            command=command, retrieve_output=True, retrieve_output_path="/tmp/script.out", **kwargs
+        )
+
+    def execute(self, context: Dict):
+        with TemporaryDirectory(prefix='venv') as tmp_dir:
+            input_filename = os.path.join(tmp_dir, 'script.in')
+            self._output_filename = os.path.join(tmp_dir, 'script.out')
+            string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
+            script_filename = os.path.join(tmp_dir, 'script.py')
+            self._write_args(input_filename)
+            self._write_string_args(string_args_filename)
+            py_source = self._get_python_source()
+            write_python_script(
+                jinja_context=dict(
+                    op_args=self.op_args,
+                    op_kwargs=self.op_kwargs,
+                    pickling_library=self.pickling_library.__name__,
+                    python_callable=self.python_callable.__name__,
+                    python_callable_source=py_source,
+                ),
+                filename=script_filename,
+            )
+
+            self.environment["PYTHON_SCRIPT"] = _b64_encode_file(script_filename)
+            if self.op_args or self.op_kwargs:
+                self.environment["PYTHON_INPUT"] = _b64_encode_file(input_filename)
+            else:
+                self.environment["PYTHON_INPUT"] = ""
+
+            self.command = (
+                f'bash -cx \'{_generate_decode_command("PYTHON_SCRIPT", "/tmp/script.py")} &&'
+                f'touch /tmp/string_args &&'
+                f'touch /tmp/script.in &&'
+                f'{_generate_decode_command("PYTHON_INPUT", "/tmp/script.in")} &&'
+                f'python /tmp/script.py /tmp/script.in /tmp/script.out /tmp/string_args \''

Review comment:
       Why bash _and_ python -- why not just do this all in the one generated python command?

##########
File path: airflow/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# pylint: disable=missing-function-docstring
+
+# [START tutorial]
+# [START import_module]
+import json
+
+from airflow.decorators import dag, task
+from airflow.utils.dates import days_ago
+
+# [END import_module]
+
+# [START default_args]
+# These args will get passed on to each operator
+# You can override them on a per-task basis during operator initialization
+default_args = {
+    'owner': 'airflow',
+}
+# [END default_args]
+
+
+# [START instantiate_dag]
+@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
+def tutorial_taskflow_api_etl():
+    """
+    ### TaskFlow API Tutorial Documentation
+    This is a simple ETL data pipeline example which demonstrates the use of
+    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
+    Documentation that goes along with the Airflow TaskFlow API tutorial is
+    located
+    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
+    """
+    # [END instantiate_dag]
+
+    # [START extract_virtualenv]
+    @task.virtualenv(
+        use_dill=True,
+        system_site_packages=False,
+        requirements=['funcsigs'],
+    )
+    def extract():
+        """
+        #### Extract task
+        A simple Extract task to get data ready for the rest of the data
+        pipeline. In this case, getting data is simulated by reading from a
+        hardcoded JSON string.
+        """
+        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
+
+        order_data_dict = json.loads(data_string)
+        return order_data_dict
+
+    # [END extract_virtualenv]
+
+    # [START transform_docker]
+    @task.docker(
+        image="python:3.8.8-slim-buster",
+        force_pull=True,
+        docker_url="unix://var/run/docker.sock",
+        network_mode="bridge",
+        api_version='auto',

Review comment:
       ```suggestion
   ```
   
   Those are all the defaults, so we shouldn't need to include them in the example.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r690364029



##########
File path: airflow/providers_manager.py
##########
@@ -278,6 +309,51 @@ def _get_attr(obj: Any, attr_name: str):
             return None
         return getattr(obj, attr_name)
 
+    def _add_taskflow_decorator(
+        self, decorator_name, decorator_class_name: str, provider_package: str
+    ) -> None:
+        if provider_package.startswith("apache-airflow"):
+            provider_path = provider_package[len("apache-") :].replace("-", ".")
+            if not decorator_class_name.startswith(provider_path):
+                log.warning(
+                    "Sanity check failed when importing '%s' from '%s' package. It should start with '%s'",
+                    decorator_class_name,
+                    provider_package,
+                    provider_path,
+                )
+                return
+        if decorator_name in self._taskflow_decorator_dict:
+            log.warning(
+                "The hook_class '%s' has been already registered.",
+                decorator_class_name,
+            )
+            return
+        try:
+            module, class_name = decorator_class_name.rsplit('.', maxsplit=1)
+            decorator_class = getattr(importlib.import_module(module), class_name)
+            self._taskflow_decorator_dict[decorator_name] = decorator_class

Review comment:
       As discussed before - I will make another change/refactor to the "providers_manager" to accomplish that in "generic" way




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r690245914



##########
File path: airflow/providers_manager.py
##########
@@ -278,6 +309,51 @@ def _get_attr(obj: Any, attr_name: str):
             return None
         return getattr(obj, attr_name)
 
+    def _add_taskflow_decorator(
+        self, decorator_name, decorator_class_name: str, provider_package: str
+    ) -> None:
+        if provider_package.startswith("apache-airflow"):
+            provider_path = provider_package[len("apache-") :].replace("-", ".")
+            if not decorator_class_name.startswith(provider_path):
+                log.warning(
+                    "Sanity check failed when importing '%s' from '%s' package. It should start with '%s'",
+                    decorator_class_name,
+                    provider_package,
+                    provider_path,
+                )
+                return
+        if decorator_name in self._taskflow_decorator_dict:
+            log.warning(
+                "The hook_class '%s' has been already registered.",
+                decorator_class_name,

Review comment:
       ```suggestion
                   "The taskflow decoraotr '%s' has been already registered (by %s).",
                   decorator_name, self._taskflow_decorator_dict[decorator_name].__name__
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#issuecomment-820695864


   [The Workflow run](https://github.com/apache/airflow/actions/runs/753278990) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r712206077



##########
File path: airflow/decorators/__init__.py
##########
@@ -15,129 +15,36 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Callable, Dict, Iterable, List, Optional, Union
+from typing import TYPE_CHECKING
 
-from airflow.decorators.python import python_task
-from airflow.decorators.python_virtualenv import _virtualenv_task
+from airflow.decorators.python import PythonDecoratorMixin, python_task  # noqa
+from airflow.decorators.python_virtualenv import PythonVirtualenvDecoratorMixin
 from airflow.decorators.task_group import task_group  # noqa
 from airflow.models.dag import dag  # noqa
+from airflow.providers_manager import ProvidersManager
 
 
-class _TaskDecorator:
-    def __call__(
-        self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
-    ):
-        """
-        Python operator decorator. Wraps a function into an Airflow operator.
-        Accepts kwargs for operator kwarg. This decorator can be reused in a single DAG.
+class _TaskDecorator(PythonDecoratorMixin, PythonVirtualenvDecoratorMixin):
+    def __getattr__(self, name):
+        if name.startswith("__"):
+            raise AttributeError(f'{type(self).__name__} has not attriubte {name!r}')

Review comment:
       What a mishmash of typing there. Shesh Ash πŸ™‚ 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #15330:
URL: https://github.com/apache/airflow/pull/15330


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#issuecomment-818305808


   [The Workflow run](https://github.com/apache/airflow/actions/runs/742835368) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r688946055



##########
File path: airflow/provider.yaml.schema.json
##########
@@ -210,7 +210,20 @@
     "additional-extras": {
       "type": "object",
       "description": "Additional extras that the provider should have"
+    },
+    "task-decorators": {
+        "type": "array",
+        "description": "Apply custom decorators to the TaskFlow API. Can be accessed by users via '@task.<name>'",

Review comment:
       ```suggestion
           "description": "Decorators to use with the TaskFlow API. Can be accessed by users via '@task.<name>'",
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r708162445



##########
File path: airflow/decorators/__init__.py
##########
@@ -15,34 +15,58 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Callable, Optional
-
-from airflow.decorators.python import python_task
-from airflow.decorators.python_virtualenv import _virtualenv_task
+from airflow.decorators.python import PythonDecoratorMixin
+from airflow.decorators.python_virtualenv import PythonVirtualenvDecoratorMixin
 from airflow.decorators.task_group import task_group  # noqa
 from airflow.exceptions import AirflowException
 from airflow.models.dag import dag  # noqa
 from airflow.providers_manager import ProvidersManager
 
+# [START import_docker]
+try:
+    from airflow.providers.docker.decorators.docker import DockerDecoratorMixin
+except ImportError:
+    DockerDecoratorMixin = None
+# [END import_docker]
+
 
 class _TaskDecorator:

Review comment:
       Typo, I meant to say
   
   ```python
   class _TaskDecorator(PythonDecoratorMixin, PythonVirtualenvDecoratorMixin)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r708265092



##########
File path: airflow/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py
##########
@@ -0,0 +1,117 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# pylint: disable=missing-function-docstring
+
+# [START tutorial]
+# [START import_module]
+import json
+
+from airflow.decorators import dag, task
+from airflow.utils.dates import days_ago
+
+# [END import_module]
+
+# [START default_args]
+# These args will get passed on to each operator
+# You can override them on a per-task basis during operator initialization
+default_args = {
+    'owner': 'airflow',
+}
+# [END default_args]
+
+
+# [START instantiate_dag]
+@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
+def tutorial_taskflow_api_etl():
+    """
+    ### TaskFlow API Tutorial Documentation
+    This is a simple ETL data pipeline example which demonstrates the use of
+    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
+    Documentation that goes along with the Airflow TaskFlow API tutorial is
+    located
+    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
+    """
+    # [END instantiate_dag]
+
+    # [START extract_virtualenv]
+    @task.virtualenv(
+        use_dill=True,
+        system_site_packages=False,
+        requirements=['funcsigs'],

Review comment:
       @dimberman Is `funcsigs` needed for this PR to work, or is this just meant to be an example?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r685647871



##########
File path: airflow/decorators/docker.py
##########
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+import inspect
+import os
+import pickle
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Callable, Dict, Optional, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script
+
+
+def _generate_decode_command(env_var, file):
+    return (
+        f'python -c "import os; import base64;'
+        f' x = base64.b64decode(os.environ[\\"{env_var}\\"]);'
+        f' f = open(\\"{file}\\", \\"wb\\"); f.write(x);'
+        f' f.close()"'
+    )

Review comment:
       I had to undo this change as it broke a bunch of tests and I couldn't figure out how to get it to work. Glad to discuss if you have ideas.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#issuecomment-820750969


   [The Workflow run](https://github.com/apache/airflow/actions/runs/753451831) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r614349308



##########
File path: airflow/decorators/docker.py
##########
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+import inspect
+import os
+import pickle
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Callable, Dict, Optional, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator

Review comment:
       We talked about making the "sub-decorators' (i.e. `@task.x`) pluggable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r708282079



##########
File path: docs/apache-airflow/howto/create-custom-decorator.rst
##########
@@ -0,0 +1,118 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Creating Custom TaskFlow Decorators
+===================================
+
+As of Airflow 2.2, users can now integrate custom decorators into their provider packages and have those decorators
+appear natively as part of the ``@task.____`` design.
+
+For an example. Let's say you were trying to create a "foo" decorator. To create ``@task.foo``, follow the following
+steps:
+
+1. Create a ``FooDecoratedOperator``
+
+In this case, we are assuming that you have a ``FooOperator`` that takes a python function as an argument.
+By creating a ``FooDecoratedOperator`` that inherits from ``FooOperator`` and
+``airflow.decorators.base.DecoratedOperator``, Airflow will supply much of the needed functionality required to treat
+your new class as a taskflow native class.
+
+2. Create a ``foo_task`` function
+
+Once you have your decorated class, create a function that takes arguments ``python_callable``\, ``multiple_outputs``\,
+and ``kwargs``\. This function will use the ``airflow.decorators.base.task_decorator_factory`` function to convert

Review comment:
       ```suggestion
   Once you have your decorated class, create a function that takes arguments ``python_callable``, ``multiple_outputs``,
   and ``kwargs``. This function will use the ``airflow.decorators.base.task_decorator_factory`` function to convert
   ```
   
   I don't think the `\` does anything here, does it?

##########
File path: docs/apache-airflow/howto/create-custom-decorator.rst
##########
@@ -0,0 +1,118 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Creating Custom TaskFlow Decorators
+===================================
+
+As of Airflow 2.2, users can now integrate custom decorators into their provider packages and have those decorators
+appear natively as part of the ``@task.____`` design.
+
+For an example. Let's say you were trying to create a "foo" decorator. To create ``@task.foo``, follow the following
+steps:
+
+1. Create a ``FooDecoratedOperator``
+
+In this case, we are assuming that you have a ``FooOperator`` that takes a python function as an argument.
+By creating a ``FooDecoratedOperator`` that inherits from ``FooOperator`` and
+``airflow.decorators.base.DecoratedOperator``, Airflow will supply much of the needed functionality required to treat
+your new class as a taskflow native class.
+
+2. Create a ``foo_task`` function
+
+Once you have your decorated class, create a function that takes arguments ``python_callable``\, ``multiple_outputs``\,
+and ``kwargs``\. This function will use the ``airflow.decorators.base.task_decorator_factory`` function to convert
+the new ``FooDecoratedOperator`` into a TaskFlow function decorator!
+
+.. code-block:: python
+
+   def foo_task(
+       python_callable: Optional[Callable] = None,
+       multiple_outputs: Optional[bool] = None,
+       **kwargs
+   ):
+       return task_decorator_factory(
+           python_callable=python_callable,
+           multiple_outputs=multiple_outputs,
+           decorated_operator_class=FooDecoratedOperator,
+           **kwargs,
+       )
+
+3. Register your new decorator in the provider.yaml of your provider
+
+Finally, add a key-value of ``decorator-name``:``path-to-function`` to your provider.yaml. When Airflow starts, the
+``ProviderManager`` class will automatically import this value and ``task.decorator-name`` will work as a new
+decorator!
+
+.. code-block:: yaml
+
+   package-name: apache-airflow-providers-docker
+   name: Docker
+   description: |
+       `Docker <https://docs.docker.com/install/>`__
+
+   task-decorators:
+       docker: airflow.providers.docker.operators.docker.docker_decorator
+
+
+4. (Optional) Create a Mixin class so that your decorator will show up in your IDE's autocomplete
+
+For better or worse, Python IDEs can not autocomplete dynamically
+generated methods (see `here <https://intellij-support.jetbrains.com/hc/en-us/community/posts/115000665110-auto-completion-for-dynamic-module-attributes-in-python>`_).
+
+To get around this, we had to find a solution that was "best possible." IDEs will only allow typing
+through stub files, but we wanted to avoid any situation where a user would update their provider and the autocomplete
+would be out of sync with the provider's actual parameters.
+
+To hack around this problem, we found that you could extend the ``_TaskDecorator`` class in the ``__init__.pyi`` file

Review comment:
       We don't do it in the `pyi` file anymore.

##########
File path: docs/apache-airflow/howto/create-custom-decorator.rst
##########
@@ -0,0 +1,118 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Creating Custom TaskFlow Decorators
+===================================
+
+As of Airflow 2.2, users can now integrate custom decorators into their provider packages and have those decorators
+appear natively as part of the ``@task.____`` design.
+
+For an example. Let's say you were trying to create a "foo" decorator. To create ``@task.foo``, follow the following
+steps:
+
+1. Create a ``FooDecoratedOperator``
+
+In this case, we are assuming that you have a ``FooOperator`` that takes a python function as an argument.
+By creating a ``FooDecoratedOperator`` that inherits from ``FooOperator`` and
+``airflow.decorators.base.DecoratedOperator``, Airflow will supply much of the needed functionality required to treat
+your new class as a taskflow native class.
+
+2. Create a ``foo_task`` function
+
+Once you have your decorated class, create a function that takes arguments ``python_callable``\, ``multiple_outputs``\,
+and ``kwargs``\. This function will use the ``airflow.decorators.base.task_decorator_factory`` function to convert
+the new ``FooDecoratedOperator`` into a TaskFlow function decorator!
+
+.. code-block:: python
+
+   def foo_task(
+       python_callable: Optional[Callable] = None,
+       multiple_outputs: Optional[bool] = None,
+       **kwargs
+   ):
+       return task_decorator_factory(
+           python_callable=python_callable,
+           multiple_outputs=multiple_outputs,
+           decorated_operator_class=FooDecoratedOperator,
+           **kwargs,
+       )
+
+3. Register your new decorator in the provider.yaml of your provider
+
+Finally, add a key-value of ``decorator-name``:``path-to-function`` to your provider.yaml. When Airflow starts, the
+``ProviderManager`` class will automatically import this value and ``task.decorator-name`` will work as a new
+decorator!

Review comment:
       Please mention that the key here must be a valid python identifier (and change your example so it is) -- cos `@task.decorator-name` isn't going to work πŸ™‚ 

##########
File path: docs/apache-airflow/howto/create-custom-decorator.rst
##########
@@ -0,0 +1,118 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Creating Custom TaskFlow Decorators
+===================================
+
+As of Airflow 2.2, users can now integrate custom decorators into their provider packages and have those decorators
+appear natively as part of the ``@task.____`` design.
+
+For an example. Let's say you were trying to create a "foo" decorator. To create ``@task.foo``, follow the following
+steps:
+
+1. Create a ``FooDecoratedOperator``
+
+In this case, we are assuming that you have a ``FooOperator`` that takes a python function as an argument.
+By creating a ``FooDecoratedOperator`` that inherits from ``FooOperator`` and
+``airflow.decorators.base.DecoratedOperator``, Airflow will supply much of the needed functionality required to treat
+your new class as a taskflow native class.
+
+2. Create a ``foo_task`` function
+
+Once you have your decorated class, create a function that takes arguments ``python_callable``\, ``multiple_outputs``\,
+and ``kwargs``\. This function will use the ``airflow.decorators.base.task_decorator_factory`` function to convert
+the new ``FooDecoratedOperator`` into a TaskFlow function decorator!
+
+.. code-block:: python
+
+   def foo_task(
+       python_callable: Optional[Callable] = None,
+       multiple_outputs: Optional[bool] = None,
+       **kwargs
+   ):
+       return task_decorator_factory(
+           python_callable=python_callable,
+           multiple_outputs=multiple_outputs,
+           decorated_operator_class=FooDecoratedOperator,
+           **kwargs,
+       )
+
+3. Register your new decorator in the provider.yaml of your provider
+
+Finally, add a key-value of ``decorator-name``:``path-to-function`` to your provider.yaml. When Airflow starts, the
+``ProviderManager`` class will automatically import this value and ``task.decorator-name`` will work as a new
+decorator!
+
+.. code-block:: yaml
+
+   package-name: apache-airflow-providers-docker
+   name: Docker
+   description: |
+       `Docker <https://docs.docker.com/install/>`__
+
+   task-decorators:
+       docker: airflow.providers.docker.operators.docker.docker_decorator
+
+
+4. (Optional) Create a Mixin class so that your decorator will show up in your IDE's autocomplete
+
+For better or worse, Python IDEs can not autocomplete dynamically
+generated methods (see `here <https://intellij-support.jetbrains.com/hc/en-us/community/posts/115000665110-auto-completion-for-dynamic-module-attributes-in-python>`_).
+
+To get around this, we had to find a solution that was "best possible." IDEs will only allow typing
+through stub files, but we wanted to avoid any situation where a user would update their provider and the autocomplete
+would be out of sync with the provider's actual parameters.
+
+To hack around this problem, we found that you could extend the ``_TaskDecorator`` class in the ``__init__.pyi`` file
+and the correct autocomplete will show up in the IDE.
+
+To correctly implement this, please take the following steps:
+
+Create a ``Mixin`` class for your decorator
+
+Mixin classes are classes in python that tell the python interpreter that python can import them at any time.
+Because they are not dependent on other classes, Mixin classes are great for multiple inheritance.
+
+In the DockerDecorator we created a Mixin class that looks like this
+
+.. exampleinclude:: ../howto/docker_decorator.py
+    :language: python
+    :start-after: [START decoratormixin]
+    :end-before: [END decoratormixin]

Review comment:
       Rather than including a whole copy of docker_decorator.py here (that will get out of date) can we instead point at the actual airflow/providers/docker/decorators/docker.py? 

##########
File path: docs/apache-airflow/howto/create-custom-decorator.rst
##########
@@ -0,0 +1,118 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Creating Custom TaskFlow Decorators
+===================================
+
+As of Airflow 2.2, users can now integrate custom decorators into their provider packages and have those decorators
+appear natively as part of the ``@task.____`` design.
+
+For an example. Let's say you were trying to create a "foo" decorator. To create ``@task.foo``, follow the following
+steps:
+
+1. Create a ``FooDecoratedOperator``
+
+In this case, we are assuming that you have a ``FooOperator`` that takes a python function as an argument.
+By creating a ``FooDecoratedOperator`` that inherits from ``FooOperator`` and
+``airflow.decorators.base.DecoratedOperator``, Airflow will supply much of the needed functionality required to treat
+your new class as a taskflow native class.
+
+2. Create a ``foo_task`` function
+
+Once you have your decorated class, create a function that takes arguments ``python_callable``\, ``multiple_outputs``\,
+and ``kwargs``\. This function will use the ``airflow.decorators.base.task_decorator_factory`` function to convert
+the new ``FooDecoratedOperator`` into a TaskFlow function decorator!
+
+.. code-block:: python
+
+   def foo_task(
+       python_callable: Optional[Callable] = None,
+       multiple_outputs: Optional[bool] = None,
+       **kwargs
+   ):
+       return task_decorator_factory(
+           python_callable=python_callable,
+           multiple_outputs=multiple_outputs,
+           decorated_operator_class=FooDecoratedOperator,
+           **kwargs,
+       )
+
+3. Register your new decorator in the provider.yaml of your provider
+
+Finally, add a key-value of ``decorator-name``:``path-to-function`` to your provider.yaml. When Airflow starts, the
+``ProviderManager`` class will automatically import this value and ``task.decorator-name`` will work as a new
+decorator!
+
+.. code-block:: yaml
+
+   package-name: apache-airflow-providers-docker
+   name: Docker
+   description: |
+       `Docker <https://docs.docker.com/install/>`__
+
+   task-decorators:
+       docker: airflow.providers.docker.operators.docker.docker_decorator

Review comment:
       Use the same "Foo" example here, not docker for consistency




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r700995980



##########
File path: airflow/decorators/__init__.py
##########
@@ -15,129 +15,58 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Callable, Dict, Iterable, List, Optional, Union
-
-from airflow.decorators.python import python_task
-from airflow.decorators.python_virtualenv import _virtualenv_task
+from airflow.decorators.python import PythonDecoratorMixin

Review comment:
       ```suggestion
   from airflow.decorators.python import PythonDecoratorMixin, python_task
   ```
   
   We probably have to keep this for back-compat reasons.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r690241648



##########
File path: airflow/providers/docker/decorators/docker.py
##########
@@ -0,0 +1,459 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+import inspect
+import os
+import pickle
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Callable, Dict, Iterable, List, Optional, TypeVar, Union
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script
+
+
+def _generate_decode_command(env_var, file):
+    return (
+        f'python -c "import os; import base64;'
+        f' x = base64.b64decode(os.environ[\\"{env_var}\\"]);'
+        f' f = open(\\"{file}\\", \\"wb\\"); f.write(x);'
+        f' f.close()"'
+    )
+
+
+def _b64_encode_file(filename):
+    with open(filename, "rb") as file_to_encode:
+        return base64.b64encode(file_to_encode.read())
+
+
+class _DockerDecoratedOperator(DecoratedOperator, DockerOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        **kwargs,
+    ) -> None:
+        self.string_args = [1, 2, 1]
+        self._output_filename = ""
+        command = "dummy command"
+        self.pickling_library = pickle

Review comment:
       Should support dill/anything else the same way VirtualEnv operator does please.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r688945812



##########
File path: airflow/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py
##########
@@ -0,0 +1,117 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# pylint: disable=missing-function-docstring
+
+# [START tutorial]
+# [START import_module]
+import json
+
+from airflow.decorators import dag, task
+from airflow.utils.dates import days_ago
+
+# [END import_module]
+
+# [START default_args]
+# These args will get passed on to each operator
+# You can override them on a per-task basis during operator initialization
+default_args = {
+    'owner': 'airflow',
+}
+# [END default_args]
+
+
+# [START instantiate_dag]
+@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
+def tutorial_taskflow_api_etl():
+    """
+    ### TaskFlow API Tutorial Documentation
+    This is a simple ETL data pipeline example which demonstrates the use of
+    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
+    Documentation that goes along with the Airflow TaskFlow API tutorial is
+    located
+    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
+    """
+    # [END instantiate_dag]
+
+    # [START extract_virtualenv]
+    @task.virtualenv(
+        use_dill=True,
+        system_site_packages=False,
+        requirements=['funcsigs'],

Review comment:
       Not strictly needed in this PR but a better example of adding a package here that is actually used in the function below would be nice and would be easier to understand from a use-case point of view too




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
dimberman commented on pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#issuecomment-817920556


   @xinbinhuang that's a great idea!
   
   re: kubernetes, once this gets merged the next step will be "@task.kubernetes" where a user can give a pod spec and launch it using the KPO :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r712243970



##########
File path: airflow/providers/docker/decorators/docker.py
##########
@@ -0,0 +1,298 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+import inspect
+import os
+import pickle
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Callable, Dict, Iterable, List, Optional, TypeVar, Union
+
+import dill
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator
+from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script
+
+
+def _generate_decode_command(env_var, file):
+    return (
+        f'python -c "import os; import base64;'
+        f' x = base64.b64decode(os.environ[\\"{env_var}\\"]);'
+        f' f = open(\\"{file}\\", \\"wb\\"); f.write(x);'
+        f' f.close()"'
+    )
+
+
+def _b64_encode_file(filename):
+    with open(filename, "rb") as file_to_encode:
+        return base64.b64encode(file_to_encode.read())
+
+
+class _DockerDecoratedOperator(DecoratedOperator, DockerOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    def __init__(
+        self,
+        use_dill=False,
+        **kwargs,
+    ) -> None:
+        self.string_args = [1, 2, 1]

Review comment:
       Vestigial code. Removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15330: Add a Docker Taskflow decorator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15330:
URL: https://github.com/apache/airflow/pull/15330#discussion_r685734272



##########
File path: airflow/decorators/__init__.py
##########
@@ -41,103 +45,12 @@ def __call__(
         """
         return self.python(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs)
 
-    @staticmethod
-    def python(python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs):
-        """
-        Python operator decorator. Wraps a function into an Airflow operator.
-        Accepts kwargs for operator kwarg. This decorator can be reused in a single DAG.
-
-        :param python_callable: Function to decorate
-        :type python_callable: Optional[Callable]
-        :param multiple_outputs: if set, function return value will be
-            unrolled to multiple XCom values. List/Tuples will unroll to xcom values
-            with index as key. Dict will unroll to xcom values with keys as XCom keys.
-            Defaults to False.
-        :type multiple_outputs: bool
-        """
-        return python_task(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs)
-
-    @staticmethod
-    def virtualenv(
-        python_callable: Optional[Callable] = None,
-        multiple_outputs: Optional[bool] = None,
-        requirements: Optional[Iterable[str]] = None,
-        python_version: Optional[Union[str, int, float]] = None,
-        use_dill: bool = False,
-        system_site_packages: bool = True,
-        string_args: Optional[Iterable[str]] = None,
-        templates_dict: Optional[Dict] = None,
-        templates_exts: Optional[List[str]] = None,
-        **kwargs,
-    ):
-        """
-        Allows one to run a function in a virtualenv that is
-        created and destroyed automatically (with certain caveats).
-
-        The function must be defined using def, and not be
-        part of a class. All imports must happen inside the function
-        and no variables outside of the scope may be referenced. A global scope
-        variable named virtualenv_string_args will be available (populated by
-        string_args). In addition, one can pass stuff through op_args and op_kwargs, and one
-        can use a return value.
-        Note that if your virtualenv runs in a different Python major version than Airflow,
-        you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to
-        Airflow through plugins. You can use string_args though.
-
-        .. seealso::
-            For more information on how to use this operator, take a look at the guide:
-            :ref:`howto/operator:PythonVirtualenvOperator`
-
-        :param python_callable: A python function with no references to outside variables,
-            defined with def, which will be run in a virtualenv
-        :type python_callable: function
-        :param multiple_outputs: if set, function return value will be
-            unrolled to multiple XCom values. List/Tuples will unroll to xcom values
-            with index as key. Dict will unroll to xcom values with keys as XCom keys.
-            Defaults to False.
-        :type multiple_outputs: bool
-        :param requirements: A list of requirements as specified in a pip install command
-        :type requirements: list[str]
-        :param python_version: The Python version to run the virtualenv with. Note that
-            both 2 and 2.7 are acceptable forms.
-        :type python_version: Optional[Union[str, int, float]]
-        :param use_dill: Whether to use dill to serialize
-            the args and result (pickle is default). This allow more complex types
-            but requires you to include dill in your requirements.
-        :type use_dill: bool
-        :param system_site_packages: Whether to include
-            system_site_packages in your virtualenv.
-            See virtualenv documentation for more information.
-        :type system_site_packages: bool
-        :param op_args: A list of positional arguments to pass to python_callable.
-        :type op_args: list
-        :param op_kwargs: A dict of keyword arguments to pass to python_callable.
-        :type op_kwargs: dict
-        :param string_args: Strings that are present in the global var virtualenv_string_args,
-            available to python_callable at runtime as a list[str]. Note that args are split
-            by newline.
-        :type string_args: list[str]
-        :param templates_dict: a dictionary where the values are templates that
-            will get templated by the Airflow engine sometime between
-            ``__init__`` and ``execute`` takes place and are made available
-            in your callable's context after the template has been applied
-        :type templates_dict: dict of str
-        :param templates_exts: a list of file extensions to resolve while
-            processing templated fields, for examples ``['.sql', '.hql']``
-        :type templates_exts: list[str]
-        """
-        return _virtualenv_task(
-            python_callable=python_callable,
-            multiple_outputs=multiple_outputs,
-            requirements=requirements,
-            python_version=python_version,
-            use_dill=use_dill,
-            system_site_packages=system_site_packages,
-            string_args=string_args,
-            templates_dict=templates_dict,
-            templates_exts=templates_exts,
-            **kwargs,
-        )
+    def __getattr__(self, name):
+        if self.store.get(name, None):
+            return self.store[name]
+        decorator = ProvidersManager().taskflow_decorators[name]
+        self.store[name] = decorator
+        return decorator

Review comment:
       Using `__getattr__` like this breaks IDE introspection (e.g. autocomplete) and type hints so it’d be best to avoid it as much as possible. Maybe we could use metaclass or some kind of descriptor to achieve this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org