You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by na...@apache.org on 2018/03/19 15:13:03 UTC
[11/13] incubator-spot git commit: [SPOT-213][SPOT-77][SPOT-221]
Update for spot-ingest to support Kerberos,
implements hive client and Librdkafka support
[SPOT-213][SPOT-77][SPOT-221] Update for spot-ingest to support Kerberos, implements hive client and Librdkafka support
Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/41e51b8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/41e51b8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/41e51b8f
Branch: refs/heads/master
Commit: 41e51b8fab0ba7ebccba10e8e3052c7131cb43dc
Parents: d7b1d37
Author: natedogs911 <na...@gmail.com>
Authored: Tue Jan 23 11:49:40 2018 -0800
Committer: natedogs911 <na...@gmail.com>
Committed: Tue Jan 23 11:49:40 2018 -0800
----------------------------------------------------------------------
spot-ingest/common/kafka_client.py | 193 ++++++++++++++++++++------
spot-ingest/master_collector.py | 21 +--
spot-ingest/pipelines/dns/collector.py | 133 +++++++++++-------
spot-ingest/pipelines/dns/worker.py | 141 ++++++++++++++-----
spot-ingest/pipelines/flow/collector.py | 111 +++++++++------
spot-ingest/pipelines/flow/worker.py | 193 ++++++++++++++++++++------
spot-ingest/pipelines/proxy/collector.py | 6 +-
spot-ingest/worker.py | 6 +-
8 files changed, 588 insertions(+), 216 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/common/kafka_client.py
----------------------------------------------------------------------
diff --git a/spot-ingest/common/kafka_client.py b/spot-ingest/common/kafka_client.py
index 977cb92..15441b2 100755
--- a/spot-ingest/common/kafka_client.py
+++ b/spot-ingest/common/kafka_client.py
@@ -19,23 +19,23 @@
import logging
import os
+import sys
from common.utils import Util
-from kafka import KafkaProducer
-from kafka import KafkaConsumer as KC
-from kafka.partitioner.roundrobin import RoundRobinPartitioner
-from kafka.common import TopicPartition
+from confluent_kafka import Producer
+from confluent_kafka import Consumer
+import common.configurator as config
-class KafkaTopic(object):
+class KafkaProducer(object):
- def __init__(self,topic,server,port,zk_server,zk_port,partitions):
+ def __init__(self, topic, server, port, zk_server, zk_port, partitions):
- self._initialize_members(topic,server,port,zk_server,zk_port,partitions)
+ self._initialize_members(topic, server, port, zk_server, zk_port, partitions)
- def _initialize_members(self,topic,server,port,zk_server,zk_port,partitions):
+ def _initialize_members(self, topic, server, port, zk_server, zk_port, partitions):
# get logger isinstance
- self._logger = logging.getLogger("SPOT.INGEST.KAFKA")
+ self._logger = logging.getLogger("SPOT.INGEST.KafkaProducer")
# kafka requirements
self._server = server
@@ -46,42 +46,93 @@ class KafkaTopic(object):
self._num_of_partitions = partitions
self._partitions = []
self._partitioner = None
+ self._kafka_brokers = '{0}:{1}'.format(self._server, self._port)
# create topic with partitions
self._create_topic()
- def _create_topic(self):
-
- self._logger.info("Creating topic: {0} with {1} parititions".format(self._topic,self._num_of_partitions))
+ self._kafka_conf = self._producer_config(self._kafka_brokers)
+
+ self._p = Producer(**self._kafka_conf)
+
+ def _producer_config(self, server):
+ # type: (str) -> dict
+ """Returns a configuration dictionary containing optional values"""
+
+ connection_conf = {
+ 'bootstrap.servers': server,
+ }
+
+ if os.environ.get('KAFKA_DEBUG'):
+ connection_conf.update({'debug': 'all'})
+
+ if config.kerberos_enabled():
+ self._logger.info('Kerberos enabled')
+ principal, keytab, sasl_mech, security_proto = config.kerberos()
+ connection_conf.update({
+ 'sasl.mechanisms': sasl_mech,
+ 'security.protocol': security_proto,
+ 'sasl.kerberos.principal': principal,
+ 'sasl.kerberos.keytab': keytab,
+ 'sasl.kerberos.min.time.before.relogin': 6000
+ })
+
+ sn = os.environ.get('KAFKA_SERVICE_NAME')
+ if sn:
+ self._logger.info('Setting Kerberos service name: ' + sn)
+ connection_conf.update({'sasl.kerberos.service.name': sn})
+
+ kinit_cmd = os.environ.get('KAFKA_KINIT')
+ if kinit_cmd:
+ self._logger.info('using kinit command: ' + kinit_cmd)
+ connection_conf.update({'sasl.kerberos.kinit.cmd': kinit_cmd})
+ else:
+ # Using -S %{sasl.kerberos.service.name}/%{broker.name} causes the ticket cache to refresh
+ # resulting in authentication errors for other services
+ connection_conf.update({
+ 'sasl.kerberos.kinit.cmd': 'kinit -S "%{sasl.kerberos.service.name}/%{broker.name}" -k -t "%{sasl.kerberos.keytab}" %{sasl.kerberos.principal}'
+ })
+
+ if config.ssl_enabled():
+ self._logger.info('Using SSL connection settings')
+ ssl_verify, ca_location, cert, key = config.ssl()
+ connection_conf.update({
+ 'ssl.certificate.location': cert,
+ 'ssl.ca.location': ca_location,
+ 'ssl.key.location': key
+ })
+
+ return connection_conf
- # Create partitions for the workers.
- self._partitions = [ TopicPartition(self._topic,p) for p in range(int(self._num_of_partitions))]
+ def _create_topic(self):
- # create partitioner
- self._partitioner = RoundRobinPartitioner(self._partitions)
+ self._logger.info("Creating topic: {0} with {1} parititions".format(self._topic, self._num_of_partitions))
# get script path
- zk_conf = "{0}:{1}".format(self._zk_server,self._zk_port)
- create_topic_cmd = "{0}/kafka_topic.sh create {1} {2} {3}".format(os.path.dirname(os.path.abspath(__file__)),self._topic,zk_conf,self._num_of_partitions)
+ zk_conf = "{0}:{1}".format(self._zk_server, self._zk_port)
+ create_topic_cmd = "{0}/kafka_topic.sh create {1} {2} {3}".format(
+ os.path.dirname(os.path.abspath(__file__)),
+ self._topic,
+ zk_conf,
+ self._num_of_partitions
+ )
# execute create topic cmd
- Util.execute_cmd(create_topic_cmd,self._logger)
+ Util.execute_cmd(create_topic_cmd, self._logger)
- def send_message(self,message,topic_partition):
+ def SendMessage(self, message, topic):
+ p = self._p
+ p.produce(topic, message.encode('utf-8'), callback=self._delivery_callback)
+ p.poll(0)
+ p.flush(timeout=3600000)
- self._logger.info("Sending message to: Topic: {0} Partition:{1}".format(self._topic,topic_partition))
- kafka_brokers = '{0}:{1}'.format(self._server,self._port)
- producer = KafkaProducer(bootstrap_servers=[kafka_brokers],api_version_auto_timeout_ms=3600000)
- future = producer.send(self._topic,message,partition=topic_partition)
- producer.flush(timeout=3600000)
- producer.close()
-
@classmethod
- def SendMessage(cls,message,kafka_servers,topic,partition=0):
- producer = KafkaProducer(bootstrap_servers=kafka_servers,api_version_auto_timeout_ms=3600000)
- future = producer.send(topic,message,partition=partition)
- producer.flush(timeout=3600000)
- producer.close()
+ def _delivery_callback(cls, err, msg):
+ if err:
+ sys.stderr.write('%% Message failed delivery: %s\n' % err)
+ else:
+ sys.stderr.write('%% Message delivered to %s [%d]\n' %
+ (msg.topic(), msg.partition()))
@property
def Topic(self):
@@ -93,22 +144,24 @@ class KafkaTopic(object):
@property
def Zookeeper(self):
- zk = "{0}:{1}".format(self._zk_server,self._zk_port)
+ zk = "{0}:{1}".format(self._zk_server, self._zk_port)
return zk
@property
def BootstrapServers(self):
- servers = "{0}:{1}".format(self._server,self._port)
+ servers = "{0}:{1}".format(self._server, self._port)
return servers
class KafkaConsumer(object):
- def __init__(self,topic,server,port,zk_server,zk_port,partition):
+ def __init__(self, topic, server, port, zk_server, zk_port, partition):
+
+ self._initialize_members(topic, server, port, zk_server, zk_port, partition)
- self._initialize_members(topic,server,port,zk_server,zk_port,partition)
+ def _initialize_members(self, topic, server, port, zk_server, zk_port, partition):
- def _initialize_members(self,topic,server,port,zk_server,zk_port,partition):
+ self._logger = logging.getLogger("SPOT.INGEST.KafkaConsumer")
self._topic = topic
self._server = server
@@ -116,14 +169,64 @@ class KafkaConsumer(object):
self._zk_server = zk_server
self._zk_port = zk_port
self._id = partition
+ self._kafka_brokers = '{0}:{1}'.format(self._server, self._port)
+ self._kafka_conf = self._consumer_config(self._id, self._kafka_brokers)
+
+ def _consumer_config(self, groupid, server):
+ # type: (dict) -> dict
+ """Returns a configuration dictionary containing optional values"""
+
+ connection_conf = {
+ 'bootstrap.servers': server,
+ 'group.id': groupid,
+ }
+
+ if config.kerberos_enabled():
+ self._logger.info('Kerberos enabled')
+ principal, keytab, sasl_mech, security_proto = config.kerberos()
+ connection_conf.update({
+ 'sasl.mechanisms': sasl_mech,
+ 'security.protocol': security_proto,
+ 'sasl.kerberos.principal': principal,
+ 'sasl.kerberos.keytab': keytab,
+ 'sasl.kerberos.min.time.before.relogin': 6000,
+ 'default.topic.config': {
+ 'auto.commit.enable': 'true',
+ 'auto.commit.interval.ms': '60000',
+ 'auto.offset.reset': 'smallest'}
+ })
+
+ sn = os.environ.get('KAFKA_SERVICE_NAME')
+ if sn:
+ self._logger.info('Setting Kerberos service name: ' + sn)
+ connection_conf.update({'sasl.kerberos.service.name': sn})
+
+ kinit_cmd = os.environ.get('KAFKA_KINIT')
+ if kinit_cmd:
+ self._logger.info('using kinit command: ' + kinit_cmd)
+ connection_conf.update({'sasl.kerberos.kinit.cmd': kinit_cmd})
+ else:
+ # Using -S %{sasl.kerberos.service.name}/%{broker.name} causes the ticket cache to refresh
+ # resulting in authentication errors for other services
+ connection_conf.update({
+ 'sasl.kerberos.kinit.cmd': 'kinit -k -t "%{sasl.kerberos.keytab}" %{sasl.kerberos.principal}'
+ })
+
+ if config.ssl_enabled():
+ self._logger.info('Using SSL connection settings')
+ ssl_verify, ca_location, cert, key = config.ssl()
+ connection_conf.update({
+ 'ssl.certificate.location': cert,
+ 'ssl.ca.location': ca_location,
+ 'ssl.key.location': key
+ })
+
+ return connection_conf
def start(self):
-
- kafka_brokers = '{0}:{1}'.format(self._server,self._port)
- consumer = KC(bootstrap_servers=[kafka_brokers],group_id=self._topic)
- partition = [TopicPartition(self._topic,int(self._id))]
- consumer.assign(partitions=partition)
- consumer.poll()
+
+ consumer = Consumer(**self._kafka_conf)
+ consumer.subscribe([self._topic])
return consumer
@property
@@ -132,6 +235,4 @@ class KafkaConsumer(object):
@property
def ZookeperServer(self):
- return "{0}:{1}".format(self._zk_server,self._zk_port)
-
-
+ return "{0}:{1}".format(self._zk_server, self._zk_port)
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/master_collector.py
----------------------------------------------------------------------
diff --git a/spot-ingest/master_collector.py b/spot-ingest/master_collector.py
index 6f6ff7c..23be9f4 100755
--- a/spot-ingest/master_collector.py
+++ b/spot-ingest/master_collector.py
@@ -24,14 +24,15 @@ import sys
import datetime
from common.utils import Util
from common.kerberos import Kerberos
-from common.kafka_client import KafkaTopic
-
+import common.configurator as Config
+from common.kafka_client import KafkaProducer
# get master configuration.
SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
CONF_FILE = "{0}/ingest_conf.json".format(SCRIPT_PATH)
MASTER_CONF = json.loads(open(CONF_FILE).read())
+
def main():
# input Parameters
@@ -49,6 +50,7 @@ def main():
# start collector based on data source type.
start_collector(args.type, args.workers_num, args.ingest_id)
+
def start_collector(type, workers_num, id=None):
# generate ingest id
@@ -68,7 +70,7 @@ def start_collector(type, workers_num, id=None):
sys.exit(1)
# validate if kerberos authentication is required.
- if os.getenv('KRB_AUTH'):
+ if Config.kerberos_enabled():
kb = Kerberos()
kb.authenticate()
@@ -80,17 +82,20 @@ def start_collector(type, workers_num, id=None):
# required zookeeper info.
zk_server = MASTER_CONF["kafka"]['zookeper_server']
zk_port = MASTER_CONF["kafka"]['zookeper_port']
-
- topic = "SPOT-INGEST-{0}_{1}".format(type, ingest_id) if not id else id
- kafka = KafkaTopic(topic, k_server, k_port, zk_server, zk_port, workers_num)
+
+ topic = "{0}".format(type,ingest_id) if not id else id
+ producer = KafkaProducer(topic, k_server, k_port, zk_server, zk_port, workers_num)
# create a collector instance based on data source type.
logger.info("Starting {0} ingest instance".format(topic))
- module = __import__("pipelines.{0}.collector".format(MASTER_CONF["pipelines"][type]["type"]), fromlist=['Collector'])
+ module = __import__("pipelines.{0}.collector".
+ format(MASTER_CONF["pipelines"][type]["type"]),
+ fromlist=['Collector'])
# start collector.
- ingest_collector = module.Collector(MASTER_CONF['hdfs_app_path'], kafka, type)
+ ingest_collector = module.Collector(MASTER_CONF['hdfs_app_path'], producer, type)
ingest_collector.start()
+
if __name__ == '__main__':
main()
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/pipelines/dns/collector.py
----------------------------------------------------------------------
diff --git a/spot-ingest/pipelines/dns/collector.py b/spot-ingest/pipelines/dns/collector.py
index c421c47..97c5ed6 100755
--- a/spot-ingest/pipelines/dns/collector.py
+++ b/spot-ingest/pipelines/dns/collector.py
@@ -18,26 +18,29 @@
#
import time
+import logging
import os
-import subprocess
import json
-import logging
from multiprocessing import Process
from common.utils import Util
+from common import hdfs_client as hdfs
+from common.hdfs_client import HdfsException
from common.file_collector import FileWatcher
from multiprocessing import Pool
-from common.kafka_client import KafkaTopic
+
class Collector(object):
- def __init__(self, hdfs_app_path, kafka_topic, conf_type):
- self._initialize_members(hdfs_app_path, kafka_topic, conf_type)
+ def __init__(self, hdfs_app_path, kafkaproducer, conf_type):
+
+ self._initialize_members(hdfs_app_path, kafkaproducer, conf_type)
+
+ def _initialize_members(self, hdfs_app_path, kafkaproducer, conf_type):
- def _initialize_members(self, hdfs_app_path, kafka_topic, conf_type):
# getting parameters.
self._logger = logging.getLogger('SPOT.INGEST.DNS')
self._hdfs_app_path = hdfs_app_path
- self._kafka_topic = kafka_topic
+ self._producer = kafkaproducer
# get script path
self._script_path = os.path.dirname(os.path.abspath(__file__))
@@ -64,6 +67,8 @@ class Collector(object):
self._processes = conf["collector_processes"]
self._ingestion_interval = conf["ingestion_interval"]
self._pool = Pool(processes=self._processes)
+ # TODO: review re-use of hdfs.client
+ self._hdfs_client = hdfs.get_client()
def start(self):
@@ -74,74 +79,108 @@ class Collector(object):
while True:
self._ingest_files_pool()
time.sleep(self._ingestion_interval)
-
except KeyboardInterrupt:
self._logger.info("Stopping DNS collector...")
- Util.remove_kafka_topic(self._kafka_topic.Zookeeper, self._kafka_topic.Topic, self._logger)
+ Util.remove_kafka_topic(self._producer.Zookeeper, self._producer.Topic, self._logger)
self._watcher.stop()
self._pool.terminate()
self._pool.close()
self._pool.join()
SystemExit("Ingest finished...")
-
def _ingest_files_pool(self):
+
if self._watcher.HasFiles:
+
for x in range(0, self._processes):
- file = self._watcher.GetNextFile()
- resutl = self._pool.apply_async(ingest_file, args=(file, self._pkt_num, self._pcap_split_staging, self._kafka_topic.Partition, self._hdfs_root_path, self._kafka_topic.Topic, self._kafka_topic.BootstrapServers, ))
- #resutl.get() # to debug add try and catch.
- if not self._watcher.HasFiles: break
+ self._logger.info('processes: {0}'.format(self._processes))
+ new_file = self._watcher.GetNextFile()
+ if self._processes <= 1:
+ _ingest_file(
+ self._hdfs_client,
+ new_file,
+ self._pkt_num,
+ self._pcap_split_staging,
+ self._hdfs_root_path,
+ self._producer,
+ self._producer.Topic
+ )
+ else:
+ resutl = self._pool.apply_async(_ingest_file, args=(
+ self._hdfs_client,
+ new_file,
+ self._pkt_num,
+ self._pcap_split_staging,
+ self._hdfs_root_path,
+ self._producer,
+ self._producer.Topic
+ ))
+ # resutl.get() # to debug add try and catch.
+ if not self._watcher.HasFiles:
+ break
return True
-def ingest_file(file,pkt_num,pcap_split_staging, partition,hdfs_root_path,topic,kafka_servers):
+
+def _ingest_file(hdfs_client, new_file, pkt_num, pcap_split_staging, hdfs_root_path, producer, topic):
logger = logging.getLogger('SPOT.INGEST.DNS.{0}'.format(os.getpid()))
try:
# get file name and date.
- org_file = file
- file_name_parts = file.split('/')
+ org_file = new_file
+ file_name_parts = new_file.split('/')
file_name = file_name_parts[len(file_name_parts)-1]
# split file.
name = file_name.split('.')[0]
- split_cmd = "editcap -c {0} {1} {2}/{3}_spot.pcap".format(pkt_num,file,pcap_split_staging,name)
+ split_cmd = "editcap -c {0} {1} {2}/{3}_spot.pcap".format(pkt_num,
+ new_file,
+ pcap_split_staging,
+ name)
logger.info("Splitting file: {0}".format(split_cmd))
Util.execute_cmd(split_cmd,logger)
logger.info("Removing file: {0}".format(org_file))
rm_big_file = "rm {0}".format(org_file)
- Util.execute_cmd(rm_big_file,logger)
-
- for currdir,subdir,files in os.walk(pcap_split_staging):
- for file in files:
- if file.endswith(".pcap") and "{0}_spot".format(name) in file:
-
- # get timestamp from the file name to build hdfs path.
- file_date = file.split('.')[0]
- pcap_hour = file_date[-6:-4]
- pcap_date_path = file_date[-14:-6]
-
- # hdfs path with timestamp.
- hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path,pcap_date_path,pcap_hour)
-
- # create hdfs path.
- Util.creat_hdfs_folder(hdfs_path,logger)
-
- # load file to hdfs.
- hadoop_pcap_file = "{0}/{1}".format(hdfs_path,file)
- Util.load_to_hdfs(os.path.join(currdir,file),hadoop_pcap_file,logger)
-
- # create event for workers to process the file.
- logger.info( "Sending split file to worker number: {0}".format(partition))
- KafkaTopic.SendMessage(hadoop_pcap_file,kafka_servers,topic,partition)
- logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(file,topic))
-
+ Util.execute_cmd(rm_big_file,logger)
-
except Exception as err:
-
- logger.error("There was a problem, please check the following error message:{0}".format(err.message))
+ logger.error("There was a problem splitting the file: {0}".format(err.message))
logger.error("Exception: {0}".format(err))
+ for currdir, subdir, files in os.walk(pcap_split_staging):
+ for file in files:
+ if file.endswith(".pcap") and "{0}_spot".format(name) in file:
+ # get timestamp from the file name to build hdfs path.
+ file_date = file.split('.')[0]
+ pcap_hour = file_date[-6:-4]
+ pcap_date_path = file_date[-14:-6]
+
+ # hdfs path with timestamp.
+ hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path, pcap_date_path, pcap_hour)
+
+ # create hdfs path.
+ try:
+ if len(hdfs.list_dir(hdfs_path, hdfs_client)) == 0:
+ logger.info('creating directory: ' + hdfs_path)
+ hdfs_client.mkdir(hdfs_path, hdfs_client)
+
+ # load file to hdfs.
+ hadoop_pcap_file = "{0}/{1}".format(hdfs_path,file)
+ result = hdfs_client.upload_file(hadoop_pcap_file, os.path.join(currdir,file))
+ if not result:
+ logger.error('File failed to upload: ' + hadoop_pcap_file)
+ raise HdfsException
+
+ # create event for workers to process the file.
+ logger.info( "Sending split file to Topic: {0}".format(topic))
+ producer.SendMessage(hadoop_pcap_file, topic)
+ logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(file,topic))
+
+ except HdfsException as err:
+ logger.error('Exception: ' + err.exception)
+ logger.info('Check Hdfs Connection settings and server health')
+
+ except Exception as err:
+ logger.info("File {0} failed to be sent to Kafka Topic to: {1}".format(new_file,topic))
+ logger.error("Error: {0}".format(err))
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/pipelines/dns/worker.py
----------------------------------------------------------------------
diff --git a/spot-ingest/pipelines/dns/worker.py b/spot-ingest/pipelines/dns/worker.py
index 6f51f45..f23fa8f 100755
--- a/spot-ingest/pipelines/dns/worker.py
+++ b/spot-ingest/pipelines/dns/worker.py
@@ -21,18 +21,22 @@ import logging
import datetime
import subprocess
import json
+import sys
import os
from multiprocessing import Process
from common.utils import Util
+from common import hive_engine
+from common import hdfs_client as hdfs
+from confluent_kafka import KafkaError, KafkaException
class Worker(object):
- def __init__(self,db_name,hdfs_app_path,kafka_consumer,conf_type,processes=None):
+ def __init__(self, db_name, hdfs_app_path, kafka_consumer, conf_type, processes=None):
- self._initialize_members(db_name,hdfs_app_path,kafka_consumer,conf_type)
+ self._initialize_members(db_name,hdfs_app_path, kafka_consumer, conf_type)
- def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type):
+ def _initialize_members(self, db_name, hdfs_app_path, kafka_consumer, conf_type):
# get logger instance.
self._logger = Util.get_logger('SPOT.INGEST.WRK.DNS')
@@ -44,32 +48,58 @@ class Worker(object):
self._script_path = os.path.dirname(os.path.abspath(__file__))
conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path)))
conf = json.loads(open(conf_file).read())
- self._conf = conf["pipelines"][conf_type]
+ self._conf = conf["pipelines"][conf_type]
+ self._id = "spot-{0}-worker".format(conf_type)
self._process_opt = self._conf['process_opt']
self._local_staging = self._conf['local_staging']
self.kafka_consumer = kafka_consumer
+ self._cursor = hive_engine.create_connection()
+
def start(self):
self._logger.info("Listening topic:{0}".format(self.kafka_consumer.Topic))
- for message in self.kafka_consumer.start():
- self._new_file(message.value)
-
- def _new_file(self,file):
-
- self._logger.info("-------------------------------------- New File received --------------------------------------")
+ consumer = self.kafka_consumer.start()
+
+ try:
+ while True:
+ message = consumer.poll(timeout=1.0)
+ if message is None:
+ continue
+ if not message.error():
+ self._new_file(message.value().decode('utf-8'))
+ elif message.error():
+ if message.error().code() == KafkaError._PARTITION_EOF:
+ continue
+ elif message.error:
+ raise KafkaException(message.error())
+
+ except KeyboardInterrupt:
+ sys.stderr.write('%% Aborted by user\n')
+
+ consumer.close()
+
+ def _new_file(self, nf):
+
+ self._logger.info(
+ "-------------------------------------- New File received --------------------------------------"
+ )
self._logger.info("File: {0} ".format(file))
- p = Process(target=self._process_new_file, args=(file,))
+ p = Process(target=self._process_new_file, args=nf)
p.start()
p.join()
- def _process_new_file(self,file):
+ def _process_new_file(self, nf):
+
# get file from hdfs
- get_file_cmd = "hadoop fs -get {0} {1}.".format(file,self._local_staging)
- self._logger.info("Getting file from hdfs: {0}".format(get_file_cmd))
- Util.execute_cmd(get_file_cmd,self._logger)
+ self._logger.info("Getting file from hdfs: {0}".format(nf))
+ if hdfs.file_exists(nf):
+ hdfs.download_file(nf, self._local_staging)
+ else:
+ self._logger.info("file: {0} not found".format(nf))
+ # TODO: error handling
# get file name and date
file_name_parts = file.split('/')
@@ -82,37 +112,86 @@ class Worker(object):
binary_day = binary_date_path[6:8]
# build process cmd.
- process_cmd = "tshark -r {0}{1} {2} > {0}{1}.csv".format(self._local_staging,file_name,self._process_opt)
+ process_cmd = "tshark -r {0}{1} {2} > {0}{1}.csv".format(self._local_staging, file_name, self._process_opt)
self._logger.info("Processing file: {0}".format(process_cmd))
- Util.execute_cmd(process_cmd,self._logger)
+ Util.execute_cmd(process_cmd, self._logger)
# create hdfs staging.
hdfs_path = "{0}/dns".format(self._hdfs_app_path)
staging_timestamp = datetime.datetime.now().strftime('%M%S%f')[:-4]
hdfs_staging_path = "{0}/stage/{1}".format(hdfs_path,staging_timestamp)
- create_staging_cmd = "hadoop fs -mkdir -p {0}".format(hdfs_staging_path)
- self._logger.info("Creating staging: {0}".format(create_staging_cmd))
- Util.execute_cmd(create_staging_cmd,self._logger)
+ self._logger.info("Creating staging: {0}".format(hdfs_staging_path))
+ hdfs.mkdir(hdfs_staging_path)
# move to stage.
- mv_to_staging ="hadoop fs -moveFromLocal {0}{1}.csv {2}/.".format(self._local_staging,file_name,hdfs_staging_path)
- self._logger.info("Moving data to staging: {0}".format(mv_to_staging))
- Util.execute_cmd(mv_to_staging,self._logger)
+ local_file = "{0}{1}.csv".format(self._local_staging, file_name)
+ self._logger.info("Moving data to staging: {0}".format(hdfs_staging_path))
+ hdfs.upload_file(hdfs_staging_path, local_file)
#load to avro
- load_to_avro_cmd = "hive -hiveconf dbname={0} -hiveconf y={1} -hiveconf m={2} -hiveconf d={3} -hiveconf h={4} -hiveconf data_location='{5}' -f pipelines/dns/load_dns_avro_parquet.hql".format(self._db_name,binary_year,binary_month,binary_day,binary_hour,hdfs_staging_path)
-
- self._logger.info("Loading data to hive: {0}".format(load_to_avro_cmd))
- Util.execute_cmd(load_to_avro_cmd,self._logger)
+ drop_table = 'DROP TABLE IF EXISTS {0}.dns_tmp'.format(self._db_name)
+ self._cursor.execute(drop_table)
+
+ # Create external table
+ create_external = ("\n"
+ "CREATE EXTERNAL TABLE {0}.dns_tmp (\n"
+ " frame_day STRING,\n"
+ " frame_time STRING,\n"
+ " unix_tstamp BIGINT,\n"
+ " frame_len INT,\n"
+ " ip_src STRING,\n"
+ " ip_dst STRING,\n"
+ " dns_qry_name STRING,\n"
+ " dns_qry_type INT,\n"
+ " dns_qry_class STRING,\n"
+ " dns_qry_rcode INT,\n"
+ " dns_a STRING \n"
+ " )\n"
+ " ROW FORMAT DELIMITED FIELDS TERMINATED BY ','\n"
+ " STORED AS TEXTFILE\n"
+ " LOCATION '{1}'\n"
+ " TBLPROPERTIES ('avro.schema.literal'='{{\n"
+ " \"type\": \"record\"\n"
+ " , \"name\": \"RawDnsRecord\"\n"
+ " , \"namespace\" : \"com.cloudera.accelerators.dns.avro\"\n"
+ " , \"fields\": [\n"
+ " {{\"name\": \"frame_day\", \"type\":[\"string\", \"null\"]}\n"
+ " , {{\"name\": \"frame_time\", \"type\":[\"string\", \"null\"]}\n"
+ " , {{\"name\": \"unix_tstamp\", \"type\":[\"bigint\", \"null\"]}\n"
+ " , {{\"name\": \"frame_len\", \"type\":[\"int\", \"null\"]}\n"
+ " , {{\"name\": \"ip_src\", \"type\":[\"string\", \"null\"]}\n"
+ " , {{\"name\": \"ip_dst\", \"type\":[\"string\", \"null\"]}\n"
+ " , {{\"name\": \"dns_qry_name\", \"type\":[\"string\", \"null\"]}\n"
+ " , {{\"name\": \"dns_qry_type\", \"type\":[\"int\", \"null\"]}\n"
+ " , {{\"name\": \"dns_qry_class\", \"type\":[\"string\", \"null\"]}\n"
+ " , {{\"name\": \"dns_qry_rcode\", \"type\":[\"int\", \"null\"]}\n"
+ " , {{\"name\": \"dns_a\", \"type\":[\"string\", \"null\"]}\n"
+ " ]\n"
+ "}')\n"
+ ).format(self._db_name, hdfs_staging_path)
+ self._logger.info( "Creating external table: {0}".format(create_external))
+ self._cursor.execute(create_external)
+
+ # Insert data
+ insert_into_table = """
+ INSERT INTO TABLE {0}.dns
+ PARTITION (y={1}, m={2}, d={3}, h={4)
+ SELECT CONCAT(frame_day , frame_time) as treceived, unix_tstamp, frame_len, ip_dst, ip_src, dns_qry_name,
+ dns_qry_class,dns_qry_type, dns_qry_rcode, dns_a
+ FROM {0}.dns_tmp
+ """.format(self._db_name,binary_year,binary_month,binary_day,binary_hour)
+ self._logger.info( "Loading data to {0}: {1}"
+ .format(self._db_name, insert_into_table)
+ )
+ self._cursor.execute(insert_into_table)
# remove from hdfs staging
- rm_hdfs_staging_cmd = "hadoop fs -rm -R -skipTrash {0}".format(hdfs_staging_path)
- self._logger.info("Removing staging path: {0}".format(rm_hdfs_staging_cmd))
- Util.execute_cmd(rm_hdfs_staging_cmd,self._logger)
+ self._logger.info("Removing staging path: {0}".format(hdfs_staging_path))
+ hdfs.delete_folder(hdfs_staging_path)
# remove from local staging.
rm_local_staging = "rm {0}{1}".format(self._local_staging,file_name)
self._logger.info("Removing files from local staging: {0}".format(rm_local_staging))
- Util.execute_cmd(rm_local_staging,self._logger)
+ Util.execute_cmd(rm_local_staging, self._logger)
self._logger.info("File {0} was successfully processed.".format(file_name))
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/pipelines/flow/collector.py
----------------------------------------------------------------------
diff --git a/spot-ingest/pipelines/flow/collector.py b/spot-ingest/pipelines/flow/collector.py
index b9a97f2..5e5cd49 100755
--- a/spot-ingest/pipelines/flow/collector.py
+++ b/spot-ingest/pipelines/flow/collector.py
@@ -23,22 +23,24 @@ import os
import json
from multiprocessing import Process
from common.utils import Util
+from common import hdfs_client as hdfs
+from common.hdfs_client import HdfsException
from common.file_collector import FileWatcher
from multiprocessing import Pool
-from common.kafka_client import KafkaTopic
+
class Collector(object):
- def __init__(self,hdfs_app_path,kafka_topic,conf_type):
+ def __init__(self, hdfs_app_path, kafkaproducer, conf_type):
- self._initialize_members(hdfs_app_path,kafka_topic,conf_type)
+ self._initialize_members(hdfs_app_path, kafkaproducer, conf_type)
+
+ def _initialize_members(self, hdfs_app_path, kafkaproducer, conf_type):
- def _initialize_members(self,hdfs_app_path,kafka_topic,conf_type):
-
# getting parameters.
self._logger = logging.getLogger('SPOT.INGEST.FLOW')
self._hdfs_app_path = hdfs_app_path
- self._kafka_topic = kafka_topic
+ self._producer = kafkaproducer
# get script path
self._script_path = os.path.dirname(os.path.abspath(__file__))
@@ -62,6 +64,8 @@ class Collector(object):
self._processes = conf["collector_processes"]
self._ingestion_interval = conf["ingestion_interval"]
self._pool = Pool(processes=self._processes)
+ # TODO: review re-use of hdfs.client
+ self._hdfs_client = hdfs.get_client()
def start(self):
@@ -74,54 +78,83 @@ class Collector(object):
time.sleep(self._ingestion_interval)
except KeyboardInterrupt:
self._logger.info("Stopping FLOW collector...")
- Util.remove_kafka_topic(self._kafka_topic.Zookeeper,self._kafka_topic.Topic,self._logger)
+ Util.remove_kafka_topic(self._producer.Zookeeper, self._producer.Topic, self._logger)
self._watcher.stop()
self._pool.terminate()
self._pool.close()
self._pool.join()
SystemExit("Ingest finished...")
-
def _ingest_files_pool(self):
if self._watcher.HasFiles:
- for x in range(0,self._processes):
- file = self._watcher.GetNextFile()
- resutl = self._pool.apply_async(ingest_file,args=(file,self._kafka_topic.Partition,self._hdfs_root_path ,self._kafka_topic.Topic,self._kafka_topic.BootstrapServers,))
- #resutl.get() # to debug add try and catch.
- if not self._watcher.HasFiles: break
+ for x in range(0, self._processes):
+ self._logger.info('processes: {0}'.format(self._processes))
+ new_file = self._watcher.GetNextFile()
+ if self._processes <= 1:
+ _ingest_file(
+ new_file,
+ self._hdfs_root_path,
+ self._producer,
+ self._producer.Topic
+ )
+ else:
+ result = self._pool.apply_async(_ingest_file, args=(
+ new_file,
+ self._hdfs_root_path,
+ self._producer,
+ self._producer.Topic
+ ))
+ # result.get() # to debug add try and catch.
+ if not self._watcher.HasFiles:
+ break
return True
-
-def ingest_file(file,partition,hdfs_root_path,topic,kafka_servers):
+def _ingest_file(new_file, hdfs_root_path, producer, topic):
- logger = logging.getLogger('SPOT.INGEST.FLOW.{0}'.format(os.getpid()))
-
- try:
+ logger = logging.getLogger('SPOT.INGEST.FLOW.{0}'.format(os.getpid()))
- # get file name and date.
- file_name_parts = file.split('/')
- file_name = file_name_parts[len(file_name_parts)-1]
- file_date = file_name.split('.')[1]
+ try:
- file_date_path = file_date[0:8]
- file_date_hour = file_date[8:10]
+ # get file name and date.
+ file_name_parts = new_file.split('/')
+ file_name = file_name_parts[len(file_name_parts)-1]
+ file_date = file_name.split('.')[1]
+ file_date_path = file_date[0:8]
+ file_date_hour = file_date[8:10]
- # hdfs path with timestamp.
- hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path,file_date_path,file_date_hour)
- Util.creat_hdfs_folder(hdfs_path,logger)
+ # hdfs path with timestamp.
+ hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path, file_date_path, file_date_hour)
+ hdfs_file = "{0}/{1}".format(hdfs_path, file_name)
- # load to hdfs.
- hdfs_file = "{0}/{1}".format(hdfs_path,file_name)
- Util.load_to_hdfs(file,hdfs_file,logger)
-
- # create event for workers to process the file.
- logger.info("Sending file to worker number: {0}".format(partition))
- KafkaTopic.SendMessage(hdfs_file,kafka_servers,topic,partition)
- logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(file,topic))
-
- except Exception as err:
- logger.error("There was a problem, please check the following error message:{0}".format(err.message))
- logger.error("Exception: {0}".format(err))
+ try:
+ if len(hdfs.list_dir(hdfs_path)) == 0:
+ logger.info('creating directory: ' + hdfs_path)
+ hdfs.mkdir(hdfs_path)
+ logger.info('uploading file to hdfs: ' + hdfs_file)
+ result = hdfs.upload_file(hdfs_path, new_file)
+ if not result:
+ logger.error('File failed to upload: ' + hdfs_file)
+ raise HdfsException
+ else:
+ rm_file = "rm {0}".format(new_file)
+ logger.info("Removing files from local staging: {0}".format(rm_file))
+ Util.execute_cmd(rm_file, logger)
+
+ except HdfsException as err:
+ logger.error('Exception: ' + err.exception)
+ logger.info('Check Hdfs Connection settings and server health')
+
+ except Exception as err:
+ logger.error("There was a problem, Exception: {0}".format(err))
+
+ # create event for workers to process the file.
+ # logger.info("Sending file to worker number: {0}".format(partition))
+ try:
+ producer.SendMessage(hdfs_file, topic)
+ logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(hdfs_file, topic))
+ except Exception as err:
+ logger.info("File {0} failed to be sent to Kafka Topic to: {1}".format(hdfs_file, topic))
+ logger.error("Error: {0}".format(err))
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/pipelines/flow/worker.py
----------------------------------------------------------------------
diff --git a/spot-ingest/pipelines/flow/worker.py b/spot-ingest/pipelines/flow/worker.py
index 1630022..bb957a5 100755
--- a/spot-ingest/pipelines/flow/worker.py
+++ b/spot-ingest/pipelines/flow/worker.py
@@ -22,17 +22,20 @@ import subprocess
import datetime
import logging
import os
-import json
+import json
from multiprocessing import Process
from common.utils import Util
+from common import hive_engine
+from common import hdfs_client as hdfs
+from confluent_kafka import KafkaError, KafkaException
class Worker(object):
- def __init__(self,db_name,hdfs_app_path,kafka_consumer,conf_type,processes=None):
- self._initialize_members(db_name,hdfs_app_path,kafka_consumer,conf_type)
+ def __init__(self, db_name, hdfs_app_path, kafka_consumer, conf_type, processes=None):
+ self._initialize_members(db_name, hdfs_app_path, kafka_consumer, conf_type)
- def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type):
+ def _initialize_members(self, db_name, hdfs_app_path, kafka_consumer, conf_type):
# get logger instance.
self._logger = Util.get_logger('SPOT.INGEST.WRK.FLOW')
@@ -45,76 +48,186 @@ class Worker(object):
conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path)))
conf = json.loads(open(conf_file).read())
self._conf = conf["pipelines"][conf_type]
+ self._id = "spot-{0}-worker".format(conf_type)
self._process_opt = self._conf['process_opt']
self._local_staging = self._conf['local_staging']
self.kafka_consumer = kafka_consumer
+ # self._cursor = hive_engine.create_connection()
+ self._cursor = hive_engine
+
def start(self):
self._logger.info("Listening topic:{0}".format(self.kafka_consumer.Topic))
- for message in self.kafka_consumer.start():
- self._new_file(message.value)
-
- def _new_file(self,file):
-
- self._logger.info("-------------------------------------- New File received --------------------------------------")
- self._logger.info("File: {0} ".format(file))
- p = Process(target=self._process_new_file, args=(file,))
+ consumer = self.kafka_consumer.start()
+ try:
+ while True:
+ message = consumer.poll(timeout=1.0)
+ if message is None:
+ continue
+ if not message.error():
+ self._new_file(message.value().decode('utf-8'))
+ elif message.error():
+ if message.error().code() == KafkaError._PARTITION_EOF:
+ continue
+ elif message.error:
+ raise KafkaException(message.error())
+
+ except KeyboardInterrupt:
+ sys.stderr.write('%% Aborted by user\n')
+
+ consumer.close()
+
+ def _new_file(self, nf):
+
+ self._logger.info(
+ "-------------------------------------- New File received --------------------------------------"
+ )
+ self._logger.info("File: {0} ".format(nf))
+
+ p = Process(target=self._process_new_file, args=(nf, ))
p.start()
p.join()
- def _process_new_file(self,file):
-
- # get file from hdfs
- get_file_cmd = "hadoop fs -get {0} {1}.".format(file,self._local_staging)
- self._logger.info("Getting file from hdfs: {0}".format(get_file_cmd))
- Util.execute_cmd(get_file_cmd,self._logger)
+ def _process_new_file(self, nf):
# get file name and date
- file_name_parts = file.split('/')
+ file_name_parts = nf.split('/')
file_name = file_name_parts[len(file_name_parts)-1]
-
+ nf_path = nf.rstrip(file_name)
flow_date = file_name.split('.')[1]
flow_year = flow_date[0:4]
flow_month = flow_date[4:6]
flow_day = flow_date[6:8]
flow_hour = flow_date[8:10]
+ # get file from hdfs
+ if hdfs.file_exists(nf_path, file_name):
+ self._logger.info("Getting file from hdfs: {0}".format(nf))
+ hdfs.download_file(nf, self._local_staging)
+ else:
+ self._logger.info("file: {0} not found".format(nf))
+ # TODO: error handling
+
# build process cmd.
- process_cmd = "nfdump -o csv -r {0}{1} {2} > {0}{1}.csv".format(self._local_staging,file_name,self._process_opt)
+ sf = "{0}{1}.csv".format(self._local_staging,file_name)
+ process_cmd = "nfdump -o csv -r {0}{1} {2} > {3}".format(self._local_staging, file_name, self._process_opt, sf)
self._logger.info("Processing file: {0}".format(process_cmd))
- Util.execute_cmd(process_cmd,self._logger)
+ Util.execute_cmd(process_cmd,self._logger)
# create hdfs staging.
hdfs_path = "{0}/flow".format(self._hdfs_app_path)
staging_timestamp = datetime.datetime.now().strftime('%M%S%f')[:-4]
- hdfs_staging_path = "{0}/stage/{1}".format(hdfs_path,staging_timestamp)
- create_staging_cmd = "hadoop fs -mkdir -p {0}".format(hdfs_staging_path)
- self._logger.info("Creating staging: {0}".format(create_staging_cmd))
- Util.execute_cmd(create_staging_cmd,self._logger)
+ hdfs_staging_path = "{0}/stage/{1}".format(hdfs_path,staging_timestamp)
+ self._logger.info("Creating staging: {0}".format(hdfs_staging_path))
+ hdfs.mkdir(hdfs_staging_path)
# move to stage.
- mv_to_staging ="hadoop fs -moveFromLocal {0}{1}.csv {2}/.".format(self._local_staging,file_name,hdfs_staging_path)
- self._logger.info("Moving data to staging: {0}".format(mv_to_staging))
- subprocess.call(mv_to_staging,shell=True)
-
- #load to avro
- load_to_avro_cmd = "hive -hiveconf dbname={0} -hiveconf y={1} -hiveconf m={2} -hiveconf d={3} -hiveconf h={4} -hiveconf data_location='{5}' -f pipelines/flow/load_flow_avro_parquet.hql".format(self._db_name,flow_year,flow_month,flow_day,flow_hour,hdfs_staging_path)
-
- self._logger.info( "Loading data to hive: {0}".format(load_to_avro_cmd))
- Util.execute_cmd(load_to_avro_cmd,self._logger)
+ local_file = "{0}{1}.csv".format(self._local_staging, file_name)
+ self._logger.info("Moving data to staging: {0}".format(hdfs_staging_path))
+ hdfs.upload_file(hdfs_staging_path, local_file)
+
+ # load with impyla
+ drop_table = "DROP TABLE IF EXISTS {0}.flow_tmp".format(self._db_name)
+ self._logger.info( "Dropping temp table: {0}".format(drop_table))
+ self._cursor.execute_query(drop_table)
+
+ create_external = ("\n"
+ "CREATE EXTERNAL TABLE {0}.flow_tmp (\n"
+ " treceived STRING,\n"
+ " tryear INT,\n"
+ " trmonth INT,\n"
+ " trday INT,\n"
+ " trhour INT,\n"
+ " trminute INT,\n"
+ " trsec INT,\n"
+ " tdur FLOAT,\n"
+ " sip STRING,\n"
+ " dip STRING,\n"
+ " sport INT,\n"
+ " dport INT,\n"
+ " proto STRING,\n"
+ " flag STRING,\n"
+ " fwd INT,\n"
+ " stos INT,\n"
+ " ipkt BIGINT,\n"
+ " ibyt BIGINT,\n"
+ " opkt BIGINT,\n"
+ " obyt BIGINT,\n"
+ " input INT,\n"
+ " output INT,\n"
+ " sas INT,\n"
+ " das INT,\n"
+ " dtos INT,\n"
+ " dir INT,\n"
+ " rip STRING\n"
+ " )\n"
+ " ROW FORMAT DELIMITED FIELDS TERMINATED BY ','\n"
+ " STORED AS TEXTFILE\n"
+ " LOCATION '{1}'\n"
+ " TBLPROPERTIES ('avro.schema.literal'='{{\n"
+ " \"type\": \"record\"\n"
+ " , \"name\": \"RawFlowRecord\"\n"
+ " , \"namespace\" : \"com.cloudera.accelerators.flows.avro\"\n"
+ " , \"fields\": [\n"
+ " {{\"name\": \"treceived\", \"type\":[\"string\", \"null\"]}}\n"
+ " , {{\"name\": \"tryear\", \"type\":[\"float\", \"null\"]}}\n"
+ " , {{\"name\": \"trmonth\", \"type\":[\"float\", \"null\"]}}\n"
+ " , {{\"name\": \"trday\", \"type\":[\"float\", \"null\"]}}\n"
+ " , {{\"name\": \"trhour\", \"type\":[\"float\", \"null\"]}}\n"
+ " , {{\"name\": \"trminute\", \"type\":[\"float\", \"null\"]}}\n"
+ " , {{\"name\": \"trsec\", \"type\":[\"float\", \"null\"]}}\n"
+ " , {{\"name\": \"tdur\", \"type\":[\"float\", \"null\"]}}\n"
+ " , {{\"name\": \"sip\", \"type\":[\"string\", \"null\"]}}\n"
+ " , {{\"name\": \"sport\", \"type\":[\"int\", \"null\"]}}\n"
+ " , {{\"name\": \"dip\", \"type\":[\"string\", \"null\"]}}\n"
+ " , {{\"name\": \"dport\", \"type\":[\"int\", \"null\"]}}\n"
+ " , {{\"name\": \"proto\", \"type\":[\"string\", \"null\"]}}\n"
+ " , {{\"name\": \"flag\", \"type\":[\"string\", \"null\"]}}\n"
+ " , {{\"name\": \"fwd\", \"type\":[\"int\", \"null\"]}}\n"
+ " , {{\"name\": \"stos\", \"type\":[\"int\", \"null\"]}}\n"
+ " , {{\"name\": \"ipkt\", \"type\":[\"bigint\", \"null\"]}}\n"
+ " , {{\"name\": \"ibytt\", \"type\":[\"bigint\", \"null\"]}}\n"
+ " , {{\"name\": \"opkt\", \"type\":[\"bigint\", \"null\"]}}\n"
+ " , {{\"name\": \"obyt\", \"type\":[\"bigint\", \"null\"]}}\n"
+ " , {{\"name\": \"input\", \"type\":[\"int\", \"null\"]}}\n"
+ " , {{\"name\": \"output\", \"type\":[\"int\", \"null\"]}}\n"
+ " , {{\"name\": \"sas\", \"type\":[\"int\", \"null\"]}}\n"
+ " , {{\"name\": \"das\", \"type\":[\"int\", \"null\"]}}\n"
+ " , {{\"name\": \"dtos\", \"type\":[\"int\", \"null\"]}}\n"
+ " , {{\"name\": \"dir\", \"type\":[\"int\", \"null\"]}}\n"
+ " , {{\"name\": \"rip\", \"type\":[\"string\", \"null\"]}}\n"
+ " ]\n"
+ "}}')\n"
+ ).format(self._db_name, hdfs_staging_path)
+ self._logger.info( "Creating external table: {0}".format(create_external))
+ self._cursor.execute_query(create_external)
+
+ insert_into_table = """
+ INSERT INTO TABLE {0}.flow
+ PARTITION (y={1}, m={2}, d={3}, h={4})
+ SELECT treceived, unix_timestamp(treceived) AS unix_tstamp, tryear, trmonth, trday, trhour, trminute, trsec,
+ tdur, sip, dip, sport, dport, proto, flag, fwd, stos, ipkt, ibyt, opkt, obyt, input, output,
+ sas, das, dtos, dir, rip
+ FROM {0}.flow_tmp
+ """.format(self._db_name, flow_year, flow_month, flow_day, flow_hour)
+ self._logger.info( "Loading data to {0}: {1}"
+ .format(self._db_name, insert_into_table)
+ )
+ self._cursor.execute_query(insert_into_table)
# remove from hdfs staging
- rm_hdfs_staging_cmd = "hadoop fs -rm -R -skipTrash {0}".format(hdfs_staging_path)
- self._logger.info("Removing staging path: {0}".format(rm_hdfs_staging_cmd))
- Util.execute_cmd(rm_hdfs_staging_cmd,self._logger)
+ self._logger.info("Removing staging path: {0}".format(hdfs_staging_path))
+ hdfs.delete_folder(hdfs_staging_path)
# remove from local staging.
rm_local_staging = "rm {0}{1}".format(self._local_staging,file_name)
self._logger.info("Removing files from local staging: {0}".format(rm_local_staging))
Util.execute_cmd(rm_local_staging,self._logger)
- self._logger.info("File {0} was successfully processed.".format(file_name))
-
+ rm_local_staging = "rm {0}".format(sf)
+ self._logger.info("Removing files from local staging: {0}".format(rm_local_staging))
+ Util.execute_cmd(rm_local_staging,self._logger)
+ self._logger.info("File {0} was successfully processed.".format(file_name))
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/pipelines/proxy/collector.py
----------------------------------------------------------------------
diff --git a/spot-ingest/pipelines/proxy/collector.py b/spot-ingest/pipelines/proxy/collector.py
index 69d708c..008b568 100644
--- a/spot-ingest/pipelines/proxy/collector.py
+++ b/spot-ingest/pipelines/proxy/collector.py
@@ -23,7 +23,7 @@ import os
import sys
import copy
from common.utils import Util, NewFileEvent
-from common.kafka_client import KafkaTopic
+from common.kafka_client import KafkaProducer
from multiprocessing import Pool
from common.file_collector import FileWatcher
import time
@@ -106,10 +106,10 @@ def ingest_file(file,message_size,topic,kafka_servers):
for line in f:
message += line
if len(message) > message_size:
- KafkaTopic.SendMessage(message,kafka_servers,topic,0)
+ KafkaProducer.SendMessage(message, kafka_servers, topic, 0)
message = ""
#send the last package.
- KafkaTopic.SendMessage(message,kafka_servers,topic,0)
+ KafkaProducer.SendMessage(message, kafka_servers, topic, 0)
rm_file = "rm {0}".format(file)
Util.execute_cmd(rm_file,logger)
logger.info("File {0} has been successfully sent to Kafka Topic: {1}".format(file,topic))
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/worker.py
----------------------------------------------------------------------
diff --git a/spot-ingest/worker.py b/spot-ingest/worker.py
index 5c29148..ce758c5 100755
--- a/spot-ingest/worker.py
+++ b/spot-ingest/worker.py
@@ -24,11 +24,13 @@ import sys
from common.utils import Util
from common.kerberos import Kerberos
from common.kafka_client import KafkaConsumer
+import common.configurator as Config
SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
CONF_FILE = "{0}/ingest_conf.json".format(SCRIPT_PATH)
WORKER_CONF = json.loads(open(CONF_FILE).read())
+
def main():
# input parameters
@@ -63,8 +65,8 @@ def start_worker(type, topic, id, processes=None):
logger.error("The provided data source {0} is not valid".format(type))
sys.exit(1)
- # validate if kerberos authentication is requiered.
- if os.getenv('KRB_AUTH'):
+ # validate if kerberos authentication is required.
+ if Config.kerberos_enabled():
kb = Kerberos()
kb.authenticate()