You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "argibbs (via GitHub)" <gi...@apache.org> on 2023/03/05 21:04:21 UTC

[GitHub] [airflow] argibbs opened a new pull request, #29933: Explicit skipped states list for ExternalTaskSensor

argibbs opened a new pull request, #29933:
URL: https://github.com/apache/airflow/pull/29933

   # Context
   
   A while back I made a change to the `ExternalTaskSensor` to properly support soft-fail (see #23647). This was so that I could propagate skipped state - by setting `failed_states=[SKIPPED]` as well as `soft_fail=True`, then my external task sensor would go to skipped state when the upstream task skipped. This works beautifully.
   
   Except.
   
   Sometimes my upstream task would fail. I didn't want the fail to propagate instantly, but if it didn't get fixed, retried and succeeded within a given interval, then I wanted the sensor to fail. But with `soft_fail=True` the timeout would result in the sensor going to skipped, not failed. I couldn't have both things.
   
   So I've made the (small) change in this PR to have an explicit `skipped_states` on the ExternalTaskSensor, along with the existing `failed_states` and `allowed_states`. It does exactly what you think - if the target task enters a state that's in the `skipped_states` list then the sensor will go to skipped. By default the list is empty, so behaviour is unchanged from before.
   
   If you are monitoring multiple tasks, then I made behaviour mimic the `failed_states` list. If any one of the tasks goes to a skipped state, then the sensor goes to skipped - unless a different task entered a failed state at the same time, then it will fail.
   
   # Changes
   
   1. New `skipped_states` list on the ExternalTaskSensor
   2. Tweaks to the `__init__` to handle validation across all three lists (allowed, skipped, failed)
   3. Logic to skip if the target reaches a skipped state
   4. Test of the new behaviour
   5. Newsfragment describing the change


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on a diff in pull request #29933: Explicit skipped states list for ExternalTaskSensor

Posted by "argibbs (via GitHub)" <gi...@apache.org>.
argibbs commented on code in PR #29933:
URL: https://github.com/apache/airflow/pull/29933#discussion_r1126159457


##########
airflow/sensors/external_task.py:
##########
@@ -273,6 +284,31 @@ def poke(self, context, session=None):
                     )
                 raise AirflowException(f"The external DAG {self.external_dag_id} failed.")
 
+        count_skipped = -1
+        if self.skipped_states:
+            count_skipped = self.get_count(dttm_filter, session, self.skipped_states)
+
+        # Skip if anything in the list has skipped. Note if we are checking multiple tasks and one skips
+        # before another errors, we'll skip first.

