You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by na...@apache.org on 2018/03/19 15:13:03 UTC

[11/13] incubator-spot git commit: [SPOT-213][SPOT-77][SPOT-221] Update for spot-ingest to support Kerberos, implements hive client and Librdkafka support

[SPOT-213][SPOT-77][SPOT-221] Update for spot-ingest to support Kerberos, implements hive client and Librdkafka support


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/41e51b8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/41e51b8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/41e51b8f

Branch: refs/heads/master
Commit: 41e51b8fab0ba7ebccba10e8e3052c7131cb43dc
Parents: d7b1d37
Author: natedogs911 <na...@gmail.com>
Authored: Tue Jan 23 11:49:40 2018 -0800
Committer: natedogs911 <na...@gmail.com>
Committed: Tue Jan 23 11:49:40 2018 -0800

----------------------------------------------------------------------
 spot-ingest/common/kafka_client.py       | 193 ++++++++++++++++++++------
 spot-ingest/master_collector.py          |  21 +--
 spot-ingest/pipelines/dns/collector.py   | 133 +++++++++++-------
 spot-ingest/pipelines/dns/worker.py      | 141 ++++++++++++++-----
 spot-ingest/pipelines/flow/collector.py  | 111 +++++++++------
 spot-ingest/pipelines/flow/worker.py     | 193 ++++++++++++++++++++------
 spot-ingest/pipelines/proxy/collector.py |   6 +-
 spot-ingest/worker.py                    |   6 +-
 8 files changed, 588 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/common/kafka_client.py
----------------------------------------------------------------------
diff --git a/spot-ingest/common/kafka_client.py b/spot-ingest/common/kafka_client.py
index 977cb92..15441b2 100755
--- a/spot-ingest/common/kafka_client.py
+++ b/spot-ingest/common/kafka_client.py
@@ -19,23 +19,23 @@
 
 import logging
 import os
+import sys
 from common.utils import Util
-from kafka import KafkaProducer
-from kafka import KafkaConsumer as KC
-from kafka.partitioner.roundrobin import RoundRobinPartitioner
-from kafka.common import TopicPartition
+from confluent_kafka import Producer
+from confluent_kafka import Consumer
+import common.configurator as config
 
-class KafkaTopic(object):
 
+class KafkaProducer(object):
 
-    def __init__(self,topic,server,port,zk_server,zk_port,partitions):
+    def __init__(self, topic, server, port, zk_server, zk_port, partitions):
 
-        self._initialize_members(topic,server,port,zk_server,zk_port,partitions)
+        self._initialize_members(topic, server, port, zk_server, zk_port, partitions)
 
-    def _initialize_members(self,topic,server,port,zk_server,zk_port,partitions):
+    def _initialize_members(self, topic, server, port, zk_server, zk_port, partitions):
 
         # get logger isinstance
-        self._logger = logging.getLogger("SPOT.INGEST.KAFKA")
+        self._logger = logging.getLogger("SPOT.INGEST.KafkaProducer")
 
         # kafka requirements
         self._server = server
@@ -46,42 +46,93 @@ class KafkaTopic(object):
         self._num_of_partitions = partitions
         self._partitions = []
         self._partitioner = None
+        self._kafka_brokers = '{0}:{1}'.format(self._server, self._port)
 
         # create topic with partitions
         self._create_topic()
 
-    def _create_topic(self):
-
-        self._logger.info("Creating topic: {0} with {1} parititions".format(self._topic,self._num_of_partitions))     
+        self._kafka_conf = self._producer_config(self._kafka_brokers)
+
+        self._p = Producer(**self._kafka_conf)
+
+    def _producer_config(self, server):
+        # type: (str) -> dict
+        """Returns a configuration dictionary containing optional values"""
+
+        connection_conf = {
+            'bootstrap.servers': server,
+        }
+
+        if os.environ.get('KAFKA_DEBUG'):
+            connection_conf.update({'debug': 'all'})
+
+        if config.kerberos_enabled():
+            self._logger.info('Kerberos enabled')
+            principal, keytab, sasl_mech, security_proto = config.kerberos()
+            connection_conf.update({
+                'sasl.mechanisms': sasl_mech,
+                'security.protocol': security_proto,
+                'sasl.kerberos.principal': principal,
+                'sasl.kerberos.keytab': keytab,
+                'sasl.kerberos.min.time.before.relogin': 6000
+            })
+
+            sn = os.environ.get('KAFKA_SERVICE_NAME')
+            if sn:
+                self._logger.info('Setting Kerberos service name: ' + sn)
+                connection_conf.update({'sasl.kerberos.service.name': sn})
+
+            kinit_cmd = os.environ.get('KAFKA_KINIT')
+            if kinit_cmd:
+                self._logger.info('using kinit command: ' + kinit_cmd)
+                connection_conf.update({'sasl.kerberos.kinit.cmd': kinit_cmd})
+            else:
+                # Using -S %{sasl.kerberos.service.name}/%{broker.name} causes the ticket cache to refresh
+                # resulting in authentication errors for other services
+                connection_conf.update({
+                    'sasl.kerberos.kinit.cmd': 'kinit -S "%{sasl.kerberos.service.name}/%{broker.name}" -k -t "%{sasl.kerberos.keytab}" %{sasl.kerberos.principal}'
+                })
+
+        if config.ssl_enabled():
+            self._logger.info('Using SSL connection settings')
+            ssl_verify, ca_location, cert, key = config.ssl()
+            connection_conf.update({
+                'ssl.certificate.location': cert,
+                'ssl.ca.location': ca_location,
+                'ssl.key.location': key
+            })
+
+        return connection_conf
 
-        # Create partitions for the workers.
-        self._partitions = [ TopicPartition(self._topic,p) for p in range(int(self._num_of_partitions))]        
+    def _create_topic(self):
 
-        # create partitioner
-        self._partitioner = RoundRobinPartitioner(self._partitions)
+        self._logger.info("Creating topic: {0} with {1} parititions".format(self._topic, self._num_of_partitions))
         
         # get script path 
-        zk_conf = "{0}:{1}".format(self._zk_server,self._zk_port)
-        create_topic_cmd = "{0}/kafka_topic.sh create {1} {2} {3}".format(os.path.dirname(os.path.abspath(__file__)),self._topic,zk_conf,self._num_of_partitions)
+        zk_conf = "{0}:{1}".format(self._zk_server, self._zk_port)
+        create_topic_cmd = "{0}/kafka_topic.sh create {1} {2} {3}".format(
+            os.path.dirname(os.path.abspath(__file__)),
+            self._topic,
+            zk_conf,
+            self._num_of_partitions
+        )
 
         # execute create topic cmd
-        Util.execute_cmd(create_topic_cmd,self._logger)
+        Util.execute_cmd(create_topic_cmd, self._logger)
 
