You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by na...@apache.org on 2018/03/19 15:13:01 UTC

[09/13] incubator-spot git commit: [SPOT-213] [INGEST] adding common functions for hdfs, hive with support for kerberos

[SPOT-213] [INGEST] adding common functions for hdfs, hive with support for kerberos


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/1582c4c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/1582c4c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/1582c4c1

Branch: refs/heads/master
Commit: 1582c4c1ad10358d672f0ebef5d2c88ac225c65b
Parents: 13e35fc
Author: natedogs911 <na...@gmail.com>
Authored: Fri Jan 19 09:40:50 2018 -0800
Committer: natedogs911 <na...@gmail.com>
Committed: Fri Jan 19 09:40:50 2018 -0800

----------------------------------------------------------------------
 spot-ingest/common/configurator.py | 119 ++++++++++++++++
 spot-ingest/common/hdfs_client.py  | 233 ++++++++++++++++++++++++++++++++
 spot-ingest/common/hive_engine.py  |  73 ++++++++++
 3 files changed, 425 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1582c4c1/spot-ingest/common/configurator.py
----------------------------------------------------------------------
diff --git a/spot-ingest/common/configurator.py b/spot-ingest/common/configurator.py
new file mode 100644
index 0000000..6fe0ede
--- /dev/null
+++ b/spot-ingest/common/configurator.py
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import ConfigParser
+from io import open
+
+
+def configuration():
+
+    config = ConfigParser.ConfigParser()
+
+    try:
+        conf = open("/etc/spot.conf", "r")
+    except (OSError, IOError) as e:
+        print("Error opening: spot.conf" + " error: " + e.errno)
+        raise e
+
+    config.readfp(SecHead(conf))
+    return config
+
+
+def db():
+    conf = configuration()
+    return conf.get('conf', 'DBNAME').replace("'", "").replace('"', '')
+
+
+def impala():
+    conf = configuration()
+    return conf.get('conf', 'IMPALA_DEM'), conf.get('conf', 'IMPALA_PORT')
+
+
+def hive():
+    conf = configuration()
+    return conf.get('conf', 'HS2_HOST'), conf.get('conf', 'HS2_PORT')
+
+
+def hdfs():
+    conf = configuration()
+    name_node = conf.get('conf',"NAME_NODE")
+    web_port = conf.get('conf',"WEB_PORT")
+    hdfs_user = conf.get('conf',"HUSER")
+    hdfs_user = hdfs_user.split("/")[-1].replace("'", "").replace('"', '')
+    return name_node,web_port,hdfs_user
+
+
+def spot():
+    conf = configuration()
+    return conf.get('conf',"HUSER").replace("'", "").replace('"', '')
+
+
+def kerberos_enabled():
+    conf = configuration()
+    enabled = conf.get('conf', 'KERBEROS').replace("'", "").replace('"', '')
+    if enabled.lower() == 'true':
+        return True
+    else:
+        return False
+
+
+def kerberos():
+    conf = configuration()
+    if kerberos_enabled():
+        principal = conf.get('conf', 'PRINCIPAL')
+        keytab = conf.get('conf', 'KEYTAB')
+        sasl_mech = conf.get('conf', 'SASL_MECH')
+        security_proto = conf.get('conf', 'SECURITY_PROTO')
+        return principal, keytab, sasl_mech, security_proto
+    else:
+        raise KeyError
+
+
+def ssl_enabled():
+    conf = configuration()
+    enabled = conf.get('conf', 'SSL')
+    if enabled.lower() == 'true':
+        return True
+    else:
+        return False
+
+
+def ssl():
+    conf = configuration()
+    if ssl_enabled():
+        ssl_verify = conf.get('conf', 'SSL_VERIFY')
+        ca_location = conf.get('conf', 'CA_LOCATION')
+        cert = conf.get('conf', 'CERT')
+        key = conf.get('conf', 'KEY')
+        return ssl_verify, ca_location, cert, key
+    else:
+        raise KeyError
+
+
+class SecHead(object):
+    def __init__(self, fp):
+        self.fp = fp
+        self.sechead = '[conf]\n'
+
+    def readline(self):
+        if self.sechead:
+            try:
+                return self.sechead
+            finally:
+                self.sechead = None
+        else:
+            return self.fp.readline()

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1582c4c1/spot-ingest/common/hdfs_client.py
----------------------------------------------------------------------
diff --git a/spot-ingest/common/hdfs_client.py b/spot-ingest/common/hdfs_client.py
new file mode 100644
index 0000000..5605e9c
--- /dev/null
+++ b/spot-ingest/common/hdfs_client.py
@@ -0,0 +1,233 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from hdfs import InsecureClient
+from hdfs.util import HdfsError
+from hdfs import Client
+from hdfs.ext.kerberos import KerberosClient
+from requests import Session
+from json import dump
+from threading import Lock
+import logging
+import configurator as Config
+from sys import stderr
+
+
+class Progress(object):
+
+    """Basic progress tracker callback."""
+
+    def __init__(self, hdfs_path, nbytes):
+        self._data = {}
+        self._lock = Lock()
+        self._hpath = hdfs_path
+        self._nbytes = nbytes
+
+    def __call__(self):
+        with self._lock:
+            if self._nbytes >= 0:
+                self._data[self._hpath] = self._nbytes
+            else:
+                stderr.write('%s\n' % (sum(self._data.values()), ))
+
+
+class SecureKerberosClient(KerberosClient):
+
+    """A new client subclass for handling HTTPS connections with Kerberos.
+
+    :param url: URL to namenode.
+    :param cert: Local certificate. See `requests` documentation for details
+      on how to use this.
+    :param verify: Whether to check the host's certificate. WARNING: non production use only
+    :param \*\*kwargs: Keyword arguments passed to the default `Client`
+      constructor.
+
+    """
+
+    def __init__(self, url, mutual_auth, cert=None, verify='true', **kwargs):
+
+        self._logger = logging.getLogger("SPOT.INGEST.HDFS_client")
+        session = Session()
+
+        if verify == 'true':
+            self._logger.info('SSL verification enabled')
+            session.verify = True
+            if cert is not None:
+                self._logger.info('SSL Cert: ' + cert)
+                if ',' in cert:
+                    session.cert = [path.strip() for path in cert.split(',')]
+                else:
+                    session.cert = cert
+        elif verify == 'false':
+            session.verify = False
+
+        super(SecureKerberosClient, self).__init__(url, mutual_auth, session=session, **kwargs)
+
+
+class HdfsException(HdfsError):
+    def __init__(self, message):
+        super(HdfsException, self).__init__(message)
+        self.message = message
+
+
+def get_client(user=None):
+    # type: (object) -> Client
+
+    logger = logging.getLogger('SPOT.INGEST.HDFS.get_client')
+    hdfs_nm, hdfs_port, hdfs_user = Config.hdfs()
+    conf = {'url': '{0}:{1}'.format(hdfs_nm, hdfs_port),
+            'mutual_auth': 'OPTIONAL'
+            }
+
+    if Config.ssl_enabled():
+        ssl_verify, ca_location, cert, key = Config.ssl()
+        conf.update({'verify': ssl_verify.lower()})
+        if cert:
+            conf.update({'cert': cert})
+
+    if Config.kerberos_enabled():
+        # TODO: handle other conditions
+        krb_conf = {'mutual_auth': 'OPTIONAL'}
+        conf.update(krb_conf)
+
+    # TODO: possible user parameter
+    logger.info('Client conf:')
+    for k,v in conf.iteritems():
+        logger.info(k + ': ' + v)
+
+    client = SecureKerberosClient(**conf)
+
+    return client
+
+
+def get_file(hdfs_file, client=None):
+    if not client:
+        client = get_client()
+
+    with client.read(hdfs_file) as reader:
+        results = reader.read()
+        return results
+
+
+def upload_file(hdfs_fp, local_fp, overwrite=False, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        result = client.upload(hdfs_fp, local_fp, overwrite=overwrite, progress=Progress)
+        return result
+    except HdfsError as err:
+        return err
+
+
+def download_file(hdfs_path, local_path, overwrite=False, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        client.download(hdfs_path, local_path, overwrite=overwrite)
+        return True
+    except HdfsError:
+        return False
+
+
+def mkdir(hdfs_path, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        client.makedirs(hdfs_path)
+        return True
+    except HdfsError:
+        return False
+
+
+def put_file_csv(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        hdfs_full_name = "{0}/{1}".format(hdfs_path,hdfs_file_name)
+        with client.write(hdfs_full_name,append=append_file,overwrite=overwrite_file) as writer:
+            for item in hdfs_file_content:
+                data = ','.join(str(d) for d in item)
+                writer.write("{0}\n".format(data))
+        return True
+
+    except HdfsError:
+        return False
+
+
+def put_file_json(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        hdfs_full_name = "{0}/{1}".format(hdfs_path,hdfs_file_name)
+        with client.write(hdfs_full_name,append=append_file,overwrite=overwrite_file,encoding='utf-8') as writer:
+            dump(hdfs_file_content, writer)
+        return True
+    except HdfsError:
+        return False
+
+
+def delete_folder(hdfs_file, user=None, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        client.delete(hdfs_file,recursive=True)
+    except HdfsError:
+        return False
+
+
+def check_dir(hdfs_path, client=None):
+    """
+    Returns True if directory exists
+    Returns False if directory does not exist
+    : param hdfs_path: path to check
+    : object client: hdfs client object for persistent connection
+    """
+    if not client:
+        client = get_client()
+
+    result = client.list(hdfs_path)
+    if None not in result:
+        return True
+    else:
+        return False
+
+
+def list_dir(hdfs_path, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        return client.list(hdfs_path)
+    except HdfsError:
+        return {}
+
+
+def file_exists(hdfs_path, file_name, client=None):
+    if not client:
+        client = get_client()
+
+    files = list_dir(hdfs_path, client)
+    if str(file_name) in files:
+        return True
+    else:
+        return False

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1582c4c1/spot-ingest/common/hive_engine.py
----------------------------------------------------------------------
diff --git a/spot-ingest/common/hive_engine.py b/spot-ingest/common/hive_engine.py
new file mode 100644
index 0000000..eb3d79e
--- /dev/null
+++ b/spot-ingest/common/hive_engine.py
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from impala.dbapi import connect
+import common.configurator as config
+
+
+def create_connection():
+
+    host, port = config.hive()
+    conf = {}
+
+    # TODO: if using hive, kerberos service name must be changed, impyla sets 'impala' as default
+    conf.update({'kerberos_service_name': 'hive'})
+
+    if config.kerberos_enabled():
+        principal, keytab, sasl_mech, security_proto = config.kerberos()
+        conf.update({'auth_mechanism': 'GSSAPI',
+                     })
+    else:
+        conf.update({'auth_mechanism': 'PLAIN',
+                     })
+
+    if config.ssl_enabled():
+        ssl_verify, ca_location, cert, key = config.ssl()
+        if ssl_verify.lower() == 'false':
+            conf.update({'use_ssl': ssl_verify})
+        else:
+            conf.update({'ca_cert': cert,
+                         'use_ssl': ssl_verify
+                         })
+
+    db = config.db()
+    conn = connect(host=host, port=int(port), database=db, **conf)
+    return conn.cursor()
+
+
+def execute_query(query,fetch=False):
+
+    impala_cursor = create_connection()
+    impala_cursor.execute(query)
+
+    return impala_cursor if not fetch else impala_cursor.fetchall()
+
+
+def execute_query_as_list(query):
+
+    query_results = execute_query(query)
+    row_result = {}
+    results = []
+
+    for row in query_results:
+        x=0
+        for header in query_results.description:
+            row_result[header[0]] = row[x]
+            x +=1
+        results.append(row_result)
+        row_result = {}
+
+    return results