You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jl...@apache.org on 2017/03/13 22:04:24 UTC

incubator-airflow git commit: [AIRFLOW-770] Refactor BaseHook so env vars are always read

Repository: incubator-airflow
Updated Branches:
  refs/heads/master fdefe1f82 -> 261b65670


[AIRFLOW-770] Refactor BaseHook so env vars are always read

The WebHDFS and HDFS hooks ignore connections set
in the environment
variables because they use
`BaseHook.get_connections()` directly,
which fetches a list of connections from DB. I
moved that method's
logic to `_get_connections_from_db()` and made a
new
`get_connections()` that first checks environment
variables before
falling back on connections in DB. Also because
connection extras
cannot be specified when using environment
variables, I added an arg
to HDFSHook for using Snakebite's
AutoConfigClient, which can be
initialized without any connection info.

Closes #2056 from dhuang/AIRFLOW-770


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/261b6567
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/261b6567
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/261b6567

Branch: refs/heads/master
Commit: 261b65670a610d33a25f96b824207ba5771524f2
Parents: fdefe1f
Author: Daniel Huang <dx...@gmail.com>
Authored: Mon Mar 13 18:04:14 2017 -0400
Committer: Jeremiah Lowin <jl...@apache.org>
Committed: Mon Mar 13 18:04:14 2017 -0400

----------------------------------------------------------------------
 airflow/hooks/base_hook.py | 18 +++++++++--
 airflow/hooks/hdfs_hook.py | 60 ++++++++++++++++++++++++------------
 tests/core.py              | 67 +++++++++++++++++++++++++++++++++++++++++
 3 files changed, 123 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/261b6567/airflow/hooks/base_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/base_hook.py b/airflow/hooks/base_hook.py
index f5eeb6a..cef8c97 100644
--- a/airflow/hooks/base_hook.py
+++ b/airflow/hooks/base_hook.py
@@ -41,7 +41,7 @@ class BaseHook(object):
         pass
 
     @classmethod
