You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by na...@apache.org on 2018/03/19 15:12:54 UTC

[02/13] incubator-spot git commit: [SPOT-213][SPOT-250][OA][API] add kerberos support

[SPOT-213][SPOT-250][OA][API] add kerberos support


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/d1f5a67f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/d1f5a67f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/d1f5a67f

Branch: refs/heads/master
Commit: d1f5a67f929090e2bad865d53d4389c69b176fc5
Parents: 7376c5e
Author: natedogs911 <na...@gmail.com>
Authored: Thu Jan 18 11:02:39 2018 -0800
Committer: natedogs911 <na...@gmail.com>
Committed: Thu Jan 18 11:02:39 2018 -0800

----------------------------------------------------------------------
 spot-oa/api/resources/configurator.py  |  69 +++++++++-
 spot-oa/api/resources/hdfs_client.py   | 201 ++++++++++++++++++++++++----
 spot-oa/api/resources/impala_engine.py |  29 +++-
 3 files changed, 262 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/d1f5a67f/spot-oa/api/resources/configurator.py
----------------------------------------------------------------------
diff --git a/spot-oa/api/resources/configurator.py b/spot-oa/api/resources/configurator.py
index 5bda045..017732d 100644
--- a/spot-oa/api/resources/configurator.py
+++ b/spot-oa/api/resources/configurator.py
@@ -14,35 +14,90 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
 import ConfigParser
-import os
+from io import open
+
 
 def configuration():
 
-    conf_file = "/etc/spot.conf"
     config = ConfigParser.ConfigParser()
-    config.readfp(SecHead(open(conf_file)))
+
+    try:
+        conf = open("/etc/spot.conf", "r")
+    except (OSError, IOError) as e:
+        print("Error opening: spot.conf" + " error: " + e.errno)
+        raise e
+
+    config.readfp(SecHead(conf))
     return config
 
+
 def db():
     conf = configuration()
-    return conf.get('conf', 'DBNAME').replace("'","").replace('"','')
+    return conf.get('conf', 'DBNAME').replace("'", "").replace('"', '')
+
 
 def impala():
     conf = configuration()
-    return conf.get('conf', 'IMPALA_DEM'),conf.get('conf', 'IMPALA_PORT')
+    return conf.get('conf', 'IMPALA_DEM'), conf.get('conf', 'IMPALA_PORT')
+
 
 def hdfs():
     conf = configuration()
     name_node = conf.get('conf',"NAME_NODE")
     web_port = conf.get('conf',"WEB_PORT")
     hdfs_user = conf.get('conf',"HUSER")
-    hdfs_user = hdfs_user.split("/")[-1].replace("'","").replace('"','')
+    hdfs_user = hdfs_user.split("/")[-1].replace("'", "").replace('"', '')
     return name_node,web_port,hdfs_user
 
+
 def spot():
     conf = configuration()
-    return conf.get('conf',"HUSER").replace("'","").replace('"','')
+    return conf.get('conf',"HUSER").replace("'", "").replace('"', '')
+
+
+def kerberos_enabled():
+    conf = configuration()
+    enabled = conf.get('conf', 'KERBEROS').replace("'", "").replace('"', '')
+    if enabled.lower() == 'true':
+        return True
+    else:
+        return False
+
+
+def kerberos():
+    conf = configuration()
+    if kerberos_enabled():
+        principal = conf.get('conf', 'PRINCIPAL')
+        keytab = conf.get('conf', 'KEYTAB')
+        sasl_mech = conf.get('conf', 'SASL_MECH')
+        security_proto = conf.get('conf', 'SECURITY_PROTO')
+        return principal, keytab, sasl_mech, security_proto
+    else:
+        raise KeyError
+
+
+def ssl_enabled():
+    conf = configuration()
+    enabled = conf.get('conf', 'SSL')
+    if enabled.lower() == 'true':
+        return True
+    else:
+        return False
+
+
+def ssl():
+    conf = configuration()
+    if ssl_enabled():
+        ssl_verify = conf.get('conf', 'SSL_VERIFY')
+        ca_location = conf.get('conf', 'CA_LOCATION')
+        cert = conf.get('conf', 'CERT')
+        key = conf.get('conf', 'KEY')
+        return ssl_verify, ca_location, cert, key
+    else:
+        raise KeyError
+
 
 class SecHead(object):
     def __init__(self, fp):

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/d1f5a67f/spot-oa/api/resources/hdfs_client.py
----------------------------------------------------------------------
diff --git a/spot-oa/api/resources/hdfs_client.py b/spot-oa/api/resources/hdfs_client.py
index 31c5eba..e7f6bec 100644
--- a/spot-oa/api/resources/hdfs_client.py
+++ b/spot-oa/api/resources/hdfs_client.py
@@ -14,63 +14,216 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-from hdfs import InsecureClient
+
 from hdfs.util import HdfsError