-    def send_message(self,message,topic_partition):
+    def SendMessage(self, message, topic):
+        p = self._p
+        p.produce(topic, message.encode('utf-8'), callback=self._delivery_callback)
+        p.poll(0)
+        p.flush(timeout=3600000)
 
-        self._logger.info("Sending message to: Topic: {0} Partition:{1}".format(self._topic,topic_partition))
-        kafka_brokers = '{0}:{1}'.format(self._server,self._port)             
-        producer = KafkaProducer(bootstrap_servers=[kafka_brokers],api_version_auto_timeout_ms=3600000)
-        future = producer.send(self._topic,message,partition=topic_partition)
-        producer.flush(timeout=3600000)
-        producer.close()
-    
     @classmethod
-    def SendMessage(cls,message,kafka_servers,topic,partition=0):
-        producer = KafkaProducer(bootstrap_servers=kafka_servers,api_version_auto_timeout_ms=3600000)
-        future = producer.send(topic,message,partition=partition)
-        producer.flush(timeout=3600000)
-        producer.close()  
+    def _delivery_callback(cls, err, msg):
+        if err:
+            sys.stderr.write('%% Message failed delivery: %s\n' % err)
+        else:
+            sys.stderr.write('%% Message delivered to %s [%d]\n' %
+                             (msg.topic(), msg.partition()))
 
     @property
     def Topic(self):
@@ -93,22 +144,24 @@ class KafkaTopic(object):
 
     @property
     def Zookeeper(self):
-        zk = "{0}:{1}".format(self._zk_server,self._zk_port)
+        zk = "{0}:{1}".format(self._zk_server, self._zk_port)
         return zk
 
     @property
     def BootstrapServers(self):
-        servers = "{0}:{1}".format(self._server,self._port) 
+        servers = "{0}:{1}".format(self._server, self._port)
         return servers
 
 
 class KafkaConsumer(object):
     
-    def __init__(self,topic,server,port,zk_server,zk_port,partition):
+    def __init__(self, topic, server, port, zk_server, zk_port, partition):
+
+        self._initialize_members(topic, server, port, zk_server, zk_port, partition)
 
-        self._initialize_members(topic,server,port,zk_server,zk_port,partition)
+    def _initialize_members(self, topic, server, port, zk_server, zk_port, partition):
 
-    def _initialize_members(self,topic,server,port,zk_server,zk_port,partition):
+        self._logger = logging.getLogger("SPOT.INGEST.KafkaConsumer")
 
         self._topic = topic
         self._server = server
@@ -116,14 +169,64 @@ class KafkaConsumer(object):
         self._zk_server = zk_server
         self._zk_port = zk_port
         self._id = partition
+        self._kafka_brokers = '{0}:{1}'.format(self._server, self._port)
+        self._kafka_conf = self._consumer_config(self._id, self._kafka_brokers)
+
+    def _consumer_config(self, groupid, server):
+        # type: (dict) -> dict
+        """Returns a configuration dictionary containing optional values"""
+
+        connection_conf = {
+            'bootstrap.servers': server,
+            'group.id': groupid,
+        }
+
+        if config.kerberos_enabled():
+            self._logger.info('Kerberos enabled')
+            principal, keytab, sasl_mech, security_proto = config.kerberos()
+            connection_conf.update({
+                'sasl.mechanisms': sasl_mech,
+                'security.protocol': security_proto,
+                'sasl.kerberos.principal': principal,
+                'sasl.kerberos.keytab': keytab,
+                'sasl.kerberos.min.time.before.relogin': 6000,
+                'default.topic.config': {
+                    'auto.commit.enable': 'true',
+                    'auto.commit.interval.ms': '60000',
+                    'auto.offset.reset': 'smallest'}
+            })
+
+            sn = os.environ.get('KAFKA_SERVICE_NAME')
+            if sn:
+                self._logger.info('Setting Kerberos service name: ' + sn)
+                connection_conf.update({'sasl.kerberos.service.name': sn})
+
+            kinit_cmd = os.environ.get('KAFKA_KINIT')
+            if kinit_cmd:
+                self._logger.info('using kinit command: ' + kinit_cmd)
+                connection_conf.update({'sasl.kerberos.kinit.cmd': kinit_cmd})
+            else:
+                # Using -S %{sasl.kerberos.service.name}/%{broker.name} causes the ticket cache to refresh
+                # resulting in authentication errors for other services
+                connection_conf.update({
+                    'sasl.kerberos.kinit.cmd': 'kinit -k -t "%{sasl.kerberos.keytab}" %{sasl.kerberos.principal}'
+                })
+
+        if config.ssl_enabled():
+            self._logger.info('Using SSL connection settings')
+            ssl_verify, ca_location, cert, key = config.ssl()
+            connection_conf.update({
+                'ssl.certificate.location': cert,
+                'ssl.ca.location': ca_location,
+                'ssl.key.location': key
+            })
+
+        return connection_conf
 
     def start(self):
-        
-        kafka_brokers = '{0}:{1}'.format(self._server,self._port)
-        consumer =  KC(bootstrap_servers=[kafka_brokers],group_id=self._topic)
-        partition = [TopicPartition(self._topic,int(self._id))]
-        consumer.assign(partitions=partition)
-        consumer.poll()
+
+        consumer = Consumer(**self._kafka_conf)
+        consumer.subscribe([self._topic])
         return consumer
 
     @property
@@ -132,6 +235,4 @@ class KafkaConsumer(object):
 
     @property
     def ZookeperServer(self):
-        return "{0}:{1}".format(self._zk_server,self._zk_port)
-
-    
+        return "{0}:{1}".format(self._zk_server, self._zk_port)

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/master_collector.py
----------------------------------------------------------------------
diff --git a/spot-ingest/master_collector.py b/spot-ingest/master_collector.py
index 6f6ff7c..23be9f4 100755
--- a/spot-ingest/master_collector.py
+++ b/spot-ingest/master_collector.py
@@ -24,14 +24,15 @@ import sys
 import datetime
 from common.utils import Util
 from common.kerberos import Kerberos
-from common.kafka_client import KafkaTopic
-
+import common.configurator as Config
+from common.kafka_client import KafkaProducer
 
 # get master configuration.
 SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
 CONF_FILE = "{0}/ingest_conf.json".format(SCRIPT_PATH)
 MASTER_CONF = json.loads(open(CONF_FILE).read())
 
+
 def main():
 
     # input Parameters
@@ -49,6 +50,7 @@ def main():
     # start collector based on data source type.
     start_collector(args.type, args.workers_num, args.ingest_id)
 
+
 def start_collector(type, workers_num, id=None):
 
     # generate ingest id
@@ -68,7 +70,7 @@ def start_collector(type, workers_num, id=None):
         sys.exit(1)
 
     # validate if kerberos authentication is required.
