You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/01/21 00:06:55 UTC
[airflow] 03/23: Enhance `multiple_outputs` inference of dict typing (#19608)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit d2ae684a09c85fc557b24e2fb4421df7da79a9b0
Author: Josh Fell <48...@users.noreply.github.com>
AuthorDate: Mon Jan 10 11:14:22 2022 -0500
Enhance `multiple_outputs` inference of dict typing (#19608)
(cherry picked from commit 4198550bba474e7942705a4c6df2ad916fb76561)
---
.pre-commit-config.yaml | 1 +
airflow/decorators/base.py | 27 +++++++++++++++++----------
airflow/decorators/python.py | 16 ++++++----------
airflow/decorators/python_virtualenv.py | 10 ++++------
tests/decorators/test_python.py | 23 ++++++++++++++++++-----
5 files changed, 46 insertions(+), 31 deletions(-)
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 0003071..08d60ab 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -195,6 +195,7 @@ repos:
- "4"
files: ^chart/values\.schema\.json$|^chart/values_schema\.schema\.json$
pass_filenames: true
+ # TODO: Bump to Python 3.7 when support for Python 3.6 is dropped in Airflow 2.3.
- repo: https://github.com/asottile/pyupgrade
rev: v2.29.0
hooks:
diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
index 229a114..cd76839 100644
--- a/airflow/decorators/base.py
+++ b/airflow/decorators/base.py
@@ -18,6 +18,7 @@
import functools
import inspect
import re
+import sys
from inspect import signature
from typing import Any, Callable, Dict, Optional, Tuple, TypeVar, cast
@@ -91,9 +92,8 @@ class DecoratedOperator(BaseOperator):
:param op_args: a list of positional arguments that will get unpacked when
calling your callable (templated)
:type op_args: list
- :param multiple_outputs: if set, function return value will be
- unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
- Defaults to False.
+ :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
+ multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
:type multiple_outputs: bool
:param kwargs_to_upstream: For certain operators, we might need to upstream certain arguments
that would otherwise be absorbed by the DecoratedOperator (for example python_callable for the
@@ -189,10 +189,8 @@ def task_decorator_factory(
:param python_callable: Function to decorate
:type python_callable: Optional[Callable]
- :param multiple_outputs: if set, function return value will be
- unrolled to multiple XCom values. List/Tuples will unroll to xcom values
- with index as key. Dict will unroll to xcom values with keys as XCom keys.
- Defaults to False.
+ :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
+ multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
:type multiple_outputs: bool
:param decorated_operator_class: The operator that executes the logic needed to run the python function in
the correct environment
@@ -201,10 +199,19 @@ def task_decorator_factory(
"""
# try to infer from type annotation
if python_callable and multiple_outputs is None:
- sig = signature(python_callable).return_annotation
- ttype = getattr(sig, "__origin__", None)
+ return_type = signature(python_callable).return_annotation
+
+ # If the return type annotation is already the builtins ``dict`` type, use it for the inference.
+ if return_type == dict:
+ ttype = return_type
+ # Checking if Python 3.6, ``__origin__`` attribute does not exist until 3.7; need to use ``__extra__``
+ # TODO: Remove check when support for Python 3.6 is dropped in Airflow 2.3.
+ elif sys.version_info < (3, 7):
+ ttype = getattr(return_type, "__extra__", None)
+ else:
+ ttype = getattr(return_type, "__origin__", None)
- multiple_outputs = sig != inspect.Signature.empty and ttype in (dict, Dict)
+ multiple_outputs = return_type != inspect.Signature.empty and ttype in (dict, Dict)
def wrapper(f: T):
"""
diff --git a/airflow/decorators/python.py b/airflow/decorators/python.py
index 7dc6c1b..2411761 100644
--- a/airflow/decorators/python.py
+++ b/airflow/decorators/python.py
@@ -33,9 +33,8 @@ class _PythonDecoratedOperator(DecoratedOperator, PythonOperator):
:param op_args: a list of positional arguments that will get unpacked when
calling your callable (templated)
:type op_args: list
- :param multiple_outputs: if set, function return value will be
- unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
- Defaults to False.
+ :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
+ multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
:type multiple_outputs: bool
"""
@@ -85,9 +84,8 @@ class PythonDecoratorMixin:
:param python_callable: Function to decorate
:type python_callable: Optional[Callable]
- :param multiple_outputs: if set, function return value will be
- unrolled to multiple XCom values. List/Tuples will unroll to xcom values
- with index as key. Dict will unroll to xcom values with keys as XCom keys.
+ :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
+ multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys.
Defaults to False.
:type multiple_outputs: bool
"""
@@ -109,10 +107,8 @@ def python_task(
:param python_callable: Function to decorate
:type python_callable: Optional[Callable]
- :param multiple_outputs: if set, function return value will be
- unrolled to multiple XCom values. List/Tuples will unroll to xcom values
- with index as key. Dict will unroll to xcom values with keys as XCom keys.
- Defaults to False.
+ :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
+ multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
:type multiple_outputs: bool
"""
return task_decorator_factory(
diff --git a/airflow/decorators/python_virtualenv.py b/airflow/decorators/python_virtualenv.py
index 8024e5a..d412344 100644
--- a/airflow/decorators/python_virtualenv.py
+++ b/airflow/decorators/python_virtualenv.py
@@ -36,9 +36,8 @@ class _PythonVirtualenvDecoratedOperator(DecoratedOperator, PythonVirtualenvOper
:param op_args: a list of positional arguments that will get unpacked when
calling your callable (templated)
:type op_args: list
- :param multiple_outputs: if set, function return value will be
- unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
- Defaults to False.
+ :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
+ multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
:type multiple_outputs: bool
"""
@@ -88,9 +87,8 @@ class PythonVirtualenvDecoratorMixin:
:param python_callable: Function to decorate
:type python_callable: Optional[Callable]
- :param multiple_outputs: if set, function return value will be
- unrolled to multiple XCom values. List/Tuples will unroll to xcom values
- with index as key. Dict will unroll to xcom values with keys as XCom keys.
+ :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
+ multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys.
Defaults to False.
:type multiple_outputs: bool
"""
diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py
index 8782999..798d877 100644
--- a/tests/decorators/test_python.py
+++ b/tests/decorators/test_python.py
@@ -15,12 +15,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import sys
import unittest.mock
from collections import namedtuple
from datetime import date, timedelta
from typing import Dict, Tuple
import pytest
+from parameterized import parameterized
from airflow.decorators import task as task_decorator
from airflow.exceptions import AirflowException
@@ -112,13 +114,24 @@ class TestAirflowTaskDecorator(TestPythonBase):
with pytest.raises(AirflowException):
task_decorator(not_callable, dag=self.dag)
- def test_infer_multiple_outputs_using_typing(self):
- @task_decorator
- def identity_dict(x: int, y: int) -> Dict[str, int]:
- return {"x": x, "y": y}
+ @parameterized.expand([["dict"], ["dict[str, int]"], ["Dict"], ["Dict[str, int]"]])
+ def test_infer_multiple_outputs_using_dict_typing(self, test_return_annotation):
+ if sys.version_info < (3, 9) and test_return_annotation == "dict[str, int]":
+ self.skipTest("dict[...] not a supported typing prior to Python 3.9")
+
+ @task_decorator
+ def identity_dict(x: int, y: int) -> eval(test_return_annotation):
+ return {"x": x, "y": y}
+
+ assert identity_dict(5, 5).operator.multiple_outputs is True
+
+ @task_decorator
+ def identity_dict_stringified(x: int, y: int) -> test_return_annotation:
+ return {"x": x, "y": y}
- assert identity_dict(5, 5).operator.multiple_outputs is True
+ assert identity_dict_stringified(5, 5).operator.multiple_outputs is True
+ def test_infer_multiple_outputs_using_other_typing(self):
@task_decorator
def identity_tuple(x: int, y: int) -> Tuple[int, int]:
return x, y