+from hdfs import Client
+from hdfs.ext.kerberos import KerberosClient
+from requests import Session
 from json import dump
-import api.resources.configurator as Config
+from threading import Lock
+import logging
+import configurator as Config
+from sys import stderr
+
+
+class Progress(object):
+
+    """Basic progress tracker callback."""
+
+    def __init__(self, hdfs_path, nbytes):
+        self._data = {}
+        self._lock = Lock()
+        self._hpath = hdfs_path
+        self._nbytes = nbytes
+
+    def __call__(self):
+        with self._lock:
+            if self._nbytes >= 0:
+                self._data[self._hpath] = self._nbytes
+            else:
+                stderr.write('%s\n' % (sum(self._data.values()), ))
+
+
+class SecureKerberosClient(KerberosClient):
+
+    """A new client subclass for handling HTTPS connections with Kerberos.
+
+    :param url: URL to namenode.
+    :param cert: Local certificate. See `requests` documentation for details
+      on how to use this.
+    :param verify: Whether to check the host's certificate. WARNING: non production use only
+    :param \*\*kwargs: Keyword arguments passed to the default `Client`
+      constructor.
+
+    """
+
+    def __init__(self, url, mutual_auth, cert=None, verify='true', **kwargs):
+
+        self._logger = logging.getLogger("SPOT.INGEST.HDFS_client")
+        session = Session()
+
+        if verify == 'true':
+            self._logger.info('SSL verification enabled')
+            session.verify = True
+            if cert is not None:
+                self._logger.info('SSL Cert: ' + cert)
+                if ',' in cert:
+                    session.cert = [path.strip() for path in cert.split(',')]
+                else:
+                    session.cert = cert
+        elif verify == 'false':
+            session.verify = False
+
+        super(SecureKerberosClient, self).__init__(url, mutual_auth, session=session, **kwargs)
+
 
+class HdfsException(HdfsError):
+    def __init__(self, message):
+        super(HdfsException, self).__init__(message)
+        self.message = message
+
+
+def get_client(user=None):
+    # type: (object) -> Client
+
+    logger = logging.getLogger('SPOT.INGEST.HDFS.get_client')
+    hdfs_nm, hdfs_port, hdfs_user = Config.hdfs()
+    conf = {'url': '{0}:{1}'.format(hdfs_nm, hdfs_port)}
+
+    if Config.ssl_enabled():
+        ssl_verify, ca_location, cert, key = Config.ssl()
+        conf.update({'verify': ssl_verify.lower()})
+        if cert:
+            conf.update({'cert': cert})
+
+    if Config.kerberos_enabled():
+        krb_conf = {'mutual_auth': 'OPTIONAL'}
+        conf.update(krb_conf)
+
+    # TODO: possible user parameter
+    logger.info('Client conf:')
+    for k,v in conf.iteritems():
+        logger.info(k + ': ' + v)
+
+    client = SecureKerberosClient(**conf)
 
-def _get_client(user=None):
-    hdfs_nm,hdfs_port,hdfs_user = Config.hdfs()
-    client = InsecureClient('http://{0}:{1}'.format(hdfs_nm,hdfs_port), user= user if user else hdfs_user)
     return client
 
-def get_file(hdfs_file):
-    client = _get_client()
+
+def get_file(hdfs_file, client=None):
+    if not client:
+        client = get_client()
+
     with client.read(hdfs_file) as reader:
         results = reader.read()
         return results
 