-    if os.getenv('KRB_AUTH'):
+    if Config.kerberos_enabled():
         kb = Kerberos()
         kb.authenticate()
 
@@ -80,17 +82,20 @@ def start_collector(type, workers_num, id=None):
     # required zookeeper info.
     zk_server = MASTER_CONF["kafka"]['zookeper_server']
     zk_port = MASTER_CONF["kafka"]['zookeper_port']
-
-    topic = "SPOT-INGEST-{0}_{1}".format(type, ingest_id) if not id else id
-    kafka = KafkaTopic(topic, k_server, k_port, zk_server, zk_port, workers_num)
+         
+    topic = "{0}".format(type,ingest_id) if not id else id
+    producer = KafkaProducer(topic, k_server, k_port, zk_server, zk_port, workers_num)
 
     # create a collector instance based on data source type.
     logger.info("Starting {0} ingest instance".format(topic))
-    module = __import__("pipelines.{0}.collector".format(MASTER_CONF["pipelines"][type]["type"]), fromlist=['Collector'])
+    module = __import__("pipelines.{0}.collector".
+                        format(MASTER_CONF["pipelines"][type]["type"]),
+                        fromlist=['Collector'])
 
     # start collector.
-    ingest_collector = module.Collector(MASTER_CONF['hdfs_app_path'], kafka, type)
+    ingest_collector = module.Collector(MASTER_CONF['hdfs_app_path'], producer, type)
     ingest_collector.start()
 
+
 if __name__ == '__main__':
     main()

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/pipelines/dns/collector.py
----------------------------------------------------------------------
diff --git a/spot-ingest/pipelines/dns/collector.py b/spot-ingest/pipelines/dns/collector.py
index c421c47..97c5ed6 100755
--- a/spot-ingest/pipelines/dns/collector.py
+++ b/spot-ingest/pipelines/dns/collector.py
@@ -18,26 +18,29 @@
 #
 
 import time
+import logging
 import os
-import subprocess
 import json
-import logging
 from multiprocessing import Process
 from common.utils import Util
+from common import hdfs_client as hdfs
+from common.hdfs_client import HdfsException
 from common.file_collector import FileWatcher
 from multiprocessing import Pool
-from common.kafka_client import KafkaTopic
+
 
 class Collector(object):
 
-    def __init__(self, hdfs_app_path, kafka_topic, conf_type):
-        self._initialize_members(hdfs_app_path, kafka_topic, conf_type)
+    def __init__(self, hdfs_app_path, kafkaproducer, conf_type):
+
+        self._initialize_members(hdfs_app_path, kafkaproducer, conf_type)
+
+    def _initialize_members(self, hdfs_app_path, kafkaproducer, conf_type):
 
-    def _initialize_members(self, hdfs_app_path, kafka_topic, conf_type):
         # getting parameters.
         self._logger = logging.getLogger('SPOT.INGEST.DNS')
         self._hdfs_app_path = hdfs_app_path
-        self._kafka_topic = kafka_topic
+        self._producer = kafkaproducer
 
         # get script path
         self._script_path = os.path.dirname(os.path.abspath(__file__))
@@ -64,6 +67,8 @@ class Collector(object):
         self._processes = conf["collector_processes"]
         self._ingestion_interval = conf["ingestion_interval"]
         self._pool = Pool(processes=self._processes)
+        # TODO: review re-use of hdfs.client
+        self._hdfs_client = hdfs.get_client()
 
     def start(self):
 
@@ -74,74 +79,108 @@ class Collector(object):
             while True:
                 self._ingest_files_pool()
                 time.sleep(self._ingestion_interval)
-
         except KeyboardInterrupt:
             self._logger.info("Stopping DNS collector...")
-            Util.remove_kafka_topic(self._kafka_topic.Zookeeper, self._kafka_topic.Topic, self._logger)
+            Util.remove_kafka_topic(self._producer.Zookeeper, self._producer.Topic, self._logger)
             self._watcher.stop()
             self._pool.terminate()
             self._pool.close()
             self._pool.join()
             SystemExit("Ingest finished...")
 
-
     def _ingest_files_pool(self):
+
         if self._watcher.HasFiles:
+
             for x in range(0, self._processes):
-                file = self._watcher.GetNextFile()
-                resutl = self._pool.apply_async(ingest_file, args=(file, self._pkt_num, self._pcap_split_staging, self._kafka_topic.Partition, self._hdfs_root_path, self._kafka_topic.Topic, self._kafka_topic.BootstrapServers, ))
-                #resutl.get() # to debug add try and catch.
-                if  not self._watcher.HasFiles: break    
+                self._logger.info('processes: {0}'.format(self._processes))
+                new_file = self._watcher.GetNextFile()
+                if self._processes <= 1:
+                    _ingest_file(
+                        self._hdfs_client,
+                        new_file,
+                        self._pkt_num,
+                        self._pcap_split_staging,
+                        self._hdfs_root_path,
+                        self._producer,
+                        self._producer.Topic
+                        )
+                else:
+                    resutl = self._pool.apply_async(_ingest_file, args=(
+                        self._hdfs_client,
+                        new_file,
+                        self._pkt_num,
+                        self._pcap_split_staging,
+                        self._hdfs_root_path,
+                        self._producer,
+                        self._producer.Topic
+                        ))
+                # resutl.get() # to debug add try and catch.
+                if not self._watcher.HasFiles:
+                    break
         return True
 
-def ingest_file(file,pkt_num,pcap_split_staging, partition,hdfs_root_path,topic,kafka_servers):
+
+def _ingest_file(hdfs_client, new_file, pkt_num, pcap_split_staging, hdfs_root_path, producer, topic):
 
     logger = logging.getLogger('SPOT.INGEST.DNS.{0}'.format(os.getpid()))
     
     try:
         # get file name and date.
-        org_file = file
-        file_name_parts = file.split('/')
+        org_file = new_file
+        file_name_parts = new_file.split('/')
         file_name = file_name_parts[len(file_name_parts)-1]
 
         # split file.
         name = file_name.split('.')[0]
-        split_cmd = "editcap -c {0} {1} {2}/{3}_spot.pcap".format(pkt_num,file,pcap_split_staging,name)
+        split_cmd = "editcap -c {0} {1} {2}/{3}_spot.pcap".format(pkt_num,
+                                                                  new_file,
+                                                                  pcap_split_staging,
+                                                                  name)
         logger.info("Splitting file: {0}".format(split_cmd))
         Util.execute_cmd(split_cmd,logger)
 
         logger.info("Removing file: {0}".format(org_file))
         rm_big_file = "rm {0}".format(org_file)
