You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "yermalov-here (via GitHub)" <gi...@apache.org> on 2023/09/19 14:16:27 UTC

[GitHub] [airflow] yermalov-here commented on a diff in pull request #34464: Consolidate ExternalTaskSensor deferrable and non deferrable logic

yermalov-here commented on code in PR #34464:
URL: https://github.com/apache/airflow/pull/34464#discussion_r1330208166


##########
airflow/sensors/external_task.py:
##########
@@ -233,100 +232,18 @@ def _get_dttm_filter(self, context):
 
     @provide_session
     def poke(self, context: Context, session: Session = NEW_SESSION) -> bool:
-        # delay check to poke rather than __init__ in case it was supplied as XComArgs
-        if self.external_task_ids and len(self.external_task_ids) > len(set(self.external_task_ids)):
-            raise ValueError("Duplicate task_ids passed in external_task_ids parameter")
-
-        dttm_filter = self._get_dttm_filter(context)
-        serialized_dttm_filter = ",".join(dt.isoformat() for dt in dttm_filter)
-
-        if self.external_task_ids:
-            self.log.info(
-                "Poking for tasks %s in dag %s on %s ... ",
-                self.external_task_ids,
-                self.external_dag_id,
-                serialized_dttm_filter,
-            )
-
-        if self.external_task_group_id:
-            self.log.info(
-                "Poking for task_group '%s' in dag '%s' on %s ... ",
-                self.external_task_group_id,
-                self.external_dag_id,
-                serialized_dttm_filter,
-            )
-
-        if self.external_dag_id and not self.external_task_group_id and not self.external_task_ids:
-            self.log.info(
-                "Poking for DAG '%s' on %s ... ",
-                self.external_dag_id,
-                serialized_dttm_filter,
-            )
-
-        # In poke mode this will check dag existence only once
-        if self.check_existence and not self._has_checked_existence:
-            self._check_for_existence(session=session)
-
-        count_failed = -1
-        if self.failed_states:
-            count_failed = self.get_count(dttm_filter, session, self.failed_states)
-
-        # Fail if anything in the list has failed.
-        if count_failed > 0:
-            if self.external_task_ids:
-                if self.soft_fail:
-                    raise AirflowSkipException(
-                        f"Some of the external tasks {self.external_task_ids} "
-                        f"in DAG {self.external_dag_id} failed. Skipping due to soft_fail."
-                    )
-                raise AirflowException(
-                    f"Some of the external tasks {self.external_task_ids} "
-                    f"in DAG {self.external_dag_id} failed."
-                )
-            elif self.external_task_group_id:
-                if self.soft_fail:
-                    raise AirflowSkipException(
-                        f"The external task_group '{self.external_task_group_id}' "
-                        f"in DAG '{self.external_dag_id}' failed. Skipping due to soft_fail."
-                    )
-                raise AirflowException(
-                    f"The external task_group '{self.external_task_group_id}' "
-                    f"in DAG '{self.external_dag_id}' failed."
-                )
-
-            else:
-                if self.soft_fail:
-                    raise AirflowSkipException(
-                        f"The external DAG {self.external_dag_id} failed. Skipping due to soft_fail."
-                    )
-                raise AirflowException(f"The external DAG {self.external_dag_id} failed.")
-
-        count_skipped = -1
-        if self.skipped_states:
-            count_skipped = self.get_count(dttm_filter, session, self.skipped_states)
-
-        # Skip if anything in the list has skipped. Note if we are checking multiple tasks and one skips
-        # before another errors, we'll skip first.
-        if count_skipped > 0:
-            if self.external_task_ids:
-                raise AirflowSkipException(
-                    f"Some of the external tasks {self.external_task_ids} "
-                    f"in DAG {self.external_dag_id} reached a state in our states-to-skip-on list. Skipping."
-                )
-            elif self.external_task_group_id:
-                raise AirflowSkipException(
-                    f"The external task_group '{self.external_task_group_id}' "
-                    f"in DAG {self.external_dag_id} reached a state in our states-to-skip-on list. Skipping."
-                )
-            else:
-                raise AirflowSkipException(
-                    f"The external DAG {self.external_dag_id} reached a state in our states-to-skip-on list. "
-                    "Skipping."
-                )
-
-        # only go green if every single task has reached an allowed state
-        count_allowed = self.get_count(dttm_filter, session, self.allowed_states)
-        return count_allowed == len(dttm_filter)
+        return TaskStateTrigger.check_external_dag(

Review Comment:
   Does it make sense to create a hook for external tasks, move the shared code there and call the hook from operator and trigger?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org