You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/19 16:42:44 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request, #28469: Support using a list of callbacks in `on_*_callback/sla_miss_callback`s

ephraimbuddy opened a new pull request, #28469:
URL: https://github.com/apache/airflow/pull/28469

   Previously, it was only possible to specify a single callback function when defining a DAG/task callbacks.
   This change allows users to specify a list of callback functions, which will be invoked in the order they are provided.
   
   This will not affect DAG/task that use a single callback function.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on pull request #28469: Support using a list of callbacks in `on_*_callback/sla_miss_callback`s

Posted by GitBox <gi...@apache.org>.
uranusjr commented on PR #28469:
URL: https://github.com/apache/airflow/pull/28469#issuecomment-1359207037

   Looks good to me in general, but there are a few log messages we should improve/unify.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #28469: Support using a list of callbacks in `on_*_callback/sla_miss_callback`s

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #28469:
URL: https://github.com/apache/airflow/pull/28469#discussion_r1053203962


##########
airflow/dag_processing/processor.py:
##########
@@ -473,13 +473,21 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
             notification_sent = False
             if dag.sla_miss_callback:
                 # Execute the alert callback
-                self.log.info("Calling SLA miss callback")
-                try:
-                    dag.sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis)
-                    notification_sent = True
-                except Exception:
-                    Stats.incr("sla_callback_notification_failure")
-                    self.log.exception("Could not call sla_miss_callback for DAG %s", dag.dag_id)
+                callbacks = (
+                    dag.sla_miss_callback
+                    if isinstance(dag.sla_miss_callback, list)
+                    else [dag.sla_miss_callback]
+                )
+                for callback in callbacks:
+                    self.log.info("Calling SLA miss callback %s", callback)
+                    try:
+                        callback(dag, task_list, blocking_task_list, slas, blocking_tis)
+                        notification_sent = True
+                    except Exception:
+                        Stats.incr("sla_callback_notification_failure")
+                        self.log.exception(
+                            "Could not call sla_miss_callback(%s) for DAG %s", callback, dag.dag_id

Review Comment:
   ```suggestion
                               "Could not call sla_miss_callback %s for DAG %s",
                               qualname(callback),
                               dag.dag_id,
   ```
   
   Probably more readable? `qualname` is in `airflow.utils.module_loading`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #28469: Support using a list of callbacks in `on_*_callback/sla_miss_callback`s

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #28469:
URL: https://github.com/apache/airflow/pull/28469#discussion_r1053205669


##########
airflow/models/taskinstance.py:
##########
@@ -1528,14 +1528,19 @@ def signal_handler(signum, frame):
         Stats.incr("ti_successes")
 
     def _run_finished_callback(
-        self, callback: TaskStateChangeCallback | None, context: Context, callback_type: str
+        self,
+        callbacks: None | TaskStateChangeCallback | list[TaskStateChangeCallback],
+        context: Context,
+        callback_type: str,
     ) -> None:
         """Run callback after task finishes"""
-        try:
-            if callback:
-                callback(context)
-        except Exception:  # pylint: disable=broad-except
-            self.log.exception(f"Error when executing {callback_type} callback")
+        if callbacks:
+            callbacks = callbacks if isinstance(callbacks, list) else [callbacks]
+            for callback in callbacks:
+                try:
+                    callback(context)
+                except Exception:  # pylint: disable=broad-except
+                    self.log.exception(f"Error when executing {callback_type} callback")

Review Comment:
   Add callback name here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #28469: Support using a list of callbacks in `on_*_callback/sla_miss_callback`s

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #28469:
URL: https://github.com/apache/airflow/pull/28469#discussion_r1056253990


##########
airflow/dag_processing/processor.py:
##########
@@ -473,13 +473,21 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
             notification_sent = False
             if dag.sla_miss_callback:
                 # Execute the alert callback
-                self.log.info("Calling SLA miss callback")
-                try:
-                    dag.sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis)
-                    notification_sent = True
-                except Exception:
-                    Stats.incr("sla_callback_notification_failure")
-                    self.log.exception("Could not call sla_miss_callback for DAG %s", dag.dag_id)
+                callbacks = (
+                    dag.sla_miss_callback
+                    if isinstance(dag.sla_miss_callback, list)
+                    else [dag.sla_miss_callback]
+                )
+                for callback in callbacks:
+                    self.log.info("Calling SLA miss callback %s", callback)
+                    try:
+                        callback(dag, task_list, blocking_task_list, slas, blocking_tis)
+                        notification_sent = True
+                    except Exception:
+                        Stats.incr("sla_callback_notification_failure")
+                        self.log.exception(
+                            "Could not call sla_miss_callback(%s) for DAG %s", callback, dag.dag_id

Review Comment:
   It’s OK to leave it as is if you prefer, this is a log message and there’s no one right answerr



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #28469: Support using a list of callbacks in `on_*_callback/sla_miss_callback`s

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #28469:
URL: https://github.com/apache/airflow/pull/28469#discussion_r1056077448


##########
airflow/dag_processing/processor.py:
##########
@@ -473,13 +473,21 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
             notification_sent = False
             if dag.sla_miss_callback:
                 # Execute the alert callback
-                self.log.info("Calling SLA miss callback")
-                try:
-                    dag.sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis)
-                    notification_sent = True
-                except Exception:
-                    Stats.incr("sla_callback_notification_failure")
-                    self.log.exception("Could not call sla_miss_callback for DAG %s", dag.dag_id)
+                callbacks = (
+                    dag.sla_miss_callback
+                    if isinstance(dag.sla_miss_callback, list)
+                    else [dag.sla_miss_callback]
+                )
+                for callback in callbacks:
+                    self.log.info("Calling SLA miss callback %s", callback)
+                    try:
+                        callback(dag, task_list, blocking_task_list, slas, blocking_tis)
+                        notification_sent = True
+                    except Exception:
+                        Stats.incr("sla_callback_notification_failure")
+                        self.log.exception(
+                            "Could not call sla_miss_callback(%s) for DAG %s", callback, dag.dag_id

Review Comment:
   WDYT @uranusjr , should I use `inspect` or leave it as it was before?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #28469: Support using a list of callbacks in `on_*_callback/sla_miss_callback`s

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #28469:
URL: https://github.com/apache/airflow/pull/28469#discussion_r1055673182


##########
airflow/models/taskinstance.py:
##########
@@ -1528,14 +1528,19 @@ def signal_handler(signum, frame):
         Stats.incr("ti_successes")
 
     def _run_finished_callback(
-        self, callback: TaskStateChangeCallback | None, context: Context, callback_type: str
+        self,
+        callbacks: None | TaskStateChangeCallback | list[TaskStateChangeCallback],
+        context: Context,
+        callback_type: str,
     ) -> None:
         """Run callback after task finishes"""
-        try:
-            if callback:
-                callback(context)
-        except Exception:  # pylint: disable=broad-except
-            self.log.exception(f"Error when executing {callback_type} callback")
+        if callbacks:
+            callbacks = callbacks if isinstance(callbacks, list) else [callbacks]
+            for callback in callbacks:
+                try:
+                    callback(context)
+                except Exception:  # pylint: disable=broad-except
+                    self.log.exception(f"Error when executing {callback_type} callback")

Review Comment:
   `qualname` didn't work here, had to use the old `func_name`. I'm not sure why



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #28469: Support using a list of callbacks in `on_*_callback/sla_miss_callback`s

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #28469:
URL: https://github.com/apache/airflow/pull/28469#discussion_r1054265270


##########
airflow/dag_processing/processor.py:
##########
@@ -473,13 +473,21 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
             notification_sent = False
             if dag.sla_miss_callback:
                 # Execute the alert callback
-                self.log.info("Calling SLA miss callback")
-                try:
-                    dag.sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis)
-                    notification_sent = True
-                except Exception:
-                    Stats.incr("sla_callback_notification_failure")
-                    self.log.exception("Could not call sla_miss_callback for DAG %s", dag.dag_id)
+                callbacks = (
+                    dag.sla_miss_callback
+                    if isinstance(dag.sla_miss_callback, list)
+                    else [dag.sla_miss_callback]
+                )
+                for callback in callbacks:
+                    self.log.info("Calling SLA miss callback %s", callback)
+                    try:
+                        callback(dag, task_list, blocking_task_list, slas, blocking_tis)
+                        notification_sent = True
+                    except Exception:
+                        Stats.incr("sla_callback_notification_failure")
+                        self.log.exception(
+                            "Could not call sla_miss_callback(%s) for DAG %s", callback, dag.dag_id

Review Comment:
   `qualname` looks like it's doing a lot, gives the importable name of the function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy merged pull request #28469: Support using a list of callbacks in `on_*_callback/sla_miss_callback`s

Posted by GitBox <gi...@apache.org>.
ephraimbuddy merged PR #28469:
URL: https://github.com/apache/airflow/pull/28469


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #28469: Support using a list of callbacks in `on_*_callback/sla_miss_callback`s

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #28469:
URL: https://github.com/apache/airflow/pull/28469#discussion_r1055671758


##########
airflow/dag_processing/processor.py:
##########
@@ -473,13 +473,21 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
             notification_sent = False
             if dag.sla_miss_callback:
                 # Execute the alert callback
-                self.log.info("Calling SLA miss callback")
-                try:
-                    dag.sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis)
-                    notification_sent = True
-                except Exception:
-                    Stats.incr("sla_callback_notification_failure")
-                    self.log.exception("Could not call sla_miss_callback for DAG %s", dag.dag_id)
+                callbacks = (
+                    dag.sla_miss_callback
+                    if isinstance(dag.sla_miss_callback, list)
+                    else [dag.sla_miss_callback]
+                )
+                for callback in callbacks:
+                    self.log.info("Calling SLA miss callback %s", callback)
+                    try:
+                        callback(dag, task_list, blocking_task_list, slas, blocking_tis)
+                        notification_sent = True
+                    except Exception:
+                        Stats.incr("sla_callback_notification_failure")
+                        self.log.exception(
+                            "Could not call sla_miss_callback(%s) for DAG %s", callback, dag.dag_id

Review Comment:
   I think I prefer the previous implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #28469: Support using a list of callbacks in `on_*_callback/sla_miss_callback`s

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #28469:
URL: https://github.com/apache/airflow/pull/28469#discussion_r1056262687


##########
airflow/dag_processing/processor.py:
##########
@@ -473,13 +473,21 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
             notification_sent = False
             if dag.sla_miss_callback:
                 # Execute the alert callback
-                self.log.info("Calling SLA miss callback")
-                try:
-                    dag.sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis)
-                    notification_sent = True
-                except Exception:
-                    Stats.incr("sla_callback_notification_failure")
-                    self.log.exception("Could not call sla_miss_callback for DAG %s", dag.dag_id)
+                callbacks = (
+                    dag.sla_miss_callback
+                    if isinstance(dag.sla_miss_callback, list)
+                    else [dag.sla_miss_callback]
+                )
+                for callback in callbacks:
+                    self.log.info("Calling SLA miss callback %s", callback)
+                    try:
+                        callback(dag, task_list, blocking_task_list, slas, blocking_tis)
+                        notification_sent = True
+                    except Exception:
+                        Stats.incr("sla_callback_notification_failure")
+                        self.log.exception(
+                            "Could not call sla_miss_callback(%s) for DAG %s", callback, dag.dag_id

Review Comment:
   Cool



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org