You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ar...@apache.org on 2017/03/20 18:14:21 UTC

incubator-airflow git commit: [AIRFLOW-999] Add support for Redis database

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 23a16f7ad -> 8de850162


[AIRFLOW-999] Add support for Redis database

This PR includes a redis_hook and a redis_key_sensor to enable
checking for key existence in redis. It also updates the
documentation and add the relevant unit tests.

- [x] Opened a PR on Github

- [x] My PR addresses the following Airflow JIRA
issues:
    -
https://issues.apache.org/jira/browse/AIRFLOW-999
- [x] The PR title references the JIRA issues. For
example, "[AIRFLOW-1] My Airflow PR"

- [x] My PR adds unit tests
- [ ] __OR__ my PR does not need testing for this
extremely good reason:

- [x] Here are some details about my PR:
- [ ] Here are screenshots of any UI changes, if
appropriate:

- [x] Each commit subject references a JIRA issue.
For example, "[AIRFLOW-1] Add new feature"
- [x] Multiple commits addressing the same JIRA
issue have been squashed
- [x] My commits follow the guidelines from "[How
to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
  1. Subject is separated from body by a blank line
  2. Subject is limited to 50 characters
  3. Subject does not end with a period
  4. Subject uses the imperative mood ("add", not
"adding")
  5. Body wraps at 72 characters
  6. Body explains "what" and "why", not "how"

Closes #2165 from msempere/AIRFLOW-999/support-
for-redis-database


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8de85016
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8de85016
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8de85016

Branch: refs/heads/master
Commit: 8de85016265443987a0e0fff406e996d421dc9d6
Parents: 23a16f7
Author: MSempere <ms...@gmx.com>
Authored: Mon Mar 20 11:10:55 2017 -0700
Committer: Arthur Wiedmer <ar...@gmail.com>
Committed: Mon Mar 20 11:11:31 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/redis_hook.py         | 92 ++++++++++++++++++++++++
 airflow/contrib/sensors/redis_key_sensor.py | 46 ++++++++++++
 airflow/models.py                           |  4 ++
 airflow/utils/db.py                         |  5 ++
 docs/installation.rst                       |  2 +
 setup.py                                    |  2 +
 tests/contrib/hooks/test_redis_hook.py      | 46 ++++++++++++
 tests/contrib/sensors/redis_sensor.py       | 64 +++++++++++++++++
 8 files changed, 261 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/airflow/contrib/hooks/redis_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/redis_hook.py b/airflow/contrib/hooks/redis_hook.py
