You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/01/12 00:10:15 UTC

[airflow] branch v2-5-test updated: Fix some docs on using sensors with taskflow (#28708)

This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-5-test by this push:
     new 2301e8b831 Fix some docs on using sensors with taskflow (#28708)
2301e8b831 is described below

commit 2301e8b83132f4b49fd23b65af437f4d388414c7
Author: Charles Machalow <cs...@gmail.com>
AuthorDate: Wed Jan 4 03:43:15 2023 -0800

    Fix some docs on using sensors with taskflow (#28708)
    
    Also add in testing to ensure that returning bool from taskflow sensors works as expected
    
    (cherry picked from commit 12a065a38d19f4b5698962db67f5fe9ab50d420a)
---
 airflow/decorators/sensor.py              |  2 +-
 airflow/sensors/python.py                 |  2 +-
 docs/apache-airflow/concepts/taskflow.rst |  9 ++++
 docs/apache-airflow/tutorial/taskflow.rst |  6 ++-
 tests/decorators/test_sensor.py           | 74 +++++++++++++++++++++++++++++++
 5 files changed, 90 insertions(+), 3 deletions(-)

diff --git a/airflow/decorators/sensor.py b/airflow/decorators/sensor.py
index 291c412988..2033968620 100644
--- a/airflow/decorators/sensor.py
+++ b/airflow/decorators/sensor.py
@@ -56,7 +56,7 @@ class DecoratedSensorOperator(PythonSensor):
         kwargs["task_id"] = get_unique_task_id(task_id, kwargs.get("dag"), kwargs.get("task_group"))
         super().__init__(**kwargs)
 
-    def poke(self, context: Context) -> PokeReturnValue:
+    def poke(self, context: Context) -> PokeReturnValue | bool:
         return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
diff --git a/airflow/sensors/python.py b/airflow/sensors/python.py
index 374df243d7..615e4e20ee 100644
--- a/airflow/sensors/python.py
+++ b/airflow/sensors/python.py
@@ -65,7 +65,7 @@ class PythonSensor(BaseSensorOperator):
         self.op_kwargs = op_kwargs or {}
         self.templates_dict = templates_dict
 
-    def poke(self, context: Context) -> PokeReturnValue:
+    def poke(self, context: Context) -> PokeReturnValue | bool:
         context_merge(context, self.op_kwargs, templates_dict=self.templates_dict)
         self.op_kwargs = determine_kwargs(self.python_callable, self.op_args, context)
 
diff --git a/docs/apache-airflow/concepts/taskflow.rst b/docs/apache-airflow/concepts/taskflow.rst
index 11554dabf4..97847455ab 100644
--- a/docs/apache-airflow/concepts/taskflow.rst
+++ b/docs/apache-airflow/concepts/taskflow.rst
@@ -182,6 +182,15 @@ for deserialization ensure that ``deserialize(data: dict, version: int)`` is spe
 
   Note: Typing of ``version`` is required and needs to be ``ClassVar[int]``
 
+
+Sensors and the TaskFlow API
+--------------------------------------
+
+.. versionadded:: 2.5.0
+
+For an example of writing a Sensor using the TaskFlow API, see
+:ref:`Using the TaskFlow API with Sensor operators <taskflow/task_sensor_example>`.
+
 History
 -------
 
diff --git a/docs/apache-airflow/tutorial/taskflow.rst b/docs/apache-airflow/tutorial/taskflow.rst
index 9db581200e..66255936f5 100644
--- a/docs/apache-airflow/tutorial/taskflow.rst
+++ b/docs/apache-airflow/tutorial/taskflow.rst
@@ -365,7 +365,11 @@ You can apply the ``@task.sensor`` decorator to convert a regular Python functio
 BaseSensorOperator class. The Python function implements the poke logic and returns an instance of
 the ``PokeReturnValue`` class as the ``poke()`` method in the BaseSensorOperator does. The ``PokeReturnValue`` is
 a new feature in Airflow 2.3 that allows a sensor operator to push an XCom value as described in
-section "Having sensors return XOM values" of :doc:`apache-airflow-providers:howto/create-update-providers`.
+section "Having sensors return XCOM values" of :doc:`apache-airflow-providers:howto/create-update-providers`.
+
+Alternatively in cases where the sensor doesn't need to push XCOM values:  both ``poke()`` and the wrapped
+function can return a boolean-like value where ``True`` designates the sensor's operation as complete and
+``False`` designates the sensor's operation as incomplete.
 
 .. _taskflow/task_sensor_example:
 
diff --git a/tests/decorators/test_sensor.py b/tests/decorators/test_sensor.py
index d58fb486aa..a6dd9106cf 100644
--- a/tests/decorators/test_sensor.py
+++ b/tests/decorators/test_sensor.py
@@ -63,6 +63,30 @@ class TestSensorDecorator:
         )
         assert actual_xcom_value == sensor_xcom_value
 
+    def test_basic_sensor_success_returns_bool(self, dag_maker):
+        @task.sensor
+        def sensor_f():
+            return True
+
+        @task
+        def dummy_f():
+            pass
+
+        with dag_maker():
+            sf = sensor_f()
+            df = dummy_f()
+            sf >> df
+
+        dr = dag_maker.create_dagrun()
+        sf.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+        tis = dr.get_task_instances()
+        assert len(tis) == 2
+        for ti in tis:
+            if ti.task_id == "sensor_f":
+                assert ti.state == State.SUCCESS
+            if ti.task_id == "dummy_f":
+                assert ti.state == State.NONE
+
     def test_basic_sensor_failure(self, dag_maker):
         @task.sensor(timeout=0)
         def sensor_f():
@@ -89,6 +113,32 @@ class TestSensorDecorator:
             if ti.task_id == "dummy_f":
                 assert ti.state == State.NONE
 
+    def test_basic_sensor_failure_returns_bool(self, dag_maker):
+        @task.sensor(timeout=0)
+        def sensor_f():
+            return False
+
+        @task
+        def dummy_f():
+            pass
+
+        with dag_maker():
+            sf = sensor_f()
+            df = dummy_f()
+            sf >> df
+
+        dr = dag_maker.create_dagrun()
+        with pytest.raises(AirflowSensorTimeout):
+            sf.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+
+        tis = dr.get_task_instances()
+        assert len(tis) == 2
+        for ti in tis:
+            if ti.task_id == "sensor_f":
+                assert ti.state == State.FAILED
+            if ti.task_id == "dummy_f":
+                assert ti.state == State.NONE
+
     def test_basic_sensor_soft_fail(self, dag_maker):
         @task.sensor(timeout=0, soft_fail=True)
         def sensor_f():
@@ -113,6 +163,30 @@ class TestSensorDecorator:
             if ti.task_id == "dummy_f":
                 assert ti.state == State.NONE
 
+    def test_basic_sensor_soft_fail_returns_bool(self, dag_maker):
+        @task.sensor(timeout=0, soft_fail=True)
+        def sensor_f():
+            return False
+
+        @task
+        def dummy_f():
+            pass
+
+        with dag_maker():
+            sf = sensor_f()
+            df = dummy_f()
+            sf >> df
+
+        dr = dag_maker.create_dagrun()
+        sf.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+        tis = dr.get_task_instances()
+        assert len(tis) == 2
+        for ti in tis:
+            if ti.task_id == "sensor_f":
+                assert ti.state == State.SKIPPED
+            if ti.task_id == "dummy_f":
+                assert ti.state == State.NONE
+
     def test_basic_sensor_get_upstream_output(self, dag_maker):
         ret_val = 100
         sensor_xcom_value = "xcom_value"