You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/03/06 21:46:48 UTC

[airflow] 04/37: listener plugin example added (#27905)

This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1456f11a99a1a3fe5f8513b4082a89e64e417866
Author: Bowrna <ma...@gmail.com>
AuthorDate: Sat Jan 21 02:58:27 2023 +0530

    listener plugin example added (#27905)
    
    (cherry picked from commit 100bb8d79a1e0c5fe6fca4b69c529b447cc992d1)
---
 airflow/example_dags/plugins/event_listener.py  | 156 ++++++++++++++++++++++++
 airflow/example_dags/plugins/listener_plugin.py |  26 ++++
 docs/apache-airflow/howto/index.rst             |   1 +
 docs/apache-airflow/howto/listener-plugin.rst   |  95 +++++++++++++++
 docs/spelling_wordlist.txt                      |   1 +
 5 files changed, 279 insertions(+)

diff --git a/airflow/example_dags/plugins/event_listener.py b/airflow/example_dags/plugins/event_listener.py
new file mode 100644
index 0000000000..2e2d01800b
--- /dev/null
+++ b/airflow/example_dags/plugins/event_listener.py
@@ -0,0 +1,156 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from airflow.listeners import hookimpl
+
+if TYPE_CHECKING:
+    from airflow.models.dagrun import DagRun
+    from airflow.models.taskinstance import TaskInstance
+    from airflow.utils.state import TaskInstanceState
+
+
+# [START howto_listen_ti_running_task]
+@hookimpl
+def on_task_instance_running(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
+    """
+    This method is called when task state changes to RUNNING.
+    Through callback, parameters like previous_task_state, task_instance object can be accessed.
+    This will give more information about current task_instance that is running its dag_run,
+    task and dag information.
+    """
+    print("Task instance is in running state")
+    print(" Previous state of the Task instance:", previous_state)
+
+    state: TaskInstanceState = task_instance.state
+    name: str = task_instance.task_id
+    start_date = task_instance.start_date
+
+    dagrun = task_instance.dag_run
+    dagrun_status = dagrun.state
+
+    task = task_instance.task
+
+    dag = task.dag
+    dag_name = None
+    if dag:
+        dag_name = dag.dag_id
+    print(f"Current task name:{name} state:{state} start_date:{start_date}")
+    print(f"Dag name:{dag_name} and current dag run status:{dagrun_status}")
+
+
+# [END howto_listen_ti_running_task]
+
+# [START howto_listen_ti_success_task]
+@hookimpl
+def on_task_instance_success(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
+    """
+    This method is called when task state changes to SUCCESS.
+    Through callback, parameters like previous_task_state, task_instance object can be accessed.
+    This will give more information about current task_instance that has succeeded its
+    dag_run, task and dag information.
+    """
+    print("Task instance in success state")
+    print(" Previous state of the Task instance:", previous_state)
+
+    dag_id = task_instance.dag_id
+    hostname = task_instance.hostname
+    operator = task_instance.operator
+
+    dagrun = task_instance.dag_run
+    queued_at = dagrun.queued_at
+    print(f"Dag name:{dag_id} queued_at:{queued_at}")
+    print(f"Task hostname:{hostname} operator:{operator}")
+
+
+# [END howto_listen_ti_success_task]
+
+# [START howto_listen_ti_failure_task]
+@hookimpl
+def on_task_instance_failed(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
+    """
+    This method is called when task state changes to FAILED.
+    Through callback, parameters like previous_task_state, task_instance object can be accessed.
+    This will give more information about current task_instance that has failed its dag_run,
+    task and dag information.
+    """
+    print("Task instance in failure state")
+
+    start_date = task_instance.start_date
+    end_date = task_instance.end_date
+    duration = task_instance.duration
+
+    dagrun = task_instance.dag_run
+
+    task = task_instance.task
+
+    dag = task_instance.task.dag
+
+    print(f"Task start:{start_date} end:{end_date} duration:{duration}")
+    print(f"Task:{task} dag:{dag} dagrun:{dagrun}")
+
+
+# [END howto_listen_ti_failure_task]
+
+# [START howto_listen_dagrun_success_task]
+@hookimpl
+def on_dag_run_success(dag_run: DagRun, message: str):
+    """
+    This method is called when dag run state changes to SUCCESS.
+    """
+    print("Dag run in success state")
+    start_date = dag_run.start_date
+    end_date = dag_run.end_date
+
+    print(f"Dag run start:{start_date} end:{end_date}")
+
+
+# [END howto_listen_dagrun_success_task]
+
+# [START howto_listen_dagrun_failure_task]
+@hookimpl
+def on_dag_run_failed(dag_run: DagRun, message: str):
+    """
+    This method is called when dag run state changes to FAILED.
+    """
+    print("Dag run  in failure state")
+    dag_id = dag_run.dag_id
+    run_id = dag_run.run_id
+    external_trigger = dag_run.external_trigger
+
+    print(f"Dag information:{dag_id} Run id: {run_id} external trigger: {external_trigger}")
+
+
+# [END howto_listen_dagrun_failure_task]
+
+# [START howto_listen_dagrun_running_task]
+@hookimpl
+def on_dag_run_running(dag_run: DagRun, message: str):
+    """
+    This method is called when dag run state changes to RUNNING.
+    """
+    print("Dag run  in running state")
+    queued_at = dag_run.queued_at
+    dag_hash_info = dag_run.dag_hash
+
+    print(f"Dag information Queued at: {queued_at} hash info: {dag_hash_info}")
+
+
+# [END howto_listen_dagrun_running_task]
diff --git a/airflow/example_dags/plugins/listener_plugin.py b/airflow/example_dags/plugins/listener_plugin.py
new file mode 100644
index 0000000000..a365d57e3f
--- /dev/null
+++ b/airflow/example_dags/plugins/listener_plugin.py
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from airflow.example_dags.plugins import event_listener
+from airflow.plugins_manager import AirflowPlugin
+
+
+class MetadataCollectionPlugin(AirflowPlugin):
+    name = "MetadataCollectionPlugin"
+    listeners = [event_listener]
diff --git a/docs/apache-airflow/howto/index.rst b/docs/apache-airflow/howto/index.rst
index 5d6b54fd77..7a170828d5 100644
--- a/docs/apache-airflow/howto/index.rst
+++ b/docs/apache-airflow/howto/index.rst
@@ -37,6 +37,7 @@ configuring an Airflow environment.
     operator/index
     timetable
     custom-view-plugin
+    listener-plugin
     customize-ui
     custom-operator
     create-custom-decorator
diff --git a/docs/apache-airflow/howto/listener-plugin.rst b/docs/apache-airflow/howto/listener-plugin.rst
new file mode 100644
index 0000000000..7b46a9de8a
--- /dev/null
+++ b/docs/apache-airflow/howto/listener-plugin.rst
@@ -0,0 +1,95 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Listener Plugin of Airflow
+==========================
+
+Airflow has feature that allows to add listener for monitoring and tracking
+the task state using Plugins.
+
+This is a simple example listener plugin of Airflow that helps to track the task
+state and collect useful metadata information about the task, dag run and dag.
+
+This is an example plugin for Airflow that allows to create listener plugin of Airflow.
+This plugin works by using SQLAlchemy's event mechanism. It watches
+the task instance state change in the table level and triggers event.
+This will be notified for all the tasks across all the DAGs.
+
+In this plugin, an object reference is derived from the base class
+``airflow.plugins_manager.AirflowPlugin``.
+
+Listener plugin uses pluggy app under the hood. Pluggy is an app built for plugin
+management and hook calling for Pytest. Pluggy enables function hooking so it allows
+building "pluggable" systems with your own customization over that hooking.
+
+Using this plugin, following events can be listened:
+    * task instance is in running state.
+    * task instance is in success state.
+    * task instance is in failure state.
+    * dag run is in running state.
+    * dag run is in success state.
+    * dag run is in failure state.
+    * on start before event like airflow job, scheduler or backfilljob
+    * before stop for event like airflow job, scheduler or backfilljob
+
+Listener Registration
+---------------------
+
+A listener plugin with object reference to listener object is registered
+as part of airflow plugin. The following is a
+skeleton for us to implement a new listener:
+
+.. code-block:: python
+
+    from airflow.plugins_manager import AirflowPlugin
+
+    # This is the listener file created where custom code to monitor is added over hookimpl
+    import listener
+
+
+    class MetadataCollectionPlugin(AirflowPlugin):
+        name = "MetadataCollectionPlugin"
+        listeners = [listener]
+
+
+Next, we can check code added into ``listener`` and see implementation
+methods for each of those listeners. After the implementation, the listener part
+gets executed during all the task execution across all the DAGs
+
+For reference, here's the plugin code within ``listener.py`` class that shows list of tables in the database:
+
+This example listens when the task instance is in running state
+
+.. exampleinclude:: ../../../airflow/example_dags/plugins/event_listener.py
+    :language: python
+    :start-after: [START howto_listen_ti_running_task]
+    :end-before: [END howto_listen_ti_running_task]
+
+Similarly, code to listen after task_instance success and failure can be implemented.
+
+This example listens when the dag run is change to failed state
+
+.. exampleinclude:: ../../../airflow/example_dags/plugins/event_listener.py
+    :language: python
+    :start-after: [START howto_listen_dagrun_failure_task]
+    :end-before: [END howto_listen_dagrun_failure_task]
+
+Similarly, code to listen after dag_run success and during running state can be implemented.
+
+The listener plugin files required to add the listener implementation is added as part of the
+Airflow plugin into ``$AIRFLOW_HOME/plugins/`` folder and loaded during Airflow startup.
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 734723894a..5a62b70939 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -123,6 +123,7 @@ backfill
 backfillable
 backfilled
 backfilling
+backfilljob
 BackfillJobTest
 Backfills
 backfills