-def put_file_csv(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False):
-    
+
+def upload_file(hdfs_fp, local_fp, overwrite=False, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        result = client.upload(hdfs_fp, local_fp, overwrite=overwrite, progress=Progress)
+        return result
+    except HdfsError as err:
+        return err
+
+
+def download_file(hdfs_path, local_path, overwrite=False, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        client.download(hdfs_path, local_path, overwrite=overwrite)
+        return True
+    except HdfsError:
+        return False
+
+
+def mkdir(hdfs_path, client=None):
+    if client is not None:
+        client = get_client()
+
+    try:
+        client.makedirs(hdfs_path)
+        return True
+    except HdfsError:
+        return False
+
+
+def put_file_csv(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False, client=None):
+    if not client:
+        client = get_client()
+
     try:
-        client = _get_client()
         hdfs_full_name = "{0}/{1}".format(hdfs_path,hdfs_file_name)
         with client.write(hdfs_full_name,append=append_file,overwrite=overwrite_file) as writer:
             for item in hdfs_file_content:
                 data = ','.join(str(d) for d in item)
                 writer.write("{0}\n".format(data))
         return True
-        
+
     except HdfsError:
         return False
 
-def put_file_json(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False):
-    
+
+def put_file_json(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False, client=None):
+    if not client:
+        client = get_client()
+
     try:
-        client = _get_client()
         hdfs_full_name = "{0}/{1}".format(hdfs_path,hdfs_file_name)
         with client.write(hdfs_full_name,append=append_file,overwrite=overwrite_file,encoding='utf-8') as writer:
-	        dump(hdfs_file_content, writer)
+            dump(hdfs_file_content, writer)
         return True
     except HdfsError:
         return False
-    
 
-def delete_folder(hdfs_file,user=None):
-    client = _get_client(user)
-    client.delete(hdfs_file,recursive=True)
 
-def list_dir(hdfs_path):
+def delete_folder(hdfs_file, user=None, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        client.delete(hdfs_file,recursive=True)
+    except HdfsError:
+        return False
+
+
+def check_dir(hdfs_path, client=None):
+    """
+    Returns True if directory exists
+    Returns False if directory does not exist
+    : param hdfs_path: path to check
+    : object client: hdfs client object for persistent connection
+    """
+    if not client:
+        client = get_client()
+
+    result = client.list(hdfs_path)
+    if None not in result:
+        return True
+    else:
+        return False
+
+
+def list_dir(hdfs_path, client=None):
+    if not client:
+        client = get_client()
+
     try:
-        client = _get_client()
         return client.list(hdfs_path)
     except HdfsError:
         return {}
 
-def file_exists(hdfs_path,file_name):
-    files = list_dir(hdfs_path)
+
+def file_exists(hdfs_path, file_name, client=None):
+    if not client:
+        client = get_client()
+
+    files = list_dir(client, hdfs_path)
     if str(file_name) in files:
-	    return True
+        return True
     else:
         return False

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/d1f5a67f/spot-oa/api/resources/impala_engine.py
----------------------------------------------------------------------
diff --git a/spot-oa/api/resources/impala_engine.py b/spot-oa/api/resources/impala_engine.py
index b7d0148..542bbd0 100644
--- a/spot-oa/api/resources/impala_engine.py
+++ b/spot-oa/api/resources/impala_engine.py
@@ -15,15 +15,33 @@
 # limitations under the License.
 #
 from impala.dbapi import connect
-import api.resources.configurator as Config
+import api.resources.configurator as config
+
 
 def create_connection():
 
-    impala_host, impala_port =  Config.impala()
-    db = Config.db()
-    conn = connect(host=impala_host, port=int(impala_port),database=db)
+    impala_host, impala_port = config.impala()
+    conf = {}
+
+    # TODO: if using hive, kerberos service name must be changed, impyla sets 'impala' as default
+    service_name = {'kerberos_service_name': 'impala'}
+
+    if config.kerberos_enabled():
+        principal, keytab, sasl_mech, security_proto = config.kerberos()
+        conf.update({'auth_mechanism': 'GSSAPI',
+                     })
+
+    if config.ssl_enabled():
+        ssl_verify, ca_location, cert, key = config.ssl()
+        conf.update({'ca_cert': cert,
+                     'use_ssl': ssl_verify
+                     })
+
+    db = config.db()
+    conn = connect(host=impala_host, port=int(impala_port), database=db, **conf)
     return conn.cursor()
 
+
 def execute_query(query,fetch=False):
 
     impala_cursor = create_connection()
@@ -31,6 +49,7 @@ def execute_query(query,fetch=False):
 
     return impala_cursor if not fetch else impala_cursor.fetchall()
 
+
 def execute_query_as_list(query):
 
     query_results = execute_query(query)
@@ -46,5 +65,3 @@ def execute_query_as_list(query):
         row_result = {}
 
     return results
-
-