You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jl...@apache.org on 2017/03/13 22:04:24 UTC
incubator-airflow git commit: [AIRFLOW-770] Refactor BaseHook so env
vars are always read
Repository: incubator-airflow
Updated Branches:
refs/heads/master fdefe1f82 -> 261b65670
[AIRFLOW-770] Refactor BaseHook so env vars are always read
The WebHDFS and HDFS hooks ignore connections set
in the environment
variables because they use
`BaseHook.get_connections()` directly,
which fetches a list of connections from DB. I
moved that method's
logic to `_get_connections_from_db()` and made a
new
`get_connections()` that first checks environment
variables before
falling back on connections in DB. Also because
connection extras
cannot be specified when using environment
variables, I added an arg
to HDFSHook for using Snakebite's
AutoConfigClient, which can be
initialized without any connection info.
Closes #2056 from dhuang/AIRFLOW-770
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/261b6567
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/261b6567
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/261b6567
Branch: refs/heads/master
Commit: 261b65670a610d33a25f96b824207ba5771524f2
Parents: fdefe1f
Author: Daniel Huang <dx...@gmail.com>
Authored: Mon Mar 13 18:04:14 2017 -0400
Committer: Jeremiah Lowin <jl...@apache.org>
Committed: Mon Mar 13 18:04:14 2017 -0400
----------------------------------------------------------------------
airflow/hooks/base_hook.py | 18 +++++++++--
airflow/hooks/hdfs_hook.py | 60 ++++++++++++++++++++++++------------
tests/core.py | 67 +++++++++++++++++++++++++++++++++++++++++
3 files changed, 123 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/261b6567/airflow/hooks/base_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/base_hook.py b/airflow/hooks/base_hook.py
index f5eeb6a..cef8c97 100644
--- a/airflow/hooks/base_hook.py
+++ b/airflow/hooks/base_hook.py
@@ -41,7 +41,7 @@ class BaseHook(object):
pass
@classmethod
- def get_connections(cls, conn_id):
+ def _get_connections_from_db(cls, conn_id):
session = settings.Session()
db = (
session.query(Connection)
@@ -56,13 +56,25 @@ class BaseHook(object):
return db
@classmethod
- def get_connection(cls, conn_id):
+ def _get_connection_from_env(cls, conn_id):
environment_uri = os.environ.get(CONN_ENV_PREFIX + conn_id.upper())
conn = None
if environment_uri:
conn = Connection(conn_id=conn_id, uri=environment_uri)
+ return conn
+
+ @classmethod
+ def get_connections(cls, conn_id):
+ conn = cls._get_connection_from_env(conn_id)
+ if conn:
+ conns = [conn]
else:
- conn = random.choice(cls.get_connections(conn_id))
+ conns = cls._get_connections_from_db(conn_id)
+ return conns
+
+ @classmethod
+ def get_connection(cls, conn_id):
+ conn = random.choice(cls.get_connections(conn_id))
if conn.host:
logging.info("Using connection to: " + conn.host)
return conn
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/261b6567/airflow/hooks/hdfs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hdfs_hook.py b/airflow/hooks/hdfs_hook.py
index 69dccd0..549b609 100644
--- a/airflow/hooks/hdfs_hook.py
+++ b/airflow/hooks/hdfs_hook.py
@@ -31,8 +31,16 @@ class HDFSHookException(AirflowException):
class HDFSHook(BaseHook):
"""
Interact with HDFS. This class is a wrapper around the snakebite library.
+
+ :param hdfs_conn_id: Connection id to fetch connection info
+ :type conn_id: string
+ :param proxy_user: effective user for HDFS operations
+ :type proxy_user: string
+ :param autoconfig: use snakebite's automatically configured client
+ :type autoconfig: bool
"""
- def __init__(self, hdfs_conn_id='hdfs_default', proxy_user=None):
+ def __init__(self, hdfs_conn_id='hdfs_default', proxy_user=None,
+ autoconfig=False):
if not snakebite_imported:
raise ImportError(
'This HDFSHook implementation requires snakebite, but '
@@ -41,33 +49,47 @@ class HDFSHook(BaseHook):
'this hook -- or help by submitting a PR!')
self.hdfs_conn_id = hdfs_conn_id
self.proxy_user = proxy_user
+ self.autoconfig = autoconfig
def get_conn(self):
"""
Returns a snakebite HDFSClient object.
"""
- connections = self.get_connections(self.hdfs_conn_id)
- use_sasl = False
- if configuration.get('core', 'security') == 'kerberos':
- use_sasl = True
+ # When using HAClient, proxy_user must be the same, so is ok to always
+ # take the first.
+ effective_user = self.proxy_user
+ autoconfig = self.autoconfig
+ use_sasl = configuration.get('core', 'security') == 'kerberos'
+
+ try:
+ connections = self.get_connections(self.hdfs_conn_id)
+
+ if not effective_user:
+ effective_user = connections[0].login
+ if not autoconfig:
+ autoconfig = connections[0].extra_dejson.get('autoconfig',
+ False)
+ hdfs_namenode_principal = connections[0].extra_dejson.get(
+ 'hdfs_namenode_principal')
+ except AirflowException:
+ if not autoconfig:
+ raise
- # When using HAClient, proxy_user must be the same, so is ok to always take the first.
- effective_user = self.proxy_user or connections[0].login
- if len(connections) == 1:
- autoconfig = connections[0].extra_dejson.get('autoconfig', False)
- if autoconfig:
- client = AutoConfigClient(effective_user=effective_user, use_sasl=use_sasl)
- else:
- hdfs_namenode_principal = connections[0].extra_dejson.get('hdfs_namenode_principal')
- client = Client(connections[0].host, connections[0].port,
- effective_user=effective_user, use_sasl=use_sasl,
- hdfs_namenode_principal=hdfs_namenode_principal)
+ if autoconfig:
+ # will read config info from $HADOOP_HOME conf files
+ client = AutoConfigClient(effective_user=effective_user,
+ use_sasl=use_sasl)
+ elif len(connections) == 1:
+ client = Client(connections[0].host, connections[0].port,
+ effective_user=effective_user, use_sasl=use_sasl,
+ hdfs_namenode_principal=hdfs_namenode_principal)
elif len(connections) > 1:
- hdfs_namenode_principal = connections[0].extra_dejson.get('hdfs_namenode_principal')
nn = [Namenode(conn.host, conn.port) for conn in connections]
- client = HAClient(nn, effective_user=effective_user, use_sasl=use_sasl,
+ client = HAClient(nn, effective_user=effective_user,
+ use_sasl=use_sasl,
hdfs_namenode_principal=hdfs_namenode_principal)
else:
- raise HDFSHookException("conn_id doesn't exist in the repository")
+ raise HDFSHookException("conn_id doesn't exist in the repository "
+ "and autoconfig is not specified")
return client
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/261b6567/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 67400e1..e17920a 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -15,6 +15,7 @@
from __future__ import print_function
import doctest
+import json
import os
import re
import unittest
@@ -2045,6 +2046,22 @@ class ConnectionTest(unittest.TestCase):
self.assertIsInstance(engine, sqlalchemy.engine.Engine)
self.assertEqual('postgres://username:password@ec2.compute.com:5432/the_database', str(engine.url))
+ def test_get_connections_env_var(self):
+ conns = SqliteHook.get_connections(conn_id='test_uri')
+ assert len(conns) == 1
+ assert conns[0].host == 'ec2.compute.com'
+ assert conns[0].schema == 'the_database'
+ assert conns[0].login == 'username'
+ assert conns[0].password == 'password'
+ assert conns[0].port == 5432
+
+ def test_get_connections_db(self):
+ conns = BaseHook.get_connections(conn_id='airflow_db')
+ assert len(conns) == 1
+ assert conns[0].host == 'localhost'
+ assert conns[0].schema == 'airflow'
+ assert conns[0].login == 'root'
+
class WebHDFSHookTest(unittest.TestCase):
def setUp(self):
@@ -2062,6 +2079,56 @@ class WebHDFSHookTest(unittest.TestCase):
try:
+ from airflow.hooks.hdfs_hook import HDFSHook
+ import snakebite
+except ImportError:
+ HDFSHook = None
+
+
+@unittest.skipIf(HDFSHook is None,
+ "Skipping test because HDFSHook is not installed")
+class HDFSHookTest(unittest.TestCase):
+ def setUp(self):
+ configuration.load_test_config()
+ os.environ['AIRFLOW_CONN_HDFS_DEFAULT'] = ('hdfs://localhost:8020')
+
+ def test_get_client(self):
+ client = HDFSHook(proxy_user='foo').get_conn()
+ self.assertIsInstance(client, snakebite.client.Client)
+ self.assertEqual('localhost', client.host)
+ self.assertEqual(8020, client.port)
+ self.assertEqual('foo', client.service.channel.effective_user)
+
+ @mock.patch('airflow.hooks.hdfs_hook.AutoConfigClient')
+ @mock.patch('airflow.hooks.hdfs_hook.HDFSHook.get_connections')
+ def test_get_autoconfig_client(self, mock_get_connections,
+ MockAutoConfigClient):
+ c = models.Connection(conn_id='hdfs', conn_type='hdfs',
+ host='localhost', port=8020, login='foo',
+ extra=json.dumps({'autoconfig': True}))
+ mock_get_connections.return_value = [c]
+ HDFSHook(hdfs_conn_id='hdfs').get_conn()
+ MockAutoConfigClient.assert_called_once_with(effective_user='foo',
+ use_sasl=False)
+
+ @mock.patch('airflow.hooks.hdfs_hook.AutoConfigClient')
+ def test_get_autoconfig_client_no_conn(self, MockAutoConfigClient):
+ HDFSHook(hdfs_conn_id='hdfs_missing', autoconfig=True).get_conn()
+ MockAutoConfigClient.assert_called_once_with(effective_user=None,
+ use_sasl=False)
+
+ @mock.patch('airflow.hooks.hdfs_hook.HDFSHook.get_connections')
+ def test_get_ha_client(self, mock_get_connections):
+ c1 = models.Connection(conn_id='hdfs_default', conn_type='hdfs',
+ host='localhost', port=8020)
+ c2 = models.Connection(conn_id='hdfs_default', conn_type='hdfs',
+ host='localhost2', port=8020)
+ mock_get_connections.return_value = [c1, c2]
+ client = HDFSHook().get_conn()
+ self.assertIsInstance(client, snakebite.client.HAClient)
+
+
+try:
from airflow.hooks.S3_hook import S3Hook
except ImportError:
S3Hook = None