You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by sa...@apache.org on 2016/11/14 07:21:30 UTC
incubator-airflow git commit: [AIRFLOW-591] Add datadog hook & sensor
Repository: incubator-airflow
Updated Branches:
refs/heads/master f8f7d1af2 -> d8383038a
[AIRFLOW-591] Add datadog hook & sensor
Closes #1851 from gtoonstra/contrib_datadog
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d8383038
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d8383038
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d8383038
Branch: refs/heads/master
Commit: d8383038ac65afab9205c4ba57caf126b7c5dcb7
Parents: f8f7d1a
Author: gtoonstra <gt...@gmail.com>
Authored: Mon Nov 14 07:19:03 2016 +0000
Committer: Siddharth Anand <si...@yahoo.com>
Committed: Mon Nov 14 07:21:08 2016 +0000
----------------------------------------------------------------------
airflow/contrib/hooks/datadog_hook.py | 136 +++++++++++++++++++++++++
airflow/contrib/sensors/datadog_sensor.py | 81 +++++++++++++++
setup.py | 2 +
tests/contrib/sensors/datadog_sensor.py | 91 +++++++++++++++++
4 files changed, 310 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8383038/airflow/contrib/hooks/datadog_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/datadog_hook.py b/airflow/contrib/hooks/datadog_hook.py
new file mode 100644
index 0000000..2125701
--- /dev/null
+++ b/airflow/contrib/hooks/datadog_hook.py
@@ -0,0 +1,136 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+import logging
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.exceptions import AirflowException
+from datadog import initialize, api
+
+
+class DatadogHook(BaseHook):
+ """
+ Uses datadog API to send metrics of practically anything measurable,
+ so it's possible to track # of db records inserted/deleted, records read
+ from file and many other useful metrics.
+
+ Depends on the datadog API, which has to be deployed on the same server where
+ Airflow runs.
+
+ :param datadog_conn_id: The connection to datadog, containing metadata for api keys.
+ :param datadog_conn_id: string
+ """
+
+ def __init__(self, datadog_conn_id='datadog_default'):
+ conn = self.get_connection(datadog_conn_id)
+ self.api_key = conn.extra_dejson.get('api_key', None)
+ self.app_key = conn.extra_dejson.get('app_key', None)
+ self.source_type_name = conn.extra_dejson.get('source_type_name', None)
+
+ # If the host is populated, it will use that hostname instead.
+ # for all metric submissions.
+ self.host = conn.host
+
+ if self.api_key is None:
+ raise AirflowException("api_key must be specified in the Datadog connection details")
+ if self.app_key is None:
+ raise AirflowException("app_key must be specified in the Datadog connection details")
+
+ logging.info("Setting up api keys for datadog")
+ options = {
+ 'api_key': self.api_key,
+ 'app_key': self.app_key
+ }
+ initialize(**options)
+
+ def validate_response(self, response):
+ if response['status'] != 'ok':
+ logging.error("Data dog returned: " + response)
+ raise AirflowException("Error status received from datadog")
+
+ def send_metric(self, metric_name, datapoint, tags=None):
+ """
+ Sends a single datapoint metric to DataDog
+
+ :param metric_name: The name of the metric
+ :type metric_name: string
+ :param datapoint: A single integer or float related to the metric
+ :type datapoint: integer or float
+ :param tags: A list of tags associated with the metric
+ :type tags: list
+ """
+ response = api.Metric.send(
+ metric=metric_name,
+ points=datapoint,
+ host=self.host,
+ tags=tags)
+
+ self.validate_response(response)
+ return response
+
+ def query_metric(self,
+ query,
+ from_seconds_ago,
+ to_seconds_ago):
+ """
+ Queries datadog for a specific metric, potentially with some function applied to it
+ and returns the results.
+
+ :param query: The datadog query to execute (see datadog docs)
+ :type query: string
+ :param from_seconds_ago: How many seconds ago to start querying for.
+ :type from_seconds_ago: int
+ :param to_seconds_ago: Up to how many seconds ago to query for.
+ :type to_seconds_ago: int
+ """
+ now = int(time.time())
+
+ response = api.Metric.query(
+ start=now - from_seconds_ago,
+ end=now - to_seconds_ago,
+ query=query)
+
+ self.validate_response(response)
+ return response
+
+ def post_event(self, title, text, tags=None, alert_type=None, aggregation_key=None):
+ """
+ Posts an event to datadog (processing finished, potentially alerts, other issues)
+ Think about this as a means to maintain persistence of alerts, rather than alerting
+ itself.
+
+ :param title: The title of the event
+ :type title: string
+ :param text: The body of the event (more information)
+ :type text: string
+ :param tags: List of string tags to apply to the event
+ :type tags: list
+ :param alert_type: The alert type for the event, one of
+ ["error", "warning", "info", "success"]
+ :type alert_type: string
+ :param aggregation_key: Key that can be used to aggregate this event in a stream
+ :type aggregation_key: string
+ """
+ response = api.Event.create(
+ title=title,
+ text=text,
+ host=self.host,
+ tags=tags,
+ alert_type=alert_type,
+ aggregation_key=aggregation_key,
+ source_type_name=self.source_type_name)
+
+ self.validate_response(response)
+ return response
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8383038/airflow/contrib/sensors/datadog_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/datadog_sensor.py b/airflow/contrib/sensors/datadog_sensor.py
new file mode 100644
index 0000000..d8660f7
--- /dev/null
+++ b/airflow/contrib/sensors/datadog_sensor.py
@@ -0,0 +1,81 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.operators.sensors import BaseSensorOperator
+from airflow.contrib.hooks.datadog_hook import DatadogHook
+from airflow.utils import apply_defaults
+from airflow.exceptions import AirflowException
+from datadog import api
+
+
+class DatadogSensor(BaseSensorOperator):
+ """
+ A sensor to listen, with a filter, to datadog event streams and determine
+ if some event was emitted.
+
+ Depends on the datadog API, which has to be deployed on the same server where
+ Airflow runs.
+
+ :param datadog_conn_id: The connection to datadog, containing metadata for api keys.
+ :param datadog_conn_id: string
+ """
+ ui_color = '#66c3dd'
+
+ @apply_defaults
+ def __init__(
+ self,
+ datadog_conn_id='datadog_default',
+ from_seconds_ago=3600,
+ up_to_seconds_from_now=0,
+ priority=None,
+ sources=None,
+ tags=None,
+ response_check=None,
+ *args,
+ **kwargs):
+ super(DatadogSensor, self).__init__(*args, **kwargs)
+ self.datadog_conn_id = datadog_conn_id
+ self.from_seconds_ago = from_seconds_ago
+ self.up_to_seconds_from_now = up_to_seconds_from_now
+ self.priority = priority
+ self.sources = sources
+ self.tags = tags
+ self.response_check = response_check
+
+ def poke(self, context):
+ # This instantiates the hook, but doesn't need it further,
+ # because the API authenticates globally (unfortunately),
+ # but for airflow this shouldn't matter too much, because each
+ # task instance runs in its own process anyway.
+ DatadogHook(datadog_conn_id=self.datadog_conn_id)
+
+ response = api.Event.query(
+ start=self.from_seconds_ago,
+ end=self.up_to_seconds_from_now,
+ priority=self.priority,
+ sources=self.sources,
+ tags=self.tags)
+
+ if isinstance(response, dict) and response.get('status', 'ok') != 'ok':
+ logging.error("Unexpected datadog result: %s" % (response))
+ raise AirflowException("Datadog returned unexpected result")
+
+ if self.response_check:
+ # run content check on response
+ return self.response_check(response)
+
+ # If no check was inserted, assume any event that matched yields true.
+ return len(response) > 0
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8383038/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 848c4e2..3a75168 100644
--- a/setup.py
+++ b/setup.py
@@ -109,6 +109,7 @@ celery = [
'flower>=0.7.3'
]
crypto = ['cryptography>=0.9.3']
+datadog = ['datadog>=0.14.0']
doc = [
'sphinx>=1.2.3',
'sphinx-argparse>=0.1.13',
@@ -213,6 +214,7 @@ def do_setup():
'celery': celery,
'cloudant': cloudant,
'crypto': crypto,
+ 'datadog': datadog,
'devel': devel_minreq,
'devel_hadoop': devel_hadoop,
'doc': doc,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8383038/tests/contrib/sensors/datadog_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/datadog_sensor.py b/tests/contrib/sensors/datadog_sensor.py
new file mode 100644
index 0000000..4d601e1
--- /dev/null
+++ b/tests/contrib/sensors/datadog_sensor.py
@@ -0,0 +1,91 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from mock import patch
+
+from airflow.contrib.sensors.datadog_sensor import DatadogSensor
+
+
+at_least_one_event = [{'alert_type': 'info',
+ 'comments': [],
+ 'date_happened': 1419436860,
+ 'device_name': None,
+ 'host': None,
+ 'id': 2603387619536318140,
+ 'is_aggregate': False,
+ 'priority': 'normal',
+ 'resource': '/api/v1/events/2603387619536318140',
+ 'source': 'My Apps',
+ 'tags': ['application:web', 'version:1'],
+ 'text': 'And let me tell you all about it here!',
+ 'title': 'Something big happened!',
+ 'url': '/event/jump_to?event_id=2603387619536318140'},
+ {'alert_type': 'info',
+ 'comments': [],
+ 'date_happened': 1419436865,
+ 'device_name': None,
+ 'host': None,
+ 'id': 2603387619536318141,
+ 'is_aggregate': False,
+ 'priority': 'normal',
+ 'resource': '/api/v1/events/2603387619536318141',
+ 'source': 'My Apps',
+ 'tags': ['application:web', 'version:1'],
+ 'text': 'And let me tell you all about it here!',
+ 'title': 'Something big happened!',
+ 'url': '/event/jump_to?event_id=2603387619536318141'}]
+
+zero_events = []
+
+
+class TestDatadogSensor(unittest.TestCase):
+ @patch('airflow.contrib.hooks.datadog_hook.api.Event.query')
+ @patch('airflow.contrib.sensors.datadog_sensor.api.Event.query')
+ def test_sensor_ok(self, api1, api2):
+ api1.return_value = at_least_one_event
+ api2.return_value = at_least_one_event
+
+ sensor = DatadogSensor(
+ task_id='test_datadog',
+ datadog_conn_id='datadog_default',
+ from_seconds_ago=3600,
+ up_to_seconds_from_now=0,
+ priority=None,
+ sources=None,
+ tags=None,
+ response_check=None)
+
+ self.assertTrue(sensor.poke({}))
+
+ @patch('airflow.contrib.hooks.datadog_hook.api.Event.query')
+ @patch('airflow.contrib.sensors.datadog_sensor.api.Event.query')
+ def test_sensor_fail(self, api1, api2):
+ api1.return_value = zero_events
+ api2.return_value = zero_events
+
+ sensor = DatadogSensor(
+ task_id='test_datadog',
+ datadog_conn_id='datadog_default',
+ from_seconds_ago=0,
+ up_to_seconds_from_now=0,
+ priority=None,
+ sources=None,
+ tags=None,
+ response_check=None)
+
+ self.assertFalse(sensor.poke({}))
+
+if __name__ == '__main__':
+ unittest.main()