-        Util.execute_cmd(rm_big_file,logger)    
-
-        for currdir,subdir,files in os.walk(pcap_split_staging):
-            for file in files:
-                if file.endswith(".pcap") and "{0}_spot".format(name) in file:
-
-                        # get timestamp from the file name to build hdfs path.
-                        file_date = file.split('.')[0]
-                        pcap_hour = file_date[-6:-4]
-                        pcap_date_path = file_date[-14:-6]
-
-                        # hdfs path with timestamp.
-                        hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path,pcap_date_path,pcap_hour)
-
-                        # create hdfs path.
-                        Util.creat_hdfs_folder(hdfs_path,logger)
-
-                        # load file to hdfs.
-                        hadoop_pcap_file = "{0}/{1}".format(hdfs_path,file)
-                        Util.load_to_hdfs(os.path.join(currdir,file),hadoop_pcap_file,logger)
-
-                        # create event for workers to process the file.
-                        logger.info( "Sending split file to worker number: {0}".format(partition))
-                        KafkaTopic.SendMessage(hadoop_pcap_file,kafka_servers,topic,partition)
-                        logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(file,topic))
-
+        Util.execute_cmd(rm_big_file,logger)
 
-  
     except Exception as err:
-        
-        logger.error("There was a problem, please check the following error message:{0}".format(err.message))
+        logger.error("There was a problem splitting the file: {0}".format(err.message))
         logger.error("Exception: {0}".format(err))
 
+    for currdir, subdir, files in os.walk(pcap_split_staging):
+        for file in files:
+            if file.endswith(".pcap") and "{0}_spot".format(name) in file:
+                # get timestamp from the file name to build hdfs path.
+                file_date = file.split('.')[0]
+                pcap_hour = file_date[-6:-4]
+                pcap_date_path = file_date[-14:-6]
+
+                # hdfs path with timestamp.
+                hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path, pcap_date_path, pcap_hour)
+
+                # create hdfs path.
+                try:
+                    if len(hdfs.list_dir(hdfs_path, hdfs_client)) == 0:
+                        logger.info('creating directory: ' + hdfs_path)
+                        hdfs_client.mkdir(hdfs_path, hdfs_client)
+
+                    # load file to hdfs.
+                    hadoop_pcap_file = "{0}/{1}".format(hdfs_path,file)
+                    result = hdfs_client.upload_file(hadoop_pcap_file, os.path.join(currdir,file))
+                    if not result:
+                        logger.error('File failed to upload: ' + hadoop_pcap_file)
+                        raise HdfsException
+
+                    # create event for workers to process the file.
+                    logger.info( "Sending split file to Topic: {0}".format(topic))
+                    producer.SendMessage(hadoop_pcap_file, topic)
+                    logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(file,topic))
+
+                except HdfsException as err:
+                    logger.error('Exception: ' + err.exception)
+                    logger.info('Check Hdfs Connection settings and server health')
+
+                except Exception as err:
+                    logger.info("File {0} failed to be sent to Kafka Topic to: {1}".format(new_file,topic))
+                    logger.error("Error: {0}".format(err))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/pipelines/dns/worker.py
----------------------------------------------------------------------
diff --git a/spot-ingest/pipelines/dns/worker.py b/spot-ingest/pipelines/dns/worker.py
index 6f51f45..f23fa8f 100755
--- a/spot-ingest/pipelines/dns/worker.py
+++ b/spot-ingest/pipelines/dns/worker.py
@@ -21,18 +21,22 @@ import logging
 import datetime
 import subprocess
 import json
+import sys
 import os
 from multiprocessing import Process
 from common.utils import Util
+from common import hive_engine
+from common import hdfs_client as hdfs
+from confluent_kafka import KafkaError, KafkaException
 
 
 class Worker(object):
 
-    def __init__(self,db_name,hdfs_app_path,kafka_consumer,conf_type,processes=None):
+    def __init__(self, db_name, hdfs_app_path, kafka_consumer, conf_type, processes=None):
         
-        self._initialize_members(db_name,hdfs_app_path,kafka_consumer,conf_type)
+        self._initialize_members(db_name,hdfs_app_path, kafka_consumer, conf_type)
 
-    def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type):
+    def _initialize_members(self, db_name, hdfs_app_path, kafka_consumer, conf_type):
 
         # get logger instance.
         self._logger = Util.get_logger('SPOT.INGEST.WRK.DNS')
@@ -44,32 +48,58 @@ class Worker(object):
         self._script_path = os.path.dirname(os.path.abspath(__file__))
         conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path)))
         conf = json.loads(open(conf_file).read())
-        self._conf = conf["pipelines"][conf_type] 
+        self._conf = conf["pipelines"][conf_type]
+        self._id = "spot-{0}-worker".format(conf_type)
 
         self._process_opt = self._conf['process_opt']
         self._local_staging = self._conf['local_staging']
         self.kafka_consumer = kafka_consumer
 
+        self._cursor = hive_engine.create_connection()
+
     def start(self):
 
         self._logger.info("Listening topic:{0}".format(self.kafka_consumer.Topic))
-        for message in self.kafka_consumer.start():
-            self._new_file(message.value)
-
-    def _new_file(self,file):
-
-        self._logger.info("-------------------------------------- New File received --------------------------------------")
+        consumer = self.kafka_consumer.start()
+
+        try:
+            while True:
+                message = consumer.poll(timeout=1.0)
+                if message is None:
+                    continue
+                if not message.error():
+                    self._new_file(message.value().decode('utf-8'))
+                elif message.error():
+                    if message.error().code() == KafkaError._PARTITION_EOF:
+                        continue
+                    elif message.error:
+                        raise KafkaException(message.error())
+
+        except KeyboardInterrupt:
+            sys.stderr.write('%% Aborted by user\n')
+
+        consumer.close()
+
+    def _new_file(self, nf):
+
+        self._logger.info(
+            "-------------------------------------- New File received --------------------------------------"
+        )
         self._logger.info("File: {0} ".format(file))        
-        p = Process(target=self._process_new_file, args=(file,))
+        p = Process(target=self._process_new_file, args=nf)
         p.start() 
         p.join()
 
-    def _process_new_file(self,file):
+    def _process_new_file(self, nf):
+
 
         # get file from hdfs
-        get_file_cmd = "hadoop fs -get {0} {1}.".format(file,self._local_staging)
-        self._logger.info("Getting file from hdfs: {0}".format(get_file_cmd))
-        Util.execute_cmd(get_file_cmd,self._logger)
+        self._logger.info("Getting file from hdfs: {0}".format(nf))
+        if hdfs.file_exists(nf):
+            hdfs.download_file(nf, self._local_staging)
+        else:
+            self._logger.info("file: {0} not found".format(nf))
+            # TODO: error handling
 
         # get file name and date
         file_name_parts = file.split('/')
@@ -82,37 +112,86 @@ class Worker(object):
         binary_day = binary_date_path[6:8]
 
         # build process cmd.
