You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/01/21 00:06:55 UTC

[airflow] 03/23: Enhance `multiple_outputs` inference of dict typing (#19608)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d2ae684a09c85fc557b24e2fb4421df7da79a9b0
Author: Josh Fell <48...@users.noreply.github.com>
AuthorDate: Mon Jan 10 11:14:22 2022 -0500

    Enhance `multiple_outputs` inference of dict typing (#19608)
    
    (cherry picked from commit 4198550bba474e7942705a4c6df2ad916fb76561)
---
 .pre-commit-config.yaml                 |  1 +
 airflow/decorators/base.py              | 27 +++++++++++++++++----------
 airflow/decorators/python.py            | 16 ++++++----------
 airflow/decorators/python_virtualenv.py | 10 ++++------
 tests/decorators/test_python.py         | 23 ++++++++++++++++++-----
 5 files changed, 46 insertions(+), 31 deletions(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 0003071..08d60ab 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -195,6 +195,7 @@ repos:
           - "4"
         files: ^chart/values\.schema\.json$|^chart/values_schema\.schema\.json$
         pass_filenames: true
+  # TODO: Bump to Python 3.7 when support for Python 3.6 is dropped in Airflow 2.3.
   - repo: https://github.com/asottile/pyupgrade
     rev: v2.29.0
     hooks:
diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
index 229a114..cd76839 100644
--- a/airflow/decorators/base.py
+++ b/airflow/decorators/base.py
@@ -18,6 +18,7 @@
 import functools
 import inspect
 import re
+import sys
 from inspect import signature
 from typing import Any, Callable, Dict, Optional, Tuple, TypeVar, cast
 
@@ -91,9 +92,8 @@ class DecoratedOperator(BaseOperator):
     :param op_args: a list of positional arguments that will get unpacked when
         calling your callable (templated)
     :type op_args: list
-    :param multiple_outputs: if set, function return value will be
-        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
-        Defaults to False.
+    :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
+        multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
     :type multiple_outputs: bool
     :param kwargs_to_upstream: For certain operators, we might need to upstream certain arguments
         that would otherwise be absorbed by the DecoratedOperator (for example python_callable for the
@@ -189,10 +189,8 @@ def task_decorator_factory(
 
     :param python_callable: Function to decorate
     :type python_callable: Optional[Callable]
-    :param multiple_outputs: if set, function return value will be
-        unrolled to multiple XCom values. List/Tuples will unroll to xcom values
-        with index as key. Dict will unroll to xcom values with keys as XCom keys.
-        Defaults to False.
+    :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
+        multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
     :type multiple_outputs: bool
     :param decorated_operator_class: The operator that executes the logic needed to run the python function in
         the correct environment
@@ -201,10 +199,19 @@ def task_decorator_factory(
     """
     # try to infer from  type annotation
     if python_callable and multiple_outputs is None:
-        sig = signature(python_callable).return_annotation
-        ttype = getattr(sig, "__origin__", None)
+        return_type = signature(python_callable).return_annotation
+
+        # If the return type annotation is already the builtins ``dict`` type, use it for the inference.
+        if return_type == dict:
+            ttype = return_type
+        # Checking if Python 3.6, ``__origin__`` attribute does not exist until 3.7; need to use ``__extra__``
+        # TODO: Remove check when support for Python 3.6 is dropped in Airflow 2.3.
+        elif sys.version_info < (3, 7):
+            ttype = getattr(return_type, "__extra__", None)
+        else:
+            ttype = getattr(return_type, "__origin__", None)
 
-        multiple_outputs = sig != inspect.Signature.empty and ttype in (dict, Dict)
+        multiple_outputs = return_type != inspect.Signature.empty and ttype in (dict, Dict)
 
     def wrapper(f: T):
         """
diff --git a/airflow/decorators/python.py b/airflow/decorators/python.py
index 7dc6c1b..2411761 100644
--- a/airflow/decorators/python.py
+++ b/airflow/decorators/python.py
@@ -33,9 +33,8 @@ class _PythonDecoratedOperator(DecoratedOperator, PythonOperator):
     :param op_args: a list of positional arguments that will get unpacked when
         calling your callable (templated)
     :type op_args: list
-    :param multiple_outputs: if set, function return value will be
-        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
-        Defaults to False.
+    :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
+        multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
     :type multiple_outputs: bool
     """
 
@@ -85,9 +84,8 @@ class PythonDecoratorMixin:
 
         :param python_callable: Function to decorate
         :type python_callable: Optional[Callable]
-        :param multiple_outputs: if set, function return value will be
-            unrolled to multiple XCom values. List/Tuples will unroll to xcom values
-            with index as key. Dict will unroll to xcom values with keys as XCom keys.
+        :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
+            multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys.
             Defaults to False.
         :type multiple_outputs: bool
         """
@@ -109,10 +107,8 @@ def python_task(
 
     :param python_callable: Function to decorate
     :type python_callable: Optional[Callable]
-    :param multiple_outputs: if set, function return value will be
-        unrolled to multiple XCom values. List/Tuples will unroll to xcom values
-        with index as key. Dict will unroll to xcom values with keys as XCom keys.
-        Defaults to False.
+    :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
+        multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
     :type multiple_outputs: bool
     """
     return task_decorator_factory(
diff --git a/airflow/decorators/python_virtualenv.py b/airflow/decorators/python_virtualenv.py
index 8024e5a..d412344 100644
--- a/airflow/decorators/python_virtualenv.py
+++ b/airflow/decorators/python_virtualenv.py
@@ -36,9 +36,8 @@ class _PythonVirtualenvDecoratedOperator(DecoratedOperator, PythonVirtualenvOper
     :param op_args: a list of positional arguments that will get unpacked when
         calling your callable (templated)
     :type op_args: list
-    :param multiple_outputs: if set, function return value will be
-        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
-        Defaults to False.
+    :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
+        multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
     :type multiple_outputs: bool
     """
 
@@ -88,9 +87,8 @@ class PythonVirtualenvDecoratorMixin:
 
         :param python_callable: Function to decorate
         :type python_callable: Optional[Callable]
-        :param multiple_outputs: if set, function return value will be
-            unrolled to multiple XCom values. List/Tuples will unroll to xcom values
-            with index as key. Dict will unroll to xcom values with keys as XCom keys.
+        :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
+            multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys.
             Defaults to False.
         :type multiple_outputs: bool
         """
diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py
index 8782999..798d877 100644
--- a/tests/decorators/test_python.py
+++ b/tests/decorators/test_python.py
@@ -15,12 +15,14 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import sys
 import unittest.mock
 from collections import namedtuple
 from datetime import date, timedelta
 from typing import Dict, Tuple
 
 import pytest
+from parameterized import parameterized
 
 from airflow.decorators import task as task_decorator
 from airflow.exceptions import AirflowException
@@ -112,13 +114,24 @@ class TestAirflowTaskDecorator(TestPythonBase):
         with pytest.raises(AirflowException):
             task_decorator(not_callable, dag=self.dag)
 
-    def test_infer_multiple_outputs_using_typing(self):
-        @task_decorator
-        def identity_dict(x: int, y: int) -> Dict[str, int]:
-            return {"x": x, "y": y}
+    @parameterized.expand([["dict"], ["dict[str, int]"], ["Dict"], ["Dict[str, int]"]])
+    def test_infer_multiple_outputs_using_dict_typing(self, test_return_annotation):
+        if sys.version_info < (3, 9) and test_return_annotation == "dict[str, int]":
+            self.skipTest("dict[...] not a supported typing prior to Python 3.9")
+
+            @task_decorator
+            def identity_dict(x: int, y: int) -> eval(test_return_annotation):
+                return {"x": x, "y": y}
+
+            assert identity_dict(5, 5).operator.multiple_outputs is True
+
+            @task_decorator
+            def identity_dict_stringified(x: int, y: int) -> test_return_annotation:
+                return {"x": x, "y": y}
 
-        assert identity_dict(5, 5).operator.multiple_outputs is True
+            assert identity_dict_stringified(5, 5).operator.multiple_outputs is True
 
+    def test_infer_multiple_outputs_using_other_typing(self):
         @task_decorator
         def identity_tuple(x: int, y: int) -> Tuple[int, int]:
             return x, y