You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/01/12 00:10:15 UTC
[airflow] branch v2-5-test updated: Fix some docs on using sensors with taskflow (#28708)
This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-5-test by this push:
new 2301e8b831 Fix some docs on using sensors with taskflow (#28708)
2301e8b831 is described below
commit 2301e8b83132f4b49fd23b65af437f4d388414c7
Author: Charles Machalow <cs...@gmail.com>
AuthorDate: Wed Jan 4 03:43:15 2023 -0800
Fix some docs on using sensors with taskflow (#28708)
Also add in testing to ensure that returning bool from taskflow sensors works as expected
(cherry picked from commit 12a065a38d19f4b5698962db67f5fe9ab50d420a)
---
airflow/decorators/sensor.py | 2 +-
airflow/sensors/python.py | 2 +-
docs/apache-airflow/concepts/taskflow.rst | 9 ++++
docs/apache-airflow/tutorial/taskflow.rst | 6 ++-
tests/decorators/test_sensor.py | 74 +++++++++++++++++++++++++++++++
5 files changed, 90 insertions(+), 3 deletions(-)
diff --git a/airflow/decorators/sensor.py b/airflow/decorators/sensor.py
index 291c412988..2033968620 100644
--- a/airflow/decorators/sensor.py
+++ b/airflow/decorators/sensor.py
@@ -56,7 +56,7 @@ class DecoratedSensorOperator(PythonSensor):
kwargs["task_id"] = get_unique_task_id(task_id, kwargs.get("dag"), kwargs.get("task_group"))
super().__init__(**kwargs)
- def poke(self, context: Context) -> PokeReturnValue:
+ def poke(self, context: Context) -> PokeReturnValue | bool:
return self.python_callable(*self.op_args, **self.op_kwargs)
diff --git a/airflow/sensors/python.py b/airflow/sensors/python.py
index 374df243d7..615e4e20ee 100644
--- a/airflow/sensors/python.py
+++ b/airflow/sensors/python.py
@@ -65,7 +65,7 @@ class PythonSensor(BaseSensorOperator):
self.op_kwargs = op_kwargs or {}
self.templates_dict = templates_dict
- def poke(self, context: Context) -> PokeReturnValue:
+ def poke(self, context: Context) -> PokeReturnValue | bool:
context_merge(context, self.op_kwargs, templates_dict=self.templates_dict)
self.op_kwargs = determine_kwargs(self.python_callable, self.op_args, context)
diff --git a/docs/apache-airflow/concepts/taskflow.rst b/docs/apache-airflow/concepts/taskflow.rst
index 11554dabf4..97847455ab 100644
--- a/docs/apache-airflow/concepts/taskflow.rst
+++ b/docs/apache-airflow/concepts/taskflow.rst
@@ -182,6 +182,15 @@ for deserialization ensure that ``deserialize(data: dict, version: int)`` is spe
Note: Typing of ``version`` is required and needs to be ``ClassVar[int]``
+
+Sensors and the TaskFlow API
+--------------------------------------
+
+.. versionadded:: 2.5.0
+
+For an example of writing a Sensor using the TaskFlow API, see
+:ref:`Using the TaskFlow API with Sensor operators <taskflow/task_sensor_example>`.
+
History
-------
diff --git a/docs/apache-airflow/tutorial/taskflow.rst b/docs/apache-airflow/tutorial/taskflow.rst
index 9db581200e..66255936f5 100644
--- a/docs/apache-airflow/tutorial/taskflow.rst
+++ b/docs/apache-airflow/tutorial/taskflow.rst
@@ -365,7 +365,11 @@ You can apply the ``@task.sensor`` decorator to convert a regular Python functio
BaseSensorOperator class. The Python function implements the poke logic and returns an instance of
the ``PokeReturnValue`` class as the ``poke()`` method in the BaseSensorOperator does. The ``PokeReturnValue`` is
a new feature in Airflow 2.3 that allows a sensor operator to push an XCom value as described in
-section "Having sensors return XOM values" of :doc:`apache-airflow-providers:howto/create-update-providers`.
+section "Having sensors return XCOM values" of :doc:`apache-airflow-providers:howto/create-update-providers`.
+
+Alternatively in cases where the sensor doesn't need to push XCOM values: both ``poke()`` and the wrapped
+function can return a boolean-like value where ``True`` designates the sensor's operation as complete and
+``False`` designates the sensor's operation as incomplete.
.. _taskflow/task_sensor_example:
diff --git a/tests/decorators/test_sensor.py b/tests/decorators/test_sensor.py
index d58fb486aa..a6dd9106cf 100644
--- a/tests/decorators/test_sensor.py
+++ b/tests/decorators/test_sensor.py
@@ -63,6 +63,30 @@ class TestSensorDecorator:
)
assert actual_xcom_value == sensor_xcom_value
+ def test_basic_sensor_success_returns_bool(self, dag_maker):
+ @task.sensor
+ def sensor_f():
+ return True
+
+ @task
+ def dummy_f():
+ pass
+
+ with dag_maker():
+ sf = sensor_f()
+ df = dummy_f()
+ sf >> df
+
+ dr = dag_maker.create_dagrun()
+ sf.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+ tis = dr.get_task_instances()
+ assert len(tis) == 2
+ for ti in tis:
+ if ti.task_id == "sensor_f":
+ assert ti.state == State.SUCCESS
+ if ti.task_id == "dummy_f":
+ assert ti.state == State.NONE
+
def test_basic_sensor_failure(self, dag_maker):
@task.sensor(timeout=0)
def sensor_f():
@@ -89,6 +113,32 @@ class TestSensorDecorator:
if ti.task_id == "dummy_f":
assert ti.state == State.NONE
+ def test_basic_sensor_failure_returns_bool(self, dag_maker):
+ @task.sensor(timeout=0)
+ def sensor_f():
+ return False
+
+ @task
+ def dummy_f():
+ pass
+
+ with dag_maker():
+ sf = sensor_f()
+ df = dummy_f()
+ sf >> df
+
+ dr = dag_maker.create_dagrun()
+ with pytest.raises(AirflowSensorTimeout):
+ sf.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+
+ tis = dr.get_task_instances()
+ assert len(tis) == 2
+ for ti in tis:
+ if ti.task_id == "sensor_f":
+ assert ti.state == State.FAILED
+ if ti.task_id == "dummy_f":
+ assert ti.state == State.NONE
+
def test_basic_sensor_soft_fail(self, dag_maker):
@task.sensor(timeout=0, soft_fail=True)
def sensor_f():
@@ -113,6 +163,30 @@ class TestSensorDecorator:
if ti.task_id == "dummy_f":
assert ti.state == State.NONE
+ def test_basic_sensor_soft_fail_returns_bool(self, dag_maker):
+ @task.sensor(timeout=0, soft_fail=True)
+ def sensor_f():
+ return False
+
+ @task
+ def dummy_f():
+ pass
+
+ with dag_maker():
+ sf = sensor_f()
+ df = dummy_f()
+ sf >> df
+
+ dr = dag_maker.create_dagrun()
+ sf.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+ tis = dr.get_task_instances()
+ assert len(tis) == 2
+ for ti in tis:
+ if ti.task_id == "sensor_f":
+ assert ti.state == State.SKIPPED
+ if ti.task_id == "dummy_f":
+ assert ti.state == State.NONE
+
def test_basic_sensor_get_upstream_output(self, dag_maker):
ret_val = 100
sensor_xcom_value = "xcom_value"