-        process_cmd = "tshark -r {0}{1} {2} > {0}{1}.csv".format(self._local_staging,file_name,self._process_opt)
+        process_cmd = "tshark -r {0}{1} {2} > {0}{1}.csv".format(self._local_staging, file_name, self._process_opt)
         self._logger.info("Processing file: {0}".format(process_cmd))
-        Util.execute_cmd(process_cmd,self._logger)
+        Util.execute_cmd(process_cmd, self._logger)
 
         # create hdfs staging.
         hdfs_path = "{0}/dns".format(self._hdfs_app_path)
         staging_timestamp = datetime.datetime.now().strftime('%M%S%f')[:-4]
         hdfs_staging_path =  "{0}/stage/{1}".format(hdfs_path,staging_timestamp)
-        create_staging_cmd = "hadoop fs -mkdir -p {0}".format(hdfs_staging_path)
-        self._logger.info("Creating staging: {0}".format(create_staging_cmd))
-        Util.execute_cmd(create_staging_cmd,self._logger)
+        self._logger.info("Creating staging: {0}".format(hdfs_staging_path))
+        hdfs.mkdir(hdfs_staging_path)
 
         # move to stage.
-        mv_to_staging ="hadoop fs -moveFromLocal {0}{1}.csv {2}/.".format(self._local_staging,file_name,hdfs_staging_path)
-        self._logger.info("Moving data to staging: {0}".format(mv_to_staging))
-        Util.execute_cmd(mv_to_staging,self._logger)
+        local_file = "{0}{1}.csv".format(self._local_staging, file_name)
+        self._logger.info("Moving data to staging: {0}".format(hdfs_staging_path))
+        hdfs.upload_file(hdfs_staging_path, local_file)
 
         #load to avro
-        load_to_avro_cmd = "hive -hiveconf dbname={0} -hiveconf y={1} -hiveconf m={2} -hiveconf d={3} -hiveconf h={4} -hiveconf data_location='{5}' -f pipelines/dns/load_dns_avro_parquet.hql".format(self._db_name,binary_year,binary_month,binary_day,binary_hour,hdfs_staging_path)
-
-        self._logger.info("Loading data to hive: {0}".format(load_to_avro_cmd))
-        Util.execute_cmd(load_to_avro_cmd,self._logger)
+        drop_table = 'DROP TABLE IF EXISTS {0}.dns_tmp'.format(self._db_name)
+        self._cursor.execute(drop_table)
+
+        # Create external table
+        create_external = ("\n"
+                           "CREATE EXTERNAL TABLE {0}.dns_tmp (\n"
+                           "  frame_day STRING,\n"
+                           "  frame_time STRING,\n"
+                           "  unix_tstamp BIGINT,\n"
+                           "  frame_len INT,\n"
+                           "  ip_src STRING,\n"
+                           "  ip_dst STRING,\n"
+                           "  dns_qry_name STRING,\n"
+                           "  dns_qry_type INT,\n"
+                           "  dns_qry_class STRING,\n"
+                           "  dns_qry_rcode INT,\n"
+                           "  dns_a STRING  \n"
+                           "  )\n"
+                           "  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','\n"
+                           "  STORED AS TEXTFILE\n"
+                           "  LOCATION '{1}'\n"
+                           "  TBLPROPERTIES ('avro.schema.literal'='{{\n"
+                           "  \"type\":   \"record\"\n"
+                           "  , \"name\":   \"RawDnsRecord\"\n"
+                           "  , \"namespace\" : \"com.cloudera.accelerators.dns.avro\"\n"
+                           "  , \"fields\": [\n"
+                           "      {{\"name\": \"frame_day\",        \"type\":[\"string\", \"null\"]}\n"
+                           "      , {{\"name\": \"frame_time\",     \"type\":[\"string\", \"null\"]}\n"
+                           "      , {{\"name\": \"unix_tstamp\",    \"type\":[\"bigint\", \"null\"]}\n"
+                           "      , {{\"name\": \"frame_len\",      \"type\":[\"int\",    \"null\"]}\n"
+                           "      , {{\"name\": \"ip_src\",         \"type\":[\"string\", \"null\"]}\n"
+                           "      , {{\"name\": \"ip_dst\",         \"type\":[\"string\", \"null\"]}\n"
+                           "      , {{\"name\": \"dns_qry_name\",   \"type\":[\"string\", \"null\"]}\n"
+                           "      , {{\"name\": \"dns_qry_type\",   \"type\":[\"int\",    \"null\"]}\n"
+                           "      , {{\"name\": \"dns_qry_class\",  \"type\":[\"string\", \"null\"]}\n"
+                           "      , {{\"name\": \"dns_qry_rcode\",  \"type\":[\"int\",    \"null\"]}\n"
+                           "      , {{\"name\": \"dns_a\",          \"type\":[\"string\", \"null\"]}\n"
+                           "      ]\n"
+                           "}')\n"
+                           ).format(self._db_name, hdfs_staging_path)
+        self._logger.info( "Creating external table: {0}".format(create_external))
+        self._cursor.execute(create_external)
+
+        # Insert data
+        insert_into_table = """
+            INSERT INTO TABLE {0}.dns
+            PARTITION (y={1}, m={2}, d={3}, h={4)
+            SELECT   CONCAT(frame_day , frame_time) as treceived, unix_tstamp, frame_len, ip_dst, ip_src, dns_qry_name,
+            dns_qry_class,dns_qry_type, dns_qry_rcode, dns_a 
+            FROM {0}.dns_tmp
+        """.format(self._db_name,binary_year,binary_month,binary_day,binary_hour)
+        self._logger.info( "Loading data to {0}: {1}"
+                           .format(self._db_name, insert_into_table)
+                           )
+        self._cursor.execute(insert_into_table)
 
         # remove from hdfs staging
-        rm_hdfs_staging_cmd = "hadoop fs -rm -R -skipTrash {0}".format(hdfs_staging_path)
-        self._logger.info("Removing staging path: {0}".format(rm_hdfs_staging_cmd))
-        Util.execute_cmd(rm_hdfs_staging_cmd,self._logger)
+        self._logger.info("Removing staging path: {0}".format(hdfs_staging_path))
+        hdfs.delete_folder(hdfs_staging_path)
 
         # remove from local staging.
         rm_local_staging = "rm {0}{1}".format(self._local_staging,file_name)
         self._logger.info("Removing files from local staging: {0}".format(rm_local_staging))
-        Util.execute_cmd(rm_local_staging,self._logger)
+        Util.execute_cmd(rm_local_staging, self._logger)
 
         self._logger.info("File {0} was successfully processed.".format(file_name))

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/pipelines/flow/collector.py
----------------------------------------------------------------------
diff --git a/spot-ingest/pipelines/flow/collector.py b/spot-ingest/pipelines/flow/collector.py
index b9a97f2..5e5cd49 100755
--- a/spot-ingest/pipelines/flow/collector.py
+++ b/spot-ingest/pipelines/flow/collector.py
@@ -23,22 +23,24 @@ import os
 import json
 from multiprocessing import Process
 from common.utils import Util
