You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2017/11/15 22:54:26 UTC

incubator-airflow git commit: [AIRFLOW-1805] Allow Slack token to be passed through connection

Repository: incubator-airflow
Updated Branches:
  refs/heads/master d04519e60 -> d8e8f9014


[AIRFLOW-1805] Allow Slack token to be passed through connection

Allow users to pass in Slack token through
connection which can provide better security. This
enables user to expose token only to workers
instead to both workers and schedulers.

Closes #2789 from
yrqls21/add_conn_supp_in_slack_op


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d8e8f901
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d8e8f901
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d8e8f901

Branch: refs/heads/master
Commit: d8e8f90142246ae5b02c1a0f9649ea5a419a5afc
Parents: d04519e
Author: Kevin Yang <ke...@airbnb.com>
Authored: Wed Nov 15 14:53:56 2017 -0800
Committer: Dan Davydov <da...@airbnb.com>
Committed: Wed Nov 15 14:53:58 2017 -0800

----------------------------------------------------------------------
 airflow/hooks/__init__.py           |   1 +
 airflow/hooks/slack_hook.py         |  56 ++++++++++++
 airflow/operators/slack_operator.py |  25 ++++--
 tests/hooks/test_slack_hook.py      | 101 ++++++++++++++++++++++
 tests/operators/slack_operator.py   | 141 +++++++++++++++++++++++++++++++
 5 files changed, 315 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8e8f901/airflow/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/__init__.py b/airflow/hooks/__init__.py
index 6e96e2a..6372b2f 100644
--- a/airflow/hooks/__init__.py
+++ b/airflow/hooks/__init__.py
@@ -55,6 +55,7 @@ _hooks = {
     'dbapi_hook': ['DbApiHook'],
     'mssql_hook': ['MsSqlHook'],
     'oracle_hook': ['OracleHook'],
+    'slack_hook': ['SlackHook'],
 }
 
 import os as _os

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8e8f901/airflow/hooks/slack_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/slack_hook.py b/airflow/hooks/slack_hook.py
new file mode 100644
index 0000000..cd47573
--- /dev/null
+++ b/airflow/hooks/slack_hook.py
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from slackclient import SlackClient
+from airflow.hooks.base_hook import BaseHook
+from airflow.exceptions import AirflowException
+
+
+class SlackHook(BaseHook):
+    """
+       Interact with Slack, using slackclient library.
+    """
+
+    def __init__(self, token=None, slack_conn_id=None):
+        """
+        Takes both Slack API token directly and connection that has Slack API token.
+
+        If both supplied, Slack API token will be used.
+
+        :param token: Slack API token
+        :type token: string
+        :param slack_conn_id: connection that has Slack API token in the password field
+        :type slack_conn_id: string
+        """
+        self.token = self.__get_token(token, slack_conn_id)
+
+    def __get_token(self, token, slack_conn_id):
+        if token is not None:
+            return token
+        elif slack_conn_id is not None:
+            conn = self.get_connection(slack_conn_id)
+
+            if not getattr(conn, 'password', None):
+                raise AirflowException('Missing token(password) in Slack connection')
+            return conn.password
+        else:
+            raise AirflowException('Cannot get token: No valid Slack token nor slack_conn_id supplied.')
+
+    def call(self, method, api_params):
+        sc = SlackClient(self.token)
+        rc = sc.api_call(method, **api_params)
+
+        if not rc['ok']:
+            msg = "Slack API call failed (%s)".format(rc['error'])
+            raise AirflowException(msg)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8e8f901/airflow/operators/slack_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/slack_operator.py b/airflow/operators/slack_operator.py
index 8b21211..8398a7a 100644
--- a/airflow/operators/slack_operator.py
+++ b/airflow/operators/slack_operator.py
@@ -14,9 +14,9 @@
 
 import json
 
-from slackclient import SlackClient
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
+from airflow.hooks.slack_hook import SlackHook
 from airflow.exceptions import AirflowException
 
 
@@ -26,6 +26,8 @@ class SlackAPIOperator(BaseOperator):
     The SlackAPIPostOperator is derived from this operator.
     In the future additional Slack API Operators will be derived from this class as well
 
