You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2020/10/20 08:18:43 UTC

[airflow] branch master updated: Switch PagerdutyHook from pypd to use pdpyras instead (#11151)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ee6186  Switch PagerdutyHook from pypd to use pdpyras instead (#11151)
3ee6186 is described below

commit 3ee618623be6079ed177da793b490cb7436d5cb6
Author: Alex Begg <ab...@gocurrency.com>
AuthorDate: Tue Oct 20 01:17:58 2020 -0700

    Switch PagerdutyHook from pypd to use pdpyras instead (#11151)
---
 airflow/providers/pagerduty/README.md             |  2 +-
 airflow/providers/pagerduty/hooks/pagerduty.py    | 72 ++++++++++++++++++-----
 setup.py                                          |  2 +-
 tests/providers/pagerduty/hooks/test_pagerduty.py | 66 +++++++--------------
 4 files changed, 79 insertions(+), 63 deletions(-)

diff --git a/airflow/providers/pagerduty/README.md b/airflow/providers/pagerduty/README.md
index 4c86aa2..e95ff5f 100644
--- a/airflow/providers/pagerduty/README.md
+++ b/airflow/providers/pagerduty/README.md
@@ -49,7 +49,7 @@ You can install this package on top of an existing airflow 2.* installation via
 
 | PIP package   | Version required   |
 |:--------------|:-------------------|
-| pypd          | &gt;=1.1.0            |
+| pdpyras       | &gt;=4.1.2,&lt;5         |
 
 # Provider classes summary
 
diff --git a/airflow/providers/pagerduty/hooks/pagerduty.py b/airflow/providers/pagerduty/hooks/pagerduty.py
index 55f9a47..781ed9b 100644
--- a/airflow/providers/pagerduty/hooks/pagerduty.py
+++ b/airflow/providers/pagerduty/hooks/pagerduty.py
@@ -15,10 +15,10 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Hook for creating Pagerduty incidents."""
+"""Hook for sending or receiving data from PagerDuty as well as creating PagerDuty incidents."""
 from typing import Any, Dict, List, Optional
 
-import pypd
+import pdpyras
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
@@ -26,17 +26,18 @@ from airflow.hooks.base_hook import BaseHook
 
 class PagerdutyHook(BaseHook):
     """
-    Takes both Pagerduty API token directly and connection that has Pagerduty API token.
+    Takes both PagerDuty API token directly and connection that has PagerDuty API token.
 
-    If both supplied, Pagerduty API token will be used.
+    If both supplied, PagerDuty API token will be used.
 
-    :param token: Pagerduty API token
-    :param pagerduty_conn_id: connection that has Pagerduty API token in the password field
+    :param token: PagerDuty API token
+    :param pagerduty_conn_id: connection that has PagerDuty API token in the password field
     """
 
     def __init__(self, token: Optional[str] = None, pagerduty_conn_id: Optional[str] = None) -> None:
         super().__init__()
         self.routing_key = None
+        self._session = None
 
         if pagerduty_conn_id is not None:
             conn = self.get_connection(pagerduty_conn_id)
@@ -52,6 +53,19 @@ class PagerdutyHook(BaseHook):
         if self.token is None:
             raise AirflowException('Cannot get token: No valid api token nor pagerduty_conn_id supplied.')
 
+    def get_session(self) -> pdpyras.APISession:
+        """
+        Returns `pdpyras.APISession` for use with sending or receiving data through the PagerDuty REST API.
+
+        The `pdpyras` library supplies a class `pdpyras.APISession` extending `requests.Session` from the
+        Requests HTTP library.
+
+        Documentation on how to use the `APISession` class can be found at:
+        https://pagerduty.github.io/pdpyras/#data-access-abstraction
+        """
+        self._session = pdpyras.APISession(self.token)
+        return self._session
+
     # pylint: disable=too-many-arguments
     def create_event(
         self,
@@ -65,14 +79,15 @@ class PagerdutyHook(BaseHook):
         group: Optional[str] = None,
         component: Optional[str] = None,
         class_type: Optional[str] = None,
-        links: Optional[List[Dict]] = None,
+        images: Optional[List[Any]] = None,
+        links: Optional[List[Any]] = None,
     ) -> Dict:
         """
         Create event for service integration.
 
         :param summary: Summary for the event
         :type summary: str
-        :param severity: Severity for the event, needs to be one of: Info, Warning, Error, Critical
+        :param severity: Severity for the event, needs to be one of: info, warning, error, critical
         :type severity: str
         :param source: Specific human-readable unique identifier, such as a
             hostname, for the system having the problem.
@@ -83,10 +98,12 @@ class PagerdutyHook(BaseHook):
         :param routing_key: Integration key. If not specified, will try to read
             from connection's extra json blob.
         :type routing_key: str
-        :param dedup_key: A string which identifies the alert triggered for the given event
+        :param dedup_key: A string which identifies the alert triggered for the given event.
+            Required for the actions acknowledge and resolve.
         :type dedup_key: str
-        :param custom_details: Free-form details from the event
-        :type custom_details: str
+        :param custom_details: Free-form details from the event. Can be a dictionary or a string.
+            If a dictionary is passed it will show up in PagerDuty as a table.
+        :type custom_details: dict or str
         :param group: A cluster or grouping of sources. For example, sources
             “prod-datapipe-02” and “prod-datapipe-03” might both be part of “prod-datapipe”
         :type group: str
@@ -94,8 +111,19 @@ class PagerdutyHook(BaseHook):
         :type component: str
         :param class_type: The class/type of the event.
         :type class_type: str
-        :param links: List of links to include.
-        :type class_type: list(str)
+        :param images: List of images to include. Each dictionary in the list accepts the following keys:
+            `src`: The source (URL) of the image being attached to the incident. This image must be served via
+            HTTPS.
+            `href`: [Optional] URL to make the image a clickable link.
+            `alt`: [Optional] Alternative text for the image.
+        :type images: list[dict]
+        :param links: List of links to include. Each dictionary in the list accepts the following keys:
+            `href`: URL of the link to be attached.
+            `text`: [Optional] Plain text that describes the purpose of the link, and can be used as the
+            link's text.
+        :type links: list[dict]
+        :return: PagerDuty Events API v2 response.
+        :rtype: dict
         """
         if routing_key is None:
             routing_key = self.routing_key
@@ -115,13 +143,27 @@ class PagerdutyHook(BaseHook):
         if class_type:
             payload["class"] = class_type
 
+        actions = ('trigger', 'acknowledge', 'resolve')
+        if action not in actions:
+            raise ValueError("Event action must be one of: %s" % ', '.join(actions))
         data = {
-            "routing_key": routing_key,
             "event_action": action,
             "payload": payload,
         }
         if dedup_key:
             data["dedup_key"] = dedup_key
+        elif action != 'trigger':
+            raise ValueError(
+                "The dedup_key property is required for event_action=%s events, and it must \
+                be a string."
+                % action
+            )
+        if images is not None:
+            data["images"] = images
         if links is not None:
             data["links"] = links
-        return pypd.EventV2.create(api_key=self.token, data=data)
+
+        session = pdpyras.EventsAPISession(routing_key)
+        resp = session.post('/v2/enqueue', json=data)
+        resp.raise_for_status()
+        return resp.json()
diff --git a/setup.py b/setup.py
index e102d5b..ef3ee97 100644
--- a/setup.py
+++ b/setup.py
@@ -340,7 +340,7 @@ oracle = [
     'cx_Oracle>=5.1.2',
 ]
 pagerduty = [
-    'pypd>=1.1.0',
+    'pdpyras>=4.1.2,<5',
 ]
 papermill = [
     'papermill[all]>=1.2.1',
diff --git a/tests/providers/pagerduty/hooks/test_pagerduty.py b/tests/providers/pagerduty/hooks/test_pagerduty.py
index 77bafd2..50a1154 100644
--- a/tests/providers/pagerduty/hooks/test_pagerduty.py
+++ b/tests/providers/pagerduty/hooks/test_pagerduty.py
@@ -15,10 +15,8 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-
 import unittest
-from unittest import mock
+import requests_mock
 
 from airflow.models import Connection
 from airflow.providers.pagerduty.hooks.pagerduty import PagerdutyHook
@@ -63,59 +61,35 @@ class TestPagerdutyHook(unittest.TestCase):
         hook = PagerdutyHook(token="pagerduty_param_token", pagerduty_conn_id=DEFAULT_CONN_ID)
         self.assertEqual(hook.token, 'pagerduty_param_token', 'token initialised.')
 
-    @mock.patch('airflow.providers.pagerduty.hooks.pagerduty.pypd.EventV2.create')
-    def test_create_event(self, mock_event_create):
+    @requests_mock.mock()
+    def test_get_service(self, m):
         hook = PagerdutyHook(pagerduty_conn_id=DEFAULT_CONN_ID)
-        mock_event_create.return_value = {
-            "status": "success",
-            "message": "Event processed",
-            "dedup_key": "samplekeyhere",
+        mock_response_body = {
+            "id": "PZYX321",
+            "name": "Apache Airflow",
+            "status": "active",
+            "type": "service",
+            "summary": "Apache Airflow",
+            "self": "https://api.pagerduty.com/services/PZYX321",
         }
-        resp = hook.create_event(
-            routing_key="key",
-            summary="test",
-            source="airflow_test",
-            severity="error",
-        )
-        self.assertEqual(resp["status"], "success")
-        mock_event_create.assert_called_once_with(
-            api_key="pagerduty_token",
-            data={
-                "routing_key": "key",
-                "event_action": "trigger",
-                "payload": {
-                    "severity": "error",
-                    "source": "airflow_test",
-                    "summary": "test",
-                },
-            },
-        )
+        m.get('https://api.pagerduty.com/services/PZYX321', json={"service": mock_response_body})
+        session = hook.get_session()
+        resp = session.rget('/services/PZYX321')
+        self.assertEqual(resp, mock_response_body)
 
-    @mock.patch('airflow.providers.pagerduty.hooks.pagerduty.pypd.EventV2.create')
-    def test_create_event_with_default_routing_key(self, mock_event_create):
+    @requests_mock.mock()
+    def test_create_event(self, m):
         hook = PagerdutyHook(pagerduty_conn_id=DEFAULT_CONN_ID)
-        mock_event_create.return_value = {
+        mock_response_body = {
             "status": "success",
             "message": "Event processed",
             "dedup_key": "samplekeyhere",
         }
+        m.post('https://events.pagerduty.com/v2/enqueue', json=mock_response_body)
         resp = hook.create_event(
+            routing_key="different_key",
             summary="test",
             source="airflow_test",
             severity="error",
-            custom_details='{"foo": "bar"}',
-        )
-        self.assertEqual(resp["status"], "success")
-        mock_event_create.assert_called_once_with(
-            api_key="pagerduty_token",
-            data={
-                "routing_key": "route",
-                "event_action": "trigger",
-                "payload": {
-                    "severity": "error",
-                    "source": "airflow_test",
-                    "summary": "test",
-                    "custom_details": '{"foo": "bar"}',
-                },
-            },
         )
+        self.assertEqual(resp, mock_response_body)