You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/11 20:06:31 UTC

[GitHub] forsberg closed pull request #2319: [AIRFLOW-1236] SlackPostOperator using Slack Incoming WebHook

forsberg closed pull request #2319: [AIRFLOW-1236] SlackPostOperator using Slack Incoming WebHook
URL: https://github.com/apache/incubator-airflow/pull/2319
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/hooks/slack_webhook_hook.py b/airflow/hooks/slack_webhook_hook.py
new file mode 100644
index 0000000000..d48221e4a4
--- /dev/null
+++ b/airflow/hooks/slack_webhook_hook.py
@@ -0,0 +1,24 @@
+import logging
+import requests
+from airflow.hooks.base_hook import BaseHook
+
+class SlackWebHookHook(BaseHook):
+    """Slack WebHook hook. Connects to a slack webhook URL using a Connection.
+
+    The Slack WebHook URL should be defined in the 'extra' field of the connection as defined
+    in the Airflow admin interface.
+    """
+    def __init__(self, conn_id):
+        self.conn_id = conn_id
+
+    def _get_webhook_url(self):
+        webhook_url = self.get_connection(self.conn_id).extra
+        if not webhook_url:
+            logging.error("'extra' not defined on connection '%s'. SlackWebHookHook will fail" % self.conn_id)
+        return webhook_url
+
+    def post(self, json):
+        response = requests.post(self._get_webhook_url(), json=json,
+                                 headers={'Content-Type': 'application/json'})
+        response.raise_for_status()
+
diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py
index 50b05ff461..a1b173f3bf 100644
--- a/airflow/operators/__init__.py
+++ b/airflow/operators/__init__.py
@@ -88,7 +88,7 @@
     'jdbc_operator': ['JdbcOperator'],
     'mssql_operator': ['MsSqlOperator'],
     'mssql_to_hive': ['MsSqlToHiveTransfer'],
-    'slack_operator': ['SlackAPIOperator', 'SlackAPIPostOperator'],
+    'slack_operator': ['SlackAPIOperator', 'SlackAPIPostOperator', 'SlackPostOperator'],
     'generic_transfer': ['GenericTransfer'],
     'oracle_operator': ['OracleOperator']
 }
diff --git a/airflow/operators/slack_operator.py b/airflow/operators/slack_operator.py
index 2e6d4269fa..fbf4c076b7 100644
--- a/airflow/operators/slack_operator.py
+++ b/airflow/operators/slack_operator.py
@@ -16,10 +16,12 @@
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.exceptions import AirflowException
+from airflow.hooks.slack_webhook_hook import SlackWebHookHook
 import json
 import logging
 
 
+
 class SlackAPIOperator(BaseOperator):
     """
     Base Slack Operator
@@ -61,6 +63,7 @@ def execute(self, **kwargs):
         SlackAPIOperator calls will not fail even if the call is not unsuccessful.
         It should not prevent a DAG from completing in success
         """
+
         if not self.api_params:
             self.construct_api_call_params()
         sc = SlackClient(self.token)
@@ -116,3 +119,55 @@ def construct_api_call_params(self):
             'icon_url': self.icon_url,
             'attachments': json.dumps(self.attachments),
         }
