You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by na...@apache.org on 2018/03/19 19:28:18 UTC

[09/42] incubator-spot git commit: PEP8 fixes

PEP8 fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/ce70f882
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/ce70f882
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/ce70f882

Branch: refs/heads/SPOT-181_ODM
Commit: ce70f882b09189ec62d1ad60f4ff2411acb05c2a
Parents: dbf6f51
Author: tpltnt <tp...@dropcut.net>
Authored: Fri Dec 29 16:54:14 2017 +0100
Committer: tpltnt <tp...@dropcut.net>
Committed: Fri Dec 29 17:32:39 2017 +0100

----------------------------------------------------------------------
 spot-ingest/master_collector.py | 60 ++++++++++++++++++++----------------
 spot-ingest/worker.py           | 56 ++++++++++++++++++---------------
 2 files changed, 64 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/ce70f882/spot-ingest/master_collector.py
----------------------------------------------------------------------
diff --git a/spot-ingest/master_collector.py b/spot-ingest/master_collector.py
index 9cd91ea..6f6ff7c 100755
--- a/spot-ingest/master_collector.py
+++ b/spot-ingest/master_collector.py
@@ -21,70 +21,76 @@ import argparse
 import os
 import json
 import sys
+import datetime
 from common.utils import Util
 from common.kerberos import Kerberos
 from common.kafka_client import KafkaTopic
-import datetime 
+
 
 # get master configuration.
-script_path = os.path.dirname(os.path.abspath(__file__))
-conf_file = "{0}/ingest_conf.json".format(script_path)
-master_conf = json.loads(open (conf_file).read())
+SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
+CONF_FILE = "{0}/ingest_conf.json".format(SCRIPT_PATH)
+MASTER_CONF = json.loads(open(CONF_FILE).read())
 
 def main():
 
     # input Parameters
     parser = argparse.ArgumentParser(description="Master Collector Ingest Daemon")
-    parser.add_argument('-t','--type',dest='type',required=True,help='Type of data that will be ingested (Pipeline Configuration)',metavar='')
-    parser.add_argument('-w','--workers',dest='workers_num',required=True,help='Number of workers for the ingest process',metavar='')
-    parser.add_argument('-id','--ingestId',dest='ingest_id',required=False,help='Ingest ID',metavar='')
+    parser.add_argument('-t', '--type', dest='type', required=True,
+                        help='Type of data that will be ingested (Pipeline Configuration)',
+                        metavar='')
+    parser.add_argument('-w', '--workers', dest='workers_num',
+                        required=True, help='Number of workers for the ingest process',
+                        metavar='')
+    parser.add_argument('-id', '--ingestId', dest='ingest_id',
+                        required=False, help='Ingest ID', metavar='')
     args = parser.parse_args()
 
     # start collector based on data source type.
-    start_collector(args.type,args.workers_num,args.ingest_id)
+    start_collector(args.type, args.workers_num, args.ingest_id)
 
-def start_collector(type,workers_num,id=None):
+def start_collector(type, workers_num, id=None):
 
     # generate ingest id
-    ingest_id = str(datetime.datetime.time(datetime.datetime.now())).replace(":","_").replace(".","_")
-    
+    ingest_id = str(datetime.datetime.time(datetime.datetime.now())).replace(":", "_").replace(".", "_")
+
     # create logger.
     logger = Util.get_logger("SPOT.INGEST")
 
     # validate the given configuration exists in ingest_conf.json.
-    if not type in master_conf["pipelines"]:
-        logger.error("'{0}' type is not a valid configuration.".format(type));
+    if not type in MASTER_CONF["pipelines"]:
+        logger.error("'{0}' type is not a valid configuration.".format(type))
         sys.exit(1)
 
     # validate the type is a valid module.
-    if not Util.validate_data_source(master_conf["pipelines"][type]["type"]):
-        logger.error("'{0}' type is not configured. Please check you ingest conf file".format(master_conf["pipelines"][type]["type"]));
+    if not Util.validate_data_source(MASTER_CONF["pipelines"][type]["type"]):
+        logger.error("'{0}' type is not configured. Please check you ingest conf file".format(MASTER_CONF["pipelines"][type]["type"]))
         sys.exit(1)
-    
+
     # validate if kerberos authentication is required.
     if os.getenv('KRB_AUTH'):
         kb = Kerberos()
         kb.authenticate()
-    
+
     # kafka server info.
     logger.info("Initializing kafka instance")
-    k_server = master_conf["kafka"]['kafka_server']
-    k_port = master_conf["kafka"]['kafka_port']
+    k_server = MASTER_CONF["kafka"]['kafka_server']
+    k_port = MASTER_CONF["kafka"]['kafka_port']
 
     # required zookeeper info.
-    zk_server = master_conf["kafka"]['zookeper_server']
-    zk_port = master_conf["kafka"]['zookeper_port']
-         
-    topic = "SPOT-INGEST-{0}_{1}".format(type,ingest_id) if not id else id
-    kafka = KafkaTopic(topic,k_server,k_port,zk_server,zk_port,workers_num)
+    zk_server = MASTER_CONF["kafka"]['zookeper_server']
+    zk_port = MASTER_CONF["kafka"]['zookeper_port']
+
+    topic = "SPOT-INGEST-{0}_{1}".format(type, ingest_id) if not id else id
+    kafka = KafkaTopic(topic, k_server, k_port, zk_server, zk_port, workers_num)
 
     # create a collector instance based on data source type.
     logger.info("Starting {0} ingest instance".format(topic))
