You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by na...@apache.org on 2018/03/19 19:28:18 UTC
[09/42] incubator-spot git commit: PEP8 fixes
PEP8 fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/ce70f882
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/ce70f882
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/ce70f882
Branch: refs/heads/SPOT-181_ODM
Commit: ce70f882b09189ec62d1ad60f4ff2411acb05c2a
Parents: dbf6f51
Author: tpltnt <tp...@dropcut.net>
Authored: Fri Dec 29 16:54:14 2017 +0100
Committer: tpltnt <tp...@dropcut.net>
Committed: Fri Dec 29 17:32:39 2017 +0100
----------------------------------------------------------------------
spot-ingest/master_collector.py | 60 ++++++++++++++++++++----------------
spot-ingest/worker.py | 56 ++++++++++++++++++---------------
2 files changed, 64 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/ce70f882/spot-ingest/master_collector.py
----------------------------------------------------------------------
diff --git a/spot-ingest/master_collector.py b/spot-ingest/master_collector.py
index 9cd91ea..6f6ff7c 100755
--- a/spot-ingest/master_collector.py
+++ b/spot-ingest/master_collector.py
@@ -21,70 +21,76 @@ import argparse
import os
import json
import sys
+import datetime
from common.utils import Util
from common.kerberos import Kerberos
from common.kafka_client import KafkaTopic
-import datetime
+
# get master configuration.
-script_path = os.path.dirname(os.path.abspath(__file__))
-conf_file = "{0}/ingest_conf.json".format(script_path)
-master_conf = json.loads(open (conf_file).read())
+SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
+CONF_FILE = "{0}/ingest_conf.json".format(SCRIPT_PATH)
+MASTER_CONF = json.loads(open(CONF_FILE).read())
def main():
# input Parameters
parser = argparse.ArgumentParser(description="Master Collector Ingest Daemon")
- parser.add_argument('-t','--type',dest='type',required=True,help='Type of data that will be ingested (Pipeline Configuration)',metavar='')
- parser.add_argument('-w','--workers',dest='workers_num',required=True,help='Number of workers for the ingest process',metavar='')
- parser.add_argument('-id','--ingestId',dest='ingest_id',required=False,help='Ingest ID',metavar='')
+ parser.add_argument('-t', '--type', dest='type', required=True,
+ help='Type of data that will be ingested (Pipeline Configuration)',
+ metavar='')
+ parser.add_argument('-w', '--workers', dest='workers_num',
+ required=True, help='Number of workers for the ingest process',
+ metavar='')
+ parser.add_argument('-id', '--ingestId', dest='ingest_id',
+ required=False, help='Ingest ID', metavar='')
args = parser.parse_args()
# start collector based on data source type.
- start_collector(args.type,args.workers_num,args.ingest_id)
+ start_collector(args.type, args.workers_num, args.ingest_id)
-def start_collector(type,workers_num,id=None):
+def start_collector(type, workers_num, id=None):
# generate ingest id
- ingest_id = str(datetime.datetime.time(datetime.datetime.now())).replace(":","_").replace(".","_")
-
+ ingest_id = str(datetime.datetime.time(datetime.datetime.now())).replace(":", "_").replace(".", "_")
+
# create logger.
logger = Util.get_logger("SPOT.INGEST")
# validate the given configuration exists in ingest_conf.json.
- if not type in master_conf["pipelines"]:
- logger.error("'{0}' type is not a valid configuration.".format(type));
+ if not type in MASTER_CONF["pipelines"]:
+ logger.error("'{0}' type is not a valid configuration.".format(type))
sys.exit(1)
# validate the type is a valid module.
- if not Util.validate_data_source(master_conf["pipelines"][type]["type"]):
- logger.error("'{0}' type is not configured. Please check you ingest conf file".format(master_conf["pipelines"][type]["type"]));
+ if not Util.validate_data_source(MASTER_CONF["pipelines"][type]["type"]):
+ logger.error("'{0}' type is not configured. Please check you ingest conf file".format(MASTER_CONF["pipelines"][type]["type"]))
sys.exit(1)
-
+
# validate if kerberos authentication is required.
if os.getenv('KRB_AUTH'):
kb = Kerberos()
kb.authenticate()
-
+
# kafka server info.
logger.info("Initializing kafka instance")
- k_server = master_conf["kafka"]['kafka_server']
- k_port = master_conf["kafka"]['kafka_port']
+ k_server = MASTER_CONF["kafka"]['kafka_server']
+ k_port = MASTER_CONF["kafka"]['kafka_port']
# required zookeeper info.
- zk_server = master_conf["kafka"]['zookeper_server']
- zk_port = master_conf["kafka"]['zookeper_port']
-
- topic = "SPOT-INGEST-{0}_{1}".format(type,ingest_id) if not id else id
- kafka = KafkaTopic(topic,k_server,k_port,zk_server,zk_port,workers_num)
+ zk_server = MASTER_CONF["kafka"]['zookeper_server']
+ zk_port = MASTER_CONF["kafka"]['zookeper_port']
+
+ topic = "SPOT-INGEST-{0}_{1}".format(type, ingest_id) if not id else id
+ kafka = KafkaTopic(topic, k_server, k_port, zk_server, zk_port, workers_num)
# create a collector instance based on data source type.
logger.info("Starting {0} ingest instance".format(topic))
- module = __import__("pipelines.{0}.collector".format(master_conf["pipelines"][type]["type"]),fromlist=['Collector'])
+ module = __import__("pipelines.{0}.collector".format(MASTER_CONF["pipelines"][type]["type"]), fromlist=['Collector'])
# start collector.
- ingest_collector = module.Collector(master_conf['hdfs_app_path'],kafka,type)
+ ingest_collector = module.Collector(MASTER_CONF['hdfs_app_path'], kafka, type)
ingest_collector.start()
-if __name__=='__main__':
+if __name__ == '__main__':
main()
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/ce70f882/spot-ingest/worker.py
----------------------------------------------------------------------
diff --git a/spot-ingest/worker.py b/spot-ingest/worker.py
index db51def..5c29148 100755
--- a/spot-ingest/worker.py
+++ b/spot-ingest/worker.py
@@ -20,42 +20,48 @@
import argparse
import os
import json
-import logging
import sys
from common.utils import Util
from common.kerberos import Kerberos
from common.kafka_client import KafkaConsumer
-script_path = os.path.dirname(os.path.abspath(__file__))
-conf_file = "{0}/ingest_conf.json".format(script_path)
-worker_conf = json.loads(open (conf_file).read())
+SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
+CONF_FILE = "{0}/ingest_conf.json".format(SCRIPT_PATH)
+WORKER_CONF = json.loads(open(CONF_FILE).read())
def main():
# input parameters
parser = argparse.ArgumentParser(description="Worker Ingest Framework")
- parser.add_argument('-t','--type',dest='type',required=True,help='Type of data that will be ingested (Pipeline Configuration)',metavar='')
- parser.add_argument('-i','--id',dest='id',required=True,help='Worker Id, this is needed to sync Kafka and Ingest framework (Partition Number)',metavar='')
- parser.add_argument('-top','--topic',dest='topic',required=True,help='Topic to read from.',metavar="")
- parser.add_argument('-p','--processingParallelism',dest='processes',required=False,help='Processing Parallelism',metavar="")
+ parser.add_argument('-t', '--type', dest='type', required=True,
+ help='Type of data that will be ingested (Pipeline Configuration)',
+ metavar='')
+ parser.add_argument('-i', '--id', dest='id', required=True,
+ help='Worker Id, this is needed to sync Kafka and Ingest framework (Partition Number)',
+ metavar='')
+ parser.add_argument('-top', '--topic', dest='topic', required=True,
+ help='Topic to read from.', metavar="")
+ parser.add_argument('-p', '--processingParallelism', dest='processes',
+ required=False, help='Processing Parallelism', metavar="")
args = parser.parse_args()
# start worker based on the type.
- start_worker(args.type,args.topic,args.id,args.processes)
+ start_worker(args.type, args.topic, args.id, args.processes)
-def start_worker(type,topic,id,processes=None):
+def start_worker(type, topic, id, processes=None):
logger = Util.get_logger("SPOT.INGEST.WORKER")
# validate the given configuration exists in ingest_conf.json.
- if not type in worker_conf["pipelines"]:
- logger.error("'{0}' type is not a valid configuration.".format(type));
+ if not type in WORKER_CONF["pipelines"]:
+ logger.error("'{0}' type is not a valid configuration.".format(type))
sys.exit(1)
# validate the type is a valid module.
- if not Util.validate_data_source(worker_conf["pipelines"][type]["type"]):
- logger.error("The provided data source {0} is not valid".format(type));sys.exit(1)
+ if not Util.validate_data_source(WORKER_CONF["pipelines"][type]["type"]):
+ logger.error("The provided data source {0} is not valid".format(type))
+ sys.exit(1)
# validate if kerberos authentication is requiered.
if os.getenv('KRB_AUTH'):
@@ -63,27 +69,27 @@ def start_worker(type,topic,id,processes=None):
kb.authenticate()
# create a worker instance based on the data source type.
- module = __import__("pipelines.{0}.worker".format(worker_conf["pipelines"][type]["type"]),fromlist=['Worker'])
+ module = __import__("pipelines.{0}.worker".format(WORKER_CONF["pipelines"][type]["type"]),
+ fromlist=['Worker'])
# kafka server info.
logger.info("Initializing kafka instance")
- k_server = worker_conf["kafka"]['kafka_server']
- k_port = worker_conf["kafka"]['kafka_port']
+ k_server = WORKER_CONF["kafka"]['kafka_server']
+ k_port = WORKER_CONF["kafka"]['kafka_port']
# required zookeeper info.
- zk_server = worker_conf["kafka"]['zookeper_server']
- zk_port = worker_conf["kafka"]['zookeper_port']
+ zk_server = WORKER_CONF["kafka"]['zookeper_server']
+ zk_port = WORKER_CONF["kafka"]['zookeper_port']
topic = topic
# create kafka consumer.
- kafka_consumer = KafkaConsumer(topic,k_server,k_port,zk_server,zk_port,id)
+ kafka_consumer = KafkaConsumer(topic, k_server, k_port, zk_server, zk_port, id)
# start worker.
- db_name = worker_conf['dbname']
- app_path = worker_conf['hdfs_app_path']
- ingest_worker = module.Worker(db_name,app_path,kafka_consumer,type,processes)
+ db_name = WORKER_CONF['dbname']
+ app_path = WORKER_CONF['hdfs_app_path']
+ ingest_worker = module.Worker(db_name, app_path, kafka_consumer, type, processes)
ingest_worker.start()
-if __name__=='__main__':
+if __name__ == '__main__':
main()
-