+
+
+class SlackPostOperator(BaseOperator):
+    """
+    Posts messages to a slack channel using the Slack Webhook API. Same functionality as
+    :class:`.SlackAPIPostOperator`, but uses Connection info from Airflow, and does not rely on a
+    deprecated API.
+
+    :param conn_id: Connection ID (type HTTP) to use as Slack WebHook URL. Configure under Connections
+                    in the Airflow administration interface, and add the Slack WebHook URL as 'extra'
+    :param channel: channel in which to post message on slack name (#general) or ID (C12318391)
+    :type channel: string
+    :param username: Username that airflow will be posting to Slack as
+    :type username: string
+    :param text: message to send to slack
+    :type text: string
+    :param icon_url: url to icon used for this message
+    :type icon_url: string
+    :param attachments: extra formatting details - see https://api.slack.com/docs/attachments
+    :type attachments: array of hashes
+    """
+
+    template_fields = ('username', 'text', 'attachments')
+    ui_color = '#FFBA40'
+
+    @apply_defaults
+    def __init__(self,
+                 conn_id='default_slack_webhook',
+                 channel='#general',
+                 username='Airflow',
+                 text='No message has been set.\n'
+                      'Here is a video of the Royal Guards in Stockholm, Sweden instead\n'
+                      'https://www.youtube.com/watch?v=qGeZd6SovFI',
+                 icon_url='https://raw.githubusercontent.com/airbnb/airflow/master/airflow/www/static/pin_100.png',
+                 attachments=None,
+                 *args, **kwargs):
+        self.conn_id = conn_id
+        self.channel = channel
+        self.username = username
+        self.text = text
+        self.icon_url = icon_url
+        self.attachments = attachments
+        super(SlackPostOperator, self).__init__(*args, **kwargs)
+
+    def execute(self, context):
+        hook = SlackWebHookHook(self.conn_id)
+        hook.post({'text': self.text,
+                   'channel': self.channel,
+                   'username': self.username,
+                   'icon': self.icon_url,
+                   'attachments': self.attachments,
+                   })
diff --git a/tests/hooks/test_slack_webhook_hook.py b/tests/hooks/test_slack_webhook_hook.py
new file mode 100644
index 0000000000..3449a4cf3a
--- /dev/null
+++ b/tests/hooks/test_slack_webhook_hook.py
@@ -0,0 +1,80 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mock
+import unittest
+import requests
+from airflow.hooks.slack_webhook_hook import SlackWebHookHook
+
+class SlackWebhookHookTest(unittest.TestCase):
+    @mock.patch('requests.post')
+    def test_post(self, requests_post):
+        webhook_url = object()
+
+        hook = SlackWebHookHook(None)
+        hook._get_webhook_url = mock.Mock()
+        hook._get_webhook_url.return_value = webhook_url
+
+        data = {'attr':'value'}
+        hook.post(data)
+
+        requests_post.assert_called_once_with(webhook_url, json=data,
+                                              headers={'Content-Type': 'application/json'})
+
+    @mock.patch('logging.error')
+    def test_get_webhook_url(self, error):
+        conn_id = object()
+        webhook_url = mock.Mock()
+        connection = mock.Mock()
+        connection.extra = webhook_url
+
+        hook = SlackWebHookHook(conn_id)
+
+        hook.get_connection = mock.Mock()
+        hook.get_connection.return_value = connection
+
+        self.assertEqual(hook._get_webhook_url(), webhook_url)
+        hook.get_connection.assert_called_once_with(conn_id)
+
+        error.assert_not_called()
+
+    @mock.patch('logging.error')
+    def test_get_webhook_url_no_extra(self, error):
+        conn_id = 'connection name'
+        webhook_url = None
+        connection = mock.Mock()
+        connection.extra = None
+
+        hook = SlackWebHookHook(conn_id)
+
+        hook.get_connection = mock.Mock()
+        hook.get_connection.return_value = connection
+
+        self.assertEqual(hook._get_webhook_url(), None)
+        hook.get_connection.assert_called_once_with(conn_id)
+
+        error.assert_called_with("'extra' not defined on connection 'connection name'. SlackWebHookHook will fail")
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tests/operators/test_slack_operator.py b/tests/operators/test_slack_operator.py
new file mode 100644
index 0000000000..b4bfb32342
--- /dev/null
+++ b/tests/operators/test_slack_operator.py
@@ -0,0 +1,34 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mock
+import unittest
+
+from airflow.operators.slack_operator import SlackPostOperator
+
+class TestSlackPostOperator(unittest.TestCase):
+    @mock.patch('airflow.hooks.slack_webhook_hook.SlackWebHookHook.post')
+    def test_execute(self, hook_post):
+        operator = SlackPostOperator(task_id='test_task_id', text='something',
+                                     icon_url='icon_url')
+        operator.execute({})
+
+        hook_post.assert_called_once_with({'text': 'something',
+                                           'channel': '#general',
+                                           'username': 'Airflow',
+                                           'icon': 'icon_url',
+                                           'attachments': None})
+
+
+


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services