-    module = __import__("pipelines.{0}.collector".format(master_conf["pipelines"][type]["type"]),fromlist=['Collector'])
+    module = __import__("pipelines.{0}.collector".format(MASTER_CONF["pipelines"][type]["type"]), fromlist=['Collector'])
 
     # start collector.
-    ingest_collector = module.Collector(master_conf['hdfs_app_path'],kafka,type)
+    ingest_collector = module.Collector(MASTER_CONF['hdfs_app_path'], kafka, type)
     ingest_collector.start()
 
-if __name__=='__main__':
+if __name__ == '__main__':
     main()

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/ce70f882/spot-ingest/worker.py
----------------------------------------------------------------------
diff --git a/spot-ingest/worker.py b/spot-ingest/worker.py
index db51def..5c29148 100755
--- a/spot-ingest/worker.py
+++ b/spot-ingest/worker.py
@@ -20,42 +20,48 @@
 import argparse
 import os
 import json
-import logging
 import sys
 from common.utils import Util
 from common.kerberos import Kerberos
 from common.kafka_client import KafkaConsumer
 
-script_path = os.path.dirname(os.path.abspath(__file__))
-conf_file = "{0}/ingest_conf.json".format(script_path)
-worker_conf = json.loads(open (conf_file).read())
+SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
+CONF_FILE = "{0}/ingest_conf.json".format(SCRIPT_PATH)
+WORKER_CONF = json.loads(open(CONF_FILE).read())
 
 def main():
 
     # input parameters
     parser = argparse.ArgumentParser(description="Worker Ingest Framework")
-    parser.add_argument('-t','--type',dest='type',required=True,help='Type of data that will be ingested (Pipeline Configuration)',metavar='')
-    parser.add_argument('-i','--id',dest='id',required=True,help='Worker Id, this is needed to sync Kafka and Ingest framework (Partition Number)',metavar='')
-    parser.add_argument('-top','--topic',dest='topic',required=True,help='Topic to read from.',metavar="")
-    parser.add_argument('-p','--processingParallelism',dest='processes',required=False,help='Processing Parallelism',metavar="")
+    parser.add_argument('-t', '--type', dest='type', required=True,
+                        help='Type of data that will be ingested (Pipeline Configuration)',
+                        metavar='')
+    parser.add_argument('-i', '--id', dest='id', required=True,
+                        help='Worker Id, this is needed to sync Kafka and Ingest framework (Partition Number)',
+                        metavar='')
+    parser.add_argument('-top', '--topic', dest='topic', required=True,
+                        help='Topic to read from.', metavar="")
+    parser.add_argument('-p', '--processingParallelism', dest='processes',
+                        required=False, help='Processing Parallelism', metavar="")
     args = parser.parse_args()
 
     # start worker based on the type.
-    start_worker(args.type,args.topic,args.id,args.processes)
+    start_worker(args.type, args.topic, args.id, args.processes)
 
 
-def start_worker(type,topic,id,processes=None):
+def start_worker(type, topic, id, processes=None):
 
     logger = Util.get_logger("SPOT.INGEST.WORKER")
 
     # validate the given configuration exists in ingest_conf.json.
-    if not type in worker_conf["pipelines"]:
-        logger.error("'{0}' type is not a valid configuration.".format(type));
+    if not type in WORKER_CONF["pipelines"]:
+        logger.error("'{0}' type is not a valid configuration.".format(type))
         sys.exit(1)
 
     # validate the type is a valid module.
-    if not Util.validate_data_source(worker_conf["pipelines"][type]["type"]):
-        logger.error("The provided data source {0} is not valid".format(type));sys.exit(1)
+    if not Util.validate_data_source(WORKER_CONF["pipelines"][type]["type"]):
+        logger.error("The provided data source {0} is not valid".format(type))
+        sys.exit(1)
 
     # validate if kerberos authentication is requiered.
     if os.getenv('KRB_AUTH'):
@@ -63,27 +69,27 @@ def start_worker(type,topic,id,processes=None):
         kb.authenticate()
 
     # create a worker instance based on the data source type.
-    module = __import__("pipelines.{0}.worker".format(worker_conf["pipelines"][type]["type"]),fromlist=['Worker'])
+    module = __import__("pipelines.{0}.worker".format(WORKER_CONF["pipelines"][type]["type"]),
+                        fromlist=['Worker'])
 
     # kafka server info.
     logger.info("Initializing kafka instance")
-    k_server = worker_conf["kafka"]['kafka_server']
-    k_port = worker_conf["kafka"]['kafka_port']
+    k_server = WORKER_CONF["kafka"]['kafka_server']
+    k_port = WORKER_CONF["kafka"]['kafka_port']
 
     # required zookeeper info.
-    zk_server = worker_conf["kafka"]['zookeper_server']
-    zk_port = worker_conf["kafka"]['zookeper_port']
+    zk_server = WORKER_CONF["kafka"]['zookeper_server']
+    zk_port = WORKER_CONF["kafka"]['zookeper_port']
     topic = topic
 
     # create kafka consumer.
-    kafka_consumer = KafkaConsumer(topic,k_server,k_port,zk_server,zk_port,id)
+    kafka_consumer = KafkaConsumer(topic, k_server, k_port, zk_server, zk_port, id)
 
     # start worker.
-    db_name = worker_conf['dbname']
-    app_path = worker_conf['hdfs_app_path']
-    ingest_worker = module.Worker(db_name,app_path,kafka_consumer,type,processes)
+    db_name = WORKER_CONF['dbname']
+    app_path = WORKER_CONF['hdfs_app_path']
+    ingest_worker = module.Worker(db_name, app_path, kafka_consumer, type, processes)
     ingest_worker.start()
 
-if __name__=='__main__':
+if __name__ == '__main__':
     main()
-