You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/12/21 16:22:22 UTC

[GitHub] [airflow] mobuchowski opened a new pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

mobuchowski opened a new pull request #20443:
URL: https://github.com/apache/airflow/pull/20443


   This PR adds new Plugin API - listeners. It enables plugin authors to write [`pluggy` hook implementation](https://pluggy.readthedocs.io/en/stable/) that will be called on certain formalized extension points. To differentiate between current Airflow extension points, like plugins, and current Airflow hooks, implementations of those hooks are called `listeners`.
   
   The API is ment to be called across all dags, and all operators - in contrast to current `on_success_callback`, `pre_execute` and related family which are meant to provide callbacks for particular dag authors, or operator creators. 
   
   `pluggy` mechanism enables us to execute multiple, or none, listeners that implement particular extension point, so that users can use multiple listeners seamlessly.
   
   In this PR, three such extension points are added. 
   When TaskInstance's state is changed to `RUNNING`, `on_task_instance_running` hook is called. On change to`SUCCESS` `on_task_instance_success` is called, similarly on `FAILED` `on_task_instance_failed` is called.
   
   Actual notification mechanism is be implemented using [SQLAlchemy’s events mechanism](https://docs.sqlalchemy.org/en/13/orm/session_events.html#after-flush). This ensures that plugins will get every change of state, regardless of where in the codebase it happened.
   
   To make sure that this change is not affecting performance, running this mechanism on scheduler is disabled by default. The SQLAlchemy event mechanism is also not affected by default - the event listener is only added if we have any plugin which actually registers listener. 
   
   
   Design doc:
   https://docs.google.com/document/d/1L3xfdlWVUrdnFXng1Di4nMQYQtzMfhvvWDR9K4wXnDU/edit?usp=sharing
   
   Background to this change: 
   When discussing changing LineageBackend api for OpenLineage plugin, we came to consensus that the changes proposed there would be better suited to the new API https://github.com/apache/airflow/issues/17984 
   
   related: https://github.com/apache/airflow/issues/17984


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r783036547



##########
File path: tests/listeners/test_listeners.py
##########
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest as pytest
+
+from airflow import AirflowException
+from airflow.listeners import hookimpl
+from airflow.listeners.events import register_task_instance_state_events
+from airflow.listeners.listener import get_listener_manager
+from airflow.operators.bash import BashOperator
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+from tests.listeners import test_full_listener, test_partial_listener, test_throwing_listener
+
+DAG_ID = "test_listener_dag"
+TASK_ID = "test_listener_task"
+EXECUTION_DATE = timezone.utcnow()
+
+
+@pytest.fixture(scope="module", autouse=True)
+def register_events():
+    register_task_instance_state_events()
+
+
+@pytest.fixture(autouse=True)
+def clean_listener_manager():
+    lm = get_listener_manager()
+    lm.clear()
+    yield
+    lm = get_listener_manager()
+    lm.clear()
+    test_full_listener.state = []
+
+
+@provide_session
+def test_listener_gets_calls(create_task_instance, session=None):
+    lm = get_listener_manager()
+    lm.add_listener(test_full_listener)
+
+    ti = create_task_instance(session=session, state=State.QUEUED)
+    ti.run()

Review comment:
       Added comment:
   
   ```
       # Using ti.run() instead of ti._run_raw_task() to capture state change to RUNNING
       # that only happens on `check_and_change_state_before_execution()` that is called before
       # `run()` calls `_run_raw_task()`
   ```

##########
File path: airflow/plugins_manager.py
##########
@@ -458,6 +463,20 @@ def integrate_macros_plugins() -> None:
             setattr(macros, plugin.name, macros_module)
 
 
+def integrate_listener_plugins(listener_manager: "ListenerManager") -> None:
+    global plugins
+    global macros_modules

Review comment:
       Removed.

##########
File path: docs/apache-airflow/listeners.rst
##########
@@ -0,0 +1,39 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Listeners
+=========
+
+Airflow gives you an option to be notified of events happening in Airflow
+by writing listeners. Listeners are powered by `pluggy <https://pluggy.readthedocs.io/en/stable/>`__
+
+Listener API is meant to be called across all dags, and all operators - in contrast to methods like
+``on_success_callback``, ``pre_execute`` and related family which are meant to provide callbacks
+for particular dag authors, or operator creators. There is no possibility to listen on events generated
+by particular dag.
+
+To include listener in your Airflow installation, include it as a part of an :doc:`Airflow Plugin </plugins>`

Review comment:
       Done.

##########
File path: tests/listeners/test_empty_listener.py
##########
@@ -0,0 +1,24 @@
+#

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r773821632



##########
File path: airflow/utils/orm_event_handlers.py
##########
@@ -86,3 +86,7 @@ def after_cursor_execute(conn, cursor, statement, parameters, context, executema
                 stack_info,
                 statement.replace("\n", " "),
             )
+
+    if conf.getboolean('core', 'execute_listeners_on_scheduler', fallback=False):
+        # On import, register sqlalchemy event handlers which call airflow.listeners
+        import airflow.listeners.events  # noqa

Review comment:
       Fixed. Now it explicitely calls `register_task_instance_state_events`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#issuecomment-999053527


   I think this change is small enough and isolated enough not need an AIP


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r778142221



##########
File path: docs/apache-airflow/plugins.rst
##########
@@ -268,6 +285,7 @@ definitions in Airflow.
         operator_extra_links = [
             S3LogLink(),
         ]
+        listeners = [PrintingRunningListener]

Review comment:
       Generally, in pluggy, we need to register `plugin` that's a namespace that consists of possibly multiple hookimpls, not single functions. 
   By passing just a list of functions, we'd later need to construct "synthetic" namespace for it to register.
   
   I think requiring a class, or, possibly a python module, is a good idea - this is what pluggy docs present:
   
   - class https://pluggy.readthedocs.io/en/stable/index.html#a-toy-example
   - module https://pluggy.readthedocs.io/en/stable/index.html#the-plugin
   
   Now, it does not need to look like 
   ```python
           listeners = [PrintingRunningListener]
   ```
   
   We could require plugin authors to directly implement hookimpls in plugins, for example
   
   ```python
   class SomeListenerPlugin(AirflowPlugin):
       name = 'some-plugin'
       @hookimpl
       def on_task_instance_running(...):
           print("running")
   ``` 
   
   or, we could require it to add single pluggy plugin - as class or module - if for some reason we don't want them to implement multiple listener functions in single plugin.
   
   I agree that we should discourage trying to keep state in plugin - my tests do not help, as I do that there :)
   
   Note, this is separate discussion from https://github.com/apache/airflow/pull/20443#discussion_r778047132




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r778145720



##########
File path: docs/apache-airflow/plugins.rst
##########
@@ -268,6 +285,7 @@ definitions in Airflow.
         operator_extra_links = [
             S3LogLink(),
         ]
+        listeners = [PrintingRunningListener]

Review comment:
       Oh yes, you are right. Thinking... 🤔 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r774679120



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -291,3 +293,19 @@ def _update_dagrun_state_for_paused_dag(self, session=None):
             if dag_run:
                 dag_run.dag = dag
                 dag_run.update_state(session=session, execute_callbacks=True)
+
+    @staticmethod
+    def _enable_task_listeners():
+        """
+        Check if we have any registered listeners, then register sqlalchemy hooks for
+        TI state change if we do.
+        """
+        from airflow.plugins_manager import integrate_listener_plugins
+
+        integrate_listener_plugins()
+        from airflow.listeners.listener import get_listener_manager
+
+        if get_listener_manager().has_listeners():
+            from airflow.listeners.events import register_task_instance_state_events

Review comment:
       I'm only importing `register_task_instance_state_events` now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r778751047



##########
File path: airflow/listeners/listener.py
##########
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import inspect
+import logging
+from typing import TYPE_CHECKING, Set
+
+import pluggy
+
+if TYPE_CHECKING:
+    from pluggy._hooks import _HookRelay
+
+from airflow.listeners import spec

Review comment:
       Done, importing this on `__init__` now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r778038070



##########
File path: airflow/__init__.py
##########
@@ -80,6 +80,11 @@ def __getattr__(name):
     manager.initialize_providers_hooks()
     manager.initialize_providers_extra_links()
 
+if settings.EXECUTE_LISTENERS_ON_SCHEDULER:
+    from airflow.plugins_manager import integrate_listener_plugins
+
+    integrate_listener_plugins()

Review comment:
       Given the name of this setting I wonder if this code should be somewhere in SchedulerJob instead?

##########
File path: airflow/jobs/local_task_job.py
##########
@@ -71,6 +74,8 @@ def __init__(
         # terminate multiple times
         self.terminating = False
 
+        self._enable_task_listeners()

Review comment:
       I think should should be moved in to `_execute` instead of the constructor -- it (currently) doesn't make much difference, to anything other than tests, but currently the constructor otherwise doesn't do anything than create an object, so lets keep it that way.

##########
File path: docs/apache-airflow/plugins.rst
##########
@@ -268,6 +285,7 @@ definitions in Airflow.
         operator_extra_links = [
             S3LogLink(),
         ]
+        listeners = [PrintingRunningListener]

Review comment:
       I'm not sure about the need for the class here. Is there a reason we don't follow the "pluggy" way here and do:
   
   ```suggestion
           listeners = [on_task_instance_running]
   ```

##########
File path: docs/apache-airflow/plugins.rst
##########
@@ -167,9 +172,14 @@ definitions in Airflow.
 
     # Importing base classes that we need to derive
     from airflow.hooks.base import BaseHook
+    from airflow.listeners.listener import Listener
     from airflow.models.baseoperator import BaseOperatorLink
     from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator
 
+    from pluggy import HookimplMarker
+
+    hookimpl = HookimplMarker("airflow")

Review comment:
       Looking at the pluggy docs it appears the normal pattern here is to import the hookimpl from a module rather than to create it here: https://pluggy.readthedocs.io/en/latest/#the-plugin

##########
File path: docs/apache-airflow/plugins.rst
##########
@@ -268,6 +285,7 @@ definitions in Airflow.
         operator_extra_links = [
             S3LogLink(),
         ]
+        listeners = [PrintingRunningListener]

Review comment:
       The disadvantage of using a class is it might imply that you can store state in the instance between plugin invocations (which isn't the case, not in general terms anyway) where as a function might make that clearer.

##########
File path: airflow/jobs/local_task_job.py
##########
@@ -291,3 +296,13 @@ def _update_dagrun_state_for_paused_dag(self, session=None):
             if dag_run:
                 dag_run.dag = dag
                 dag_run.update_state(session=session, execute_callbacks=True)
+
+    @staticmethod
+    def _enable_task_listeners():
+        """
+        Check if we have any registered listeners, then register sqlalchemy hooks for
+        TI state change if we do.
+        """
+        integrate_listener_plugins()
+        if get_listener_manager().has_listeners():

Review comment:
       `integrate_listener_plugins` should be called automatically from inside `get_listener_manager` I think.

##########
File path: airflow/listeners/listener.py
##########
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import inspect
+import logging
+from typing import TYPE_CHECKING, Set
+
+import pluggy
+
+if TYPE_CHECKING:
+    from pluggy._hooks import _HookRelay
+
+from airflow.listeners import spec
+
+log = logging.getLogger(__name__)
+
+
+class Listener:
+    """Class used as a namespace for listener hook implementation namespace"""
+
+
+_listener_manager = None
+
+
+class ListenerManager:
+    """Class that manages registration of listeners and provides hook property for calling them"""
+
+    def __init__(self):
+        self.pm = pluggy.PluginManager("airflow")
+        self.pm.add_hookspecs(spec)
+        self.listener_names: Set[str] = set()
+
+    def has_listeners(self) -> bool:

Review comment:
       Maybe make this a property?
   
   ```suggestion
       @property
       def has_listeners(self) -> bool:
   ```

##########
File path: airflow/listeners/events.py
##########
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+
+from sqlalchemy import event
+from sqlalchemy.orm import Session
+
+from airflow.listeners.listener import get_listener_manager
+from airflow.models import TaskInstance
+from airflow.utils.state import State
+
+_is_listening = False
+
+
+def register_task_instance_state_events():
+    logger = logging.getLogger()
+    global _is_listening
+
+    def on_task_instance_state_session_flush(session, flush_context):
+        """
+        Listens for session.flush() events that modify TaskInstance's state, and notify listeners that listen
+        for that event. Doing it this way enable us to be stateless in the SQLAlchemy event listener.
+        """
+        if not get_listener_manager().has_listeners():
+            return
+        for state in flush_context.states:
+            if isinstance(state.object, TaskInstance) and session.is_modified(
+                state.object, include_collections=False
+            ):
+                added, unchanged, deleted = flush_context.get_attribute_history(state, 'state')
+
+                logger.warning(
+                    "session flush listener: added %s unchanged %s deleted %s - %s",
+                    added,
+                    unchanged,
+                    deleted,
+                    state.object,
+                )

Review comment:
       Move this to debug level now?

##########
File path: airflow/listeners/listener.py
##########
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import inspect
+import logging
+from typing import TYPE_CHECKING, Set
+
+import pluggy
+
+if TYPE_CHECKING:
+    from pluggy._hooks import _HookRelay
+
+from airflow.listeners import spec
+
+log = logging.getLogger(__name__)
+
+
+class Listener:
+    """Class used as a namespace for listener hook implementation namespace"""
+
+
+_listener_manager = None
+
+
+class ListenerManager:
+    """Class that manages registration of listeners and provides hook property for calling them"""
+
+    def __init__(self):
+        self.pm = pluggy.PluginManager("airflow")
+        self.pm.add_hookspecs(spec)
+        self.listener_names: Set[str] = set()
+
+    def has_listeners(self) -> bool:
+        return len(self.pm.get_plugins()) > 0
+
+    @property
+    def hook(self) -> "_HookRelay":
+        """Returns hook, on which plugin methods specified in spec can be called."""
+        return self.pm.hook
+
+    def add_listener(self, listener: Listener):
+        if listener.__class__.__name__ in self.listener_names:
+            return
+        if self.pm.is_registered(listener):
+            return
+
+        listener_type = type(listener)
+        if not (
+            inspect.isclass(listener_type)
+            and issubclass(listener_type, Listener)
+            and (listener_type is not Listener)
+        ):
+            log.warning("Can't register listener: %s - is not a Listener subclass", listener_type)
+            return

Review comment:
       We shouldn't need to do any of this -- one of the main features of Pluggy is that it handles signature validation for us!
   
   https://pluggy.readthedocs.io/en/latest/#enforcing

##########
File path: airflow/listeners/listener.py
##########
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import inspect
+import logging
+from typing import TYPE_CHECKING, Set
+
+import pluggy
+
+if TYPE_CHECKING:
+    from pluggy._hooks import _HookRelay
+
+from airflow.listeners import spec

Review comment:
       Delay this import until ListenerManager init?

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -3434,6 +3435,12 @@ def test_timeout_triggers(self, dag_maker):
         assert ti1.next_method == "__fail__"
         assert ti2.state == State.DEFERRED
 
+    @pytest.fixture(autouse=True, scope='function')
+    def clean_listener(self):
+        get_listener_manager().clear()
+        yield
+        get_listener_manager().clear()

Review comment:
       Why is this needed in SchedulerJob tests?

##########
File path: airflow/listeners/spec.py
##########
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import TYPE_CHECKING, Optional
+
+from pluggy import HookspecMarker
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm.session import Session
+
+    from airflow.models import TaskInstance
+    from airflow.utils.state import State
+
+hookspec = HookspecMarker("airflow")
+
+
+@hookspec
+def on_task_instance_running(
+    previous_state: "State", task_instance: "TaskInstance", session: Optional["Session"]

Review comment:
       ```suggestion
       previous_state: "TaskInstanceState", task_instance: "TaskInstance", session: Optional["Session"]
   ```
   
   in a few places.

##########
File path: tests/listeners/test_listeners.py
##########
@@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pluggy
+import pytest as pytest
+
+from airflow import AirflowException
+from airflow.listeners.events import register_task_instance_state_events
+from airflow.listeners.listener import Listener, get_listener_manager
+from airflow.operators.bash import BashOperator
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+
+DAG_ID = "test_listener_dag"
+TASK_ID = "test_listener_task"
+EXECUTION_DATE = timezone.utcnow()
+
+
+hookimpl = pluggy.HookimplMarker("airflow")
+
+
+class CollectingListener(Listener):
+    def __init__(self):
+        self.states = []
+
+
+class PartialListener(CollectingListener):
+    @hookimpl
+    def on_task_instance_running(self, previous_state, task_instance, session):
+        self.states.append(State.RUNNING)
+
+
+class FullListener(PartialListener):
+    @hookimpl
+    def on_task_instance_success(self, previous_state, task_instance, session):
+        self.states.append(State.SUCCESS)
+
+    @hookimpl
+    def on_task_instance_failed(self, previous_state, task_instance, session):
+        self.states.append(State.FAILED)
+
+
+class ThrowingListener(Listener):
+    @hookimpl
+    def on_task_instance_running(self, previous_state, task_instance, session):
+        raise RuntimeError()
+
+
+@pytest.fixture(scope="module", autouse=True)
+def register_events():
+    register_task_instance_state_events()
+
+
+@pytest.fixture(autouse=True)
+def clean_listener_manager():
+    lm = get_listener_manager()
+    lm.clear()
+    yield
+    lm = get_listener_manager()
+    lm.clear()
+
+
+@provide_session
+def test_listener_gets_calls(create_task_instance, session=None):
+    lm = get_listener_manager()
+    listener = FullListener()
+    lm.add_listener(listener)
+
+    ti = create_task_instance(session=session, state=State.QUEUED)
+    ti.run()
+
+    assert len(listener.states) == 2
+    assert listener.states == [State.RUNNING, State.SUCCESS]
+
+
+@provide_session
+def test_listener_gets_only_subscribed_calls(create_task_instance, session=None):
+    lm = get_listener_manager()
+    listener = PartialListener()
+    lm.add_listener(listener)
+
+    ti = create_task_instance(session=session, state=State.QUEUED)
+    ti.run()
+
+    assert len(listener.states) == 1
+    assert listener.states == [State.RUNNING]
+
+
+@provide_session
+def test_listener_throws_exceptions(create_task_instance, session=None):
+    lm = get_listener_manager()
+    listener = ThrowingListener()
+    lm.add_listener(listener)
+
+    with pytest.raises(RuntimeError):
+        ti = create_task_instance(session=session, state=State.QUEUED)
+        ti.run()
+
+
+def test_listener_needs_to_subclass_listener():
+    lm = get_listener_manager()
+
+    class Dummy:
+        @hookimpl
+        def on_task_instance_running(self, previous_state, task_instance, session):
+            pass
+
+    lm.add_listener(Dummy())
+    assert not lm.has_listeners()
+
+
+@provide_session
+def test_listener_captures_failed_taskinstances(create_task_instance_of_operator, session=None):
+    lm = get_listener_manager()
+    listener = FullListener()
+    lm.add_listener(listener)
+
+    with pytest.raises(AirflowException):
+        ti = create_task_instance_of_operator(
+            BashOperator, dag_id=DAG_ID, execution_date=EXECUTION_DATE, task_id=TASK_ID, bash_command="exit 1"
+        )
+        ti.run()

Review comment:
       Hmmm, if we can avoid calling `ti.run()` here in this test that would be good. (It does _a lot_ and is relatively heavy weight approach, so if we can do something lighter here that would be preferable)

##########
File path: airflow/listeners/events.py
##########
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+
+from sqlalchemy import event
+from sqlalchemy.orm import Session
+
+from airflow.listeners.listener import get_listener_manager
+from airflow.models import TaskInstance
+from airflow.utils.state import State
+
+_is_listening = False
+
+
+def register_task_instance_state_events():
+    logger = logging.getLogger()

Review comment:
       ```suggestion
       logger = logging.getLogger(__name__)
   ```

##########
File path: tests/plugins/test_plugins_manager.py
##########
@@ -350,6 +357,21 @@ class MacroPlugin(AirflowPlugin):
             # rendering templates.
             assert hasattr(macros, MacroPlugin.name)
 
+    def test_registering_plugin_listeners(self):
+        from airflow import plugins_manager
+
+        with mock.patch('airflow.plugins_manager.plugins', []):
+            plugins_manager.load_plugins_from_plugin_directory()
+            plugins_manager.integrate_listener_plugins()
+
+            assert get_listener_manager().has_listeners()
+            assert get_listener_manager().pm.get_plugins().pop().__class__.__name__ == "PluginListener"
+
+    @pytest.fixture(autouse=True)
+    def clear_listeners(self):
+        yield
+        get_listener_manager().clear()

Review comment:
       ```suggestion
   ```
   
   Already have this as a module level fixture.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r780226659



##########
File path: docs/apache-airflow/plugins.rst
##########
@@ -268,6 +285,7 @@ definitions in Airflow.
         operator_extra_links = [
             S3LogLink(),
         ]
+        listeners = [PrintingRunningListener]

Review comment:
       So looking at pytest itself (which is where pluggy came out of) the plugins are def just modules, not classes https://docs.pytest.org/en/6.2.x/writing_plugins.html?highlight=plugin




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r780309756



##########
File path: docs/apache-airflow/plugins.rst
##########
@@ -268,6 +285,7 @@ definitions in Airflow.
         operator_extra_links = [
             S3LogLink(),
         ]
+        listeners = [PrintingRunningListener]

Review comment:
       @ashb added check for modules in `add_listener.py` and refactored tests from classes to modules.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r783036364



##########
File path: tests/listeners/test_listeners.py
##########
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest as pytest
+
+from airflow import AirflowException
+from airflow.listeners import hookimpl
+from airflow.listeners.events import register_task_instance_state_events
+from airflow.listeners.listener import get_listener_manager
+from airflow.operators.bash import BashOperator
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+from tests.listeners import test_full_listener, test_partial_listener, test_throwing_listener
+
+DAG_ID = "test_listener_dag"
+TASK_ID = "test_listener_task"
+EXECUTION_DATE = timezone.utcnow()
+
+
+@pytest.fixture(scope="module", autouse=True)
+def register_events():
+    register_task_instance_state_events()
+
+
+@pytest.fixture(autouse=True)
+def clean_listener_manager():
+    lm = get_listener_manager()
+    lm.clear()
+    yield
+    lm = get_listener_manager()
+    lm.clear()
+    test_full_listener.state = []
+
+
+@provide_session
+def test_listener_gets_calls(create_task_instance, session=None):
+    lm = get_listener_manager()
+    lm.add_listener(test_full_listener)
+
+    ti = create_task_instance(session=session, state=State.QUEUED)
+    ti.run()
+
+    assert len(test_full_listener.state) == 2
+    assert test_full_listener.state == [State.RUNNING, State.SUCCESS]
+
+
+@provide_session
+def test_listener_gets_only_subscribed_calls(create_task_instance, session=None):
+    lm = get_listener_manager()
+    lm.add_listener(test_partial_listener)
+
+    ti = create_task_instance(session=session, state=State.QUEUED)
+    ti.run()
+
+    assert len(test_partial_listener.state) == 1
+    assert test_partial_listener.state == [State.RUNNING]
+
+
+@provide_session
+def test_listener_throws_exceptions(create_task_instance, session=None):
+    lm = get_listener_manager()
+    lm.add_listener(test_throwing_listener)
+
+    with pytest.raises(RuntimeError):
+        ti = create_task_instance(session=session, state=State.QUEUED)
+        ti._run_raw_task()
+
+
+@provide_session
+def test_listener_captures_failed_taskinstances(create_task_instance_of_operator, session=None):
+    lm = get_listener_manager()
+    lm.add_listener(test_full_listener)
+
+    with pytest.raises(AirflowException):
+        ti = create_task_instance_of_operator(
+            BashOperator, dag_id=DAG_ID, execution_date=EXECUTION_DATE, task_id=TASK_ID, bash_command="exit 1"
+        )
+        ti._run_raw_task()

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r783037348



##########
File path: tests/listeners/test_listeners.py
##########
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest as pytest
+
+from airflow import AirflowException
+from airflow.listeners import hookimpl
+from airflow.listeners.events import register_task_instance_state_events
+from airflow.listeners.listener import get_listener_manager
+from airflow.operators.bash import BashOperator
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+from tests.listeners import test_full_listener, test_partial_listener, test_throwing_listener
+
+DAG_ID = "test_listener_dag"
+TASK_ID = "test_listener_task"
+EXECUTION_DATE = timezone.utcnow()
+
+
+@pytest.fixture(scope="module", autouse=True)
+def register_events():
+    register_task_instance_state_events()

Review comment:
       Added `unregister_task_instance_state_events` function - please take a look at `airflow.listeners.events` now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r774678147



##########
File path: airflow/settings.py
##########
@@ -543,6 +543,9 @@ def initialize():
 # loaded from module.
 LAZY_LOAD_PROVIDERS = conf.getboolean('core', 'lazy_discover_providers', fallback=True)
 
+# By default running listeners on scheduler is disabled. Set it to True if you want to execute them.
+EXECUTE_LISTENERS_ON_SCHEDULER = conf.getboolean('core', 'execute_listeners_on_scheduler', fallback=False)

Review comment:
       Done. Also, added basic docs.

##########
File path: airflow/utils/orm_event_handlers.py
##########
@@ -86,3 +86,9 @@ def after_cursor_execute(conn, cursor, statement, parameters, context, executema
                 stack_info,
                 statement.replace("\n", " "),
             )
+
+    if conf.getboolean('core', 'execute_listeners_on_scheduler', fallback=False):

Review comment:
       Done.

##########
File path: airflow/listeners/listener.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pluggy
+
+from airflow.listeners import spec
+
+
+class Listener:
+    """Class used as a namespace for listener hook implementation namespace"""
+
+
+_listener_manager = None
+
+
+class ListenerManager:
+    """Class that manager registration of listeners and provides hook property for calling them"""
+
+    def __init__(self):
+        self.pm = pluggy.PluginManager("airflow")
+        self.pm.add_hookspecs(spec)
+        self.listener_names = set()

Review comment:
       Done.

##########
File path: airflow/listeners/listener.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pluggy
+
+from airflow.listeners import spec
+
+
+class Listener:
+    """Class used as a namespace for listener hook implementation namespace"""
+
+
+_listener_manager = None
+
+
+class ListenerManager:
+    """Class that manager registration of listeners and provides hook property for calling them"""
+
+    def __init__(self):
+        self.pm = pluggy.PluginManager("airflow")
+        self.pm.add_hookspecs(spec)
+        self.listener_names = set()
+
+    def has_listeners(self) -> bool:
+        return len(self.pm.get_plugins()) > 0
+
+    @property
+    def hook(self):

Review comment:
       Done.

##########
File path: airflow/listeners/listener.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pluggy
+
+from airflow.listeners import spec
+
+
+class Listener:
+    """Class used as a namespace for listener hook implementation namespace"""
+
+
+_listener_manager = None
+
+
+class ListenerManager:
+    """Class that manager registration of listeners and provides hook property for calling them"""

Review comment:
       Done.

##########
File path: airflow/listeners/events.py
##########
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+
+from sqlalchemy import event
+from sqlalchemy.orm import Session
+
+from airflow.listeners.listener import get_listener_manager
+from airflow.models import TaskInstance
+from airflow.utils.state import State
+
+
+def register_task_instance_state_events():
+    logger = logging.getLogger()
+
+    @event.listens_for(Session, 'after_flush', propagate=True)
+    def on_task_instance_state_session_flush(session, flush_context):
+        """
+        Listens for session.flush() events that modify TaskInstance's state, and notify listeners that listen
+        for that event. Doing it this way enable us to be stateless in the SQLAlchemy event listener.
+        """
+        for state in flush_context.states:
+            if isinstance(state.object, TaskInstance) and session.is_modified(
+                state.object, include_collections=False
+            ):
+                added, unchanged, deleted = flush_context.get_attribute_history(state, 'state')
+
+                logger.debug(f"session flush listener: added {added} unchanged {unchanged} deleted {deleted}")

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r774679590



##########
File path: tests/listeners/test_listeners.py
##########
@@ -0,0 +1,127 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+
+import pluggy
+import pytest as pytest
+
+os.environ["AIRFLOW__CORE__EXECUTE_LISTENERS_ON_SCHEDULER"] = "True"

Review comment:
       Replaced with
   
   ```
   @pytest.fixture(scope="module", autouse=True)
   def register_events():
       register_task_instance_state_events()
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r778184751



##########
File path: airflow/__init__.py
##########
@@ -80,6 +80,11 @@ def __getattr__(name):
     manager.initialize_providers_hooks()
     manager.initialize_providers_extra_links()
 
+if settings.EXECUTE_LISTENERS_ON_SCHEDULER:
+    from airflow.plugins_manager import integrate_listener_plugins
+
+    integrate_listener_plugins()

Review comment:
       Actually, I've removed it for now. It will be easy to add it later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r778286692



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -71,6 +74,8 @@ def __init__(
         # terminate multiple times
         self.terminating = False
 
+        self._enable_task_listeners()

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r778142221



##########
File path: docs/apache-airflow/plugins.rst
##########
@@ -268,6 +285,7 @@ definitions in Airflow.
         operator_extra_links = [
             S3LogLink(),
         ]
+        listeners = [PrintingRunningListener]

Review comment:
       Generally, in pluggy, we need to register `plugin` that's a namespace that consists of possibly multiple hookimpls, not single functions. 
   By passing just a list of functions, we'd later need to construct "synthetic" namespace for it to register.
   
   I think requiring a class, or, possibly a python module, is a good idea - this is what pluggy docs present:
   
   - class https://pluggy.readthedocs.io/en/stable/index.html#a-toy-example
   - module https://pluggy.readthedocs.io/en/stable/index.html#the-plugin
   
   Now, it does not need to look like 
   ```python
           listeners = [PrintingRunningListener]
   ```
   
   We could require plugin authors to directly implement hookimpls in plugins, for example
   
   ```python
   class SomeListenerPlugin(AirflowPlugin):
       name = 'some-plugin'
       @hookimpl
       def on_task_instance_running(...):
           print("running")
   ``` 
   
   or, we could require it to add single pluggy plugin - as class or module - if for some reason we don't want them to implement multiple listener functions in single plugin.
   
   Note, this is separate discussion from https://github.com/apache/airflow/pull/20443#discussion_r778047132




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #20443:
URL: https://github.com/apache/airflow/pull/20443


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r773694853



##########
File path: airflow/utils/orm_event_handlers.py
##########
@@ -86,3 +86,7 @@ def after_cursor_execute(conn, cursor, statement, parameters, context, executema
                 stack_info,
                 statement.replace("\n", " "),
             )
+
+    if conf.getboolean('core', 'execute_listeners_on_scheduler', fallback=False):
+        # On import, register sqlalchemy event handlers which call airflow.listeners
+        import airflow.listeners.events  # noqa

Review comment:
       I definitely agree with @uranusjr . We already have some of those in Airflow and side effects have bitten us more than once. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r780200803



##########
File path: docs/apache-airflow/plugins.rst
##########
@@ -167,9 +172,14 @@ definitions in Airflow.
 
     # Importing base classes that we need to derive
     from airflow.hooks.base import BaseHook
+    from airflow.listeners.listener import Listener
     from airflow.models.baseoperator import BaseOperatorLink
     from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator
 
+    from pluggy import HookimplMarker
+
+    hookimpl = HookimplMarker("airflow")

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r780226659



##########
File path: docs/apache-airflow/plugins.rst
##########
@@ -268,6 +285,7 @@ definitions in Airflow.
         operator_extra_links = [
             S3LogLink(),
         ]
+        listeners = [PrintingRunningListener]

Review comment:
       So looking at pytest itself (which is where pluggy came out of) the plugins are def just modules, not classes https://docs.pytest.org/en/6.2.x/writing_plugins.html
   
   I think we should follow pytest's lead here unless we have a very good reason to diverge.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r778287426



##########
File path: airflow/listeners/spec.py
##########
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import TYPE_CHECKING, Optional
+
+from pluggy import HookspecMarker
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm.session import Session
+
+    from airflow.models import TaskInstance
+    from airflow.utils.state import State
+
+hookspec = HookspecMarker("airflow")
+
+
+@hookspec
+def on_task_instance_running(
+    previous_state: "State", task_instance: "TaskInstance", session: Optional["Session"]

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r782570095



##########
File path: docs/apache-airflow/listeners.rst
##########
@@ -0,0 +1,39 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Listeners
+=========
+
+Airflow gives you an option to be notified of events happening in Airflow
+by writing listeners. Listeners are powered by `pluggy <https://pluggy.readthedocs.io/en/stable/>`__
+
+Listener API is meant to be called across all dags, and all operators - in contrast to methods like
+``on_success_callback``, ``pre_execute`` and related family which are meant to provide callbacks
+for particular dag authors, or operator creators. There is no possibility to listen on events generated
+by particular dag.
+
+To include listener in your Airflow installation, include it as a part of an :doc:`Airflow Plugin </plugins>`

Review comment:
       ```suggestion
   To include listener in your Airflow installation, include it as a part of an :doc:`Airflow Plugin </plugins>`
   
   |experimental|
   ```
   
   This is a substitution that just links to experimental page, gives us some wiggle room if we need to change anything here in next release without worrying about back-compat.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r774679120



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -291,3 +293,19 @@ def _update_dagrun_state_for_paused_dag(self, session=None):
             if dag_run:
                 dag_run.dag = dag
                 dag_run.update_state(session=session, execute_callbacks=True)
+
+    @staticmethod
+    def _enable_task_listeners():
+        """
+        Check if we have any registered listeners, then register sqlalchemy hooks for
+        TI state change if we do.
+        """
+        from airflow.plugins_manager import integrate_listener_plugins
+
+        integrate_listener_plugins()
+        from airflow.listeners.listener import get_listener_manager
+
+        if get_listener_manager().has_listeners():
+            from airflow.listeners.events import register_task_instance_state_events

Review comment:
       I'm only importing `register_task_instance_state_events` now - locally, to prevent circular dependencies.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r774029297



##########
File path: airflow/listeners/events.py
##########
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+
+from sqlalchemy import event
+from sqlalchemy.orm import Session
+
+from airflow.listeners.listener import get_listener_manager
+from airflow.models import TaskInstance
+from airflow.utils.state import State
+
+
+def register_task_instance_state_events():
+    logger = logging.getLogger()
+
+    @event.listens_for(Session, 'after_flush', propagate=True)
+    def on_task_instance_state_session_flush(session, flush_context):
+        """
+        Listens for session.flush() events that modify TaskInstance's state, and notify listeners that listen
+        for that event. Doing it this way enable us to be stateless in the SQLAlchemy event listener.
+        """
+        for state in flush_context.states:
+            if isinstance(state.object, TaskInstance) and session.is_modified(
+                state.object, include_collections=False
+            ):
+                added, unchanged, deleted = flush_context.get_attribute_history(state, 'state')
+
+                logger.debug(f"session flush listener: added {added} unchanged {unchanged} deleted {deleted}")

Review comment:
       ```suggestion
                   logger.debug("Session flush listener: added %s unchanged %s deleted %s", added, unchanged, deleted)
   ```
   
   

##########
File path: airflow/listeners/listener.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pluggy
+
+from airflow.listeners import spec
+
+
+class Listener:
+    """Class used as a namespace for listener hook implementation namespace"""
+
+
+_listener_manager = None
+
+
+class ListenerManager:
+    """Class that manager registration of listeners and provides hook property for calling them"""

Review comment:
       ```suggestion
       """Class that manages registration of listeners and provides hook property for calling them"""
   ```

##########
File path: airflow/listeners/listener.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pluggy
+
+from airflow.listeners import spec
+
+
+class Listener:
+    """Class used as a namespace for listener hook implementation namespace"""
+
+
+_listener_manager = None
+
+
+class ListenerManager:
+    """Class that manager registration of listeners and provides hook property for calling them"""
+
+    def __init__(self):
+        self.pm = pluggy.PluginManager("airflow")
+        self.pm.add_hookspecs(spec)
+        self.listener_names = set()

Review comment:
       ```suggestion
           self.listener_names: Set[str] = set()
   ```

##########
File path: airflow/settings.py
##########
@@ -543,6 +543,9 @@ def initialize():
 # loaded from module.
 LAZY_LOAD_PROVIDERS = conf.getboolean('core', 'lazy_discover_providers', fallback=True)
 
+# By default running listeners on scheduler is disabled. Set it to True if you want to execute them.
+EXECUTE_LISTENERS_ON_SCHEDULER = conf.getboolean('core', 'execute_listeners_on_scheduler', fallback=False)

Review comment:
       It needs to be added to https://github.com/apache/airflow/blob/main/airflow/config_templates/default_airflow.cfg and https://github.com/apache/airflow/blob/main/airflow/config_templates/config.yml too as we have got automated docs building from there - https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html

##########
File path: airflow/utils/orm_event_handlers.py
##########
@@ -86,3 +86,9 @@ def after_cursor_execute(conn, cursor, statement, parameters, context, executema
                 stack_info,
                 statement.replace("\n", " "),
             )
+
+    if conf.getboolean('core', 'execute_listeners_on_scheduler', fallback=False):

Review comment:
       ```suggestion
       if settings.EXECUTE_LISTENERS_ON_SCHEDULER:
   ```

##########
File path: airflow/listeners/listener.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pluggy
+
+from airflow.listeners import spec
+
+
+class Listener:
+    """Class used as a namespace for listener hook implementation namespace"""
+
+
+_listener_manager = None
+
+
+class ListenerManager:
+    """Class that manager registration of listeners and provides hook property for calling them"""
+
+    def __init__(self):
+        self.pm = pluggy.PluginManager("airflow")
+        self.pm.add_hookspecs(spec)
+        self.listener_names = set()
+
+    def has_listeners(self) -> bool:
+        return len(self.pm.get_plugins()) > 0
+
+    @property
+    def hook(self):

Review comment:
       Can we add TypeHints for this property

##########
File path: tests/listeners/test_listeners.py
##########
@@ -0,0 +1,127 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+
+import pluggy
+import pytest as pytest
+
+os.environ["AIRFLOW__CORE__EXECUTE_LISTENERS_ON_SCHEDULER"] = "True"

Review comment:
       This will cause issues if for some reason this file is imported by any other tests, unlikely but still. 
   
   Can we instead use a pytest module level fixture and patch `os.environ`

##########
File path: airflow/jobs/local_task_job.py
##########
@@ -291,3 +293,19 @@ def _update_dagrun_state_for_paused_dag(self, session=None):
             if dag_run:
                 dag_run.dag = dag
                 dag_run.update_state(session=session, execute_callbacks=True)
+
+    @staticmethod
+    def _enable_task_listeners():
+        """
+        Check if we have any registered listeners, then register sqlalchemy hooks for
+        TI state change if we do.
+        """
+        from airflow.plugins_manager import integrate_listener_plugins
+
+        integrate_listener_plugins()
+        from airflow.listeners.listener import get_listener_manager
+
+        if get_listener_manager().has_listeners():
+            from airflow.listeners.events import register_task_instance_state_events

Review comment:
       Is there a reason why imports are in such order, if yes, can we add comments explaining that please




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r778750727



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -291,3 +296,13 @@ def _update_dagrun_state_for_paused_dag(self, session=None):
             if dag_run:
                 dag_run.dag = dag
                 dag_run.update_state(session=session, execute_callbacks=True)
+
+    @staticmethod
+    def _enable_task_listeners():
+        """
+        Check if we have any registered listeners, then register sqlalchemy hooks for
+        TI state change if we do.
+        """
+        integrate_listener_plugins()
+        if get_listener_manager().has_listeners():

Review comment:
       Done. Changed `integrate_listener_plugins` to require `ListenerManager` parameter since they would have circular calls otherwise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r778751491



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -3434,6 +3435,12 @@ def test_timeout_triggers(self, dag_maker):
         assert ti1.next_method == "__fail__"
         assert ti2.state == State.DEFERRED
 
+    @pytest.fixture(autouse=True, scope='function')
+    def clean_listener(self):
+        get_listener_manager().clear()
+        yield
+        get_listener_manager().clear()

Review comment:
       Removed. Artifact from debugging...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r780226659



##########
File path: docs/apache-airflow/plugins.rst
##########
@@ -268,6 +285,7 @@ definitions in Airflow.
         operator_extra_links = [
             S3LogLink(),
         ]
+        listeners = [PrintingRunningListener]

Review comment:
       So looking at pytest itself (which is where pluggy came out of) the plugins are def just modules, not classes https://docs.pytest.org/en/6.2.x/writing_plugins.html?highlight=plugin

##########
File path: docs/apache-airflow/plugins.rst
##########
@@ -268,6 +285,7 @@ definitions in Airflow.
         operator_extra_links = [
             S3LogLink(),
         ]
+        listeners = [PrintingRunningListener]

Review comment:
       So looking at pytest itself (which is where pluggy came out of) the plugins are def just modules, not classes https://docs.pytest.org/en/6.2.x/writing_plugins.html
   
   I think we should follow pytest's lead here unless we have a very good reason to diverge.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r778142221



##########
File path: docs/apache-airflow/plugins.rst
##########
@@ -268,6 +285,7 @@ definitions in Airflow.
         operator_extra_links = [
             S3LogLink(),
         ]
+        listeners = [PrintingRunningListener]

Review comment:
       Generally, in pluggy, we need to register `plugin` that's a namespace that consists of possibly multiple hookimpls, not single functions. 
   By passing just a list of functions, we'd later need to construct "synthetic" namespace for it to register.
   
   I think requiring a class, or, possibly a python module, is a good idea - this is what pluggy docs present:
   
   - class https://pluggy.readthedocs.io/en/stable/index.html#a-toy-example
   - module https://pluggy.readthedocs.io/en/stable/index.html#the-plugin
   
   Now, it does not need to look like 
   ```python
           listeners = [PrintingRunningListener]
   ```
   
   We could require plugin authors to directly implement hookimpls in plugins, for example
   
   ```python
   class SomeListenerPlugin(AirflowPlugin):
       name = 'some-plugin'
       @hookimpl
       def on_task_instance_running(...):
           print("running")
   ``` 
   
   or, we could require it to add single pluggy plugin - as class or module - if for some reason we don't want them to implement multiple listener functions in single plugin.
   
   I agree that we should discourage trying to keep state in plugin - my tests do not help, as I do that there :)
   Changing the tests, and mentioning that directly in docs might be a good idea.
   
   Note, this is separate discussion from https://github.com/apache/airflow/pull/20443#discussion_r778047132




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r778750799



##########
File path: airflow/listeners/events.py
##########
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+
+from sqlalchemy import event
+from sqlalchemy.orm import Session
+
+from airflow.listeners.listener import get_listener_manager
+from airflow.models import TaskInstance
+from airflow.utils.state import State
+
+_is_listening = False
+
+
+def register_task_instance_state_events():
+    logger = logging.getLogger()

Review comment:
       Done.

##########
File path: airflow/listeners/events.py
##########
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+
+from sqlalchemy import event
+from sqlalchemy.orm import Session
+
+from airflow.listeners.listener import get_listener_manager
+from airflow.models import TaskInstance
+from airflow.utils.state import State
+
+_is_listening = False
+
+
+def register_task_instance_state_events():
+    logger = logging.getLogger()
+    global _is_listening
+
+    def on_task_instance_state_session_flush(session, flush_context):
+        """
+        Listens for session.flush() events that modify TaskInstance's state, and notify listeners that listen
+        for that event. Doing it this way enable us to be stateless in the SQLAlchemy event listener.
+        """
+        if not get_listener_manager().has_listeners():
+            return
+        for state in flush_context.states:
+            if isinstance(state.object, TaskInstance) and session.is_modified(
+                state.object, include_collections=False
+            ):
+                added, unchanged, deleted = flush_context.get_attribute_history(state, 'state')
+
+                logger.warning(
+                    "session flush listener: added %s unchanged %s deleted %s - %s",
+                    added,
+                    unchanged,
+                    deleted,
+                    state.object,
+                )

Review comment:
       Done.

##########
File path: airflow/listeners/listener.py
##########
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import inspect
+import logging
+from typing import TYPE_CHECKING, Set
+
+import pluggy
+
+if TYPE_CHECKING:
+    from pluggy._hooks import _HookRelay
+
+from airflow.listeners import spec
+
+log = logging.getLogger(__name__)
+
+
+class Listener:
+    """Class used as a namespace for listener hook implementation namespace"""
+
+
+_listener_manager = None
+
+
+class ListenerManager:
+    """Class that manages registration of listeners and provides hook property for calling them"""
+
+    def __init__(self):
+        self.pm = pluggy.PluginManager("airflow")
+        self.pm.add_hookspecs(spec)
+        self.listener_names: Set[str] = set()
+
+    def has_listeners(self) -> bool:

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r782855132



##########
File path: tests/listeners/test_empty_listener.py
##########
@@ -0,0 +1,24 @@
+#

Review comment:
       Let's change the name of these files that don't contain tests to just `empty_listener` etc -- the `test_` prefix is confusing here as they don't contain any tests themselves, but are used by them.

##########
File path: tests/listeners/test_listeners.py
##########
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest as pytest
+
+from airflow import AirflowException
+from airflow.listeners import hookimpl
+from airflow.listeners.events import register_task_instance_state_events
+from airflow.listeners.listener import get_listener_manager
+from airflow.operators.bash import BashOperator
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+from tests.listeners import test_full_listener, test_partial_listener, test_throwing_listener
+
+DAG_ID = "test_listener_dag"
+TASK_ID = "test_listener_task"
+EXECUTION_DATE = timezone.utcnow()
+
+
+@pytest.fixture(scope="module", autouse=True)
+def register_events():
+    register_task_instance_state_events()

Review comment:
       Do we need to remove this setting/config when the test finishes?

##########
File path: tests/listeners/test_listeners.py
##########
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest as pytest
+
+from airflow import AirflowException
+from airflow.listeners import hookimpl
+from airflow.listeners.events import register_task_instance_state_events
+from airflow.listeners.listener import get_listener_manager
+from airflow.operators.bash import BashOperator
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+from tests.listeners import test_full_listener, test_partial_listener, test_throwing_listener
+
+DAG_ID = "test_listener_dag"
+TASK_ID = "test_listener_task"
+EXECUTION_DATE = timezone.utcnow()
+
+
+@pytest.fixture(scope="module", autouse=True)
+def register_events():
+    register_task_instance_state_events()
+
+
+@pytest.fixture(autouse=True)
+def clean_listener_manager():
+    lm = get_listener_manager()
+    lm.clear()
+    yield
+    lm = get_listener_manager()
+    lm.clear()
+    test_full_listener.state = []
+
+
+@provide_session
+def test_listener_gets_calls(create_task_instance, session=None):
+    lm = get_listener_manager()
+    lm.add_listener(test_full_listener)
+
+    ti = create_task_instance(session=session, state=State.QUEUED)
+    ti.run()

Review comment:
       Is this one using `ti.run()` and not `ti._run_raw_task()` for a reason? If so please add a short comment

##########
File path: airflow/plugins_manager.py
##########
@@ -458,6 +463,20 @@ def integrate_macros_plugins() -> None:
             setattr(macros, plugin.name, macros_module)
 
 
+def integrate_listener_plugins(listener_manager: "ListenerManager") -> None:
+    global plugins
+    global macros_modules

Review comment:
       ```suggestion
   ```
   
   Not used?

##########
File path: tests/listeners/test_listeners.py
##########
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest as pytest
+
+from airflow import AirflowException
+from airflow.listeners import hookimpl
+from airflow.listeners.events import register_task_instance_state_events
+from airflow.listeners.listener import get_listener_manager
+from airflow.operators.bash import BashOperator
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+from tests.listeners import test_full_listener, test_partial_listener, test_throwing_listener
+
+DAG_ID = "test_listener_dag"
+TASK_ID = "test_listener_task"
+EXECUTION_DATE = timezone.utcnow()
+
+
+@pytest.fixture(scope="module", autouse=True)
+def register_events():
+    register_task_instance_state_events()
+
+
+@pytest.fixture(autouse=True)
+def clean_listener_manager():
+    lm = get_listener_manager()
+    lm.clear()
+    yield
+    lm = get_listener_manager()
+    lm.clear()
+    test_full_listener.state = []
+
+
+@provide_session
+def test_listener_gets_calls(create_task_instance, session=None):
+    lm = get_listener_manager()
+    lm.add_listener(test_full_listener)
+
+    ti = create_task_instance(session=session, state=State.QUEUED)
+    ti.run()
+
+    assert len(test_full_listener.state) == 2
+    assert test_full_listener.state == [State.RUNNING, State.SUCCESS]
+
+
+@provide_session
+def test_listener_gets_only_subscribed_calls(create_task_instance, session=None):
+    lm = get_listener_manager()
+    lm.add_listener(test_partial_listener)
+
+    ti = create_task_instance(session=session, state=State.QUEUED)
+    ti.run()
+
+    assert len(test_partial_listener.state) == 1
+    assert test_partial_listener.state == [State.RUNNING]
+
+
+@provide_session
+def test_listener_throws_exceptions(create_task_instance, session=None):
+    lm = get_listener_manager()
+    lm.add_listener(test_throwing_listener)
+
+    with pytest.raises(RuntimeError):
+        ti = create_task_instance(session=session, state=State.QUEUED)
+        ti._run_raw_task()
+
+
+@provide_session
+def test_listener_captures_failed_taskinstances(create_task_instance_of_operator, session=None):
+    lm = get_listener_manager()
+    lm.add_listener(test_full_listener)
+
+    with pytest.raises(AirflowException):
+        ti = create_task_instance_of_operator(
+            BashOperator, dag_id=DAG_ID, execution_date=EXECUTION_DATE, task_id=TASK_ID, bash_command="exit 1"
+        )
+        ti._run_raw_task()
+
+    assert test_full_listener.state == [State.FAILED]
+    assert len(test_full_listener.state) == 1
+
+
+def test_non_module_listener_is_not_registered():
+    class NotAListener:
+        @hookimpl
+        def on_task_instance_running(self, previous_state, task_instance, session):
+            pass
+
+    lm = get_listener_manager()
+    lm.add_listener(NotAListener())

Review comment:
       Shouldn't this throw a TypeError instead?

##########
File path: tests/listeners/test_listeners.py
##########
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest as pytest
+
+from airflow import AirflowException
+from airflow.listeners import hookimpl
+from airflow.listeners.events import register_task_instance_state_events
+from airflow.listeners.listener import get_listener_manager
+from airflow.operators.bash import BashOperator
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+from tests.listeners import test_full_listener, test_partial_listener, test_throwing_listener
+
+DAG_ID = "test_listener_dag"
+TASK_ID = "test_listener_task"
+EXECUTION_DATE = timezone.utcnow()
+
+
+@pytest.fixture(scope="module", autouse=True)
+def register_events():
+    register_task_instance_state_events()
+
+
+@pytest.fixture(autouse=True)
+def clean_listener_manager():
+    lm = get_listener_manager()
+    lm.clear()
+    yield
+    lm = get_listener_manager()
+    lm.clear()
+    test_full_listener.state = []
+
+
+@provide_session
+def test_listener_gets_calls(create_task_instance, session=None):
+    lm = get_listener_manager()
+    lm.add_listener(test_full_listener)
+
+    ti = create_task_instance(session=session, state=State.QUEUED)
+    ti.run()
+
+    assert len(test_full_listener.state) == 2
+    assert test_full_listener.state == [State.RUNNING, State.SUCCESS]
+
+
+@provide_session
+def test_listener_gets_only_subscribed_calls(create_task_instance, session=None):
+    lm = get_listener_manager()
+    lm.add_listener(test_partial_listener)
+
+    ti = create_task_instance(session=session, state=State.QUEUED)
+    ti.run()
+
+    assert len(test_partial_listener.state) == 1
+    assert test_partial_listener.state == [State.RUNNING]
+
+
+@provide_session
+def test_listener_throws_exceptions(create_task_instance, session=None):
+    lm = get_listener_manager()
+    lm.add_listener(test_throwing_listener)
+
+    with pytest.raises(RuntimeError):
+        ti = create_task_instance(session=session, state=State.QUEUED)
+        ti._run_raw_task()
+
+
+@provide_session
+def test_listener_captures_failed_taskinstances(create_task_instance_of_operator, session=None):
+    lm = get_listener_manager()
+    lm.add_listener(test_full_listener)
+
+    with pytest.raises(AirflowException):
+        ti = create_task_instance_of_operator(
+            BashOperator, dag_id=DAG_ID, execution_date=EXECUTION_DATE, task_id=TASK_ID, bash_command="exit 1"
+        )
+        ti._run_raw_task()

Review comment:
       Please scope the `raises` tighter (in a few places)
   
   ```suggestion
   
       ti = create_task_instance_of_operator(
           BashOperator, dag_id=DAG_ID, execution_date=EXECUTION_DATE, task_id=TASK_ID, bash_command="exit 1"
       )
       with pytest.raises(AirflowException):
               ti._run_raw_task()
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#issuecomment-1012329467


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r773555651



##########
File path: airflow/utils/orm_event_handlers.py
##########
@@ -86,3 +86,7 @@ def after_cursor_execute(conn, cursor, statement, parameters, context, executema
                 stack_info,
                 statement.replace("\n", " "),
             )
+
+    if conf.getboolean('core', 'execute_listeners_on_scheduler', fallback=False):
+        # On import, register sqlalchemy event handlers which call airflow.listeners
+        import airflow.listeners.events  # noqa

Review comment:
       IMO this is generally bad practice. I’d much prefer the import to not have side effect, and an explicit init function call registers the handlers. This would make tests much easier to write as well since you can do the init in a module-scoped pytest fixture instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#issuecomment-998920018


   FYI @bolkedebruin @potiuk @julienledem


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r780200803



##########
File path: docs/apache-airflow/plugins.rst
##########
@@ -167,9 +172,14 @@ definitions in Airflow.
 
     # Importing base classes that we need to derive
     from airflow.hooks.base import BaseHook
+    from airflow.listeners.listener import Listener
     from airflow.models.baseoperator import BaseOperatorLink
     from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator
 
+    from pluggy import HookimplMarker
+
+    hookimpl = HookimplMarker("airflow")

Review comment:
       Done.

##########
File path: docs/apache-airflow/plugins.rst
##########
@@ -268,6 +285,7 @@ definitions in Airflow.
         operator_extra_links = [
             S3LogLink(),
         ]
+        listeners = [PrintingRunningListener]

Review comment:
       @ashb added check for modules in `add_listener.py` and refactored tests from classes to modules.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r783037510



##########
File path: tests/listeners/test_listeners.py
##########
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest as pytest
+
+from airflow import AirflowException
+from airflow.listeners import hookimpl
+from airflow.listeners.events import register_task_instance_state_events
+from airflow.listeners.listener import get_listener_manager
+from airflow.operators.bash import BashOperator
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+from tests.listeners import test_full_listener, test_partial_listener, test_throwing_listener
+
+DAG_ID = "test_listener_dag"
+TASK_ID = "test_listener_task"
+EXECUTION_DATE = timezone.utcnow()
+
+
+@pytest.fixture(scope="module", autouse=True)
+def register_events():
+    register_task_instance_state_events()
+
+
+@pytest.fixture(autouse=True)
+def clean_listener_manager():
+    lm = get_listener_manager()
+    lm.clear()
+    yield
+    lm = get_listener_manager()
+    lm.clear()
+    test_full_listener.state = []
+
+
+@provide_session
+def test_listener_gets_calls(create_task_instance, session=None):
+    lm = get_listener_manager()
+    lm.add_listener(test_full_listener)
+
+    ti = create_task_instance(session=session, state=State.QUEUED)
+    ti.run()
+
+    assert len(test_full_listener.state) == 2
+    assert test_full_listener.state == [State.RUNNING, State.SUCCESS]
+
+
+@provide_session
+def test_listener_gets_only_subscribed_calls(create_task_instance, session=None):
+    lm = get_listener_manager()
+    lm.add_listener(test_partial_listener)
+
+    ti = create_task_instance(session=session, state=State.QUEUED)
+    ti.run()
+
+    assert len(test_partial_listener.state) == 1
+    assert test_partial_listener.state == [State.RUNNING]
+
+
+@provide_session
+def test_listener_throws_exceptions(create_task_instance, session=None):
+    lm = get_listener_manager()
+    lm.add_listener(test_throwing_listener)
+
+    with pytest.raises(RuntimeError):
+        ti = create_task_instance(session=session, state=State.QUEUED)
+        ti._run_raw_task()
+
+
+@provide_session
+def test_listener_captures_failed_taskinstances(create_task_instance_of_operator, session=None):
+    lm = get_listener_manager()
+    lm.add_listener(test_full_listener)
+
+    with pytest.raises(AirflowException):
+        ti = create_task_instance_of_operator(
+            BashOperator, dag_id=DAG_ID, execution_date=EXECUTION_DATE, task_id=TASK_ID, bash_command="exit 1"
+        )
+        ti._run_raw_task()
+
+    assert test_full_listener.state == [State.FAILED]
+    assert len(test_full_listener.state) == 1
+
+
+def test_non_module_listener_is_not_registered():
+    class NotAListener:
+        @hookimpl
+        def on_task_instance_running(self, previous_state, task_instance, session):
+            pass
+
+    lm = get_listener_manager()
+    lm.add_listener(NotAListener())

Review comment:
       Yes. Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r778751767



##########
File path: tests/listeners/test_listeners.py
##########
@@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pluggy
+import pytest as pytest
+
+from airflow import AirflowException
+from airflow.listeners.events import register_task_instance_state_events
+from airflow.listeners.listener import Listener, get_listener_manager
+from airflow.operators.bash import BashOperator
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+
+DAG_ID = "test_listener_dag"
+TASK_ID = "test_listener_task"
+EXECUTION_DATE = timezone.utcnow()
+
+
+hookimpl = pluggy.HookimplMarker("airflow")
+
+
+class CollectingListener(Listener):
+    def __init__(self):
+        self.states = []
+
+
+class PartialListener(CollectingListener):
+    @hookimpl
+    def on_task_instance_running(self, previous_state, task_instance, session):
+        self.states.append(State.RUNNING)
+
+
+class FullListener(PartialListener):
+    @hookimpl
+    def on_task_instance_success(self, previous_state, task_instance, session):
+        self.states.append(State.SUCCESS)
+
+    @hookimpl
+    def on_task_instance_failed(self, previous_state, task_instance, session):
+        self.states.append(State.FAILED)
+
+
+class ThrowingListener(Listener):
+    @hookimpl
+    def on_task_instance_running(self, previous_state, task_instance, session):
+        raise RuntimeError()
+
+
+@pytest.fixture(scope="module", autouse=True)
+def register_events():
+    register_task_instance_state_events()
+
+
+@pytest.fixture(autouse=True)
+def clean_listener_manager():
+    lm = get_listener_manager()
+    lm.clear()
+    yield
+    lm = get_listener_manager()
+    lm.clear()
+
+
+@provide_session
+def test_listener_gets_calls(create_task_instance, session=None):
+    lm = get_listener_manager()
+    listener = FullListener()
+    lm.add_listener(listener)
+
+    ti = create_task_instance(session=session, state=State.QUEUED)
+    ti.run()
+
+    assert len(listener.states) == 2
+    assert listener.states == [State.RUNNING, State.SUCCESS]
+
+
+@provide_session
+def test_listener_gets_only_subscribed_calls(create_task_instance, session=None):
+    lm = get_listener_manager()
+    listener = PartialListener()
+    lm.add_listener(listener)
+
+    ti = create_task_instance(session=session, state=State.QUEUED)
+    ti.run()
+
+    assert len(listener.states) == 1
+    assert listener.states == [State.RUNNING]
+
+
+@provide_session
+def test_listener_throws_exceptions(create_task_instance, session=None):
+    lm = get_listener_manager()
+    listener = ThrowingListener()
+    lm.add_listener(listener)
+
+    with pytest.raises(RuntimeError):
+        ti = create_task_instance(session=session, state=State.QUEUED)
+        ti.run()
+
+
+def test_listener_needs_to_subclass_listener():
+    lm = get_listener_manager()
+
+    class Dummy:
+        @hookimpl
+        def on_task_instance_running(self, previous_state, task_instance, session):
+            pass
+
+    lm.add_listener(Dummy())
+    assert not lm.has_listeners()
+
+
+@provide_session
+def test_listener_captures_failed_taskinstances(create_task_instance_of_operator, session=None):
+    lm = get_listener_manager()
+    listener = FullListener()
+    lm.add_listener(listener)
+
+    with pytest.raises(AirflowException):
+        ti = create_task_instance_of_operator(
+            BashOperator, dag_id=DAG_ID, execution_date=EXECUTION_DATE, task_id=TASK_ID, bash_command="exit 1"
+        )
+        ti.run()

Review comment:
       I can do `_run_raw_task`.

##########
File path: tests/plugins/test_plugins_manager.py
##########
@@ -350,6 +357,21 @@ class MacroPlugin(AirflowPlugin):
             # rendering templates.
             assert hasattr(macros, MacroPlugin.name)
 
+    def test_registering_plugin_listeners(self):
+        from airflow import plugins_manager
+
+        with mock.patch('airflow.plugins_manager.plugins', []):
+            plugins_manager.load_plugins_from_plugin_directory()
+            plugins_manager.integrate_listener_plugins()
+
+            assert get_listener_manager().has_listeners()
+            assert get_listener_manager().pm.get_plugins().pop().__class__.__name__ == "PluginListener"
+
+    @pytest.fixture(autouse=True)
+    def clear_listeners(self):
+        yield
+        get_listener_manager().clear()

Review comment:
       Removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mobuchowski commented on a change in pull request #20443: Add Listener Plugin API that tracks TaskInstance state changes

Posted by GitBox <gi...@apache.org>.
mobuchowski commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r778751353



##########
File path: airflow/listeners/listener.py
##########
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import inspect
+import logging
+from typing import TYPE_CHECKING, Set
+
+import pluggy
+
+if TYPE_CHECKING:
+    from pluggy._hooks import _HookRelay
+
+from airflow.listeners import spec
+
+log = logging.getLogger(__name__)
+
+
+class Listener:
+    """Class used as a namespace for listener hook implementation namespace"""
+
+
+_listener_manager = None
+
+
+class ListenerManager:
+    """Class that manages registration of listeners and provides hook property for calling them"""
+
+    def __init__(self):
+        self.pm = pluggy.PluginManager("airflow")
+        self.pm.add_hookspecs(spec)
+        self.listener_names: Set[str] = set()
+
+    def has_listeners(self) -> bool:
+        return len(self.pm.get_plugins()) > 0
+
+    @property
+    def hook(self) -> "_HookRelay":
+        """Returns hook, on which plugin methods specified in spec can be called."""
+        return self.pm.hook
+
+    def add_listener(self, listener: Listener):
+        if listener.__class__.__name__ in self.listener_names:
+            return
+        if self.pm.is_registered(listener):
+            return
+
+        listener_type = type(listener)
+        if not (
+            inspect.isclass(listener_type)
+            and issubclass(listener_type, Listener)
+            and (listener_type is not Listener)
+        ):
+            log.warning("Can't register listener: %s - is not a Listener subclass", listener_type)
+            return

Review comment:
       Done. Right now `add_listener` accepts anything that `pluggy` would.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org