You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/02/14 22:55:47 UTC
incubator-airflow git commit: [AIRFLOW-826] Add Zendesk hook
Repository: incubator-airflow
Updated Branches:
refs/heads/master 485280a9f -> a097627d8
[AIRFLOW-826] Add Zendesk hook
Closes #2066 from shreyasjoshis/add-zendesk-hook
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a097627d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a097627d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a097627d
Branch: refs/heads/master
Commit: a097627d8236d9244d39eee0b8cbfa22f726fe8f
Parents: 485280a
Author: Shreyas Joshi <sh...@github.com>
Authored: Tue Feb 14 14:55:36 2017 -0800
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Tue Feb 14 14:55:36 2017 -0800
----------------------------------------------------------------------
airflow/hooks/zendesk_hook.py | 102 +++++++++++++++++++++++++++++++
tests/contrib/hooks/zendesk_hook.py | 90 +++++++++++++++++++++++++++
2 files changed, 192 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a097627d/airflow/hooks/zendesk_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/zendesk_hook.py b/airflow/hooks/zendesk_hook.py
new file mode 100644
index 0000000..438597f
--- /dev/null
+++ b/airflow/hooks/zendesk_hook.py
@@ -0,0 +1,102 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+"""
+A hook to talk to Zendesk
+"""
+
+import logging
+import time
+from zdesk import Zendesk, RateLimitError, ZendeskError
+from airflow.hooks import BaseHook
+
+
+class ZendeskHook(BaseHook):
+ def __init__(self, zendesk_conn_id):
+ self.__zendesk_conn_id = zendesk_conn_id
+ self.__url = None
+
+ def get_conn(self):
+ conn = self.get_connection(self.__zendesk_conn_id)
+ self.__url = "https://" + conn.host
+ return Zendesk(self.__url, conn.login, conn.password, True)
+
+ def __handle_rate_limit_exception(self, rate_limit_exception):
+ """
+ Sleep for the time specified in the exception. If not specified, wait
+ for 60 seconds.
+ """
+ retry_after = int(
+ rate_limit_exception.response.headers.get('Retry-After', 60))
+ logging.info(
+ "Hit Zendesk API rate limit. Pausing for {} "
+ "seconds".format(
+ retry_after))
+ time.sleep(retry_after)
+
+ def call(self, path, query=None, get_all_pages=True):
+ """
+ Call Zendesk API and return results
+
+ :param path: The Zendesk API to call
+ :param query: Query parameters
+ :param get_all_pages: Accumulate results over all pages before
+ returning. Due to strict rate limiting, this can often timeout.
+ Waits for recommended period between tries after a timeout.
+ """
+ zendesk = self.get_conn()
+ first_request_successful = False
+
+ while not first_request_successful:
+ try:
+ results = zendesk.call(path, query)
+ first_request_successful = True
+ except RateLimitError as rle:
+ self.__handle_rate_limit_exception(rle)
+
+ # Find the key with the results
+ key = path.split("/")[-1].split(".json")[0]
+ next_page = results['next_page']
+ results = results[key]
+
+ if get_all_pages:
+ while next_page is not None:
+ try:
+ # Need to split because the next page URL has
+ # `github.zendesk...`
+ # in it, but the call function needs it removed.
+ next_url = next_page.split(self.__url)[1]
+ logging.info("Calling {}".format(next_url))
+ more_res = zendesk.call(next_url)
+ results.extend(more_res[key])
+ if next_page == more_res['next_page']:
+ # Unfortunately zdesk doesn't always throw ZendeskError
+ # when we are done getting all the data. Sometimes the
+ # next just refers to the current set of results. Hence,
+ # need to deal with this special case
+ break
+ else:
+ next_page = more_res['next_page']
+ except RateLimitError as rle:
+ self.__handle_rate_limit_exception(rle)
+ except ZendeskError as ze:
+ if b"Use a start_time older than 5 minutes" in ze.msg:
+ # We have pretty up to date data
+ break
+ else:
+ raise ze
+
+ return results
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a097627d/tests/contrib/hooks/zendesk_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/zendesk_hook.py b/tests/contrib/hooks/zendesk_hook.py
new file mode 100644
index 0000000..66b8e6b
--- /dev/null
+++ b/tests/contrib/hooks/zendesk_hook.py
@@ -0,0 +1,90 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+from unittest.mock import Mock, patch
+from plugins.hooks.zendesk_hook import ZendeskHook
+from zdesk import RateLimitError
+from pytest import raises
+
+
+@patch("plugins.hooks.zendesk_hook.time")
+@patch("plugins.hooks.zendesk_hook.Zendesk")
+def test_sleeps_for_correct_interval(_, mocked_time):
+ sleep_time = 10
+
+ # To break out of the otherwise infinite tries
+ mocked_time.sleep = Mock(side_effect=ValueError)
+ conn_mock = Mock()
+ mock_response = Mock()
+ mock_response.headers.get.return_value = sleep_time
+ conn_mock.call = Mock(
+ side_effect=RateLimitError(msg="some message", code="some code",
+ response=mock_response))
+
+ zendesk_hook = ZendeskHook("conn_id")
+ zendesk_hook.get_conn = Mock(return_value=conn_mock)
+
+ with raises(ValueError):
+ zendesk_hook.call("some_path", get_all_pages=False)
+ mocked_time.sleep.assert_called_with(sleep_time)
+
+
+@patch("plugins.hooks.zendesk_hook.Zendesk")
+def test_returns_single_page_if_get_all_pages_false(_):
+ zendesk_hook = ZendeskHook("conn_id")
+ mock_connection = Mock()
+ mock_connection.host = "some_host"
+ zendesk_hook.get_connection = Mock(return_value=mock_connection)
+ zendesk_hook.get_conn()
+
+ mock_conn = Mock()
+ mock_call = Mock(
+ return_value={'next_page': 'https://some_host/something', 'path': []})
+ mock_conn.call = mock_call
+ zendesk_hook.get_conn = Mock(return_value=mock_conn)
+ zendesk_hook.call("path", get_all_pages=False)
+ mock_call.assert_called_once_with("path", None)
+
+
+@patch("plugins.hooks.zendesk_hook.Zendesk")
+def test_returns_multiple_pages_if_get_all_pages_true(_):
+ zendesk_hook = ZendeskHook("conn_id")
+ mock_connection = Mock()
+ mock_connection.host = "some_host"
+ zendesk_hook.get_connection = Mock(return_value=mock_connection)
+ zendesk_hook.get_conn()
+
+ mock_conn = Mock()
+ mock_call = Mock(
+ return_value={'next_page': 'https://some_host/something', 'path': []})
+ mock_conn.call = mock_call
+ zendesk_hook.get_conn = Mock(return_value=mock_conn)
+ zendesk_hook.call("path", get_all_pages=True)
+ assert mock_call.call_count == 2
+
+
+@patch("plugins.hooks.zendesk_hook.Zendesk")
+def test_zdesk_is_inited_correctly(mock_zendesk):
+ conn_mock = Mock()
+ conn_mock.host = "conn_host"
+ conn_mock.login = "conn_login"
+ conn_mock.password = "conn_pass"
+
+ zendesk_hook = ZendeskHook("conn_id")
+ zendesk_hook.get_connection = Mock(return_value=conn_mock)
+ zendesk_hook.get_conn()
+ mock_zendesk.assert_called_with('https://conn_host', 'conn_login',
+ 'conn_pass', True)