+    :param slack_conn_id: Slack connection ID which its password is Slack API token
+    :type slack_conn_id: string
     :param token: Slack API token (https://api.slack.com/web)
     :type token: string
     :param method: The Slack API Method to Call (https://api.slack.com/methods)
@@ -36,12 +38,21 @@ class SlackAPIOperator(BaseOperator):
 
     @apply_defaults
     def __init__(self,
-                 token='unset',
-                 method='unset',
+                 slack_conn_id=None,
+                 token=None,
+                 method=None,
                  api_params=None,
                  *args, **kwargs):
         super(SlackAPIOperator, self).__init__(*args, **kwargs)
+
+        if token is None and slack_conn_id is None:
+            raise AirflowException('No valid Slack token nor slack_conn_id supplied.')
+        if token is not None and slack_conn_id is not None:
+            raise AirflowException('Cannot determine Slack credential when both token and slack_conn_id are supplied.')
+
         self.token = token
+        self.slack_conn_id = slack_conn_id
+
         self.method = method
         self.api_params = api_params
 
@@ -63,12 +74,8 @@ class SlackAPIOperator(BaseOperator):
         """
         if not self.api_params:
             self.construct_api_call_params()
-        sc = SlackClient(self.token)
-        rc = sc.api_call(self.method, **self.api_params)
-        if not rc['ok']:
-            msg = "Slack API call failed (%s)".format(rc['error'])
-            self.log.error(msg)
-            raise AirflowException(msg)
+        slack = SlackHook(token=self.token, slack_conn_id=self.slack_conn_id)
+        slack.call(self.method, self.api_params)
 
 
 class SlackAPIPostOperator(SlackAPIOperator):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8e8f901/tests/hooks/test_slack_hook.py
----------------------------------------------------------------------
diff --git a/tests/hooks/test_slack_hook.py b/tests/hooks/test_slack_hook.py
new file mode 100644
index 0000000..7ad7459
--- /dev/null
+++ b/tests/hooks/test_slack_hook.py
@@ -0,0 +1,101 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from airflow.exceptions import AirflowException
+from airflow.hooks.slack_hook import SlackHook
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+class SlackHookTestCase(unittest.TestCase):
+    def test_init_with_token_only(self):
+        test_token = 'test_token'
+        slack_hook = SlackHook(token=test_token, slack_conn_id=None)
+
+        self.assertEqual(slack_hook.token, test_token)
+
+    @mock.patch('airflow.hooks.slack_hook.SlackHook.get_connection')
+    def test_init_with_valid_slack_conn_id_only(self, get_connection_mock):
+        test_password = 'test_password'
+        get_connection_mock.return_value = mock.Mock(password=test_password)
+
+        test_slack_conn_id = 'test_slack_conn_id'
+        slack_hook = SlackHook(token=None, slack_conn_id=test_slack_conn_id)
+
+        get_connection_mock.assert_called_with(test_slack_conn_id)
+        self.assertEqual(slack_hook.token, test_password)
+
+    @mock.patch('airflow.hooks.slack_hook.SlackHook.get_connection')
+    def test_init_with_no_password_slack_conn_id_only(self, get_connection_mock):
+        conn = mock.Mock()
+        del conn.password
+        get_connection_mock.return_value = conn
+
+        test_slack_conn_id = 'test_slack_conn_id'
+        self.assertRaises(AirflowException, SlackHook, token=None, slack_conn_id=test_slack_conn_id)
+
+    @mock.patch('airflow.hooks.slack_hook.SlackHook.get_connection')
+    def test_init_with_empty_password_slack_conn_id_only(self, get_connection_mock):
+        get_connection_mock.return_value = mock.Mock(password=None)
+
+        test_slack_conn_id = 'test_slack_conn_id'
+        self.assertRaises(AirflowException, SlackHook, token=None, slack_conn_id=test_slack_conn_id)
+
+    def test_init_with_token_and_slack_conn_id(self):
+        test_token = 'test_token'
+        test_slack_conn_id = 'test_slack_conn_id'
+        slack_hook = SlackHook(token=test_token, slack_conn_id=test_slack_conn_id)
+
+        self.assertEqual(slack_hook.token, test_token)
+
+    def test_init_with_out_token_nor_slack_conn_id(self):
+        self.assertRaises(AirflowException, SlackHook, token=None, slack_conn_id=None)
+
+    @mock.patch('airflow.hooks.slack_hook.SlackClient')
+    def test_call_with_success(self, slack_client_class_mock):
+        slack_client_mock = mock.Mock()
+        slack_client_class_mock.return_value = slack_client_mock
+        slack_client_mock.api_call.return_value = {'ok': True}
+
+        test_token = 'test_token'
+        test_slack_conn_id = 'test_slack_conn_id'
+        slack_hook = SlackHook(token=test_token, slack_conn_id=test_slack_conn_id)
+        test_method = 'test_method'
+        test_api_params = {'key1': 'value1', 'key2': 'value2'}
+
+        slack_hook.call(test_method, test_api_params)
+
+        slack_client_class_mock.assert_called_with(test_token)
+        slack_client_mock.api_call.assert_called_with(test_method, **test_api_params)
+
+    @mock.patch('airflow.hooks.slack_hook.SlackClient')
+    def test_call_with_failure(self, slack_client_class_mock):
+        slack_client_mock = mock.Mock()
+        slack_client_class_mock.return_value = slack_client_mock
+        slack_client_mock.api_call.return_value = {'ok': False, 'error': 'test_error'}
+
+        test_token = 'test_token'
+        test_slack_conn_id = 'test_slack_conn_id'
+        slack_hook = SlackHook(token=test_token, slack_conn_id=test_slack_conn_id)
+        test_method = 'test_method'
+        test_api_params = {'key1': 'value1', 'key2': 'value2'}
+
+        self.assertRaises(AirflowException, slack_hook.call, test_method, test_api_params)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8e8f901/tests/operators/slack_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/slack_operator.py b/tests/operators/slack_operator.py
new file mode 100644
index 0000000..5e40648
--- /dev/null
+++ b/tests/operators/slack_operator.py
@@ -0,0 +1,141 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import json
+from airflow.exceptions import AirflowException
+from airflow.operators.slack_operator import SlackAPIPostOperator
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+class SlackAPIPostOperatorTestCase(unittest.TestCase):
+    def setUp(self):
+        self.test_username = 'test_username'
+        self.test_channel = '#test_slack_channel'
+        self.test_text = 'test_text'
+        self.test_icon_url = 'test_icon_url'
+        self.test_attachments = [
+            {
+                "fallback": "Required plain-text summary of the attachment.",
+                "color": "#36a64f",
+                "pretext": "Optional text that appears above the attachment block",
+                "author_name": "Bobby Tables",
+                "author_link": "http://flickr.com/bobby/",
+                "author_icon": "http://flickr.com/icons/bobby.jpg",
+                "title": "Slack API Documentation",
+                "title_link": "https://api.slack.com/",
+                "text": "Optional text that appears within the attachment",
+                "fields": [
+                    {
+                        "title": "Priority",
+                        "value": "High",
+                        "short": 'false'
+                    }
+                ],
+                "image_url": "http://my-website.com/path/to/image.jpg",
+                "thumb_url": "http://example.com/path/to/thumb.png",
+                "footer": "Slack API",
+                "footer_icon": "https://platform.slack-edge.com/img/default_application_icon.png",
+                "ts": 123456789
+            }
+        ]
+        self.test_attachments_in_json = json.dumps(self.test_attachments)
+        self.test_kwarg = 'test_kwarg'
+
+        self.expected_method = 'chat.postMessage'
+        self.expected_api_params = {
+            'channel': self.test_channel,
+            'username': self.test_username,
+            'text': self.test_text,
+            'icon_url': self.test_icon_url,
+            'attachments': self.test_attachments_in_json,
+        }
+
+    def __construct_operator(self, test_token, test_slack_conn_id):
+        return SlackAPIPostOperator(
+            task_id='slack',
+            username=self.test_username,
+            token=test_token,
+            slack_conn_id=test_slack_conn_id,
+            channel=self.test_channel,
+            text=self.test_text,
+            icon_url=self.test_icon_url,
+            attachments=self.test_attachments,
+            kwarg=self.test_kwarg
+        )
+
+    @mock.patch('airflow.operators.slack_operator.SlackHook')
+    def test_execute_with_token_only(self, slack_hook_class_mock):
+        slack_hook_mock = mock.Mock()
+        slack_hook_class_mock.return_value = slack_hook_mock
+
+        test_token = 'test_token'
+        slack_api_post_operator = self.__construct_operator(test_token, None)
+
+        slack_api_post_operator.execute()
+
+        slack_hook_class_mock.assert_called_with(token=test_token, slack_conn_id=None)
+
+        slack_hook_mock.call.assert_called_with(self.expected_method, self.expected_api_params)
+
+    @mock.patch('airflow.operators.slack_operator.SlackHook')
+    def test_execute_with_slack_conn_id_only(self, slack_hook_class_mock):
+        slack_hook_mock = mock.Mock()
+        slack_hook_class_mock.return_value = slack_hook_mock
+
+        test_slack_conn_id = 'test_slack_conn_id'
+        slack_api_post_operator = self.__construct_operator(None, test_slack_conn_id)
+
+        slack_api_post_operator.execute()
+
+        slack_hook_class_mock.assert_called_with(token=None, slack_conn_id=test_slack_conn_id)
+
+        slack_hook_mock.call.assert_called_with(self.expected_method, self.expected_api_params)
+
+    def test_init_with_invalid_params(self):
+        test_token = 'test_token'
+        test_slack_conn_id = 'test_slack_conn_id'
+        self.assertRaises(AirflowException, self.__construct_operator, test_token, test_slack_conn_id)
+
+        self.assertRaises(AirflowException, self.__construct_operator, None, None)
+
+    def test_init_with_valid_params(self):
+        test_token = 'test_token'
+        test_slack_conn_id = 'test_slack_conn_id'
+
+        slack_api_post_operator = self.__construct_operator(test_token, None)
+        self.assertEqual(slack_api_post_operator.token, test_token)
+        self.assertEqual(slack_api_post_operator.slack_conn_id, None)
+        self.assertEqual(slack_api_post_operator.method, self.expected_method)
+        self.assertEqual(slack_api_post_operator.text, self.test_text)
+        self.assertEqual(slack_api_post_operator.channel, self.test_channel)
+        self.assertEqual(slack_api_post_operator.api_params, self.expected_api_params)
+        self.assertEqual(slack_api_post_operator.username, self.test_username)
+        self.assertEqual(slack_api_post_operator.icon_url, self.test_icon_url)
+        self.assertEqual(slack_api_post_operator.attachments, self.test_attachments)
+
+        slack_api_post_operator = self.__construct_operator(None, test_slack_conn_id)
+        self.assertEqual(slack_api_post_operator.token, None)
+        self.assertEqual(slack_api_post_operator.slack_conn_id, test_slack_conn_id)
+
+
+if __name__ == "__main__":
+    unittest.main()