+from common import hdfs_client as hdfs
+from common.hdfs_client import HdfsException
 from common.file_collector import FileWatcher
 from multiprocessing import Pool
-from common.kafka_client import KafkaTopic
+
 
 class Collector(object):
 
-    def __init__(self,hdfs_app_path,kafka_topic,conf_type):
+    def __init__(self, hdfs_app_path, kafkaproducer, conf_type):
         
-        self._initialize_members(hdfs_app_path,kafka_topic,conf_type)
+        self._initialize_members(hdfs_app_path, kafkaproducer, conf_type)
+
+    def _initialize_members(self, hdfs_app_path, kafkaproducer, conf_type):
 
-    def _initialize_members(self,hdfs_app_path,kafka_topic,conf_type):
-  
         # getting parameters.
         self._logger = logging.getLogger('SPOT.INGEST.FLOW')
         self._hdfs_app_path = hdfs_app_path
-        self._kafka_topic = kafka_topic
+        self._producer = kafkaproducer
 
         # get script path
         self._script_path = os.path.dirname(os.path.abspath(__file__))
@@ -62,6 +64,8 @@ class Collector(object):
         self._processes = conf["collector_processes"]
         self._ingestion_interval = conf["ingestion_interval"]
         self._pool = Pool(processes=self._processes)
+        # TODO: review re-use of hdfs.client
+        self._hdfs_client = hdfs.get_client()
 
     def start(self):
 
@@ -74,54 +78,83 @@ class Collector(object):
                 time.sleep(self._ingestion_interval)
         except KeyboardInterrupt:
             self._logger.info("Stopping FLOW collector...")  
-            Util.remove_kafka_topic(self._kafka_topic.Zookeeper,self._kafka_topic.Topic,self._logger)          
+            Util.remove_kafka_topic(self._producer.Zookeeper, self._producer.Topic, self._logger)
             self._watcher.stop()
             self._pool.terminate()
             self._pool.close()            
             self._pool.join()
             SystemExit("Ingest finished...")
-    
 
     def _ingest_files_pool(self):            
        
         if self._watcher.HasFiles:
             
-            for x in range(0,self._processes):
-                file = self._watcher.GetNextFile()
-                resutl = self._pool.apply_async(ingest_file,args=(file,self._kafka_topic.Partition,self._hdfs_root_path ,self._kafka_topic.Topic,self._kafka_topic.BootstrapServers,))
-                #resutl.get() # to debug add try and catch.
-                if  not self._watcher.HasFiles: break    
+            for x in range(0, self._processes):
+                self._logger.info('processes: {0}'.format(self._processes))
+                new_file = self._watcher.GetNextFile()
+                if self._processes <= 1:
+                    _ingest_file(
+                                 new_file,
+                                 self._hdfs_root_path,
+                                 self._producer,
+                                 self._producer.Topic
+                                 )
+                else:
+                    result = self._pool.apply_async(_ingest_file, args=(
+                        new_file,
+                        self._hdfs_root_path,
+                        self._producer,
+                        self._producer.Topic
+                    ))
+                    # result.get()  # to debug add try and catch.
+                if not self._watcher.HasFiles:
+                    break
         return True
-    
 
 
-def ingest_file(file,partition,hdfs_root_path,topic,kafka_servers):
+def _ingest_file(new_file, hdfs_root_path, producer, topic):
 
-        logger = logging.getLogger('SPOT.INGEST.FLOW.{0}'.format(os.getpid()))
-
-        try:
+    logger = logging.getLogger('SPOT.INGEST.FLOW.{0}'.format(os.getpid()))
 
-            # get file name and date.
-            file_name_parts = file.split('/')
-            file_name = file_name_parts[len(file_name_parts)-1]
-            file_date = file_name.split('.')[1]
+    try:
 
-            file_date_path = file_date[0:8]
-            file_date_hour = file_date[8:10]
+        # get file name and date.
+        file_name_parts = new_file.split('/')
+        file_name = file_name_parts[len(file_name_parts)-1]
+        file_date = file_name.split('.')[1]
+        file_date_path = file_date[0:8]
+        file_date_hour = file_date[8:10]
 
-            # hdfs path with timestamp.
-            hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path,file_date_path,file_date_hour)
-            Util.creat_hdfs_folder(hdfs_path,logger)
+        # hdfs path with timestamp.
+        hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path, file_date_path, file_date_hour)
+        hdfs_file = "{0}/{1}".format(hdfs_path, file_name)
 
-            # load to hdfs.
-            hdfs_file = "{0}/{1}".format(hdfs_path,file_name)
-            Util.load_to_hdfs(file,hdfs_file,logger)
-
-            # create event for workers to process the file.
-            logger.info("Sending file to worker number: {0}".format(partition))
-            KafkaTopic.SendMessage(hdfs_file,kafka_servers,topic,partition)    
-            logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(file,topic))
-
-        except Exception as err:
-            logger.error("There was a problem, please check the following error message:{0}".format(err.message))
-            logger.error("Exception: {0}".format(err))
+        try:
+            if len(hdfs.list_dir(hdfs_path)) == 0:
+                logger.info('creating directory: ' + hdfs_path)
+                hdfs.mkdir(hdfs_path)
+            logger.info('uploading file to hdfs: ' + hdfs_file)
+            result = hdfs.upload_file(hdfs_path, new_file)
+            if not result:
+                logger.error('File failed to upload: ' + hdfs_file)
+                raise HdfsException
+            else:
+                rm_file = "rm {0}".format(new_file)
+                logger.info("Removing files from local staging: {0}".format(rm_file))
+                Util.execute_cmd(rm_file, logger)
+
+        except HdfsException as err:
+            logger.error('Exception: ' + err.exception)
+            logger.info('Check Hdfs Connection settings and server health')
+
+    except Exception as err:
+        logger.error("There was a problem, Exception: {0}".format(err))
+
+        # create event for workers to process the file.
+        # logger.info("Sending file to worker number: {0}".format(partition))
+    try:
+        producer.SendMessage(hdfs_file, topic)
+        logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(hdfs_file, topic))
+    except Exception as err:
+        logger.info("File {0} failed to be sent to Kafka Topic to: {1}".format(hdfs_file, topic))
+        logger.error("Error: {0}".format(err))

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/pipelines/flow/worker.py
----------------------------------------------------------------------
diff --git a/spot-ingest/pipelines/flow/worker.py b/spot-ingest/pipelines/flow/worker.py
index 1630022..bb957a5 100755
--- a/spot-ingest/pipelines/flow/worker.py
+++ b/spot-ingest/pipelines/flow/worker.py
@@ -22,17 +22,20 @@ import subprocess
 import datetime
 import logging
 import os
-import json 
+import json
 from multiprocessing import Process
 from common.utils import Util
