You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "hussein-awala (via GitHub)" <gi...@apache.org> on 2023/03/06 00:14:21 UTC

[GitHub] [airflow] hussein-awala commented on a diff in pull request #29933: Explicit skipped states list for ExternalTaskSensor

hussein-awala commented on code in PR #29933:
URL: https://github.com/apache/airflow/pull/29933#discussion_r1125758899


##########
airflow/sensors/external_task.py:
##########
@@ -130,29 +142,31 @@ def __init__(
     ):
         super().__init__(**kwargs)
         self.allowed_states = list(allowed_states) if allowed_states else [State.SUCCESS]
+        self.skipped_states = list(skipped_states) if skipped_states else []
         self.failed_states = list(failed_states) if failed_states else []
 
-        total_states = set(self.allowed_states + self.failed_states)
+        total_states = set(self.allowed_states + self.skipped_states + self.failed_states)
 
-        if set(self.failed_states).intersection(set(self.allowed_states)):
+        if len(total_states) != len(self.allowed_states + self.skipped_states + self.failed_states):
             raise AirflowException(
-                f"Duplicate values provided as allowed "
-                f"`{self.allowed_states}` and failed states `{self.failed_states}`"
+                f"Duplicate values provided across allowed_states, skipped_states and failed_states."

Review Comment:
   ```suggestion
                   "Duplicate values provided across allowed_states, skipped_states and failed_states."
   ```



##########
airflow/sensors/external_task.py:
##########
@@ -273,6 +284,31 @@ def poke(self, context, session=None):
                     )
                 raise AirflowException(f"The external DAG {self.external_dag_id} failed.")
 
+        count_skipped = -1
+        if self.skipped_states:
+            count_skipped = self.get_count(dttm_filter, session, self.skipped_states)
+
+        # Skip if anything in the list has skipped. Note if we are checking multiple tasks and one skips
+        # before another errors, we'll skip first.

Review Comment:
   Currently we fail the sensor if anything in the list has failed, and we return `True` when all is done. But I don't think doing the same thing with skipped tasks is a good idea.
   Maybe we can improve the sensor by adding something similar to [TriggerRule](https://github.com/apache/airflow/blob/main/airflow/utils/trigger_rule.py#L23-L38) or just skip it when nothing is fail and we have at least one task in skipped state.
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org