You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by na...@apache.org on 2018/03/19 19:28:32 UTC
[23/42] incubator-spot git commit: [SPOT-213] [INGEST] adding common
functions for hdfs, hive with support for kerberos
[SPOT-213] [INGEST] adding common functions for hdfs, hive with support for kerberos
Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/1582c4c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/1582c4c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/1582c4c1
Branch: refs/heads/SPOT-181_ODM
Commit: 1582c4c1ad10358d672f0ebef5d2c88ac225c65b
Parents: 13e35fc
Author: natedogs911 <na...@gmail.com>
Authored: Fri Jan 19 09:40:50 2018 -0800
Committer: natedogs911 <na...@gmail.com>
Committed: Fri Jan 19 09:40:50 2018 -0800
----------------------------------------------------------------------
spot-ingest/common/configurator.py | 119 ++++++++++++++++
spot-ingest/common/hdfs_client.py | 233 ++++++++++++++++++++++++++++++++
spot-ingest/common/hive_engine.py | 73 ++++++++++
3 files changed, 425 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1582c4c1/spot-ingest/common/configurator.py
----------------------------------------------------------------------
diff --git a/spot-ingest/common/configurator.py b/spot-ingest/common/configurator.py
new file mode 100644
index 0000000..6fe0ede
--- /dev/null
+++ b/spot-ingest/common/configurator.py
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import ConfigParser
+from io import open
+
+
+def configuration():
+
+ config = ConfigParser.ConfigParser()
+
+ try:
+ conf = open("/etc/spot.conf", "r")
+ except (OSError, IOError) as e:
+ print("Error opening: spot.conf" + " error: " + e.errno)
+ raise e
+
+ config.readfp(SecHead(conf))
+ return config
+
+
+def db():
+ conf = configuration()
+ return conf.get('conf', 'DBNAME').replace("'", "").replace('"', '')
+
+
+def impala():
+ conf = configuration()
+ return conf.get('conf', 'IMPALA_DEM'), conf.get('conf', 'IMPALA_PORT')
+
+
+def hive():
+ conf = configuration()
+ return conf.get('conf', 'HS2_HOST'), conf.get('conf', 'HS2_PORT')
+
+
+def hdfs():
+ conf = configuration()
+ name_node = conf.get('conf',"NAME_NODE")
+ web_port = conf.get('conf',"WEB_PORT")
+ hdfs_user = conf.get('conf',"HUSER")
+ hdfs_user = hdfs_user.split("/")[-1].replace("'", "").replace('"', '')
+ return name_node,web_port,hdfs_user
+
+
+def spot():
+ conf = configuration()
+ return conf.get('conf',"HUSER").replace("'", "").replace('"', '')
+
+
+def kerberos_enabled():
+ conf = configuration()
+ enabled = conf.get('conf', 'KERBEROS').replace("'", "").replace('"', '')
+ if enabled.lower() == 'true':
+ return True
+ else:
+ return False
+
+
+def kerberos():
+ conf = configuration()
+ if kerberos_enabled():
+ principal = conf.get('conf', 'PRINCIPAL')
+ keytab = conf.get('conf', 'KEYTAB')
+ sasl_mech = conf.get('conf', 'SASL_MECH')
+ security_proto = conf.get('conf', 'SECURITY_PROTO')
+ return principal, keytab, sasl_mech, security_proto
+ else:
+ raise KeyError
+
+
+def ssl_enabled():
+ conf = configuration()
+ enabled = conf.get('conf', 'SSL')
+ if enabled.lower() == 'true':
+ return True
+ else:
+ return False
+
+
+def ssl():
+ conf = configuration()
+ if ssl_enabled():
+ ssl_verify = conf.get('conf', 'SSL_VERIFY')
+ ca_location = conf.get('conf', 'CA_LOCATION')
+ cert = conf.get('conf', 'CERT')
+ key = conf.get('conf', 'KEY')
+ return ssl_verify, ca_location, cert, key
+ else:
+ raise KeyError
+
+
+class SecHead(object):
+ def __init__(self, fp):
+ self.fp = fp
+ self.sechead = '[conf]\n'
+
+ def readline(self):
+ if self.sechead:
+ try:
+ return self.sechead
+ finally:
+ self.sechead = None
+ else:
+ return self.fp.readline()
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1582c4c1/spot-ingest/common/hdfs_client.py
----------------------------------------------------------------------
diff --git a/spot-ingest/common/hdfs_client.py b/spot-ingest/common/hdfs_client.py
new file mode 100644
index 0000000..5605e9c
--- /dev/null
+++ b/spot-ingest/common/hdfs_client.py
@@ -0,0 +1,233 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from hdfs import InsecureClient
+from hdfs.util import HdfsError
+from hdfs import Client
+from hdfs.ext.kerberos import KerberosClient
+from requests import Session
+from json import dump
+from threading import Lock
+import logging
+import configurator as Config
+from sys import stderr
+
+
+class Progress(object):
+
+ """Basic progress tracker callback."""
+
+ def __init__(self, hdfs_path, nbytes):
+ self._data = {}
+ self._lock = Lock()
+ self._hpath = hdfs_path
+ self._nbytes = nbytes
+
+ def __call__(self):
+ with self._lock:
+ if self._nbytes >= 0:
+ self._data[self._hpath] = self._nbytes
+ else:
+ stderr.write('%s\n' % (sum(self._data.values()), ))
+
+
+class SecureKerberosClient(KerberosClient):
+
+ """A new client subclass for handling HTTPS connections with Kerberos.
+
+ :param url: URL to namenode.
+ :param cert: Local certificate. See `requests` documentation for details
+ on how to use this.
+ :param verify: Whether to check the host's certificate. WARNING: non production use only
+ :param \*\*kwargs: Keyword arguments passed to the default `Client`
+ constructor.
+
+ """
+
+ def __init__(self, url, mutual_auth, cert=None, verify='true', **kwargs):
+
+ self._logger = logging.getLogger("SPOT.INGEST.HDFS_client")
+ session = Session()
+
+ if verify == 'true':
+ self._logger.info('SSL verification enabled')
+ session.verify = True
+ if cert is not None:
+ self._logger.info('SSL Cert: ' + cert)
+ if ',' in cert:
+ session.cert = [path.strip() for path in cert.split(',')]
+ else:
+ session.cert = cert
+ elif verify == 'false':
+ session.verify = False
+
+ super(SecureKerberosClient, self).__init__(url, mutual_auth, session=session, **kwargs)
+
+
+class HdfsException(HdfsError):
+ def __init__(self, message):
+ super(HdfsException, self).__init__(message)
+ self.message = message
+
+
+def get_client(user=None):
+ # type: (object) -> Client
+
+ logger = logging.getLogger('SPOT.INGEST.HDFS.get_client')
+ hdfs_nm, hdfs_port, hdfs_user = Config.hdfs()
+ conf = {'url': '{0}:{1}'.format(hdfs_nm, hdfs_port),
+ 'mutual_auth': 'OPTIONAL'
+ }
+
+ if Config.ssl_enabled():
+ ssl_verify, ca_location, cert, key = Config.ssl()
+ conf.update({'verify': ssl_verify.lower()})
+ if cert:
+ conf.update({'cert': cert})
+
+ if Config.kerberos_enabled():
+ # TODO: handle other conditions
+ krb_conf = {'mutual_auth': 'OPTIONAL'}
+ conf.update(krb_conf)
+
+ # TODO: possible user parameter
+ logger.info('Client conf:')
+ for k,v in conf.iteritems():
+ logger.info(k + ': ' + v)
+
+ client = SecureKerberosClient(**conf)
+
+ return client
+
+
+def get_file(hdfs_file, client=None):
+ if not client:
+ client = get_client()
+
+ with client.read(hdfs_file) as reader:
+ results = reader.read()
+ return results
+
+
+def upload_file(hdfs_fp, local_fp, overwrite=False, client=None):
+ if not client:
+ client = get_client()
+
+ try:
+ result = client.upload(hdfs_fp, local_fp, overwrite=overwrite, progress=Progress)
+ return result
+ except HdfsError as err:
+ return err
+
+
+def download_file(hdfs_path, local_path, overwrite=False, client=None):
+ if not client:
+ client = get_client()
+
+ try:
+ client.download(hdfs_path, local_path, overwrite=overwrite)
+ return True
+ except HdfsError:
+ return False
+
+
+def mkdir(hdfs_path, client=None):
+ if not client:
+ client = get_client()
+
+ try:
+ client.makedirs(hdfs_path)
+ return True
+ except HdfsError:
+ return False
+
+
+def put_file_csv(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False, client=None):
+ if not client:
+ client = get_client()
+
+ try:
+ hdfs_full_name = "{0}/{1}".format(hdfs_path,hdfs_file_name)
+ with client.write(hdfs_full_name,append=append_file,overwrite=overwrite_file) as writer:
+ for item in hdfs_file_content:
+ data = ','.join(str(d) for d in item)
+ writer.write("{0}\n".format(data))
+ return True
+
+ except HdfsError:
+ return False
+
+
+def put_file_json(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False, client=None):
+ if not client:
+ client = get_client()
+
+ try:
+ hdfs_full_name = "{0}/{1}".format(hdfs_path,hdfs_file_name)
+ with client.write(hdfs_full_name,append=append_file,overwrite=overwrite_file,encoding='utf-8') as writer:
+ dump(hdfs_file_content, writer)
+ return True
+ except HdfsError:
+ return False
+
+
+def delete_folder(hdfs_file, user=None, client=None):
+ if not client:
+ client = get_client()
+
+ try:
+ client.delete(hdfs_file,recursive=True)
+ except HdfsError:
+ return False
+
+
+def check_dir(hdfs_path, client=None):
+ """
+ Returns True if directory exists
+ Returns False if directory does not exist
+ : param hdfs_path: path to check
+ : object client: hdfs client object for persistent connection
+ """
+ if not client:
+ client = get_client()
+
+ result = client.list(hdfs_path)
+ if None not in result:
+ return True
+ else:
+ return False
+
+
+def list_dir(hdfs_path, client=None):
+ if not client:
+ client = get_client()
+
+ try:
+ return client.list(hdfs_path)
+ except HdfsError:
+ return {}
+
+
+def file_exists(hdfs_path, file_name, client=None):
+ if not client:
+ client = get_client()
+
+ files = list_dir(hdfs_path, client)
+ if str(file_name) in files:
+ return True
+ else:
+ return False
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1582c4c1/spot-ingest/common/hive_engine.py
----------------------------------------------------------------------
diff --git a/spot-ingest/common/hive_engine.py b/spot-ingest/common/hive_engine.py
new file mode 100644
index 0000000..eb3d79e
--- /dev/null
+++ b/spot-ingest/common/hive_engine.py
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from impala.dbapi import connect
+import common.configurator as config
+
+
+def create_connection():
+
+ host, port = config.hive()
+ conf = {}
+
+ # TODO: if using hive, kerberos service name must be changed, impyla sets 'impala' as default
+ conf.update({'kerberos_service_name': 'hive'})
+
+ if config.kerberos_enabled():
+ principal, keytab, sasl_mech, security_proto = config.kerberos()
+ conf.update({'auth_mechanism': 'GSSAPI',
+ })
+ else:
+ conf.update({'auth_mechanism': 'PLAIN',
+ })
+
+ if config.ssl_enabled():
+ ssl_verify, ca_location, cert, key = config.ssl()
+ if ssl_verify.lower() == 'false':
+ conf.update({'use_ssl': ssl_verify})
+ else:
+ conf.update({'ca_cert': cert,
+ 'use_ssl': ssl_verify
+ })
+
+ db = config.db()
+ conn = connect(host=host, port=int(port), database=db, **conf)
+ return conn.cursor()
+
+
+def execute_query(query,fetch=False):
+
+ impala_cursor = create_connection()
+ impala_cursor.execute(query)
+
+ return impala_cursor if not fetch else impala_cursor.fetchall()
+
+
+def execute_query_as_list(query):
+
+ query_results = execute_query(query)
+ row_result = {}
+ results = []
+
+ for row in query_results:
+ x=0
+ for header in query_results.description:
+ row_result[header[0]] = row[x]
+ x +=1
+ results.append(row_result)
+ row_result = {}
+
+ return results