Review Comment:
   This initial change is simply fixing the problem I have where I only have a single monitored task (or dag). As you're noting, Multiple tasks complicates things - I did give this some thought hence the comment in the code...
   
   > Maybe we can improve the sensor by adding something similar to TriggerRule  
   
   Can you expand with an explicit example? Given that this change is entirely additive in terms of behaviour, I'm concerned about scope creep. Or to put it another way, YAGNI. This PR solves a problem I actually have. I don't want to go and complicate the change without an actual use case, which realistically isn't going to come from me. If this makes it into the main branch, and people start using it and then ask for more levers on the behaviour (e.g. something like a min-skip-count threshold), I'd want to make the change then.
   
   > or just skip it when nothing is fail and we have at least one task in skipped state.  
   
   I believe this is what the code does? If anything's failed, we'll raise an AirflowException and fail. If not, we then check for skips, and if anything's skipped we'll raise an AirflowSkipException. (And then obviously we check if everything's in a good state, and go good if so)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on a diff in pull request #29933: Explicit skipped states list for ExternalTaskSensor

Posted by "argibbs (via GitHub)" <gi...@apache.org>.
argibbs commented on code in PR #29933:
URL: https://github.com/apache/airflow/pull/29933#discussion_r1130571727


##########
airflow/sensors/external_task.py:
##########
@@ -130,29 +142,31 @@ def __init__(
     ):
         super().__init__(**kwargs)
         self.allowed_states = list(allowed_states) if allowed_states else [State.SUCCESS]
+        self.skipped_states = list(skipped_states) if skipped_states else []
         self.failed_states = list(failed_states) if failed_states else []
 
-        total_states = set(self.allowed_states + self.failed_states)
+        total_states = set(self.allowed_states + self.skipped_states + self.failed_states)
 
-        if set(self.failed_states).intersection(set(self.allowed_states)):
+        if len(total_states) != len(self.allowed_states + self.skipped_states + self.failed_states):

Review Comment:
   Ah nice one, cross with myself for not thinking of that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on a diff in pull request #29933: Explicit skipped states list for ExternalTaskSensor

Posted by "argibbs (via GitHub)" <gi...@apache.org>.
argibbs commented on code in PR #29933:
URL: https://github.com/apache/airflow/pull/29933#discussion_r1126170057


##########
airflow/sensors/external_task.py:
##########
@@ -130,29 +142,31 @@ def __init__(
     ):
         super().__init__(**kwargs)
         self.allowed_states = list(allowed_states) if allowed_states else [State.SUCCESS]
+        self.skipped_states = list(skipped_states) if skipped_states else []
         self.failed_states = list(failed_states) if failed_states else []
 
-        total_states = set(self.allowed_states + self.failed_states)
+        total_states = set(self.allowed_states + self.skipped_states + self.failed_states)
 
-        if set(self.failed_states).intersection(set(self.allowed_states)):
+        if len(total_states) != len(self.allowed_states + self.skipped_states + self.failed_states):
             raise AirflowException(
-                f"Duplicate values provided as allowed "
-                f"`{self.allowed_states}` and failed states `{self.failed_states}`"
+                f"Duplicate values provided across allowed_states, skipped_states and failed_states."

Review Comment:
   Have fixed a few of these in the file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on a diff in pull request #29933: Explicit skipped states list for ExternalTaskSensor

Posted by "argibbs (via GitHub)" <gi...@apache.org>.
argibbs commented on code in PR #29933:
URL: https://github.com/apache/airflow/pull/29933#discussion_r1130578134


##########
airflow/sensors/external_task.py:
##########
@@ -130,29 +142,31 @@ def __init__(
     ):
         super().__init__(**kwargs)
         self.allowed_states = list(allowed_states) if allowed_states else [State.SUCCESS]
+        self.skipped_states = list(skipped_states) if skipped_states else []
         self.failed_states = list(failed_states) if failed_states else []
 
-        total_states = set(self.allowed_states + self.failed_states)
+        total_states = set(self.allowed_states + self.skipped_states + self.failed_states)
 
-        if set(self.failed_states).intersection(set(self.allowed_states)):
+        if len(total_states) != len(self.allowed_states + self.skipped_states + self.failed_states):

Review Comment:
   (FWIW though, in our install, by far and away the most expensive part of the external task sensor poke method is the db query for the state of the target ... depending on the state of the rest of the system it can be 10s of seconds - we have some big dags and the scheduler poll puts fairly massive load on the db - someone appears to have reported this as #25448. Not that that's any excuse for inefficient code mind you!)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on a diff in pull request #29933: Explicit skipped states list for ExternalTaskSensor

Posted by "argibbs (via GitHub)" <gi...@apache.org>.
argibbs commented on code in PR #29933:
URL: https://github.com/apache/airflow/pull/29933#discussion_r1126172946


##########
tests/sensors/test_external_task_sensor.py:
##########
@@ -307,6 +307,27 @@ def test_external_task_sensor_soft_fail_failed_states_as_skipped(self):
         assert len(task_instances) == 1, "Unexpected number of task instances"
         assert task_instances[0].state == State.SKIPPED, "Unexpected external task state"
 
+    def test_external_task_sensor_skipped_states_as_skipped(self):
+        self.add_time_sensor()
+        op = ExternalTaskSensor(
+            task_id="test_external_task_sensor_check",
+            external_dag_id=TEST_DAG_ID,
+            external_task_id=TEST_TASK_ID,
+            allowed_states=[State.FAILED],
+            skipped_states=[State.SUCCESS],
+            dag=self.dag,
+        )
+
+        # when
+        op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+        # then
+        session = settings.Session()

Review Comment:
   Ah sorry,  I was just copying an existing test... Will fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on a diff in pull request #29933: Explicit skipped states list for ExternalTaskSensor

Posted by "argibbs (via GitHub)" <gi...@apache.org>.
argibbs commented on code in PR #29933:
URL: https://github.com/apache/airflow/pull/29933#discussion_r1126159457


##########
airflow/sensors/external_task.py:
##########
@@ -273,6 +284,31 @@ def poke(self, context, session=None):
                     )
                 raise AirflowException(f"The external DAG {self.external_dag_id} failed.")
 
+        count_skipped = -1
+        if self.skipped_states:
+            count_skipped = self.get_count(dttm_filter, session, self.skipped_states)
+
+        # Skip if anything in the list has skipped. Note if we are checking multiple tasks and one skips
+        # before another errors, we'll skip first.

Review Comment:
   This initial change is simply fixing the problem I have where I only have a single monitored task (or dag). As you're noting, Multiple tasks complicates things - I did give this some thought hence the comment in the code...
   
   > Maybe we can improve the sensor by adding something similar to TriggerRule
   Can you expand with an explicit example? Given that this change is entirely additive in terms of behaviour, I'm concerned about scope creep. Or to put it another way, YAGNI. This PR solves a problem I actually have. I don't want to go and complicate the change without an actual use case, which realistically isn't going to come from me. If this makes it into the main branch, and people start using it and then ask for more levers on the behaviour (e.g. something like a min-skip-count threshold), I'd want to make the change then.
   
   > or just skip it when nothing is fail and we have at least one task in skipped state.
   I believe this is what the code does? If anything's failed, we'll raise an AirflowException and fail. If not, we then check for skips, and if anything's skipped we'll raise an AirflowSkipException. (And then obviously we check if everything's in a good state, and go good if so)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on a diff in pull request #29933: Explicit skipped states list for ExternalTaskSensor

Posted by "argibbs (via GitHub)" <gi...@apache.org>.
argibbs commented on code in PR #29933:
URL: https://github.com/apache/airflow/pull/29933#discussion_r1133636334


##########
airflow/sensors/external_task.py:
##########
@@ -273,6 +284,31 @@ def poke(self, context, session=None):
                     )
                 raise AirflowException(f"The external DAG {self.external_dag_id} failed.")
 
+        count_skipped = -1
+        if self.skipped_states:
+            count_skipped = self.get_count(dttm_filter, session, self.skipped_states)
+
+        # Skip if anything in the list has skipped. Note if we are checking multiple tasks and one skips
+        # before another errors, we'll skip first.

Review Comment:
   Going to assume this is ok ๐Ÿ˜€



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hussein-awala commented on a diff in pull request #29933: Explicit skipped states list for ExternalTaskSensor

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on code in PR #29933:
URL: https://github.com/apache/airflow/pull/29933#discussion_r1125758899


##########
airflow/sensors/external_task.py:
##########
@@ -130,29 +142,31 @@ def __init__(
     ):
         super().__init__(**kwargs)
         self.allowed_states = list(allowed_states) if allowed_states else [State.SUCCESS]
+        self.skipped_states = list(skipped_states) if skipped_states else []
         self.failed_states = list(failed_states) if failed_states else []
 
-        total_states = set(self.allowed_states + self.failed_states)
+        total_states = set(self.allowed_states + self.skipped_states + self.failed_states)
 
-        if set(self.failed_states).intersection(set(self.allowed_states)):
+        if len(total_states) != len(self.allowed_states + self.skipped_states + self.failed_states):
             raise AirflowException(
-                f"Duplicate values provided as allowed "
-                f"`{self.allowed_states}` and failed states `{self.failed_states}`"
+                f"Duplicate values provided across allowed_states, skipped_states and failed_states."

Review Comment:
   ```suggestion
                   "Duplicate values provided across allowed_states, skipped_states and failed_states."
   ```



##########
airflow/sensors/external_task.py:
##########
@@ -273,6 +284,31 @@ def poke(self, context, session=None):
                     )
                 raise AirflowException(f"The external DAG {self.external_dag_id} failed.")
 
+        count_skipped = -1
+        if self.skipped_states:
+            count_skipped = self.get_count(dttm_filter, session, self.skipped_states)
+
+        # Skip if anything in the list has skipped. Note if we are checking multiple tasks and one skips
+        # before another errors, we'll skip first.

Review Comment:
   Currently we fail the sensor if anything in the list has failed, and we return `True` when all is done. But I don't think doing the same thing with skipped tasks is a good idea.
   Maybe we can improve the sensor by adding something similar to [TriggerRule](https://github.com/apache/airflow/blob/main/airflow/utils/trigger_rule.py#L23-L38) or just skip it when nothing is fail and we have at least one task in skipped state.
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29933: Explicit skipped states list for ExternalTaskSensor

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29933:
URL: https://github.com/apache/airflow/pull/29933#discussion_r1125957401


##########
tests/sensors/test_external_task_sensor.py:
##########
@@ -307,6 +307,27 @@ def test_external_task_sensor_soft_fail_failed_states_as_skipped(self):
         assert len(task_instances) == 1, "Unexpected number of task instances"
         assert task_instances[0].state == State.SKIPPED, "Unexpected external task state"
 
+    def test_external_task_sensor_skipped_states_as_skipped(self):
+        self.add_time_sensor()
+        op = ExternalTaskSensor(
+            task_id="test_external_task_sensor_check",
+            external_dag_id=TEST_DAG_ID,
+            external_task_id=TEST_TASK_ID,
+            allowed_states=[State.FAILED],
+            skipped_states=[State.SUCCESS],
+            dag=self.dag,
+        )
+
+        # when
+        op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+        # then
+        session = settings.Session()

Review Comment:
   Use the `session` fixture instead please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on a diff in pull request #29933: Explicit skipped states list for ExternalTaskSensor

Posted by "argibbs (via GitHub)" <gi...@apache.org>.
argibbs commented on code in PR #29933:
URL: https://github.com/apache/airflow/pull/29933#discussion_r1126159457


##########
airflow/sensors/external_task.py:
##########
@@ -273,6 +284,31 @@ def poke(self, context, session=None):
                     )
                 raise AirflowException(f"The external DAG {self.external_dag_id} failed.")
 
+        count_skipped = -1
+        if self.skipped_states:
+            count_skipped = self.get_count(dttm_filter, session, self.skipped_states)
+
+        # Skip if anything in the list has skipped. Note if we are checking multiple tasks and one skips
+        # before another errors, we'll skip first.

Review Comment:
   This initial change is simply fixing the problem I have where I only have a single monitored task (or dag). As you're noting, Multiple tasks complicates things - I did give this some thought hence the comment in the code...
   
   > Maybe we can improve the sensor by adding something similar to TriggerRule  
   
   Can you expand with an explicit example? Given that this change is entirely additive in terms of behaviour, I'm concerned about scope creep. Or to put it another way, YAGNI. This PR solves a problem I actually have. I don't want to go and complicate the change without an actual use case, which realistically isn't going to come from me (yet!). If this makes it into the main branch, and people start using it and then ask for more levers on the behaviour (e.g. something like a min-skip-count threshold), I'd want to make the change then.
   
   > or just skip it when nothing is fail and we have at least one task in skipped state.  
   
   I believe this is what the code does? If anything's failed, we'll raise an AirflowException and fail. If not, we then check for skips, and if anything's skipped we'll raise an AirflowSkipException. (And then obviously we check if everything's in a good state, and go good if so)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on a diff in pull request #29933: Explicit skipped states list for ExternalTaskSensor

Posted by "argibbs (via GitHub)" <gi...@apache.org>.
argibbs commented on code in PR #29933:
URL: https://github.com/apache/airflow/pull/29933#discussion_r1128449758


##########
tests/sensors/test_external_task_sensor.py:
##########
@@ -307,6 +307,27 @@ def test_external_task_sensor_soft_fail_failed_states_as_skipped(self):
         assert len(task_instances) == 1, "Unexpected number of task instances"
         assert task_instances[0].state == State.SKIPPED, "Unexpected external task state"
 
+    def test_external_task_sensor_skipped_states_as_skipped(self):
+        self.add_time_sensor()
+        op = ExternalTaskSensor(
+            task_id="test_external_task_sensor_check",
+            external_dag_id=TEST_DAG_ID,
+            external_task_id=TEST_TASK_ID,
+            allowed_states=[State.FAILED],
+            skipped_states=[State.SUCCESS],
+            dag=self.dag,
+        )
+
+        # when
+        op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+        # then
+        session = settings.Session()

Review Comment:
   (this is fixed)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29933: Explicit skipped states list for ExternalTaskSensor

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29933:
URL: https://github.com/apache/airflow/pull/29933#discussion_r1128988677


##########
airflow/sensors/external_task.py:
##########
@@ -130,29 +142,31 @@ def __init__(
     ):
         super().__init__(**kwargs)
         self.allowed_states = list(allowed_states) if allowed_states else [State.SUCCESS]
+        self.skipped_states = list(skipped_states) if skipped_states else []
         self.failed_states = list(failed_states) if failed_states else []
 
-        total_states = set(self.allowed_states + self.failed_states)
+        total_states = set(self.allowed_states + self.skipped_states + self.failed_states)
 
-        if set(self.failed_states).intersection(set(self.allowed_states)):
+        if len(total_states) != len(self.allowed_states + self.skipped_states + self.failed_states):

Review Comment:
   ```suggestion
           if len(total_states) != len(self.allowed_states) + len(self.skipped_states) + len(self.failed_states):
   ```
   
   Adding the lists is somewhat expensive and itโ€™s better to avoid it if possible.
   
   (The `total_states` line above can also be improved with `itertools.chain`.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] argibbs commented on a diff in pull request #29933: Explicit skipped states list for ExternalTaskSensor

Posted by "argibbs (via GitHub)" <gi...@apache.org>.
argibbs commented on code in PR #29933:
URL: https://github.com/apache/airflow/pull/29933#discussion_r1130571124


##########
airflow/sensors/external_task.py:
##########
@@ -273,6 +284,31 @@ def poke(self, context, session=None):
                     )
                 raise AirflowException(f"The external DAG {self.external_dag_id} failed.")
 
+        count_skipped = -1
+        if self.skipped_states:
+            count_skipped = self.get_count(dttm_filter, session, self.skipped_states)
+
+        # Skip if anything in the list has skipped. Note if we are checking multiple tasks and one skips
+        # before another errors, we'll skip first.

Review Comment:
   So just to follow up on this, I'm not opposed to making the behaviour more nuanced - in fact if I see an issue in the future on this area I might  pick it up - but if you're ok with it as is, I'd prefer to keep the change fairly minimal to start with?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #29933: Explicit skipped states list for ExternalTaskSensor

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk merged PR #29933:
URL: https://github.com/apache/airflow/pull/29933


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org