You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/03/28 21:23:51 UTC

incubator-airflow git commit: [AIRFLOW-2217] Add Slack webhook operator

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 721bc0927 -> 64206615a


[AIRFLOW-2217] Add Slack webhook operator

Add the Slack webhook hook/operator pair. This
allows posting
messages to Slack in an easy, light-weight manner.

Closes #3129 from danielvdende/AIRFLOW-2217-add-
slack-webhook-operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/64206615
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/64206615
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/64206615

Branch: refs/heads/master
Commit: 64206615a790c90893d5836da8d2f7159bda23ac
Parents: 721bc09
Author: Daniel van der Ende <da...@gmail.com>
Authored: Wed Mar 28 23:23:44 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed Mar 28 23:23:44 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/slack_webhook_hook.py     | 124 +++++++++++++++++++
 .../contrib/operators/slack_webhook_operator.py |  87 +++++++++++++
 tests/contrib/hooks/test_slack_webhook_hook.py  |  87 +++++++++++++
 .../operators/test_slack_webhook_operator.py    |  65 ++++++++++
 4 files changed, 363 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/64206615/airflow/contrib/hooks/slack_webhook_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/slack_webhook_hook.py b/airflow/contrib/hooks/slack_webhook_hook.py
new file mode 100644
index 0000000..d946e51
--- /dev/null
+++ b/airflow/contrib/hooks/slack_webhook_hook.py
@@ -0,0 +1,124 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+
+from airflow.hooks.http_hook import HttpHook
+from airflow.exceptions import AirflowException
+
+
+class SlackWebhookHook(HttpHook):
+    """
+    This hook allows you to post messages to Slack using incoming webhooks.
+    Takes both Slack webhook token directly and connection that has Slack webhook token.
+    If both supplied, Slack webhook token will be used.
+
+    Each Slack webhook token can be pre-configured to use a specific channel, username and
+    icon. You can override these defaults in this hook.
+
+    :param http_conn_id: connection that has Slack webhook token in the extra field
+    :type http_conn_id: str
+    :param webhook_token: Slack webhook token
+    :type webhook_token: str
+    :param message: The message you want to send on Slack
+    :type message: str
+    :param channel: The channel the message should be posted to
+    :type channel: str
+    :param username: The username to post to slack with
+    :type username: str
+    :param icon_emoji: The emoji to use as icon for the user posting to Slack
+    :type icon_emoji: str
+    :param link_names: Whether or not to find and link channel and usernames in your
+                       message
+    :type link_names: bool
+    :param proxy: Proxy to use to make the Slack webhook call
+    :type proxy: str
+    """
+    def __init__(self,
+                 http_conn_id=None,
+                 webhook_token=None,
+                 message="",
+                 channel=None,
+                 username=None,
+                 icon_emoji=None,
+                 link_names=False,
+                 proxy=None,
+                 *args,
+                 **kwargs
+                 ):
+        super(SlackWebhookHook, self).__init__(*args, **kwargs)
+        self.http_conn_id = http_conn_id
+        self.webhook_token = self._get_token(webhook_token, http_conn_id)
+        self.message = message
+        self.channel = channel
+        self.username = username
+        self.icon_emoji = icon_emoji
+        self.link_names = link_names
+        self.proxy = proxy
+
+    def _get_token(self, token, http_conn_id):
+        """
+        Given either a manually set token or a conn_id, return the webhook_token to use
+        :param token: The manually provided token
+        :param conn_id: The conn_id provided
+        :return: webhook_token (str) to use
+        """
+        if token:
+            return token
+        elif http_conn_id:
+            conn = self.get_connection(http_conn_id)
+            extra = conn.extra_dejson
+            return extra.get('webhook_token', '')
+        else:
+            raise AirflowException('Cannot get token: No valid Slack '
+                                   'webhook token nor conn_id supplied')
+
+    def _build_slack_message(self):
+        """
+        Construct the Slack message. All relevant parameters are combined here to a valid
+        Slack json message
+        :return: Slack message (str) to send
+        """
+        cmd = {}
+
+        if self.channel:
+            cmd['channel'] = self.channel
+        if self.username:
+            cmd['username'] = self.username
+        if self.icon_emoji:
+            cmd['icon_emoji'] = self.icon_emoji
+        if self.link_names:
+            cmd['link_names'] = 1
+
+        # there should always be a message to post ;-)
+        cmd['text'] = self.message
+        return json.dumps(cmd)
+
+    def execute(self):
+        """
+        Remote Popen (actually execute the slack webhook call)
+
+        :param cmd: command to remotely execute
+        :param kwargs: extra arguments to Popen (see subprocess.Popen)
+        """
+        proxies = {}
+        if self.proxy:
+            # we only need https proxy for Slack, as the endpoint is https
+            proxies = {'https': self.proxy}
+
+        slack_message = self._build_slack_message()
+        self.run(endpoint=self.webhook_token,
+                 data=slack_message,
+                 headers={'Content-type': 'application/json'},
+                 extra_options={'proxies': proxies})

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/64206615/airflow/contrib/operators/slack_webhook_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/slack_webhook_operator.py b/airflow/contrib/operators/slack_webhook_operator.py
new file mode 100644
index 0000000..dff1c40
--- /dev/null
+++ b/airflow/contrib/operators/slack_webhook_operator.py
@@ -0,0 +1,87 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from airflow.operators.http_operator import SimpleHttpOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.contrib.hooks.slack_webhook_hook import SlackWebhookHook
+
+
+class SlackWebhookOperator(SimpleHttpOperator):
+    """
+    This operator allows you to post messages to Slack using incoming webhooks.
+    Takes both Slack webhook token directly and connection that has Slack webhook token.
+    If both supplied, Slack webhook token will be used.
+
+    Each Slack webhook token can be pre-configured to use a specific channel, username and
+    icon. You can override these defaults in this hook.
+
+    :param conn_id: connection that has Slack webhook token in the extra field
+    :type conn_id: str
+    :param webhook_token: Slack webhook token
+    :type webhook_token: str
+    :param message: The message you want to send on Slack
+    :type message: str
+    :param channel: The channel the message should be posted to
+    :type channel: str
+    :param username: The username to post to slack with
+    :type username: str
+    :param icon_emoji: The emoji to use as icon for the user posting to Slack
+    :type icon_emoji: str
+    :param link_names: Whether or not to find and link channel and usernames in your
+                       message
+    :type link_names: bool
+    :param proxy: Proxy to use to make the Slack webhook call
+    :type proxy: str
+    """
+
+    @apply_defaults
+    def __init__(self,
+                 http_conn_id=None,
+                 webhook_token=None,
+                 message="",
+                 channel=None,
+                 username=None,
+                 icon_emoji=None,
+                 link_names=False,
+                 proxy=None,
+                 *args,
+                 **kwargs):
+        super(SlackWebhookOperator, self).__init__(endpoint=webhook_token,
+                                                   *args,
+                                                   **kwargs)
+        self.http_conn_id = http_conn_id
+        self.webhook_token = webhook_token
+        self.message = message
+        self.channel = channel
+        self.username = username
+        self.icon_emoji = icon_emoji
+        self.link_names = link_names
+        self.proxy = proxy
+        self.hook = None
+
+    def execute(self, context):
+        """
+        Call the SparkSqlHook to run the provided sql query
+        """
+        self.hook = SlackWebhookHook(
+            self.http_conn_id,
+            self.webhook_token,
+            self.message,
+            self.channel,
+            self.username,
+            self.icon_emoji,
+            self.link_names,
+            self.proxy
+        )
+        self.hook.execute()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/64206615/tests/contrib/hooks/test_slack_webhook_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_slack_webhook_hook.py b/tests/contrib/hooks/test_slack_webhook_hook.py
new file mode 100644
index 0000000..7977ee6
--- /dev/null
+++ b/tests/contrib/hooks/test_slack_webhook_hook.py
@@ -0,0 +1,87 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import unittest
+
+from airflow import configuration, models
+from airflow.utils import db
+
+from airflow.contrib.hooks.slack_webhook_hook import SlackWebhookHook
+
+
+class TestSlackWebhookHook(unittest.TestCase):
+
+    _config = {
+        'http_conn_id': 'slack-webhook-default',
+        'webhook_token': 'manual_token',
+        'message': 'Awesome message to put on Slack',
+        'channel': '#general',
+        'username': 'SlackMcSlackFace',
+        'icon_emoji': ':hankey:',
+        'link_names': True,
+        'proxy': 'https://my-horrible-proxy.proxyist.com:8080'
+    }
+    expected_message_dict = {'channel': _config['channel'],
+                             'username': _config['username'],
+                             'icon_emoji': _config['icon_emoji'],
+                             'link_names': 1,
+                             'text': _config['message']
+                             }
+    expected_message = json.dumps(expected_message_dict)
+
+    def setUp(self):
+        configuration.load_test_config()
+        db.merge_conn(
+            models.Connection(
+                conn_id='slack-webhook-default',
+                extra='{"webhook_token": "your_token_here"}')
+        )
+
+    def test_get_token_manual_token(self):
+        # Given
+        manual_token = 'manual_token_here'
+        hook = SlackWebhookHook(webhook_token=manual_token)
+
+        # When
+        webhook_token = hook._get_token(manual_token, None)
+
+        # Then
+        self.assertEqual(webhook_token, manual_token)
+
+    def test_get_token_conn_id(self):
+        # Given
+        conn_id = 'slack-webhook-default'
+        hook = SlackWebhookHook(http_conn_id=conn_id)
+        expected_webhook_token = 'your_token_here'
+
+        # When
+        webhook_token = hook._get_token(None, conn_id)
+
+        # Then
+        self.assertEqual(webhook_token, expected_webhook_token)
+
+    def test_build_slack_message(self):
+        # Given
+        hook = SlackWebhookHook(**self._config)
+
+        # When
+        message = hook._build_slack_message()
+
+        # Then
+        self.assertEqual(self.expected_message, message)
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/64206615/tests/contrib/operators/test_slack_webhook_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_slack_webhook_operator.py b/tests/contrib/operators/test_slack_webhook_operator.py
new file mode 100644
index 0000000..4ea0a60
--- /dev/null
+++ b/tests/contrib/operators/test_slack_webhook_operator.py
@@ -0,0 +1,65 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from airflow import DAG, configuration
+
+from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
+from airflow.utils import timezone
+
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
+
+
+class TestSlackWebhookOperator(unittest.TestCase):
+    _config = {
+        'http_conn_id': 'slack-webhook-default',
+        'webhook_token': 'manual_token',
+        'message': 'your message here',
+        'channel': '#general',
+        'username': 'SlackMcSlackFace',
+        'icon_emoji': ':hankey',
+        'link_names': True,
+        'proxy': 'https://my-horrible-proxy.proxyist.com:8080'
+    }
+
+    def setUp(self):
+        configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': DEFAULT_DATE
+        }
+        self.dag = DAG('test_dag_id', default_args=args)
+
+    def test_execute(self):
+        # Given / When
+        operator = SlackWebhookOperator(
+            task_id='slack_webhook_job',
+            dag=self.dag,
+            **self._config
+        )
+
+        self.assertEqual(self._config['http_conn_id'], operator.http_conn_id)
+        self.assertEqual(self._config['webhook_token'], operator.webhook_token)
+        self.assertEqual(self._config['message'], operator.message)
+        self.assertEqual(self._config['channel'], operator.channel)
+        self.assertEqual(self._config['username'], operator.username)
+        self.assertEqual(self._config['icon_emoji'], operator.icon_emoji)
+        self.assertEqual(self._config['link_names'], operator.link_names)
+        self.assertEqual(self._config['proxy'], operator.proxy)
+
+
+if __name__ == '__main__':
+    unittest.main()