new file mode 100644
index 0000000..936eff8
--- /dev/null
+++ b/airflow/contrib/hooks/redis_hook.py
@@ -0,0 +1,92 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+RedisHook module
+"""
+
+import logging
+
+from redis import StrictRedis
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base_hook import BaseHook
+
+
+class RedisHook(BaseHook):
+    """
+    Hook to interact with Redis database
+    """
+    def __init__(self, redis_conn_id='redis_default'):
+        """
+        Prepares hook to connect to a Redis database.
+
+        :param conn_id:     the name of the connection that has the parameters
+                            we need to connect to Redis.
+        """
+        self.redis_conn_id = redis_conn_id
+        self.client = None
+        conn = self.get_connection(self.redis_conn_id)
+        self.host = conn.host
+        self.port = int(conn.port)
+        self.password = conn.password
+        self.db = int(conn.extra_dejson.get('db', 0))
+        self.logger = logging.getLogger(__name__)
+        self.logger.debug(
+            '''Connection "{conn}":
+            \thost: {host}
+            \tport: {port}
+            \textra: {extra}
+            '''.format(
+                conn=self.redis_conn_id,
+                host=self.host,
+                port=self.port,
+                extra=conn.extra_dejson
+            )
+        )
+
+    def get_conn(self):
+        """
+        Returns a Redis connection.
+        """
+        if not self.client:
+            self.logger.debug(
+                'generating Redis client for conn_id "{conn}" on '
+                '{host}:{port}:{db}'.format(conn=self.redis_conn_id,
+                                            host=self.host,
+                                            port=self.port,
+                                            db=self.db))
+            try:
+                self.client = StrictRedis(
+                    host=self.host,
+                    port=self.port,
+                    password=self.password,
+                    db=self.db)
+            except Exception as general_error:
+                raise AirflowException(
+                    'Failed to create Redis client, error: {error}'.format(
+                        error=str(general_error)
+                    )
+                )
+
+        return self.client
+
+    def key_exists(self, key):
+        """
+        Checks if a key exists in Redis database
+
+        :param key: The key to check the existence.
+        :type key: string
+        """
+        return self.get_conn().exists(key)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/airflow/contrib/sensors/redis_key_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/redis_key_sensor.py b/airflow/contrib/sensors/redis_key_sensor.py
new file mode 100644
index 0000000..4cab407
--- /dev/null
+++ b/airflow/contrib/sensors/redis_key_sensor.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.contrib.hooks.redis_hook import RedisHook
+from airflow.operators.sensors import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class RedisKeySensor(BaseSensorOperator):
+    """
+    Checks for the existence of a key in a Redis database
+    """
+    template_fields = ('key',)
+    ui_color = '#f0eee4'
+
+    @apply_defaults
+    def __init__(self, key, redis_conn_id, *args, **kwargs):
+        """
+        Create a new RedisKeySensor
+
+        :param key: The key to be monitored
+        :type key: string
+        :param redis_conn_id: The connection ID to use when connecting to Redis DB.
+        :type redis_conn_id: string
+        """
+        super(RedisKeySensor, self).__init__(*args, **kwargs)
+        self.logger = logging.getLogger(__name__)
+        self.redis_conn_id = redis_conn_id
+        self.key = key
+
+    def poke(self, context):
+        self.logger.info('Sensor check existence of key: %s', self.key)
+        return RedisHook(self.redis_conn_id).key_exists(self.key)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index f2d955b..a7d2916 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -541,6 +541,7 @@ class Connection(Base):
         ('mssql', 'Microsoft SQL Server'),
         ('mesos_framework-id', 'Mesos Framework ID'),
         ('jira', 'JIRA',),
+        ('redis', 'Redis',),
     ]
 
     def __init__(
@@ -670,6 +671,9 @@ class Connection(Base):
             elif self.conn_type == 'jira':
                 from airflow.contrib.hooks.jira_hook import JiraHook
                 return JiraHook(jira_conn_id=self.conn_id)
+            elif self.conn_type == 'redis':
+                from airflow.contrib.hooks.redis_hook import RedisHook
+                return RedisHook(redis_conn_id=self.conn_id)
         except:
             pass
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 49a8d62..618e0020 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -191,6 +191,11 @@ def initdb():
             host='yarn', extra='{"queue": "root.default"}'))
     merge_conn(
         models.Connection(
+            conn_id='redis_default', conn_type='redis',
+            host='localhost', port=6379,
+            extra='{"db": 0}'))
+    merge_conn(
+        models.Connection(
             conn_id='emr_default', conn_type='emr',
             extra='''
                 {   "Name": "default_job_flow_name",

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/docs/installation.rst
----------------------------------------------------------------------
diff --git a/docs/installation.rst b/docs/installation.rst
index 289f64f..c001ccf 100644
--- a/docs/installation.rst
+++ b/docs/installation.rst
@@ -88,3 +88,5 @@ Here's the list of the subpackages and what they enable:
 +---------------+-------------------------------------+-------------------------------------------------+
 |  cloudant     | ``pip install airflow[cloudant]``   | Cloudant hook                                   |
 +---------------+-------------------------------------+-------------------------------------------------+
+|  redis        | ``pip install airflow[redis]``      | Redis hooks and sensors                         |
++---------------+-------------------------------------+-------------------------------------------------+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 481d427..26a0e27 100644
--- a/setup.py
+++ b/setup.py
@@ -168,6 +168,7 @@ password = [
 github_enterprise = ['Flask-OAuthlib>=0.9.1']
 qds = ['qds-sdk>=1.9.0']
 cloudant = ['cloudant>=0.5.9,<2.0'] # major update coming soon, clamp to 0.x
+redis = ['redis>=2.10.5']
 
 all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant
 devel = [
@@ -269,6 +270,7 @@ def do_setup():
             'vertica': vertica,
             'webhdfs': webhdfs,
             'jira': jira,
+            'redis': redis,
         },
         classifiers=[
             'Development Status :: 5 - Production/Stable',

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/tests/contrib/hooks/test_redis_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_redis_hook.py b/tests/contrib/hooks/test_redis_hook.py
new file mode 100644
index 0000000..ab9a4bc
--- /dev/null
+++ b/tests/contrib/hooks/test_redis_hook.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import unittest
+from mock import patch
+
+from airflow import configuration
+from airflow.contrib.hooks.redis_hook import RedisHook
+
+
+class TestRedisHook(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+
+    def test_get_conn(self):
+        hook = RedisHook(redis_conn_id='redis_default')
+        self.assertEqual(hook.client, None)
+        self.assertEqual(
+            repr(hook.get_conn()),
+            (
+                'StrictRedis<ConnectionPool'
+                '<Connection<host=localhost,port=6379,db=0>>>'
+            )
+        )
+
+    @patch("airflow.contrib.hooks.redis_hook.RedisHook.get_conn")
+    def test_first_conn_instantiation(self, get_conn):
+        hook = RedisHook(redis_conn_id='redis_default')
+        hook.key_exists('test_key')
+        self.assertTrue(get_conn.called_once())
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/tests/contrib/sensors/redis_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/redis_sensor.py b/tests/contrib/sensors/redis_sensor.py
new file mode 100644
index 0000000..8022a92
--- /dev/null
+++ b/tests/contrib/sensors/redis_sensor.py
@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import unittest
+import datetime
+
+from mock import patch
+
+from airflow import DAG
+from airflow import configuration
+from airflow.contrib.sensors.redis_key_sensor import RedisKeySensor
+
+DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+
+
+class TestRedisSensor(unittest.TestCase):
+
+    def setUp(self):
+        configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': DEFAULT_DATE
+        }
+
+        self.dag = DAG('test_dag_id', default_args=args)
+        self.sensor = RedisKeySensor(
+            task_id='test_task',
+            redis_conn_id='redis_default',
+            dag=self.dag,
+            key='test_key'
+        )
+
+    @patch("airflow.contrib.hooks.redis_hook.RedisHook.key_exists")
+    def test_poke(self, key_exists):
+        key_exists.return_value = True
+        self.assertTrue(self.sensor.poke(None))
+
+        key_exists.return_value = False
+        self.assertFalse(self.sensor.poke(None))
+
+    @patch("airflow.contrib.hooks.redis_hook.StrictRedis.exists")
+    def test_existing_key_called(self, redis_client_exists):
+        self.sensor.run(
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE, ignore_ti_state=True
+        )
+
+        self.assertTrue(redis_client_exists.called_with('test_key'))
+
+
+if __name__ == '__main__':
+    unittest.main()