You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by sa...@apache.org on 2016/11/14 07:21:30 UTC

incubator-airflow git commit: [AIRFLOW-591] Add datadog hook & sensor

Repository: incubator-airflow
Updated Branches:
  refs/heads/master f8f7d1af2 -> d8383038a


[AIRFLOW-591] Add datadog hook & sensor

Closes #1851 from gtoonstra/contrib_datadog


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d8383038
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d8383038
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d8383038

Branch: refs/heads/master
Commit: d8383038ac65afab9205c4ba57caf126b7c5dcb7
Parents: f8f7d1a
Author: gtoonstra <gt...@gmail.com>
Authored: Mon Nov 14 07:19:03 2016 +0000
Committer: Siddharth Anand <si...@yahoo.com>
Committed: Mon Nov 14 07:21:08 2016 +0000

----------------------------------------------------------------------
 airflow/contrib/hooks/datadog_hook.py     | 136 +++++++++++++++++++++++++
 airflow/contrib/sensors/datadog_sensor.py |  81 +++++++++++++++
 setup.py                                  |   2 +
 tests/contrib/sensors/datadog_sensor.py   |  91 +++++++++++++++++
 4 files changed, 310 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8383038/airflow/contrib/hooks/datadog_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/datadog_hook.py b/airflow/contrib/hooks/datadog_hook.py
new file mode 100644
index 0000000..2125701
--- /dev/null
+++ b/airflow/contrib/hooks/datadog_hook.py
@@ -0,0 +1,136 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+import logging
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.exceptions import AirflowException
+from datadog import initialize, api
+
+
+class DatadogHook(BaseHook):
+    """
+    Uses datadog API to send metrics of practically anything measurable,
+    so it's possible to track # of db records inserted/deleted, records read
+    from file and many other useful metrics.
+
+    Depends on the datadog API, which has to be deployed on the same server where
+    Airflow runs.
+
+    :param datadog_conn_id: The connection to datadog, containing metadata for api keys.
+    :param datadog_conn_id: string
+    """
+
+    def __init__(self, datadog_conn_id='datadog_default'):
+        conn = self.get_connection(datadog_conn_id)
+        self.api_key = conn.extra_dejson.get('api_key', None)
+        self.app_key = conn.extra_dejson.get('app_key', None)
+        self.source_type_name = conn.extra_dejson.get('source_type_name', None)
+
+        # If the host is populated, it will use that hostname instead.
+        # for all metric submissions.
+        self.host = conn.host
+
+        if self.api_key is None:
+            raise AirflowException("api_key must be specified in the Datadog connection details")
+        if self.app_key is None:
+            raise AirflowException("app_key must be specified in the Datadog connection details")
+
+        logging.info("Setting up api keys for datadog")
+        options = {
+            'api_key': self.api_key,
+            'app_key': self.app_key
+        }
+        initialize(**options)
+
+    def validate_response(self, response):
+        if response['status'] != 'ok':
+            logging.error("Data dog returned: " + response)
+            raise AirflowException("Error status received from datadog")
+
+    def send_metric(self, metric_name, datapoint, tags=None):
+        """
+        Sends a single datapoint metric to DataDog
+
+        :param metric_name: The name of the metric
+        :type metric_name: string
+        :param datapoint: A single integer or float related to the metric
+        :type datapoint: integer or float
+        :param tags: A list of tags associated with the metric
+        :type tags: list
+        """
+        response = api.Metric.send(
+            metric=metric_name,
+            points=datapoint,
+            host=self.host,
+            tags=tags)
+
+        self.validate_response(response)
+        return response
+
+    def query_metric(self,
+                     query,
+                     from_seconds_ago,
+                     to_seconds_ago):
+        """
+        Queries datadog for a specific metric, potentially with some function applied to it
+        and returns the results.
+
+        :param query: The datadog query to execute (see datadog docs)
+        :type query: string
+        :param from_seconds_ago: How many seconds ago to start querying for.
+        :type from_seconds_ago: int
+        :param to_seconds_ago: Up to how many seconds ago to query for.
+        :type to_seconds_ago: int
+        """
+        now = int(time.time())
+
+        response = api.Metric.query(
+            start=now - from_seconds_ago,
+            end=now - to_seconds_ago,
+            query=query)
+
+        self.validate_response(response)
+        return response
+
+    def post_event(self, title, text, tags=None, alert_type=None, aggregation_key=None):
+        """
+        Posts an event to datadog (processing finished, potentially alerts, other issues)
+        Think about this as a means to maintain persistence of alerts, rather than alerting
+        itself.
+
+        :param title: The title of the event
+        :type title: string
+        :param text: The body of the event (more information)
+        :type text: string
+        :param tags: List of string tags to apply to the event
+        :type tags: list
+        :param alert_type: The alert type for the event, one of
+            ["error", "warning", "info", "success"]
+        :type alert_type: string
+        :param aggregation_key: Key that can be used to aggregate this event in a stream
+        :type aggregation_key: string
+        """
+        response = api.Event.create(
+            title=title,
+            text=text,
+            host=self.host,
+            tags=tags,
+            alert_type=alert_type,
+            aggregation_key=aggregation_key,
+            source_type_name=self.source_type_name)
+
+        self.validate_response(response)
+        return response

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8383038/airflow/contrib/sensors/datadog_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/datadog_sensor.py b/airflow/contrib/sensors/datadog_sensor.py
new file mode 100644
index 0000000..d8660f7
--- /dev/null
+++ b/airflow/contrib/sensors/datadog_sensor.py
@@ -0,0 +1,81 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.operators.sensors import BaseSensorOperator
+from airflow.contrib.hooks.datadog_hook import DatadogHook
+from airflow.utils import apply_defaults
+from airflow.exceptions import AirflowException
+from datadog import api
+
+
+class DatadogSensor(BaseSensorOperator):
+    """
+    A sensor to listen, with a filter, to datadog event streams and determine
+    if some event was emitted.
+
+    Depends on the datadog API, which has to be deployed on the same server where
+    Airflow runs.
+
+    :param datadog_conn_id: The connection to datadog, containing metadata for api keys.
+    :param datadog_conn_id: string
+    """
+    ui_color = '#66c3dd'
+
+    @apply_defaults
+    def __init__(
+            self,
+            datadog_conn_id='datadog_default',
+            from_seconds_ago=3600,
+            up_to_seconds_from_now=0,
+            priority=None,
+            sources=None,
+            tags=None,
+            response_check=None,
+            *args,
+            **kwargs):
+        super(DatadogSensor, self).__init__(*args, **kwargs)
+        self.datadog_conn_id = datadog_conn_id
+        self.from_seconds_ago = from_seconds_ago
+        self.up_to_seconds_from_now = up_to_seconds_from_now
+        self.priority = priority
+        self.sources = sources
+        self.tags = tags
+        self.response_check = response_check
+
+    def poke(self, context):
+        # This instantiates the hook, but doesn't need it further,
+        # because the API authenticates globally (unfortunately),
+        # but for airflow this shouldn't matter too much, because each
+        # task instance runs in its own process anyway.
+        DatadogHook(datadog_conn_id=self.datadog_conn_id)
+
+        response = api.Event.query(
+            start=self.from_seconds_ago,
+            end=self.up_to_seconds_from_now,
+            priority=self.priority,
+            sources=self.sources,
+            tags=self.tags)
+
+        if isinstance(response, dict) and response.get('status', 'ok') != 'ok':
+            logging.error("Unexpected datadog result: %s" % (response))
+            raise AirflowException("Datadog returned unexpected result")
+
+        if self.response_check:
+            # run content check on response
+            return self.response_check(response)
+
+        # If no check was inserted, assume any event that matched yields true.
+        return len(response) > 0

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8383038/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 848c4e2..3a75168 100644
--- a/setup.py
+++ b/setup.py
@@ -109,6 +109,7 @@ celery = [
     'flower>=0.7.3'
 ]
 crypto = ['cryptography>=0.9.3']