-    def get_connections(cls, conn_id):
+    def _get_connections_from_db(cls, conn_id):
         session = settings.Session()
         db = (
             session.query(Connection)
@@ -56,13 +56,25 @@ class BaseHook(object):
         return db
 
     @classmethod
-    def get_connection(cls, conn_id):
+    def _get_connection_from_env(cls, conn_id):
         environment_uri = os.environ.get(CONN_ENV_PREFIX + conn_id.upper())
         conn = None
         if environment_uri:
             conn = Connection(conn_id=conn_id, uri=environment_uri)
+        return conn
+
+    @classmethod
+    def get_connections(cls, conn_id):
+        conn = cls._get_connection_from_env(conn_id)
+        if conn:
+            conns = [conn]
         else:
-            conn = random.choice(cls.get_connections(conn_id))
+            conns = cls._get_connections_from_db(conn_id)
+        return conns
+
+    @classmethod
+    def get_connection(cls, conn_id):
+        conn = random.choice(cls.get_connections(conn_id))
         if conn.host:
             logging.info("Using connection to: " + conn.host)
         return conn

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/261b6567/airflow/hooks/hdfs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hdfs_hook.py b/airflow/hooks/hdfs_hook.py
index 69dccd0..549b609 100644
--- a/airflow/hooks/hdfs_hook.py
+++ b/airflow/hooks/hdfs_hook.py
@@ -31,8 +31,16 @@ class HDFSHookException(AirflowException):
 class HDFSHook(BaseHook):
     """
     Interact with HDFS. This class is a wrapper around the snakebite library.
+
+    :param hdfs_conn_id: Connection id to fetch connection info
+    :type conn_id: string
+    :param proxy_user: effective user for HDFS operations
+    :type proxy_user: string
+    :param autoconfig: use snakebite's automatically configured client
+    :type autoconfig: bool
     """
-    def __init__(self, hdfs_conn_id='hdfs_default', proxy_user=None):
+    def __init__(self, hdfs_conn_id='hdfs_default', proxy_user=None,
+                 autoconfig=False):
         if not snakebite_imported:
             raise ImportError(
                 'This HDFSHook implementation requires snakebite, but '
@@ -41,33 +49,47 @@ class HDFSHook(BaseHook):
                 'this hook  -- or help by submitting a PR!')
         self.hdfs_conn_id = hdfs_conn_id
         self.proxy_user = proxy_user
+        self.autoconfig = autoconfig
 
     def get_conn(self):
         """
         Returns a snakebite HDFSClient object.
         """
-        connections = self.get_connections(self.hdfs_conn_id)
-        use_sasl = False
-        if configuration.get('core', 'security') == 'kerberos':
-            use_sasl = True
+        # When using HAClient, proxy_user must be the same, so is ok to always
+        # take the first.
+        effective_user = self.proxy_user
+        autoconfig = self.autoconfig
+        use_sasl = configuration.get('core', 'security') == 'kerberos'
+
+        try:
+            connections = self.get_connections(self.hdfs_conn_id)
+
+            if not effective_user:
+                effective_user = connections[0].login
+            if not autoconfig:
+                autoconfig = connections[0].extra_dejson.get('autoconfig',
+                                                             False)
+            hdfs_namenode_principal = connections[0].extra_dejson.get(
+                'hdfs_namenode_principal')
+        except AirflowException:
+            if not autoconfig:
+                raise
 
-        # When using HAClient, proxy_user must be the same, so is ok to always take the first.
-        effective_user = self.proxy_user or connections[0].login
-        if len(connections) == 1:
-            autoconfig = connections[0].extra_dejson.get('autoconfig', False)
-            if autoconfig:
-                client = AutoConfigClient(effective_user=effective_user, use_sasl=use_sasl)
-            else:
-                hdfs_namenode_principal = connections[0].extra_dejson.get('hdfs_namenode_principal')
-                client = Client(connections[0].host, connections[0].port,
-                                effective_user=effective_user, use_sasl=use_sasl,
-                                hdfs_namenode_principal=hdfs_namenode_principal)
+        if autoconfig:
+            # will read config info from $HADOOP_HOME conf files
+            client = AutoConfigClient(effective_user=effective_user,
+                                      use_sasl=use_sasl)
+        elif len(connections) == 1:
+            client = Client(connections[0].host, connections[0].port,
+                            effective_user=effective_user, use_sasl=use_sasl,
+                            hdfs_namenode_principal=hdfs_namenode_principal)
         elif len(connections) > 1:
-            hdfs_namenode_principal = connections[0].extra_dejson.get('hdfs_namenode_principal')
             nn = [Namenode(conn.host, conn.port) for conn in connections]
-            client = HAClient(nn, effective_user=effective_user, use_sasl=use_sasl,
+            client = HAClient(nn, effective_user=effective_user,
+                              use_sasl=use_sasl,
                               hdfs_namenode_principal=hdfs_namenode_principal)
         else:
-            raise HDFSHookException("conn_id doesn't exist in the repository")
+            raise HDFSHookException("conn_id doesn't exist in the repository "
+                                    "and autoconfig is not specified")
 
         return client

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/261b6567/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 67400e1..e17920a 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -15,6 +15,7 @@
 from __future__ import print_function
 
 import doctest
+import json
 import os
 import re
 import unittest
@@ -2045,6 +2046,22 @@ class ConnectionTest(unittest.TestCase):
         self.assertIsInstance(engine, sqlalchemy.engine.Engine)
         self.assertEqual('postgres://username:password@ec2.compute.com:5432/the_database', str(engine.url))
 
+    def test_get_connections_env_var(self):
+        conns = SqliteHook.get_connections(conn_id='test_uri')
+        assert len(conns) == 1
+        assert conns[0].host == 'ec2.compute.com'
+        assert conns[0].schema == 'the_database'
+        assert conns[0].login == 'username'
+        assert conns[0].password == 'password'
+        assert conns[0].port == 5432
+
+    def test_get_connections_db(self):
+        conns = BaseHook.get_connections(conn_id='airflow_db')
+        assert len(conns) == 1
+        assert conns[0].host == 'localhost'
+        assert conns[0].schema == 'airflow'
+        assert conns[0].login == 'root'
+
 
 class WebHDFSHookTest(unittest.TestCase):
     def setUp(self):
@@ -2062,6 +2079,56 @@ class WebHDFSHookTest(unittest.TestCase):
 
 
 try:
+    from airflow.hooks.hdfs_hook import HDFSHook
+    import snakebite
+except ImportError:
+    HDFSHook = None
+
+
+@unittest.skipIf(HDFSHook is None,
+                 "Skipping test because HDFSHook is not installed")
+class HDFSHookTest(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+        os.environ['AIRFLOW_CONN_HDFS_DEFAULT'] = ('hdfs://localhost:8020')
+
+    def test_get_client(self):
+        client = HDFSHook(proxy_user='foo').get_conn()
+        self.assertIsInstance(client, snakebite.client.Client)
+        self.assertEqual('localhost', client.host)
+        self.assertEqual(8020, client.port)
+        self.assertEqual('foo', client.service.channel.effective_user)
+
+    @mock.patch('airflow.hooks.hdfs_hook.AutoConfigClient')
+    @mock.patch('airflow.hooks.hdfs_hook.HDFSHook.get_connections')
+    def test_get_autoconfig_client(self, mock_get_connections,
+                                   MockAutoConfigClient):
+        c = models.Connection(conn_id='hdfs', conn_type='hdfs',
+                              host='localhost', port=8020, login='foo',
+                              extra=json.dumps({'autoconfig': True}))
+        mock_get_connections.return_value = [c]
+        HDFSHook(hdfs_conn_id='hdfs').get_conn()
+        MockAutoConfigClient.assert_called_once_with(effective_user='foo',
+                                                     use_sasl=False)
+
+    @mock.patch('airflow.hooks.hdfs_hook.AutoConfigClient')
+    def test_get_autoconfig_client_no_conn(self, MockAutoConfigClient):
+        HDFSHook(hdfs_conn_id='hdfs_missing', autoconfig=True).get_conn()
+        MockAutoConfigClient.assert_called_once_with(effective_user=None,
+                                                     use_sasl=False)
+
+    @mock.patch('airflow.hooks.hdfs_hook.HDFSHook.get_connections')
+    def test_get_ha_client(self, mock_get_connections):
+        c1 = models.Connection(conn_id='hdfs_default', conn_type='hdfs',
+                               host='localhost', port=8020)
+        c2 = models.Connection(conn_id='hdfs_default', conn_type='hdfs',
+                               host='localhost2', port=8020)
+        mock_get_connections.return_value = [c1, c2]
+        client = HDFSHook().get_conn()
+        self.assertIsInstance(client, snakebite.client.HAClient)
+
+
+try:
     from airflow.hooks.S3_hook import S3Hook
 except ImportError:
     S3Hook = None