+from common import hive_engine
+from common import hdfs_client as hdfs
+from confluent_kafka import KafkaError, KafkaException
 
 
 class Worker(object):
 
-    def __init__(self,db_name,hdfs_app_path,kafka_consumer,conf_type,processes=None):
-        self._initialize_members(db_name,hdfs_app_path,kafka_consumer,conf_type)
+    def __init__(self, db_name, hdfs_app_path, kafka_consumer, conf_type, processes=None):
+        self._initialize_members(db_name, hdfs_app_path, kafka_consumer, conf_type)
 
-    def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type):
+    def _initialize_members(self, db_name, hdfs_app_path, kafka_consumer, conf_type):
 
         # get logger instance.
         self._logger = Util.get_logger('SPOT.INGEST.WRK.FLOW')
@@ -45,76 +48,186 @@ class Worker(object):
         conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path)))
         conf = json.loads(open(conf_file).read())
         self._conf = conf["pipelines"][conf_type]
+        self._id = "spot-{0}-worker".format(conf_type)
 
         self._process_opt = self._conf['process_opt']
         self._local_staging = self._conf['local_staging']
         self.kafka_consumer = kafka_consumer
 
+        # self._cursor = hive_engine.create_connection()
+        self._cursor = hive_engine
+
     def start(self):
 
         self._logger.info("Listening topic:{0}".format(self.kafka_consumer.Topic))
-        for message in self.kafka_consumer.start():
-            self._new_file(message.value)
-
-    def _new_file(self,file):
-
-        self._logger.info("-------------------------------------- New File received --------------------------------------")
-        self._logger.info("File: {0} ".format(file))        
-        p = Process(target=self._process_new_file, args=(file,))
+        consumer = self.kafka_consumer.start()
+        try:
+            while True:
+                message = consumer.poll(timeout=1.0)
+                if message is None:
+                    continue
+                if not message.error():
+                    self._new_file(message.value().decode('utf-8'))
+                elif message.error():
+                    if message.error().code() == KafkaError._PARTITION_EOF:
+                        continue
+                    elif message.error:
+                        raise KafkaException(message.error())
+
+        except KeyboardInterrupt:
+            sys.stderr.write('%% Aborted by user\n')
+
+        consumer.close()
+
+    def _new_file(self, nf):
+
+        self._logger.info(
+            "-------------------------------------- New File received --------------------------------------"
+        )
+        self._logger.info("File: {0} ".format(nf))
+
+        p = Process(target=self._process_new_file, args=(nf, ))
         p.start()
         p.join()
         
-    def _process_new_file(self,file):
-
-        # get file from hdfs
-        get_file_cmd = "hadoop fs -get {0} {1}.".format(file,self._local_staging)
-        self._logger.info("Getting file from hdfs: {0}".format(get_file_cmd))
-        Util.execute_cmd(get_file_cmd,self._logger)
+    def _process_new_file(self, nf):
 
         # get file name and date
-        file_name_parts = file.split('/')
+        file_name_parts = nf.split('/')
         file_name = file_name_parts[len(file_name_parts)-1]
-
+        nf_path = nf.rstrip(file_name)
         flow_date = file_name.split('.')[1]
         flow_year = flow_date[0:4]
         flow_month = flow_date[4:6]
         flow_day = flow_date[6:8]
         flow_hour = flow_date[8:10]
 
+        # get file from hdfs
+        if hdfs.file_exists(nf_path, file_name):
+            self._logger.info("Getting file from hdfs: {0}".format(nf))
+            hdfs.download_file(nf, self._local_staging)
+        else:
+            self._logger.info("file: {0} not found".format(nf))
+            # TODO: error handling
+
         # build process cmd.
-        process_cmd = "nfdump -o csv -r {0}{1} {2} > {0}{1}.csv".format(self._local_staging,file_name,self._process_opt)
+        sf = "{0}{1}.csv".format(self._local_staging,file_name)
+        process_cmd = "nfdump -o csv -r {0}{1} {2} > {3}".format(self._local_staging, file_name, self._process_opt, sf)
         self._logger.info("Processing file: {0}".format(process_cmd))
-        Util.execute_cmd(process_cmd,self._logger)        
+        Util.execute_cmd(process_cmd,self._logger)
 
         # create hdfs staging.
         hdfs_path = "{0}/flow".format(self._hdfs_app_path)
         staging_timestamp = datetime.datetime.now().strftime('%M%S%f')[:-4]
-        hdfs_staging_path =  "{0}/stage/{1}".format(hdfs_path,staging_timestamp)
-        create_staging_cmd = "hadoop fs -mkdir -p {0}".format(hdfs_staging_path)
-        self._logger.info("Creating staging: {0}".format(create_staging_cmd))
-        Util.execute_cmd(create_staging_cmd,self._logger)
+        hdfs_staging_path = "{0}/stage/{1}".format(hdfs_path,staging_timestamp)
+        self._logger.info("Creating staging: {0}".format(hdfs_staging_path))
+        hdfs.mkdir(hdfs_staging_path)
 
         # move to stage.
