You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/02/14 22:55:47 UTC

incubator-airflow git commit: [AIRFLOW-826] Add Zendesk hook

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 485280a9f -> a097627d8


[AIRFLOW-826] Add Zendesk hook

Closes #2066 from shreyasjoshis/add-zendesk-hook


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a097627d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a097627d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a097627d

Branch: refs/heads/master
Commit: a097627d8236d9244d39eee0b8cbfa22f726fe8f
Parents: 485280a
Author: Shreyas Joshi <sh...@github.com>
Authored: Tue Feb 14 14:55:36 2017 -0800
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Tue Feb 14 14:55:36 2017 -0800

----------------------------------------------------------------------
 airflow/hooks/zendesk_hook.py       | 102 +++++++++++++++++++++++++++++++
 tests/contrib/hooks/zendesk_hook.py |  90 +++++++++++++++++++++++++++
 2 files changed, 192 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a097627d/airflow/hooks/zendesk_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/zendesk_hook.py b/airflow/hooks/zendesk_hook.py
new file mode 100644
index 0000000..438597f
--- /dev/null
+++ b/airflow/hooks/zendesk_hook.py
@@ -0,0 +1,102 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+"""
+A hook to talk to Zendesk
+"""
+
+import logging
+import time
+from zdesk import Zendesk, RateLimitError, ZendeskError
+from airflow.hooks import BaseHook
+
+
+class ZendeskHook(BaseHook):
+    def __init__(self, zendesk_conn_id):
+        self.__zendesk_conn_id = zendesk_conn_id
+        self.__url = None
+
+    def get_conn(self):
+        conn = self.get_connection(self.__zendesk_conn_id)
+        self.__url = "https://" + conn.host
+        return Zendesk(self.__url, conn.login, conn.password, True)
+
+    def __handle_rate_limit_exception(self, rate_limit_exception):
+        """
+        Sleep for the time specified in the exception. If not specified, wait
+        for 60 seconds.
+        """
+        retry_after = int(
+            rate_limit_exception.response.headers.get('Retry-After', 60))
+        logging.info(
+            "Hit Zendesk API rate limit. Pausing for {} "
+            "seconds".format(
+                retry_after))
+        time.sleep(retry_after)
+
+    def call(self, path, query=None, get_all_pages=True):
+        """
+        Call Zendesk API and return results
+
+        :param path: The Zendesk API to call
+        :param query: Query parameters
+        :param get_all_pages: Accumulate results over all pages before
+               returning. Due to strict rate limiting, this can often timeout.
+               Waits for recommended period between tries after a timeout.
+        """
+        zendesk = self.get_conn()
+        first_request_successful = False
+
+        while not first_request_successful:
+            try:
+                results = zendesk.call(path, query)
+                first_request_successful = True
+            except RateLimitError as rle:
+                self.__handle_rate_limit_exception(rle)
+
+        # Find the key with the results
+        key = path.split("/")[-1].split(".json")[0]
+        next_page = results['next_page']
+        results = results[key]
+
+        if get_all_pages:
+            while next_page is not None:
+                try:
+                    # Need to split because the next page URL has
+                    # `github.zendesk...`
+                    # in it, but the call function needs it removed.
+                    next_url = next_page.split(self.__url)[1]
+                    logging.info("Calling {}".format(next_url))
+                    more_res = zendesk.call(next_url)
+                    results.extend(more_res[key])
+                    if next_page == more_res['next_page']:
+                        # Unfortunately zdesk doesn't always throw ZendeskError
+                        # when we are done getting all the data. Sometimes the
+                        # next just refers to the current set of results. Hence,
+                        # need to deal with this special case
+                        break
+                    else:
+                        next_page = more_res['next_page']
+                except RateLimitError as rle:
+                    self.__handle_rate_limit_exception(rle)
+                except ZendeskError as ze:
+                    if b"Use a start_time older than 5 minutes" in ze.msg:
+                        # We have pretty up to date data
+                        break
+                    else:
+                        raise ze
+
+        return results

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a097627d/tests/contrib/hooks/zendesk_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/zendesk_hook.py b/tests/contrib/hooks/zendesk_hook.py
new file mode 100644
index 0000000..66b8e6b
--- /dev/null
+++ b/tests/contrib/hooks/zendesk_hook.py
@@ -0,0 +1,90 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+from unittest.mock import Mock, patch
+from plugins.hooks.zendesk_hook import ZendeskHook
+from zdesk import RateLimitError
+from pytest import raises
+
+
+@patch("plugins.hooks.zendesk_hook.time")
+@patch("plugins.hooks.zendesk_hook.Zendesk")
+def test_sleeps_for_correct_interval(_, mocked_time):
+    sleep_time = 10
+
+    # To break out of the otherwise infinite tries
+    mocked_time.sleep = Mock(side_effect=ValueError)
+    conn_mock = Mock()
+    mock_response = Mock()
+    mock_response.headers.get.return_value = sleep_time
+    conn_mock.call = Mock(
+        side_effect=RateLimitError(msg="some message", code="some code",
+                                   response=mock_response))
+
+    zendesk_hook = ZendeskHook("conn_id")
+    zendesk_hook.get_conn = Mock(return_value=conn_mock)
+
+    with raises(ValueError):
+        zendesk_hook.call("some_path", get_all_pages=False)
+    mocked_time.sleep.assert_called_with(sleep_time)
+
+
+@patch("plugins.hooks.zendesk_hook.Zendesk")
+def test_returns_single_page_if_get_all_pages_false(_):
+    zendesk_hook = ZendeskHook("conn_id")
+    mock_connection = Mock()
+    mock_connection.host = "some_host"
+    zendesk_hook.get_connection = Mock(return_value=mock_connection)
+    zendesk_hook.get_conn()
+
+    mock_conn = Mock()
+    mock_call = Mock(
+        return_value={'next_page': 'https://some_host/something', 'path': []})
+    mock_conn.call = mock_call
+    zendesk_hook.get_conn = Mock(return_value=mock_conn)
+    zendesk_hook.call("path", get_all_pages=False)
+    mock_call.assert_called_once_with("path", None)
+
+
+@patch("plugins.hooks.zendesk_hook.Zendesk")
+def test_returns_multiple_pages_if_get_all_pages_true(_):
+    zendesk_hook = ZendeskHook("conn_id")
+    mock_connection = Mock()
+    mock_connection.host = "some_host"
+    zendesk_hook.get_connection = Mock(return_value=mock_connection)
+    zendesk_hook.get_conn()
+
+    mock_conn = Mock()
+    mock_call = Mock(
+        return_value={'next_page': 'https://some_host/something', 'path': []})
+    mock_conn.call = mock_call
+    zendesk_hook.get_conn = Mock(return_value=mock_conn)
+    zendesk_hook.call("path", get_all_pages=True)
+    assert mock_call.call_count == 2
+
+
+@patch("plugins.hooks.zendesk_hook.Zendesk")
+def test_zdesk_is_inited_correctly(mock_zendesk):
+    conn_mock = Mock()
+    conn_mock.host = "conn_host"
+    conn_mock.login = "conn_login"
+    conn_mock.password = "conn_pass"
+
+    zendesk_hook = ZendeskHook("conn_id")
+    zendesk_hook.get_connection = Mock(return_value=conn_mock)
+    zendesk_hook.get_conn()
+    mock_zendesk.assert_called_with('https://conn_host', 'conn_login',
+                                    'conn_pass', True)