You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by na...@apache.org on 2018/03/19 15:12:54 UTC
[02/13] incubator-spot git commit: [SPOT-213][SPOT-250][OA][API] add
kerberos support
[SPOT-213][SPOT-250][OA][API] add kerberos support
Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/d1f5a67f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/d1f5a67f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/d1f5a67f
Branch: refs/heads/master
Commit: d1f5a67f929090e2bad865d53d4389c69b176fc5
Parents: 7376c5e
Author: natedogs911 <na...@gmail.com>
Authored: Thu Jan 18 11:02:39 2018 -0800
Committer: natedogs911 <na...@gmail.com>
Committed: Thu Jan 18 11:02:39 2018 -0800
----------------------------------------------------------------------
spot-oa/api/resources/configurator.py | 69 +++++++++-
spot-oa/api/resources/hdfs_client.py | 201 ++++++++++++++++++++++++----
spot-oa/api/resources/impala_engine.py | 29 +++-
3 files changed, 262 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/d1f5a67f/spot-oa/api/resources/configurator.py
----------------------------------------------------------------------
diff --git a/spot-oa/api/resources/configurator.py b/spot-oa/api/resources/configurator.py
index 5bda045..017732d 100644
--- a/spot-oa/api/resources/configurator.py
+++ b/spot-oa/api/resources/configurator.py
@@ -14,35 +14,90 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
import ConfigParser
-import os
+from io import open
+
def configuration():
- conf_file = "/etc/spot.conf"
config = ConfigParser.ConfigParser()
- config.readfp(SecHead(open(conf_file)))
+
+ try:
+ conf = open("/etc/spot.conf", "r")
+ except (OSError, IOError) as e:
+ print("Error opening: spot.conf" + " error: " + e.errno)
+ raise e
+
+ config.readfp(SecHead(conf))
return config
+
def db():
conf = configuration()
- return conf.get('conf', 'DBNAME').replace("'","").replace('"','')
+ return conf.get('conf', 'DBNAME').replace("'", "").replace('"', '')
+
def impala():
conf = configuration()
- return conf.get('conf', 'IMPALA_DEM'),conf.get('conf', 'IMPALA_PORT')
+ return conf.get('conf', 'IMPALA_DEM'), conf.get('conf', 'IMPALA_PORT')
+
def hdfs():
conf = configuration()
name_node = conf.get('conf',"NAME_NODE")
web_port = conf.get('conf',"WEB_PORT")
hdfs_user = conf.get('conf',"HUSER")
- hdfs_user = hdfs_user.split("/")[-1].replace("'","").replace('"','')
+ hdfs_user = hdfs_user.split("/")[-1].replace("'", "").replace('"', '')
return name_node,web_port,hdfs_user
+
def spot():
conf = configuration()
- return conf.get('conf',"HUSER").replace("'","").replace('"','')
+ return conf.get('conf',"HUSER").replace("'", "").replace('"', '')
+
+
+def kerberos_enabled():
+ conf = configuration()
+ enabled = conf.get('conf', 'KERBEROS').replace("'", "").replace('"', '')
+ if enabled.lower() == 'true':
+ return True
+ else:
+ return False
+
+
+def kerberos():
+ conf = configuration()
+ if kerberos_enabled():
+ principal = conf.get('conf', 'PRINCIPAL')
+ keytab = conf.get('conf', 'KEYTAB')
+ sasl_mech = conf.get('conf', 'SASL_MECH')
+ security_proto = conf.get('conf', 'SECURITY_PROTO')
+ return principal, keytab, sasl_mech, security_proto
+ else:
+ raise KeyError
+
+
+def ssl_enabled():
+ conf = configuration()
+ enabled = conf.get('conf', 'SSL')
+ if enabled.lower() == 'true':
+ return True
+ else:
+ return False
+
+
+def ssl():
+ conf = configuration()
+ if ssl_enabled():
+ ssl_verify = conf.get('conf', 'SSL_VERIFY')
+ ca_location = conf.get('conf', 'CA_LOCATION')
+ cert = conf.get('conf', 'CERT')
+ key = conf.get('conf', 'KEY')
+ return ssl_verify, ca_location, cert, key
+ else:
+ raise KeyError
+
class SecHead(object):
def __init__(self, fp):
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/d1f5a67f/spot-oa/api/resources/hdfs_client.py
----------------------------------------------------------------------
diff --git a/spot-oa/api/resources/hdfs_client.py b/spot-oa/api/resources/hdfs_client.py
index 31c5eba..e7f6bec 100644
--- a/spot-oa/api/resources/hdfs_client.py
+++ b/spot-oa/api/resources/hdfs_client.py
@@ -14,63 +14,216 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-from hdfs import InsecureClient
+
from hdfs.util import HdfsError
+from hdfs import Client
+from hdfs.ext.kerberos import KerberosClient
+from requests import Session
from json import dump
-import api.resources.configurator as Config
+from threading import Lock
+import logging
+import configurator as Config
+from sys import stderr
+
+
+class Progress(object):
+
+ """Basic progress tracker callback."""
+
+ def __init__(self, hdfs_path, nbytes):
+ self._data = {}
+ self._lock = Lock()
+ self._hpath = hdfs_path
+ self._nbytes = nbytes
+
+ def __call__(self):
+ with self._lock:
+ if self._nbytes >= 0:
+ self._data[self._hpath] = self._nbytes
+ else:
+ stderr.write('%s\n' % (sum(self._data.values()), ))
+
+
+class SecureKerberosClient(KerberosClient):
+
+ """A new client subclass for handling HTTPS connections with Kerberos.
+
+ :param url: URL to namenode.
+ :param cert: Local certificate. See `requests` documentation for details
+ on how to use this.
+ :param verify: Whether to check the host's certificate. WARNING: non production use only
+ :param \*\*kwargs: Keyword arguments passed to the default `Client`
+ constructor.
+
+ """
+
+ def __init__(self, url, mutual_auth, cert=None, verify='true', **kwargs):
+
+ self._logger = logging.getLogger("SPOT.INGEST.HDFS_client")
+ session = Session()
+
+ if verify == 'true':
+ self._logger.info('SSL verification enabled')
+ session.verify = True
+ if cert is not None:
+ self._logger.info('SSL Cert: ' + cert)
+ if ',' in cert:
+ session.cert = [path.strip() for path in cert.split(',')]
+ else:
+ session.cert = cert
+ elif verify == 'false':
+ session.verify = False
+
+ super(SecureKerberosClient, self).__init__(url, mutual_auth, session=session, **kwargs)
+
+class HdfsException(HdfsError):
+ def __init__(self, message):
+ super(HdfsException, self).__init__(message)
+ self.message = message
+
+
+def get_client(user=None):
+ # type: (object) -> Client
+
+ logger = logging.getLogger('SPOT.INGEST.HDFS.get_client')
+ hdfs_nm, hdfs_port, hdfs_user = Config.hdfs()
+ conf = {'url': '{0}:{1}'.format(hdfs_nm, hdfs_port)}
+
+ if Config.ssl_enabled():
+ ssl_verify, ca_location, cert, key = Config.ssl()
+ conf.update({'verify': ssl_verify.lower()})
+ if cert:
+ conf.update({'cert': cert})
+
+ if Config.kerberos_enabled():
+ krb_conf = {'mutual_auth': 'OPTIONAL'}
+ conf.update(krb_conf)
+
+ # TODO: possible user parameter
+ logger.info('Client conf:')
+ for k,v in conf.iteritems():
+ logger.info(k + ': ' + v)
+
+ client = SecureKerberosClient(**conf)
-def _get_client(user=None):
- hdfs_nm,hdfs_port,hdfs_user = Config.hdfs()
- client = InsecureClient('http://{0}:{1}'.format(hdfs_nm,hdfs_port), user= user if user else hdfs_user)
return client
-def get_file(hdfs_file):
- client = _get_client()
+
+def get_file(hdfs_file, client=None):
+ if not client:
+ client = get_client()
+
with client.read(hdfs_file) as reader:
results = reader.read()
return results
-def put_file_csv(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False):
-
+
+def upload_file(hdfs_fp, local_fp, overwrite=False, client=None):
+ if not client:
+ client = get_client()
+
+ try:
+ result = client.upload(hdfs_fp, local_fp, overwrite=overwrite, progress=Progress)
+ return result
+ except HdfsError as err:
+ return err
+
+
+def download_file(hdfs_path, local_path, overwrite=False, client=None):
+ if not client:
+ client = get_client()
+
+ try:
+ client.download(hdfs_path, local_path, overwrite=overwrite)
+ return True
+ except HdfsError:
+ return False
+
+
+def mkdir(hdfs_path, client=None):
+ if client is not None:
+ client = get_client()
+
+ try:
+ client.makedirs(hdfs_path)
+ return True
+ except HdfsError:
+ return False
+
+
+def put_file_csv(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False, client=None):
+ if not client:
+ client = get_client()
+
try:
- client = _get_client()
hdfs_full_name = "{0}/{1}".format(hdfs_path,hdfs_file_name)
with client.write(hdfs_full_name,append=append_file,overwrite=overwrite_file) as writer:
for item in hdfs_file_content:
data = ','.join(str(d) for d in item)
writer.write("{0}\n".format(data))
return True
-
+
except HdfsError:
return False
-def put_file_json(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False):
-
+
+def put_file_json(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False, client=None):
+ if not client:
+ client = get_client()
+
try:
- client = _get_client()
hdfs_full_name = "{0}/{1}".format(hdfs_path,hdfs_file_name)
with client.write(hdfs_full_name,append=append_file,overwrite=overwrite_file,encoding='utf-8') as writer:
- dump(hdfs_file_content, writer)
+ dump(hdfs_file_content, writer)
return True
except HdfsError:
return False
-
-def delete_folder(hdfs_file,user=None):
- client = _get_client(user)
- client.delete(hdfs_file,recursive=True)
-def list_dir(hdfs_path):
+def delete_folder(hdfs_file, user=None, client=None):
+ if not client:
+ client = get_client()
+
+ try:
+ client.delete(hdfs_file,recursive=True)
+ except HdfsError:
+ return False
+
+
+def check_dir(hdfs_path, client=None):
+ """
+ Returns True if directory exists
+ Returns False if directory does not exist
+ : param hdfs_path: path to check
+ : object client: hdfs client object for persistent connection
+ """
+ if not client:
+ client = get_client()
+
+ result = client.list(hdfs_path)
+ if None not in result:
+ return True
+ else:
+ return False
+
+
+def list_dir(hdfs_path, client=None):
+ if not client:
+ client = get_client()
+
try:
- client = _get_client()
return client.list(hdfs_path)
except HdfsError:
return {}
-def file_exists(hdfs_path,file_name):
- files = list_dir(hdfs_path)
+
+def file_exists(hdfs_path, file_name, client=None):
+ if not client:
+ client = get_client()
+
+ files = list_dir(client, hdfs_path)
if str(file_name) in files:
- return True
+ return True
else:
return False
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/d1f5a67f/spot-oa/api/resources/impala_engine.py
----------------------------------------------------------------------
diff --git a/spot-oa/api/resources/impala_engine.py b/spot-oa/api/resources/impala_engine.py
index b7d0148..542bbd0 100644
--- a/spot-oa/api/resources/impala_engine.py
+++ b/spot-oa/api/resources/impala_engine.py
@@ -15,15 +15,33 @@
# limitations under the License.
#
from impala.dbapi import connect
-import api.resources.configurator as Config
+import api.resources.configurator as config
+
def create_connection():
- impala_host, impala_port = Config.impala()
- db = Config.db()
- conn = connect(host=impala_host, port=int(impala_port),database=db)
+ impala_host, impala_port = config.impala()
+ conf = {}
+
+ # TODO: if using hive, kerberos service name must be changed, impyla sets 'impala' as default
+ service_name = {'kerberos_service_name': 'impala'}
+
+ if config.kerberos_enabled():
+ principal, keytab, sasl_mech, security_proto = config.kerberos()
+ conf.update({'auth_mechanism': 'GSSAPI',
+ })
+
+ if config.ssl_enabled():
+ ssl_verify, ca_location, cert, key = config.ssl()
+ conf.update({'ca_cert': cert,
+ 'use_ssl': ssl_verify
+ })
+
+ db = config.db()
+ conn = connect(host=impala_host, port=int(impala_port), database=db, **conf)
return conn.cursor()
+
def execute_query(query,fetch=False):
impala_cursor = create_connection()
@@ -31,6 +49,7 @@ def execute_query(query,fetch=False):
return impala_cursor if not fetch else impala_cursor.fetchall()
+
def execute_query_as_list(query):
query_results = execute_query(query)
@@ -46,5 +65,3 @@ def execute_query_as_list(query):
row_result = {}
return results
-
-