You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/03/06 21:46:48 UTC
[airflow] 04/37: listener plugin example added (#27905)
This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 1456f11a99a1a3fe5f8513b4082a89e64e417866
Author: Bowrna <ma...@gmail.com>
AuthorDate: Sat Jan 21 02:58:27 2023 +0530
listener plugin example added (#27905)
(cherry picked from commit 100bb8d79a1e0c5fe6fca4b69c529b447cc992d1)
---
airflow/example_dags/plugins/event_listener.py | 156 ++++++++++++++++++++++++
airflow/example_dags/plugins/listener_plugin.py | 26 ++++
docs/apache-airflow/howto/index.rst | 1 +
docs/apache-airflow/howto/listener-plugin.rst | 95 +++++++++++++++
docs/spelling_wordlist.txt | 1 +
5 files changed, 279 insertions(+)
diff --git a/airflow/example_dags/plugins/event_listener.py b/airflow/example_dags/plugins/event_listener.py
new file mode 100644
index 0000000000..2e2d01800b
--- /dev/null
+++ b/airflow/example_dags/plugins/event_listener.py
@@ -0,0 +1,156 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from airflow.listeners import hookimpl
+
+if TYPE_CHECKING:
+ from airflow.models.dagrun import DagRun
+ from airflow.models.taskinstance import TaskInstance
+ from airflow.utils.state import TaskInstanceState
+
+
+# [START howto_listen_ti_running_task]
+@hookimpl
+def on_task_instance_running(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
+ """
+ This method is called when task state changes to RUNNING.
+ Through callback, parameters like previous_task_state, task_instance object can be accessed.
+ This will give more information about current task_instance that is running its dag_run,
+ task and dag information.
+ """
+ print("Task instance is in running state")
+ print(" Previous state of the Task instance:", previous_state)
+
+ state: TaskInstanceState = task_instance.state
+ name: str = task_instance.task_id
+ start_date = task_instance.start_date
+
+ dagrun = task_instance.dag_run
+ dagrun_status = dagrun.state
+
+ task = task_instance.task
+
+ dag = task.dag
+ dag_name = None
+ if dag:
+ dag_name = dag.dag_id
+ print(f"Current task name:{name} state:{state} start_date:{start_date}")
+ print(f"Dag name:{dag_name} and current dag run status:{dagrun_status}")
+
+
+# [END howto_listen_ti_running_task]
+
+# [START howto_listen_ti_success_task]
+@hookimpl
+def on_task_instance_success(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
+ """
+ This method is called when task state changes to SUCCESS.
+ Through callback, parameters like previous_task_state, task_instance object can be accessed.
+ This will give more information about current task_instance that has succeeded its
+ dag_run, task and dag information.
+ """
+ print("Task instance in success state")
+ print(" Previous state of the Task instance:", previous_state)
+
+ dag_id = task_instance.dag_id
+ hostname = task_instance.hostname
+ operator = task_instance.operator
+
+ dagrun = task_instance.dag_run
+ queued_at = dagrun.queued_at
+ print(f"Dag name:{dag_id} queued_at:{queued_at}")
+ print(f"Task hostname:{hostname} operator:{operator}")
+
+
+# [END howto_listen_ti_success_task]
+
+# [START howto_listen_ti_failure_task]
+@hookimpl
+def on_task_instance_failed(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
+ """
+ This method is called when task state changes to FAILED.
+ Through callback, parameters like previous_task_state, task_instance object can be accessed.
+ This will give more information about current task_instance that has failed its dag_run,
+ task and dag information.
+ """
+ print("Task instance in failure state")
+
+ start_date = task_instance.start_date
+ end_date = task_instance.end_date
+ duration = task_instance.duration
+
+ dagrun = task_instance.dag_run
+
+ task = task_instance.task
+
+ dag = task_instance.task.dag
+
+ print(f"Task start:{start_date} end:{end_date} duration:{duration}")
+ print(f"Task:{task} dag:{dag} dagrun:{dagrun}")
+
+
+# [END howto_listen_ti_failure_task]
+
+# [START howto_listen_dagrun_success_task]
+@hookimpl
+def on_dag_run_success(dag_run: DagRun, message: str):
+ """
+ This method is called when dag run state changes to SUCCESS.
+ """
+ print("Dag run in success state")
+ start_date = dag_run.start_date
+ end_date = dag_run.end_date
+
+ print(f"Dag run start:{start_date} end:{end_date}")
+
+
+# [END howto_listen_dagrun_success_task]
+
+# [START howto_listen_dagrun_failure_task]
+@hookimpl
+def on_dag_run_failed(dag_run: DagRun, message: str):
+ """
+ This method is called when dag run state changes to FAILED.
+ """
+ print("Dag run in failure state")
+ dag_id = dag_run.dag_id
+ run_id = dag_run.run_id
+ external_trigger = dag_run.external_trigger
+
+ print(f"Dag information:{dag_id} Run id: {run_id} external trigger: {external_trigger}")
+
+
+# [END howto_listen_dagrun_failure_task]
+
+# [START howto_listen_dagrun_running_task]
+@hookimpl
+def on_dag_run_running(dag_run: DagRun, message: str):
+ """
+ This method is called when dag run state changes to RUNNING.
+ """
+ print("Dag run in running state")
+ queued_at = dag_run.queued_at
+ dag_hash_info = dag_run.dag_hash
+
+ print(f"Dag information Queued at: {queued_at} hash info: {dag_hash_info}")
+
+
+# [END howto_listen_dagrun_running_task]
diff --git a/airflow/example_dags/plugins/listener_plugin.py b/airflow/example_dags/plugins/listener_plugin.py
new file mode 100644
index 0000000000..a365d57e3f
--- /dev/null
+++ b/airflow/example_dags/plugins/listener_plugin.py
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from airflow.example_dags.plugins import event_listener
+from airflow.plugins_manager import AirflowPlugin
+
+
+class MetadataCollectionPlugin(AirflowPlugin):
+ name = "MetadataCollectionPlugin"
+ listeners = [event_listener]
diff --git a/docs/apache-airflow/howto/index.rst b/docs/apache-airflow/howto/index.rst
index 5d6b54fd77..7a170828d5 100644
--- a/docs/apache-airflow/howto/index.rst
+++ b/docs/apache-airflow/howto/index.rst
@@ -37,6 +37,7 @@ configuring an Airflow environment.
operator/index
timetable
custom-view-plugin
+ listener-plugin
customize-ui
custom-operator
create-custom-decorator
diff --git a/docs/apache-airflow/howto/listener-plugin.rst b/docs/apache-airflow/howto/listener-plugin.rst
new file mode 100644
index 0000000000..7b46a9de8a
--- /dev/null
+++ b/docs/apache-airflow/howto/listener-plugin.rst
@@ -0,0 +1,95 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+Listener Plugin of Airflow
+==========================
+
+Airflow has feature that allows to add listener for monitoring and tracking
+the task state using Plugins.
+
+This is a simple example listener plugin of Airflow that helps to track the task
+state and collect useful metadata information about the task, dag run and dag.
+
+This is an example plugin for Airflow that allows to create listener plugin of Airflow.
+This plugin works by using SQLAlchemy's event mechanism. It watches
+the task instance state change in the table level and triggers event.
+This will be notified for all the tasks across all the DAGs.
+
+In this plugin, an object reference is derived from the base class
+``airflow.plugins_manager.AirflowPlugin``.
+
+Listener plugin uses pluggy app under the hood. Pluggy is an app built for plugin
+management and hook calling for Pytest. Pluggy enables function hooking so it allows
+building "pluggable" systems with your own customization over that hooking.
+
+Using this plugin, following events can be listened:
+ * task instance is in running state.
+ * task instance is in success state.
+ * task instance is in failure state.
+ * dag run is in running state.
+ * dag run is in success state.
+ * dag run is in failure state.
+ * on start before event like airflow job, scheduler or backfilljob
+ * before stop for event like airflow job, scheduler or backfilljob
+
+Listener Registration
+---------------------
+
+A listener plugin with object reference to listener object is registered
+as part of airflow plugin. The following is a
+skeleton for us to implement a new listener:
+
+.. code-block:: python
+
+ from airflow.plugins_manager import AirflowPlugin
+
+ # This is the listener file created where custom code to monitor is added over hookimpl
+ import listener
+
+
+ class MetadataCollectionPlugin(AirflowPlugin):
+ name = "MetadataCollectionPlugin"
+ listeners = [listener]
+
+
+Next, we can check code added into ``listener`` and see implementation
+methods for each of those listeners. After the implementation, the listener part
+gets executed during all the task execution across all the DAGs
+
+For reference, here's the plugin code within ``listener.py`` class that shows list of tables in the database:
+
+This example listens when the task instance is in running state
+
+.. exampleinclude:: ../../../airflow/example_dags/plugins/event_listener.py
+ :language: python
+ :start-after: [START howto_listen_ti_running_task]
+ :end-before: [END howto_listen_ti_running_task]
+
+Similarly, code to listen after task_instance success and failure can be implemented.
+
+This example listens when the dag run is change to failed state
+
+.. exampleinclude:: ../../../airflow/example_dags/plugins/event_listener.py
+ :language: python
+ :start-after: [START howto_listen_dagrun_failure_task]
+ :end-before: [END howto_listen_dagrun_failure_task]
+
+Similarly, code to listen after dag_run success and during running state can be implemented.
+
+The listener plugin files required to add the listener implementation is added as part of the
+Airflow plugin into ``$AIRFLOW_HOME/plugins/`` folder and loaded during Airflow startup.
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 734723894a..5a62b70939 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -123,6 +123,7 @@ backfill
backfillable
backfilled
backfilling
+backfilljob
BackfillJobTest
Backfills
backfills