You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/03/06 21:47:07 UTC
[airflow] 23/37: Update how PythonSensor returns values from python_callable (#28932)
This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit e9853c779428eb586955ffae9316ce89bfda510d
Author: SoxMax <ir...@gmail.com>
AuthorDate: Wed Jan 18 19:57:31 2023 -0500
Update how PythonSensor returns values from python_callable (#28932)
* Update how PythonSensor returns values from python_callable
* test poke returns the xcom value
* update test to only run poke
* reformat based on changes
* use full if rather than ternary
(cherry picked from commit b0f302e027d09a50493f4cdd808559984e433ee1)
---
airflow/sensors/python.py | 5 ++++-
tests/sensors/test_python.py | 11 +++++++++++
2 files changed, 15 insertions(+), 1 deletion(-)
diff --git a/airflow/sensors/python.py b/airflow/sensors/python.py
index 615e4e20ee..0a91031fd6 100644
--- a/airflow/sensors/python.py
+++ b/airflow/sensors/python.py
@@ -71,4 +71,7 @@ class PythonSensor(BaseSensorOperator):
self.log.info("Poking callable: %s", str(self.python_callable))
return_value = self.python_callable(*self.op_args, **self.op_kwargs)
- return PokeReturnValue(bool(return_value))
+ if isinstance(return_value, PokeReturnValue):
+ return return_value
+ else:
+ return PokeReturnValue(bool(return_value))
diff --git a/tests/sensors/test_python.py b/tests/sensors/test_python.py
index 73a0ddffe4..ec515d8dee 100644
--- a/tests/sensors/test_python.py
+++ b/tests/sensors/test_python.py
@@ -23,6 +23,7 @@ from datetime import date
import pytest
from airflow.exceptions import AirflowSensorTimeout
+from airflow.sensors.base import PokeReturnValue
from airflow.sensors.python import PythonSensor
from tests.operators.test_python import BasePythonTest
@@ -41,6 +42,16 @@ class TestPythonSensor(BasePythonTest):
with pytest.raises(ZeroDivisionError):
self.run_as_task(lambda: 1 / 0)
+ def test_python_sensor_xcom(self):
+ with self.dag:
+ task = self.opcls(
+ task_id=self.task_id,
+ python_callable=lambda: PokeReturnValue(True, "xcom"),
+ **self.default_kwargs(),
+ )
+ poke_result = task.poke({})
+ assert poke_result.xcom_value == "xcom"
+
def test_python_callable_arguments_are_templatized(self):
"""Test PythonSensor op_args are templatized"""
# Create a named tuple and ensure it is still preserved