You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ar...@apache.org on 2017/03/20 18:14:21 UTC
incubator-airflow git commit: [AIRFLOW-999] Add support for Redis
database
Repository: incubator-airflow
Updated Branches:
refs/heads/master 23a16f7ad -> 8de850162
[AIRFLOW-999] Add support for Redis database
This PR includes a redis_hook and a redis_key_sensor to enable
checking for key existence in redis. It also updates the
documentation and add the relevant unit tests.
- [x] Opened a PR on Github
- [x] My PR addresses the following Airflow JIRA
issues:
-
https://issues.apache.org/jira/browse/AIRFLOW-999
- [x] The PR title references the JIRA issues. For
example, "[AIRFLOW-1] My Airflow PR"
- [x] My PR adds unit tests
- [ ] __OR__ my PR does not need testing for this
extremely good reason:
- [x] Here are some details about my PR:
- [ ] Here are screenshots of any UI changes, if
appropriate:
- [x] Each commit subject references a JIRA issue.
For example, "[AIRFLOW-1] Add new feature"
- [x] Multiple commits addressing the same JIRA
issue have been squashed
- [x] My commits follow the guidelines from "[How
to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
1. Subject is separated from body by a blank line
2. Subject is limited to 50 characters
3. Subject does not end with a period
4. Subject uses the imperative mood ("add", not
"adding")
5. Body wraps at 72 characters
6. Body explains "what" and "why", not "how"
Closes #2165 from msempere/AIRFLOW-999/support-
for-redis-database
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8de85016
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8de85016
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8de85016
Branch: refs/heads/master
Commit: 8de85016265443987a0e0fff406e996d421dc9d6
Parents: 23a16f7
Author: MSempere <ms...@gmx.com>
Authored: Mon Mar 20 11:10:55 2017 -0700
Committer: Arthur Wiedmer <ar...@gmail.com>
Committed: Mon Mar 20 11:11:31 2017 -0700
----------------------------------------------------------------------
airflow/contrib/hooks/redis_hook.py | 92 ++++++++++++++++++++++++
airflow/contrib/sensors/redis_key_sensor.py | 46 ++++++++++++
airflow/models.py | 4 ++
airflow/utils/db.py | 5 ++
docs/installation.rst | 2 +
setup.py | 2 +
tests/contrib/hooks/test_redis_hook.py | 46 ++++++++++++
tests/contrib/sensors/redis_sensor.py | 64 +++++++++++++++++
8 files changed, 261 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/airflow/contrib/hooks/redis_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/redis_hook.py b/airflow/contrib/hooks/redis_hook.py
new file mode 100644
index 0000000..936eff8
--- /dev/null
+++ b/airflow/contrib/hooks/redis_hook.py
@@ -0,0 +1,92 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+RedisHook module
+"""
+
+import logging
+
+from redis import StrictRedis
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base_hook import BaseHook
+
+
+class RedisHook(BaseHook):
+ """
+ Hook to interact with Redis database
+ """
+ def __init__(self, redis_conn_id='redis_default'):
+ """
+ Prepares hook to connect to a Redis database.
+
+ :param conn_id: the name of the connection that has the parameters
+ we need to connect to Redis.
+ """
+ self.redis_conn_id = redis_conn_id
+ self.client = None
+ conn = self.get_connection(self.redis_conn_id)
+ self.host = conn.host
+ self.port = int(conn.port)
+ self.password = conn.password
+ self.db = int(conn.extra_dejson.get('db', 0))
+ self.logger = logging.getLogger(__name__)
+ self.logger.debug(
+ '''Connection "{conn}":
+ \thost: {host}
+ \tport: {port}
+ \textra: {extra}
+ '''.format(
+ conn=self.redis_conn_id,
+ host=self.host,
+ port=self.port,
+ extra=conn.extra_dejson
+ )
+ )
+
+ def get_conn(self):
+ """
+ Returns a Redis connection.
+ """
+ if not self.client:
+ self.logger.debug(
+ 'generating Redis client for conn_id "{conn}" on '
+ '{host}:{port}:{db}'.format(conn=self.redis_conn_id,
+ host=self.host,
+ port=self.port,
+ db=self.db))
+ try:
+ self.client = StrictRedis(
+ host=self.host,
+ port=self.port,
+ password=self.password,
+ db=self.db)
+ except Exception as general_error:
+ raise AirflowException(
+ 'Failed to create Redis client, error: {error}'.format(
+ error=str(general_error)
+ )
+ )
+
+ return self.client
+
+ def key_exists(self, key):
+ """
+ Checks if a key exists in Redis database
+
+ :param key: The key to check the existence.
+ :type key: string
+ """
+ return self.get_conn().exists(key)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/airflow/contrib/sensors/redis_key_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/redis_key_sensor.py b/airflow/contrib/sensors/redis_key_sensor.py
new file mode 100644
index 0000000..4cab407
--- /dev/null
+++ b/airflow/contrib/sensors/redis_key_sensor.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.contrib.hooks.redis_hook import RedisHook
+from airflow.operators.sensors import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class RedisKeySensor(BaseSensorOperator):
+ """
+ Checks for the existence of a key in a Redis database
+ """
+ template_fields = ('key',)
+ ui_color = '#f0eee4'
+
+ @apply_defaults
+ def __init__(self, key, redis_conn_id, *args, **kwargs):
+ """
+ Create a new RedisKeySensor
+
+ :param key: The key to be monitored
+ :type key: string
+ :param redis_conn_id: The connection ID to use when connecting to Redis DB.
+ :type redis_conn_id: string
+ """
+ super(RedisKeySensor, self).__init__(*args, **kwargs)
+ self.logger = logging.getLogger(__name__)
+ self.redis_conn_id = redis_conn_id
+ self.key = key
+
+ def poke(self, context):
+ self.logger.info('Sensor check existence of key: %s', self.key)
+ return RedisHook(self.redis_conn_id).key_exists(self.key)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index f2d955b..a7d2916 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -541,6 +541,7 @@ class Connection(Base):
('mssql', 'Microsoft SQL Server'),
('mesos_framework-id', 'Mesos Framework ID'),
('jira', 'JIRA',),
+ ('redis', 'Redis',),
]
def __init__(
@@ -670,6 +671,9 @@ class Connection(Base):
elif self.conn_type == 'jira':
from airflow.contrib.hooks.jira_hook import JiraHook
return JiraHook(jira_conn_id=self.conn_id)
+ elif self.conn_type == 'redis':
+ from airflow.contrib.hooks.redis_hook import RedisHook
+ return RedisHook(redis_conn_id=self.conn_id)
except:
pass
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 49a8d62..618e0020 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -191,6 +191,11 @@ def initdb():
host='yarn', extra='{"queue": "root.default"}'))
merge_conn(
models.Connection(
+ conn_id='redis_default', conn_type='redis',
+ host='localhost', port=6379,
+ extra='{"db": 0}'))
+ merge_conn(
+ models.Connection(
conn_id='emr_default', conn_type='emr',
extra='''
{ "Name": "default_job_flow_name",
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/docs/installation.rst
----------------------------------------------------------------------
diff --git a/docs/installation.rst b/docs/installation.rst
index 289f64f..c001ccf 100644
--- a/docs/installation.rst
+++ b/docs/installation.rst
@@ -88,3 +88,5 @@ Here's the list of the subpackages and what they enable:
+---------------+-------------------------------------+-------------------------------------------------+
| cloudant | ``pip install airflow[cloudant]`` | Cloudant hook |
+---------------+-------------------------------------+-------------------------------------------------+
+| redis | ``pip install airflow[redis]`` | Redis hooks and sensors |
++---------------+-------------------------------------+-------------------------------------------------+
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 481d427..26a0e27 100644
--- a/setup.py
+++ b/setup.py
@@ -168,6 +168,7 @@ password = [
github_enterprise = ['Flask-OAuthlib>=0.9.1']
qds = ['qds-sdk>=1.9.0']
cloudant = ['cloudant>=0.5.9,<2.0'] # major update coming soon, clamp to 0.x
+redis = ['redis>=2.10.5']
all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant
devel = [
@@ -269,6 +270,7 @@ def do_setup():
'vertica': vertica,
'webhdfs': webhdfs,
'jira': jira,
+ 'redis': redis,
},
classifiers=[
'Development Status :: 5 - Production/Stable',
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/tests/contrib/hooks/test_redis_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_redis_hook.py b/tests/contrib/hooks/test_redis_hook.py
new file mode 100644
index 0000000..ab9a4bc
--- /dev/null
+++ b/tests/contrib/hooks/test_redis_hook.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import unittest
+from mock import patch
+
+from airflow import configuration
+from airflow.contrib.hooks.redis_hook import RedisHook
+
+
+class TestRedisHook(unittest.TestCase):
+ def setUp(self):
+ configuration.load_test_config()
+
+ def test_get_conn(self):
+ hook = RedisHook(redis_conn_id='redis_default')
+ self.assertEqual(hook.client, None)
+ self.assertEqual(
+ repr(hook.get_conn()),
+ (
+ 'StrictRedis<ConnectionPool'
+ '<Connection<host=localhost,port=6379,db=0>>>'
+ )
+ )
+
+ @patch("airflow.contrib.hooks.redis_hook.RedisHook.get_conn")
+ def test_first_conn_instantiation(self, get_conn):
+ hook = RedisHook(redis_conn_id='redis_default')
+ hook.key_exists('test_key')
+ self.assertTrue(get_conn.called_once())
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/tests/contrib/sensors/redis_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/redis_sensor.py b/tests/contrib/sensors/redis_sensor.py
new file mode 100644
index 0000000..8022a92
--- /dev/null
+++ b/tests/contrib/sensors/redis_sensor.py
@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import unittest
+import datetime
+
+from mock import patch
+
+from airflow import DAG
+from airflow import configuration
+from airflow.contrib.sensors.redis_key_sensor import RedisKeySensor
+
+DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+
+
+class TestRedisSensor(unittest.TestCase):
+
+ def setUp(self):
+ configuration.load_test_config()
+ args = {
+ 'owner': 'airflow',
+ 'start_date': DEFAULT_DATE
+ }
+
+ self.dag = DAG('test_dag_id', default_args=args)
+ self.sensor = RedisKeySensor(
+ task_id='test_task',
+ redis_conn_id='redis_default',
+ dag=self.dag,
+ key='test_key'
+ )
+
+ @patch("airflow.contrib.hooks.redis_hook.RedisHook.key_exists")
+ def test_poke(self, key_exists):
+ key_exists.return_value = True
+ self.assertTrue(self.sensor.poke(None))
+
+ key_exists.return_value = False
+ self.assertFalse(self.sensor.poke(None))
+
+ @patch("airflow.contrib.hooks.redis_hook.StrictRedis.exists")
+ def test_existing_key_called(self, redis_client_exists):
+ self.sensor.run(
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE, ignore_ti_state=True
+ )
+
+ self.assertTrue(redis_client_exists.called_with('test_key'))
+
+
+if __name__ == '__main__':
+ unittest.main()