You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/03/23 00:45:59 UTC

[GitHub] [airflow] jhtimmins commented on a change in pull request #14761: Add PythonVirtualenvDecorator and DockerDecorator to Taskflow API

jhtimmins commented on a change in pull request #14761:
URL: https://github.com/apache/airflow/pull/14761#discussion_r599088329



##########
File path: airflow/decorators/__init__.py
##########
@@ -55,5 +57,249 @@ def python(python_callable: Optional[Callable] = None, multiple_outputs: Optiona
         """
         return python_task(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs)
 
+    @staticmethod
+    def virtualenv(
+        python_callable: Optional[Callable] = None,
+        multiple_outputs: Optional[bool] = None,
+        requirements: Optional[Iterable[str]] = None,
+        python_version: Optional[Union[str, int, float]] = None,
+        use_dill: bool = False,
+        system_site_packages: bool = True,
+        string_args: Optional[Iterable[str]] = None,
+        templates_dict: Optional[Dict] = None,
+        templates_exts: Optional[List[str]] = None,
+        **kwargs,
+    ):
+        """
+        Allows one to run a function in a virtualenv that is
+        created and destroyed automatically (with certain caveats).
+
+        The function must be defined using def, and not be
+        part of a class. All imports must happen inside the function
+        and no variables outside of the scope may be referenced. A global scope
+        variable named virtualenv_string_args will be available (populated by
+        string_args). In addition, one can pass stuff through op_args and op_kwargs, and one
+        can use a return value.
+        Note that if your virtualenv runs in a different Python major version than Airflow,
+        you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to
+        Airflow through plugins. You can use string_args though.
+
+        .. seealso::
+            For more information on how to use this operator, take a look at the guide:
+            :ref:`howto/operator:PythonVirtualenvOperator`
+
+        :param python_callable: A python function with no references to outside variables,
+            defined with def, which will be run in a virtualenv
+        :type python_callable: function
+        :param multiple_outputs: if set, function return value will be
+            unrolled to multiple XCom values. List/Tuples will unroll to xcom values
+            with index as key. Dict will unroll to xcom values with keys as XCom keys.
+            Defaults to False.
+        :type multiple_outputs: bool
+        :param requirements: A list of requirements as specified in a pip install command
+        :type requirements: list[str]
+        :param python_version: The Python version to run the virtualenv with. Note that
+            both 2 and 2.7 are acceptable forms.
+        :type python_version: Optional[Union[str, int, float]]
+        :param use_dill: Whether to use dill to serialize
+            the args and result (pickle is default). This allow more complex types
+            but requires you to include dill in your requirements.
+        :type use_dill: bool
+        :param system_site_packages: Whether to include
+            system_site_packages in your virtualenv.
+            See virtualenv documentation for more information.
+        :type system_site_packages: bool
+        :param op_args: A list of positional arguments to pass to python_callable.
+        :type op_args: list
+        :param op_kwargs: A dict of keyword arguments to pass to python_callable.
+        :type op_kwargs: dict
+        :param string_args: Strings that are present in the global var virtualenv_string_args,

Review comment:
       The purpose and usage of `virtualenv_string_args` isn't immediately obvious.

##########
File path: airflow/decorators/__init__.py
##########
@@ -55,5 +57,249 @@ def python(python_callable: Optional[Callable] = None, multiple_outputs: Optiona
         """
         return python_task(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs)
 
+    @staticmethod
+    def virtualenv(
+        python_callable: Optional[Callable] = None,
+        multiple_outputs: Optional[bool] = None,
+        requirements: Optional[Iterable[str]] = None,
+        python_version: Optional[Union[str, int, float]] = None,
+        use_dill: bool = False,
+        system_site_packages: bool = True,
+        string_args: Optional[Iterable[str]] = None,
+        templates_dict: Optional[Dict] = None,
+        templates_exts: Optional[List[str]] = None,
+        **kwargs,
+    ):
+        """
+        Allows one to run a function in a virtualenv that is
+        created and destroyed automatically (with certain caveats).
+
+        The function must be defined using def, and not be
+        part of a class. All imports must happen inside the function
+        and no variables outside of the scope may be referenced. A global scope
+        variable named virtualenv_string_args will be available (populated by
+        string_args). In addition, one can pass stuff through op_args and op_kwargs, and one
+        can use a return value.
+        Note that if your virtualenv runs in a different Python major version than Airflow,
+        you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to
+        Airflow through plugins. You can use string_args though.
+
+        .. seealso::
+            For more information on how to use this operator, take a look at the guide:
+            :ref:`howto/operator:PythonVirtualenvOperator`
+
+        :param python_callable: A python function with no references to outside variables,
+            defined with def, which will be run in a virtualenv
+        :type python_callable: function
+        :param multiple_outputs: if set, function return value will be
+            unrolled to multiple XCom values. List/Tuples will unroll to xcom values
+            with index as key. Dict will unroll to xcom values with keys as XCom keys.
+            Defaults to False.
+        :type multiple_outputs: bool
+        :param requirements: A list of requirements as specified in a pip install command
+        :type requirements: list[str]
+        :param python_version: The Python version to run the virtualenv with. Note that
+            both 2 and 2.7 are acceptable forms.
+        :type python_version: Optional[Union[str, int, float]]
+        :param use_dill: Whether to use dill to serialize
+            the args and result (pickle is default). This allow more complex types
+            but requires you to include dill in your requirements.
+        :type use_dill: bool
+        :param system_site_packages: Whether to include
+            system_site_packages in your virtualenv.
+            See virtualenv documentation for more information.
+        :type system_site_packages: bool
+        :param op_args: A list of positional arguments to pass to python_callable.
+        :type op_args: list
+        :param op_kwargs: A dict of keyword arguments to pass to python_callable.
+        :type op_kwargs: dict
+        :param string_args: Strings that are present in the global var virtualenv_string_args,
+            available to python_callable at runtime as a list[str]. Note that args are split
+            by newline.
+        :type string_args: list[str]
+        :param templates_dict: a dictionary where the values are templates that
+            will get templated by the Airflow engine sometime between
+            ``__init__`` and ``execute`` takes place and are made available
+            in your callable's context after the template has been applied
+        :type templates_dict: dict of str
+        :param templates_exts: a list of file extensions to resolve while
+            processing templated fields, for examples ``['.sql', '.hql']``
+        :type templates_exts: list[str]
+        """
+        return _virtualenv_task(
+            python_callable=python_callable,
+            multiple_outputs=multiple_outputs,
+            requirements=requirements,
+            python_version=python_version,
+            use_dill=use_dill,
+            system_site_packages=system_site_packages,
+            string_args=string_args,
+            templates_dict=templates_dict,
+            templates_exts=templates_exts,
+            **kwargs,
+        )
+
+    @staticmethod

Review comment:
       Is there a reason these are static methods instead of standalone functions?

##########
File path: airflow/decorators/base.py
##########
@@ -29,7 +29,60 @@
 from airflow.utils.task_group import TaskGroup, TaskGroupContext
 
 
-class BaseDecoratedOperator(BaseOperator):
+def validate_python_callable(python_callable):
+    """
+    Validate that python callable can be wrapped by operator.
+    Raises exception if invalid.
+
+    :param python_callable: Python object to be validated
+    :raises: TypeError, AirflowException
+    """
+    if not callable(python_callable):
+        raise TypeError('`python_callable` param must be callable')
+    if 'self' in signature(python_callable).parameters.keys():
+        raise AirflowException('@task does not support methods')
+
+
+def get_unique_task_id(
+    task_id: str, dag: Optional[DAG] = None, task_group: Optional[TaskGroup] = None
+) -> str:
+    """
+    Generate unique task id given a DAG (or if run in a DAG context)
+    Ids are generated by appending a unique number to the end of
+    the original task id.
+
+    Example:
+      task_id
+      task_id__1
+      task_id__2
+      ...
+      task_id__20
+    """
+    dag = dag or DagContext.get_current_dag()
+    if not dag:
+        return task_id
+
+    # We need to check if we are in the context of TaskGroup as the task_id may
+    # already be altered
+    task_group = task_group or TaskGroupContext.get_current_task_group(dag)
+    tg_task_id = task_group.child_id(task_id) if task_group else task_id
+
+    if tg_task_id not in dag.task_ids:
+        return task_id
+    core = re.split(r'__\d+$', task_id)[0]
+    suffixes = sorted(
+        [
+            int(re.split(r'^.+__', task_id)[1])
+            for task_id in dag.task_ids
+            if re.match(rf'^{core}__\d+$', task_id)
+        ]
+    )
+    if not suffixes:
+        return f'{core}__1'
+    return f'{core}__{suffixes[-1] + 1}'

Review comment:
       ```suggestion
       return f'{core}__{len(suffixes) + 1}'
   ```
   It's not important, but you could consolidate this into one line.
   

##########
File path: airflow/decorators/docker.py
##########
@@ -0,0 +1,158 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+import os
+import pickle
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Any, Callable, Dict, Optional, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script
+
+
+class _DockerDecoratedOperator(DecoratedOperator, DockerOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.

Review comment:
       `keys as keys` is confusing

##########
File path: airflow/decorators/base.py
##########
@@ -63,73 +120,49 @@ def __init__(
         op_args: Tuple[Any],
         op_kwargs: Dict[str, Any],
         multiple_outputs: bool = False,
+        kwargs_to_upstream: dict = None,
         **kwargs,
     ) -> None:
-        kwargs['task_id'] = self._get_unique_task_id(task_id, kwargs.get('dag'), kwargs.get('task_group'))
-        super().__init__(**kwargs)
+        kwargs['task_id'] = get_unique_task_id(task_id, kwargs.get('dag'), kwargs.get('task_group'))
         self.python_callable = python_callable
+        kwargs_to_upstream = kwargs_to_upstream or {}
 
         # Check that arguments can be binded
         signature(python_callable).bind(*op_args, **op_kwargs)
         self.multiple_outputs = multiple_outputs
         self.op_args = op_args
         self.op_kwargs = op_kwargs
+        super().__init__(**kwargs_to_upstream, **kwargs)
 
-    @staticmethod
-    def _get_unique_task_id(
-        task_id: str, dag: Optional[DAG] = None, task_group: Optional[TaskGroup] = None
-    ) -> str:
-        """
-        Generate unique task id given a DAG (or if run in a DAG context)
-        Ids are generated by appending a unique number to the end of
-        the original task id.
-
-        Example:
-          task_id
-          task_id__1
-          task_id__2
-          ...
-          task_id__20
-        """
-        dag = dag or DagContext.get_current_dag()
-        if not dag:
-            return task_id
-
-        # We need to check if we are in the context of TaskGroup as the task_id may
-        # already be altered
-        task_group = task_group or TaskGroupContext.get_current_task_group(dag)
-        tg_task_id = task_group.child_id(task_id) if task_group else task_id
-
-        if tg_task_id not in dag.task_ids:
-            return task_id
-        core = re.split(r'__\d+$', task_id)[0]
-        suffixes = sorted(
-            [
-                int(re.split(r'^.+__', task_id)[1])
-                for task_id in dag.task_ids
-                if re.match(rf'^{core}__\d+$', task_id)
-            ]
-        )
-        if not suffixes:
-            return f'{core}__1'
-        return f'{core}__{suffixes[-1] + 1}'
-
-    @staticmethod
-    def validate_python_callable(python_callable):
-        """
-        Validate that python callable can be wrapped by operator.
-        Raises exception if invalid.
+    def execute(self, context: Dict):
+        return_value = super().execute(context)
+        self._handle_output(return_value=return_value, context=context, xcom_push=self.xcom_push)
+        return return_value
 
-        :param python_callable: Python object to be validated
-        :raises: TypeError, AirflowException
+    def _handle_output(self, return_value: Any, context: Dict, xcom_push: Callable):
         """
-        if not callable(python_callable):
-            raise TypeError('`python_callable` param must be callable')
-        if 'self' in signature(python_callable).parameters.keys():
-            raise AirflowException('@task does not support methods')
+        Handles logic for whether a decorator needs to push a single return value or multiple return values.
 
-    def execute(self, context: Dict):
-        raise NotImplementedError()
+        :param return_value:
+        :param context:
+        :param xcom_push:
+        """
+        if not self.multiple_outputs:
+            return return_value

Review comment:
       ```suggestion
           if self.multiple_outputs:
   ```
   Since the return value isn't used, it isn't necessary to return.

##########
File path: airflow/decorators/docker.py
##########
@@ -0,0 +1,158 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+import os
+import pickle
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Any, Callable, Dict, Optional, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script
+
+
+class _DockerDecoratedOperator(DecoratedOperator, DockerOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        **kwargs,
+    ) -> None:
+        self.pickling_library = pickle
+        command = (
+            "bash -cx \"ls /tmp && ls /tmp/py_script/ && "
+            "chmod -R 777 /tmp/py_script/ && "
+            "python /tmp/py_script/script.py "
+            "/tmp/py_script/script.in "
+            "/tmp/py_script/script.out "
+            "/tmp/py_script/string_args.txt\""
+        )
+        self.string_args = [1, 2, 1]

Review comment:
       `command` and especially `string_args` would benefit from comments explaining their purpose.

##########
File path: airflow/decorators/__init__.py
##########
@@ -55,5 +57,249 @@ def python(python_callable: Optional[Callable] = None, multiple_outputs: Optiona
         """
         return python_task(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs)
 
+    @staticmethod
+    def virtualenv(
+        python_callable: Optional[Callable] = None,
+        multiple_outputs: Optional[bool] = None,
+        requirements: Optional[Iterable[str]] = None,
+        python_version: Optional[Union[str, int, float]] = None,
+        use_dill: bool = False,
+        system_site_packages: bool = True,
+        string_args: Optional[Iterable[str]] = None,
+        templates_dict: Optional[Dict] = None,
+        templates_exts: Optional[List[str]] = None,
+        **kwargs,
+    ):
+        """
+        Allows one to run a function in a virtualenv that is
+        created and destroyed automatically (with certain caveats).
+
+        The function must be defined using def, and not be
+        part of a class. All imports must happen inside the function
+        and no variables outside of the scope may be referenced. A global scope
+        variable named virtualenv_string_args will be available (populated by
+        string_args). In addition, one can pass stuff through op_args and op_kwargs, and one
+        can use a return value.
+        Note that if your virtualenv runs in a different Python major version than Airflow,
+        you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to
+        Airflow through plugins. You can use string_args though.
+
+        .. seealso::
+            For more information on how to use this operator, take a look at the guide:
+            :ref:`howto/operator:PythonVirtualenvOperator`
+
+        :param python_callable: A python function with no references to outside variables,
+            defined with def, which will be run in a virtualenv
+        :type python_callable: function
+        :param multiple_outputs: if set, function return value will be
+            unrolled to multiple XCom values. List/Tuples will unroll to xcom values
+            with index as key. Dict will unroll to xcom values with keys as XCom keys.
+            Defaults to False.
+        :type multiple_outputs: bool
+        :param requirements: A list of requirements as specified in a pip install command
+        :type requirements: list[str]
+        :param python_version: The Python version to run the virtualenv with. Note that
+            both 2 and 2.7 are acceptable forms.
+        :type python_version: Optional[Union[str, int, float]]
+        :param use_dill: Whether to use dill to serialize
+            the args and result (pickle is default). This allow more complex types
+            but requires you to include dill in your requirements.
+        :type use_dill: bool
+        :param system_site_packages: Whether to include
+            system_site_packages in your virtualenv.
+            See virtualenv documentation for more information.
+        :type system_site_packages: bool
+        :param op_args: A list of positional arguments to pass to python_callable.
+        :type op_args: list
+        :param op_kwargs: A dict of keyword arguments to pass to python_callable.
+        :type op_kwargs: dict
+        :param string_args: Strings that are present in the global var virtualenv_string_args,
+            available to python_callable at runtime as a list[str]. Note that args are split
+            by newline.
+        :type string_args: list[str]
+        :param templates_dict: a dictionary where the values are templates that
+            will get templated by the Airflow engine sometime between
+            ``__init__`` and ``execute`` takes place and are made available
+            in your callable's context after the template has been applied
+        :type templates_dict: dict of str
+        :param templates_exts: a list of file extensions to resolve while
+            processing templated fields, for examples ``['.sql', '.hql']``
+        :type templates_exts: list[str]
+        """
+        return _virtualenv_task(
+            python_callable=python_callable,
+            multiple_outputs=multiple_outputs,
+            requirements=requirements,
+            python_version=python_version,
+            use_dill=use_dill,
+            system_site_packages=system_site_packages,
+            string_args=string_args,
+            templates_dict=templates_dict,
+            templates_exts=templates_exts,
+            **kwargs,
+        )
+
+    @staticmethod
+    def docker(  # pylint: disable=too-many-arguments,too-many-locals
+        python_callable: Optional[Callable] = None,
+        multiple_outputs: Optional[bool] = None,
+        image: str = "",
+        api_version: Optional[str] = None,
+        container_name: Optional[str] = None,
+        cpus: float = 1.0,
+        docker_url: str = 'unix://var/run/docker.sock',
+        environment: Optional[Dict] = None,
+        private_environment: Optional[Dict] = None,
+        force_pull: bool = False,
+        mem_limit: Optional[Union[float, str]] = None,
+        host_tmp_dir: Optional[str] = None,
+        network_mode: Optional[str] = None,
+        tls_ca_cert: Optional[str] = None,
+        tls_client_cert: Optional[str] = None,
+        tls_client_key: Optional[str] = None,
+        tls_hostname: Optional[Union[str, bool]] = None,
+        tls_ssl_version: Optional[str] = None,
+        tmp_dir: str = '/tmp/airflow',
+        user: Optional[Union[str, int]] = None,
+        volumes: Optional[List[str]] = None,
+        working_dir: Optional[str] = None,
+        xcom_all: bool = False,
+        docker_conn_id: Optional[str] = None,
+        dns: Optional[List[str]] = None,
+        dns_search: Optional[List[str]] = None,
+        auto_remove: bool = False,
+        shm_size: Optional[int] = None,
+        tty: bool = False,
+        privileged: bool = False,
+        cap_add: Optional[Iterable[str]] = None,
+        extra_hosts: Optional[Dict[str, str]] = None,
+        **kwargs,
+    ):
+        """
+        :param python_callable: A python function with no references to outside variables,
+            defined with def, which will be run in a virtualenv
+        :type python_callable: function
+        :param multiple_outputs: if set, function return value will be
+            unrolled to multiple XCom values. List/Tuples will unroll to xcom values
+            with index as key. Dict will unroll to xcom values with keys as XCom keys.
+            Defaults to False.
+        :type multiple_outputs: bool
+        :param image: Docker image from which to create the container.
+            If image tag is omitted, "latest" will be used.
+        :type image: str
+        :param api_version: Remote API version. Set to ``auto`` to automatically
+            detect the server's version.
+        :type api_version: str
+        :param container_name: Name of the container. Optional (templated)
+        :type container_name: str or None
+        :param cpus: Number of CPUs to assign to the container.
+            This value gets multiplied with 1024. See
+            https://docs.docker.com/engine/reference/run/#cpu-share-constraint
+        :type cpus: float
+        :param docker_url: URL of the host running the docker daemon.
+            Default is unix://var/run/docker.sock
+        :type docker_url: str
+        :param environment: Environment variables to set in the container. (templated)
+        :type environment: dict
+        :param private_environment: Private environment variables to set in the container.
+            These are not templated, and hidden from the website.
+        :type private_environment: dict
+        :param force_pull: Pull the docker image on every run. Default is False.
+        :type force_pull: bool
+        :param mem_limit: Maximum amount of memory the container can use.
+            Either a float value, which represents the limit in bytes,
+            or a string like ``128m`` or ``1g``.
+        :type mem_limit: float or str
+        :param host_tmp_dir: Specify the location of the temporary directory on the host which will
+            be mapped to tmp_dir. If not provided defaults to using the standard system temp directory.
+        :type host_tmp_dir: str
+        :param network_mode: Network mode for the container.
+        :type network_mode: str
+        :param tls_ca_cert: Path to a PEM-encoded certificate authority
+            to secure the docker connection.
+        :type tls_ca_cert: str
+        :param tls_client_cert: Path to the PEM-encoded certificate
+            used to authenticate docker client.
+        :type tls_client_cert: str
+        :param tls_client_key: Path to the PEM-encoded key used to authenticate docker client.
+        :type tls_client_key: str
+        :param tls_hostname: Hostname to match against
+            the docker server certificate or False to disable the check.
+        :type tls_hostname: str or bool
+        :param tls_ssl_version: Version of SSL to use when communicating with docker daemon.
+        :type tls_ssl_version: str
+        :param tmp_dir: Mount point inside the container to
+            a temporary directory created on the host by the operator.
+            The path is also made available via the environment variable
+            ``AIRFLOW_TMP_DIR`` inside the container.
+        :type tmp_dir: str
+        :param user: Default user inside the docker container.
+        :type user: int or str
+        :param volumes: List of volumes to mount into the container, e.g.
+            ``['/host/path:/container/path', '/host/path2:/container/path2:ro']``.
+        :type volumes: list
+        :param working_dir: Working directory to
+            set on the container (equivalent to the -w switch the docker client)
+        :type working_dir: str
+        :param xcom_all: Push all the stdout or just the last line.
+            The default is False (last line).
+        :type xcom_all: bool
+        :param docker_conn_id: ID of the Airflow connection to use
+        :type docker_conn_id: str
+        :param dns: Docker custom DNS servers
+        :type dns: list[str]
+        :param dns_search: Docker custom DNS search domain
+        :type dns_search: list[str]
+        :param auto_remove: Auto-removal of the container on daemon side when the
+            container's process exits.
+            The default is False.
+        :type auto_remove: bool
+        :param shm_size: Size of ``/dev/shm`` in bytes. The size must be
+            greater than 0. If omitted uses system default.
+        :type shm_size: int
+        :param tty: Allocate pseudo-TTY to the container
+            This needs to be set see logs of the Docker container.
+        :type tty: bool
+        :param privileged: Give extended privileges to this container.
+        :type privileged: bool
+        :param cap_add: Include container capabilities
+        :type cap_add: list[str]
+        """
+        return _docker_task(
+            python_callable=python_callable,

Review comment:
       After the python callable, should we enforce keyword-only arguments?

##########
File path: airflow/decorators/docker.py
##########
@@ -0,0 +1,158 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+import os
+import pickle
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Any, Callable, Dict, Optional, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script
+
+
+class _DockerDecoratedOperator(DecoratedOperator, DockerOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        **kwargs,
+    ) -> None:
+        self.pickling_library = pickle
+        command = (
+            "bash -cx \"ls /tmp && ls /tmp/py_script/ && "
+            "chmod -R 777 /tmp/py_script/ && "
+            "python /tmp/py_script/script.py "
+            "/tmp/py_script/script.in "
+            "/tmp/py_script/script.out "
+            "/tmp/py_script/string_args.txt\""
+        )
+        self.string_args = [1, 2, 1]
+        self._output_filename = ""
+
+        super().__init__(command=command, **kwargs)
+
+    def execute(self, context: Dict):
+        with TemporaryDirectory(prefix='venv') as tmp_dir:
+            input_filename = os.path.join(tmp_dir, 'script.in')
+            self._output_filename = os.path.join(tmp_dir, 'script.out')
+            string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
+            script_filename = os.path.join(tmp_dir, 'script.py')
+            self._write_args(input_filename)
+            self._write_string_args(string_args_filename)
+            py_source = self._get_python_source()
+            write_python_script(
+                jinja_context=dict(
+                    op_args=self.op_args,
+                    op_kwargs=self.op_kwargs,
+                    pickling_library=self.pickling_library.__name__,
+                    python_callable=self.python_callable.__name__,
+                    python_callable_source=py_source,
+                ),
+                filename=script_filename,
+            )
+            self.volumes.append(f"{tmp_dir}:/tmp/py_script/")
+            super().execute(context)
+            return self._read_result(self._output_filename)
+
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
+
+    def _write_string_args(self, filename):
+        with open(filename, 'w') as file:
+            file.write('\n'.join(map(str, self.string_args)))
+
+    def _get_python_source(self):
+        raw_source = inspect.getsource(self.python_callable)
+        res = dedent(raw_source)
+        res = remove_task_decorator(res, "@task.docker")
+        return res
+
+    def _handle_output(self, return_value: Any, context: Dict, xcom_push: Callable):
+        """
+        The DockerOperator normally returns the logs as it's output, in this case we want to override this

Review comment:
       ```suggestion
           The DockerOperator normally returns the logs as its output, in this case we want to override this
   ```

##########
File path: airflow/decorators/docker.py
##########
@@ -0,0 +1,158 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+import os
+import pickle
+from tempfile import TemporaryDirectory
+from textwrap import dedent
+from typing import Any, Callable, Dict, Optional, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.providers.docker.operators.docker import DockerOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script
+
+
+class _DockerDecoratedOperator(DecoratedOperator, DockerOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        **kwargs,
+    ) -> None:
+        self.pickling_library = pickle
+        command = (
+            "bash -cx \"ls /tmp && ls /tmp/py_script/ && "
+            "chmod -R 777 /tmp/py_script/ && "
+            "python /tmp/py_script/script.py "
+            "/tmp/py_script/script.in "
+            "/tmp/py_script/script.out "
+            "/tmp/py_script/string_args.txt\""
+        )
+        self.string_args = [1, 2, 1]
+        self._output_filename = ""
+
+        super().__init__(command=command, **kwargs)
+
+    def execute(self, context: Dict):
+        with TemporaryDirectory(prefix='venv') as tmp_dir:
+            input_filename = os.path.join(tmp_dir, 'script.in')
+            self._output_filename = os.path.join(tmp_dir, 'script.out')
+            string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
+            script_filename = os.path.join(tmp_dir, 'script.py')
+            self._write_args(input_filename)
+            self._write_string_args(string_args_filename)
+            py_source = self._get_python_source()
+            write_python_script(
+                jinja_context=dict(
+                    op_args=self.op_args,
+                    op_kwargs=self.op_kwargs,
+                    pickling_library=self.pickling_library.__name__,
+                    python_callable=self.python_callable.__name__,
+                    python_callable_source=py_source,
+                ),
+                filename=script_filename,
+            )
+            self.volumes.append(f"{tmp_dir}:/tmp/py_script/")
+            super().execute(context)
+            return self._read_result(self._output_filename)
+
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
+
+    def _write_string_args(self, filename):
+        with open(filename, 'w') as file:
+            file.write('\n'.join(map(str, self.string_args)))
+
+    def _get_python_source(self):
+        raw_source = inspect.getsource(self.python_callable)
+        res = dedent(raw_source)
+        res = remove_task_decorator(res, "@task.docker")
+        return res
+
+    def _handle_output(self, return_value: Any, context: Dict, xcom_push: Callable):

Review comment:
       It doesn't look like `return_value` gets used. Is it necessary here?

##########
File path: airflow/utils/python_virtualenv.py
##########
@@ -42,6 +43,36 @@ def _generate_pip_install_cmd(tmp_dir: str, requirements: List[str]) -> Optional
     return cmd + requirements
 
 
+def _balance_parens(after_decorator):
+    num_paren = 1
+    after_decorator = deque(after_decorator)
+    after_decorator.popleft()
+    while num_paren:
+        current = after_decorator.popleft()
+        if current == "(":
+            num_paren = num_paren + 1
+        elif current == ")":
+            num_paren = num_paren - 1
+    return ''.join(after_decorator)

Review comment:
       It may not matter, but this will fail if any of the input args have a parentheses value within a string, such as `:-)`

##########
File path: airflow/operators/python.py
##########
@@ -369,13 +367,14 @@ def execute_callable(self):
 
             self._write_args(input_filename)
             self._write_string_args(string_args_filename)
+            py_source = self.get_python_source()
             write_python_script(
                 jinja_context=dict(
                     op_args=self.op_args,
                     op_kwargs=self.op_kwargs,
                     pickling_library=self.pickling_library.__name__,
                     python_callable=self.python_callable.__name__,
-                    python_callable_source=dedent(inspect.getsource(self.python_callable)),
+                    python_callable_source=py_source,

Review comment:
       Does `py_source` need to be assigned to a variable?

##########
File path: airflow/operators/python.py
##########
@@ -107,7 +105,7 @@ class PythonOperator(BaseOperator):
 
     template_fields = ('templates_dict', 'op_args', 'op_kwargs')
     template_fields_renderers = {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"}
-    ui_color = PYTHON_OPERATOR_UI_COLOR
+    ui_color = '#ffefeb'

Review comment:
       The color is currently a magic value. I'd assign it to a constant to indicate what color it is.
   ```
   BLUE = '#ffefeb'
   ui_color = BLUE
   ```

##########
File path: airflow/utils/python_virtualenv.py
##########
@@ -42,6 +43,36 @@ def _generate_pip_install_cmd(tmp_dir: str, requirements: List[str]) -> Optional
     return cmd + requirements
 
 
+def _balance_parens(after_decorator):
+    num_paren = 1
+    after_decorator = deque(after_decorator)
+    after_decorator.popleft()
+    while num_paren:
+        current = after_decorator.popleft()
+        if current == "(":
+            num_paren = num_paren + 1
+        elif current == ")":
+            num_paren = num_paren - 1
+    return ''.join(after_decorator)

Review comment:
       Alternative approach:
   ```
   def remove_task_decorator(python_source: str, task_decorator_name: str) -> str:
     """
     Removed @task.virtualenv
     :param python_source:
     """
     func_start = source.find("def ")
     decorators = source[:func_start]
     decorated = "@".join(d for d in decorators.split("@") if not d.startswith(task_decorator_name))
     return decorated + source[func_start:]
   ```
   
   Honestly this doesn't matter but I wanted to see if there was a clear alternative way to do it and here it is.

##########
File path: docs/apache-airflow/tutorial_taskflow_api.rst
##########
@@ -160,6 +160,39 @@ the dependencies as shown below.
     :start-after: [START main_flow]
     :end-before: [END main_flow]
 
+Using the Taskflow API with Docker or Virtual Environments
+----------------------------------------------------------
+
+As of Airflow <Airflow version>, you will have the ability to use the Taskflow API with either a

Review comment:
       TODO: Update the version

##########
File path: airflow/decorators/python.py
##########
@@ -56,27 +52,13 @@ def __init__(
         self,
         **kwargs,
     ) -> None:
-        super().__init__(**kwargs)
+        kwargs_to_upstream = {
+            "python_callable": kwargs["python_callable"],
+            "op_args": kwargs["op_args"],

Review comment:
       Can we guarantee these `kwargs` keys will always be present?

##########
File path: tests/decorators/test_python_virtualenv.py
##########
@@ -0,0 +1,348 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import sys
+from datetime import timedelta
+from subprocess import CalledProcessError
+
+import pytest
+
+from airflow.decorators import task
+from airflow.utils import timezone
+
+from .test_python import TestPythonBase
+
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+END_DATE = timezone.datetime(2016, 1, 2)
+INTERVAL = timedelta(hours=12)
+FROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1)
+
+TI_CONTEXT_ENV_VARS = [
+    'AIRFLOW_CTX_DAG_ID',
+    'AIRFLOW_CTX_TASK_ID',
+    'AIRFLOW_CTX_EXECUTION_DATE',
+    'AIRFLOW_CTX_DAG_RUN_ID',
+]
+
+
+PYTHON_VERSION = sys.version_info[0]
+
+
+class TestPythonVirtualenvDecorator(TestPythonBase):
+    def test_add_dill(self):
+        @task.virtualenv(use_dill=True, system_site_packages=False)
+        def f():
+            pass
+
+        with self.dag:
+            ret = f()
+
+        assert 'dill' in ret.operator.requirements
+
+    def test_no_requirements(self):
+        """Tests that the python callable is invoked on task run."""
+
+        @task.virtualenv()
+        def f():
+            pass
+
+        with self.dag:
+            ret = f()
+
+        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+    def test_no_system_site_packages(self):
+        @task.virtualenv(system_site_packages=False, python_version=PYTHON_VERSION, use_dill=True)
+        def f():
+            try:
+                import funcsigs  # noqa: F401  # pylint: disable=redefined-outer-name,reimported,unused-import
+            except ImportError:
+                return True
+            raise Exception
+
+        with self.dag:
+            ret = f()
+
+        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)  # pylint: disable=no-member
+
+    def test_system_site_packages(self):
+        @task.virtualenv(
+            system_site_packages=False,
+            requirements=['funcsigs'],
+            python_version=PYTHON_VERSION,
+            use_dill=True,
+        )
+        def f():
+            import funcsigs  # noqa: F401  # pylint: disable=redefined-outer-name,reimported,unused-import
+
+        with self.dag:
+            ret = f()
+
+        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+    def test_with_requirements_pinned(self):
+        @task.virtualenv(
+            system_site_packages=False,
+            requirements=['funcsigs==0.4'],
+            python_version=PYTHON_VERSION,
+            use_dill=True,
+        )
+        def f():
+            import funcsigs  # noqa: F401  # pylint: disable=redefined-outer-name,reimported
+
+            if funcsigs.__version__ != '0.4':
+                raise Exception
+
+        with self.dag:
+            ret = f()
+
+        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+    def test_unpinned_requirements(self):
+        @task.virtualenv(
+            system_site_packages=False,
+            requirements=['funcsigs', 'dill'],
+            python_version=PYTHON_VERSION,
+            use_dill=True,
+        )
+        def f():
+            import funcsigs  # noqa: F401  # pylint: disable=redefined-outer-name,reimported,unused-import
+
+        with self.dag:
+            ret = f()
+
+        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+    def test_fail(self):
+        @task.virtualenv()
+        def f():
+            raise Exception
+
+        with self.dag:
+            ret = f()
+
+        with pytest.raises(CalledProcessError):
+            ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+    def test_python_2(self):
+        @task.virtualenv(python_version=2, requirements=['dill'])
+        def f():
+            {}.iteritems()  # pylint: disable=no-member
+
+        with self.dag:
+            ret = f()
+
+        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+    # def test_python_2_7(self):

Review comment:
       Is this supposed to be commented out?

##########
File path: airflow/decorators/python_virtualenv.py
##########
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+from textwrap import dedent
+from typing import Callable, Optional, TypeVar
+
+from airflow.decorators.base import DecoratedOperator, task_decorator_factory
+from airflow.operators.python import PythonVirtualenvOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.python_virtualenv import remove_task_decorator
+
+
+class _PythonVirtualenvDecoratedOperator(DecoratedOperator, PythonVirtualenvOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        **kwargs,
+    ) -> None:
+        kwargs_to_upstream = {
+            "python_callable": kwargs["python_callable"],

Review comment:
       Same as before -- are these keys guaranteed to exist?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org