You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/08/18 15:48:58 UTC

[GitHub] [airflow] casassg commented on a change in pull request #10349: infer multiple_output from return type annotation

casassg commented on a change in pull request #10349:
URL: https://github.com/apache/airflow/pull/10349#discussion_r472296043



##########
File path: airflow/operators/python.py
##########
@@ -276,6 +280,17 @@ def task(
     :type multiple_outputs: bool
 
     """
+
+    if python_callable:
+        sig = signature(python_callable).return_annotation
+        ttype = getattr(sig, "__origin__", None)
+
+        if (
+            sig != inspect.Signature.empty
+            and is_container(ttype)
+        ):

Review comment:
       Not sure about using this parenthesis here. Does this not fit in a single line?

##########
File path: tests/operators/test_python.py
##########
@@ -327,6 +328,98 @@ def test_python_operator_python_callable_is_callable(self):
         with pytest.raises(AirflowException):
             task_decorator(not_callable, dag=self.dag)
 
+    def test_infer_multiple_outputs_using_typing(self):
+        @task_decorator
+        def identity_dict(x: int, y: int) -> Dict[str, int]:
+            return {"x": x, "y": y}
+
+        assert identity_dict(5, 5).operator.multiple_outputs is True   # pylint: disable=maybe-no-member
+
+        @task_decorator
+        def identity_tuple(x: int, y: int) -> Tuple[int, int]:
+            return x, y
+
+        assert identity_tuple(5, 5).operator.multiple_outputs is True  # pylint: disable=maybe-no-member
+
+        @task_decorator
+        def identity_int(x: int) -> int:
+            return x
+
+        assert identity_int(5).operator.multiple_outputs is False   # pylint: disable=maybe-no-member
+
+        @task_decorator
+        def identity_notyping(x: int):
+            return x
+
+        assert identity_notyping(5).operator.multiple_outputs is False   # pylint: disable=maybe-no-member
+
+    @conf_vars({("core", "enable_xcom_pickling"): "True"})
+    def test_multiple_outputs_set(self):
+        @task_decorator
+        def identity(x: int, y: int) -> Set[int]:
+            return {x, y}
+
+        with self.dag:
+            res = identity(5, 6)
+
+        dr = self.dag.create_dagrun(
+            run_id=DagRunType.MANUAL.value,
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING
+        )
+
+        res.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)  # pylint: disable=maybe-no-member
+
+        ti = dr.get_task_instances()[0]
+
+        assert ti.xcom_pull() == {5, 6}  # pylint: disable=maybe-no-member
+
+    def test_multiple_outputs_list(self):
+        @task_decorator
+        def identity(x: int, y: int) -> List[int]:
+            return [x, y]
+
+        with self.dag:
+            res = identity(5, 6)
+
+        dr = self.dag.create_dagrun(
+            run_id=DagRunType.MANUAL.value,
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING
+        )
+
+        res.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)  # pylint: disable=maybe-no-member
+
+        ti = dr.get_task_instances()[0]
+
+        assert ti.xcom_pull() == [5, 6]  # pylint: disable=maybe-no-member
+        assert ti.xcom_pull(key="return_value_0") == 5
+        assert ti.xcom_pull(key="return_value_1") == 6
+
+    def test_multiple_outputs_tuple(self):
+        @task_decorator
+        def identity_tuple(x: int, y: int) -> Tuple[int, int]:
+            return x, y
+
+        with self.dag:
+            ident = identity_tuple(5, 6)
+
+        dr = self.dag.create_dagrun(
+            run_id=DagRunType.MANUAL.value,
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING
+        )
+
+        ident.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)  # pylint: disable=maybe-no-member
+
+        ti = dr.get_task_instances()[0]
+
+        assert ti.xcom_pull(key="return_value_0") == 5
+        assert ti.xcom_pull(key="return_value_1") == 6
+

Review comment:
       What happens if I manually set `multiple_outputs=False` but it has annotations? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org