You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/03/06 21:46:49 UTC

[airflow] 05/37: logging poke info when external dag is not none and task_id and task_ids are none (#28097)

This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 97912c84ddef640bac2061e3838ff1dde3e66a28
Author: Srinivasa Surabathini <40...@users.noreply.github.com>
AuthorDate: Fri Jan 20 18:15:31 2023 +0000

    logging poke info when external dag is not none and task_id and task_ids are none (#28097)
    
    (cherry picked from commit 760c52949ac41ffa7a2357aa1af0cdca163ddac8)
---
 airflow/sensors/external_task.py           | 28 +++++++++++++----
 tests/sensors/test_external_task_sensor.py | 48 ++++++++++++++++++++++++++++--
 2 files changed, 68 insertions(+), 8 deletions(-)

diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py
index 967bb5a276..d1c5443b92 100644
--- a/airflow/sensors/external_task.py
+++ b/airflow/sensors/external_task.py
@@ -143,18 +143,27 @@ class ExternalTaskSensor(BaseSensorOperator):
         if external_task_id is not None and external_task_ids is not None:
             raise ValueError(
                 "Only one of `external_task_id` or `external_task_ids` may "
-                "be provided to ExternalTaskSensor; not both."
+                "be provided to ExternalTaskSensor; "
+                "use external_task_id or external_task_ids or external_task_group_id."
             )
 
-        if external_task_id is not None:
-            external_task_ids = [external_task_id]
+        if external_task_group_id is not None and external_task_id is not None:
+            raise ValueError(
+                "Only one of `external_task_group_id` or `external_task_id` may "
+                "be provided to ExternalTaskSensor; "
+                "use external_task_id or external_task_ids or external_task_group_id."
+            )
 
-        if external_task_group_id and external_task_ids:
+        if external_task_group_id is not None and external_task_ids is not None:
             raise ValueError(
-                "Values for `external_task_group_id` and `external_task_id` or `external_task_ids` "
-                "can't be set at the same time"
+                "Only one of `external_task_group_id` or `external_task_ids` may "
+                "be provided to ExternalTaskSensor; "
+                "use external_task_id or external_task_ids or external_task_group_id."
             )
 
+        if external_task_id is not None:
+            external_task_ids = [external_task_id]
+
         if external_task_ids or external_task_group_id:
             if not total_states <= set(State.task_states):
                 raise ValueError(
@@ -217,6 +226,13 @@ class ExternalTaskSensor(BaseSensorOperator):
                 serialized_dttm_filter,
             )
 
+        if self.external_dag_id and not self.external_task_group_id and not self.external_task_ids:
+            self.log.info(
+                "Poking for DAG '%s' on %s ... ",
+                self.external_dag_id,
+                serialized_dttm_filter,
+            )
+
         # In poke mode this will check dag existence only once
         if self.check_existence and not self._has_checked_existence:
             self._check_for_existence(session=session)
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index b594210b13..1b5a5032cf 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -145,18 +145,49 @@ class TestExternalTaskSensor(unittest.TestCase):
         )
         op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
+    def test_raise_with_external_task_sensor_task_id_and_task_ids(self):
+        with pytest.raises(ValueError) as ctx:
+            ExternalTaskSensor(
+                task_id="test_external_task_sensor_task_id_with_task_ids_failed_status",
+                external_dag_id=TEST_DAG_ID,
+                external_task_id=TEST_TASK_ID,
+                external_task_ids=TEST_TASK_ID,
+                dag=self.dag,
+            )
+        assert (
+            str(ctx.value) == "Only one of `external_task_id` or `external_task_ids` may "
+            "be provided to ExternalTaskSensor; "
+            "use external_task_id or external_task_ids or external_task_group_id."
+        )
+
     def test_raise_with_external_task_sensor_task_group_and_task_id(self):
         with pytest.raises(ValueError) as ctx:
             ExternalTaskSensor(
                 task_id="test_external_task_sensor_task_group_with_task_id_failed_status",
                 external_dag_id=TEST_DAG_ID,
+                external_task_id=TEST_TASK_ID,
+                external_task_group_id=TEST_TASK_GROUP_ID,
+                dag=self.dag,
+            )
+        assert (
+            str(ctx.value) == "Only one of `external_task_group_id` or `external_task_id` may "
+            "be provided to ExternalTaskSensor; "
+            "use external_task_id or external_task_ids or external_task_group_id."
+        )
+
+    def test_raise_with_external_task_sensor_task_group_and_task_ids(self):
+        with pytest.raises(ValueError) as ctx:
+            ExternalTaskSensor(
+                task_id="test_external_task_sensor_task_group_with_task_ids_failed_status",
+                external_dag_id=TEST_DAG_ID,
                 external_task_ids=TEST_TASK_ID,
                 external_task_group_id=TEST_TASK_GROUP_ID,
                 dag=self.dag,
             )
         assert (
-            str(ctx.value) == "Values for `external_task_group_id` and `external_task_id` or "
-            "`external_task_ids` can't be set at the same time"
+            str(ctx.value) == "Only one of `external_task_group_id` or `external_task_ids` may "
+            "be provided to ExternalTaskSensor; "
+            "use external_task_id or external_task_ids or external_task_group_id."
         )
 
     # by default i.e. check_existence=False, if task_group doesn't exist, the sensor will run till timeout,
@@ -354,6 +385,19 @@ class TestExternalTaskSensor(unittest.TestCase):
         )
         op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
+    def test_external_dag_sensor_log(self, caplog):
+        other_dag = DAG("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once")
+        other_dag.create_dagrun(
+            run_id="test", start_date=DEFAULT_DATE, execution_date=DEFAULT_DATE, state=State.SUCCESS
+        )
+        op = ExternalTaskSensor(
+            task_id="test_external_dag_sensor_check",
+            external_dag_id="other_dag",
+            dag=self.dag,
+        )
+        op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+        assert (f"Poking for DAG 'other_dag' on {DEFAULT_DATE.isoformat()} ... ") in caplog.messages
+
     def test_external_dag_sensor_soft_fail_as_skipped(self):
         other_dag = DAG("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once")
         other_dag.create_dagrun(