+datadog = ['datadog>=0.14.0']
 doc = [
     'sphinx>=1.2.3',
     'sphinx-argparse>=0.1.13',
@@ -213,6 +214,7 @@ def do_setup():
             'celery': celery,
             'cloudant': cloudant,
             'crypto': crypto,
+            'datadog': datadog,
             'devel': devel_minreq,
             'devel_hadoop': devel_hadoop,
             'doc': doc,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8383038/tests/contrib/sensors/datadog_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/datadog_sensor.py b/tests/contrib/sensors/datadog_sensor.py
new file mode 100644
index 0000000..4d601e1
--- /dev/null
+++ b/tests/contrib/sensors/datadog_sensor.py
@@ -0,0 +1,91 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from mock import patch
+
+from airflow.contrib.sensors.datadog_sensor import DatadogSensor
+
+
+at_least_one_event = [{'alert_type': 'info',
+                       'comments': [],
+                       'date_happened': 1419436860,
+                       'device_name': None,
+                       'host': None,
+                       'id': 2603387619536318140,
+                       'is_aggregate': False,
+                       'priority': 'normal',
+                       'resource': '/api/v1/events/2603387619536318140',
+                       'source': 'My Apps',
+                       'tags': ['application:web', 'version:1'],
+                       'text': 'And let me tell you all about it here!',
+                       'title': 'Something big happened!',
+                       'url': '/event/jump_to?event_id=2603387619536318140'},
+                      {'alert_type': 'info',
+                       'comments': [],
+                       'date_happened': 1419436865,
+                       'device_name': None,
+                       'host': None,
+                       'id': 2603387619536318141,
+                       'is_aggregate': False,
+                       'priority': 'normal',
+                       'resource': '/api/v1/events/2603387619536318141',
+                       'source': 'My Apps',
+                       'tags': ['application:web', 'version:1'],
+                       'text': 'And let me tell you all about it here!',
+                       'title': 'Something big happened!',
+                       'url': '/event/jump_to?event_id=2603387619536318141'}]
+
+zero_events = []
+
+
+class TestDatadogSensor(unittest.TestCase):
+    @patch('airflow.contrib.hooks.datadog_hook.api.Event.query')
+    @patch('airflow.contrib.sensors.datadog_sensor.api.Event.query')
+    def test_sensor_ok(self, api1, api2):
+        api1.return_value = at_least_one_event
+        api2.return_value = at_least_one_event
+
+        sensor = DatadogSensor(
+            task_id='test_datadog',
+            datadog_conn_id='datadog_default',
+            from_seconds_ago=3600,
+            up_to_seconds_from_now=0,
+            priority=None,
+            sources=None,
+            tags=None,
+            response_check=None)
+
+        self.assertTrue(sensor.poke({}))
+
+    @patch('airflow.contrib.hooks.datadog_hook.api.Event.query')
+    @patch('airflow.contrib.sensors.datadog_sensor.api.Event.query')
+    def test_sensor_fail(self, api1, api2):
+        api1.return_value = zero_events
+        api2.return_value = zero_events
+
+        sensor = DatadogSensor(
+            task_id='test_datadog',
+            datadog_conn_id='datadog_default',
+            from_seconds_ago=0,
+            up_to_seconds_from_now=0,
+            priority=None,
+            sources=None,
+            tags=None,
+            response_check=None)
+
+        self.assertFalse(sensor.poke({}))
+
+if __name__ == '__main__':
+    unittest.main()