-        mv_to_staging ="hadoop fs -moveFromLocal {0}{1}.csv {2}/.".format(self._local_staging,file_name,hdfs_staging_path)
-        self._logger.info("Moving data to staging: {0}".format(mv_to_staging))
-        subprocess.call(mv_to_staging,shell=True)
-
-        #load to avro
-        load_to_avro_cmd = "hive -hiveconf dbname={0} -hiveconf y={1} -hiveconf m={2} -hiveconf d={3} -hiveconf h={4} -hiveconf data_location='{5}' -f pipelines/flow/load_flow_avro_parquet.hql".format(self._db_name,flow_year,flow_month,flow_day,flow_hour,hdfs_staging_path)
-
-        self._logger.info( "Loading data to hive: {0}".format(load_to_avro_cmd))
-        Util.execute_cmd(load_to_avro_cmd,self._logger)
+        local_file = "{0}{1}.csv".format(self._local_staging, file_name)
+        self._logger.info("Moving data to staging: {0}".format(hdfs_staging_path))
+        hdfs.upload_file(hdfs_staging_path, local_file)
+
+        # load with impyla
+        drop_table = "DROP TABLE IF EXISTS {0}.flow_tmp".format(self._db_name)
+        self._logger.info( "Dropping temp table: {0}".format(drop_table))
+        self._cursor.execute_query(drop_table)
+
+        create_external = ("\n"
+                           "CREATE EXTERNAL TABLE {0}.flow_tmp (\n"
+                           "  treceived STRING,\n"
+                           "  tryear INT,\n"
+                           "  trmonth INT,\n"
+                           "  trday INT,\n"
+                           "  trhour INT,\n"
+                           "  trminute INT,\n"
+                           "  trsec INT,\n"
+                           "  tdur FLOAT,\n"
+                           "  sip  STRING,\n"
+                           "  dip STRING,\n"
+                           "  sport INT,\n"
+                           "  dport INT,\n"
+                           "  proto STRING,\n"
+                           "  flag STRING,\n"
+                           "  fwd INT,\n"
+                           "  stos INT,\n"
+                           "  ipkt BIGINT,\n"
+                           "  ibyt BIGINT,\n"
+                           "  opkt BIGINT,\n"
+                           "  obyt BIGINT,\n"
+                           "  input INT,\n"
+                           "  output INT,\n"
+                           "  sas INT,\n"
+                           "  das INT,\n"
+                           "  dtos INT,\n"
+                           "  dir INT,\n"
+                           "  rip STRING\n"
+                           "  )\n"
+                           "  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','\n"
+                           "  STORED AS TEXTFILE\n"
+                           "  LOCATION '{1}'\n"
+                           "  TBLPROPERTIES ('avro.schema.literal'='{{\n"
+                           "  \"type\":   \"record\"\n"
+                           "  , \"name\":   \"RawFlowRecord\"\n"
+                           "  , \"namespace\" : \"com.cloudera.accelerators.flows.avro\"\n"
+                           "  , \"fields\": [\n"
+                           "      {{\"name\": \"treceived\",             \"type\":[\"string\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"tryear\",              \"type\":[\"float\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"trmonth\",             \"type\":[\"float\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"trday\",               \"type\":[\"float\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"trhour\",              \"type\":[\"float\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"trminute\",            \"type\":[\"float\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"trsec\",               \"type\":[\"float\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"tdur\",                \"type\":[\"float\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"sip\",                \"type\":[\"string\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"sport\",                 \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"dip\",                \"type\":[\"string\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"dport\",                 \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"proto\",              \"type\":[\"string\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"flag\",               \"type\":[\"string\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"fwd\",                   \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"stos\",                  \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"ipkt\",               \"type\":[\"bigint\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"ibytt\",              \"type\":[\"bigint\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"opkt\",               \"type\":[\"bigint\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"obyt\",               \"type\":[\"bigint\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"input\",                 \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"output\",                \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"sas\",                   \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"das\",                   \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"dtos\",                  \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"dir\",                   \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"rip\",                \"type\":[\"string\",   \"null\"]}}\n"
+                           "      ]\n"
+                           "}}')\n"
+                           ).format(self._db_name, hdfs_staging_path)
+        self._logger.info( "Creating external table: {0}".format(create_external))
+        self._cursor.execute_query(create_external)
+
+        insert_into_table = """
+        INSERT INTO TABLE {0}.flow
+        PARTITION (y={1}, m={2}, d={3}, h={4})
+        SELECT   treceived,  unix_timestamp(treceived) AS unix_tstamp, tryear,  trmonth, trday,  trhour,  trminute,  trsec,
+          tdur,  sip, dip, sport, dport,  proto,  flag,  fwd,  stos,  ipkt,  ibyt,  opkt,  obyt,  input,  output,
+          sas,  das,  dtos,  dir,  rip
+        FROM {0}.flow_tmp
+        """.format(self._db_name, flow_year, flow_month, flow_day, flow_hour)
+        self._logger.info( "Loading data to {0}: {1}"
+                           .format(self._db_name, insert_into_table)
+                           )
+        self._cursor.execute_query(insert_into_table)
 
         # remove from hdfs staging
-        rm_hdfs_staging_cmd = "hadoop fs -rm -R -skipTrash {0}".format(hdfs_staging_path)
-        self._logger.info("Removing staging path: {0}".format(rm_hdfs_staging_cmd))
-        Util.execute_cmd(rm_hdfs_staging_cmd,self._logger)
+        self._logger.info("Removing staging path: {0}".format(hdfs_staging_path))
+        hdfs.delete_folder(hdfs_staging_path)
 
         # remove from local staging.
         rm_local_staging = "rm {0}{1}".format(self._local_staging,file_name)
         self._logger.info("Removing files from local staging: {0}".format(rm_local_staging))
         Util.execute_cmd(rm_local_staging,self._logger)
 
-        self._logger.info("File {0} was successfully processed.".format(file_name))
-
+        rm_local_staging = "rm {0}".format(sf)
+        self._logger.info("Removing files from local staging: {0}".format(rm_local_staging))
+        Util.execute_cmd(rm_local_staging,self._logger)
 
+        self._logger.info("File {0} was successfully processed.".format(file_name))

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/pipelines/proxy/collector.py
----------------------------------------------------------------------
diff --git a/spot-ingest/pipelines/proxy/collector.py b/spot-ingest/pipelines/proxy/collector.py
index 69d708c..008b568 100644
--- a/spot-ingest/pipelines/proxy/collector.py
+++ b/spot-ingest/pipelines/proxy/collector.py
@@ -23,7 +23,7 @@ import os
 import sys
 import copy
 from common.utils import Util, NewFileEvent
-from common.kafka_client import KafkaTopic
+from common.kafka_client import KafkaProducer
 from multiprocessing import Pool
 from common.file_collector import FileWatcher
 import time
@@ -106,10 +106,10 @@ def ingest_file(file,message_size,topic,kafka_servers):
             for line in f:
                 message += line
                 if len(message) > message_size:
-                    KafkaTopic.SendMessage(message,kafka_servers,topic,0)
+                    KafkaProducer.SendMessage(message, kafka_servers, topic, 0)
                     message = ""
             #send the last package.        
-            KafkaTopic.SendMessage(message,kafka_servers,topic,0)            
+            KafkaProducer.SendMessage(message, kafka_servers, topic, 0)
         rm_file = "rm {0}".format(file)
         Util.execute_cmd(rm_file,logger)
         logger.info("File {0} has been successfully sent to Kafka Topic: {1}".format(file,topic))

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/worker.py
----------------------------------------------------------------------
diff --git a/spot-ingest/worker.py b/spot-ingest/worker.py
index 5c29148..ce758c5 100755
--- a/spot-ingest/worker.py
+++ b/spot-ingest/worker.py
@@ -24,11 +24,13 @@ import sys
 from common.utils import Util
 from common.kerberos import Kerberos
 from common.kafka_client import KafkaConsumer
+import common.configurator as Config
 
 SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
 CONF_FILE = "{0}/ingest_conf.json".format(SCRIPT_PATH)
 WORKER_CONF = json.loads(open(CONF_FILE).read())
 
+
 def main():
 
     # input parameters
@@ -63,8 +65,8 @@ def start_worker(type, topic, id, processes=None):
         logger.error("The provided data source {0} is not valid".format(type))
         sys.exit(1)
 
-    # validate if kerberos authentication is requiered.
-    if os.getenv('KRB_AUTH'):
+    # validate if kerberos authentication is required.
+    if Config.kerberos_enabled():
         kb = Kerberos()
         kb.authenticate()