You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by na...@apache.org on 2018/03/19 19:28:46 UTC

[37/42] incubator-spot git commit: fixed bluecoat_parse()

fixed bluecoat_parse()


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/6b79abbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/6b79abbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/6b79abbb

Branch: refs/heads/SPOT-181_ODM
Commit: 6b79abbb079d99d283664382fba131864049f1fa
Parents: 2ea6b4e
Author: tpltnt <tp...@dropcut.net>
Authored: Thu Jan 25 12:40:38 2018 +0100
Committer: tpltnt <tp...@dropcut.net>
Committed: Thu Jan 25 12:42:22 2018 +0100

----------------------------------------------------------------------
 spot-ingest/pipelines/proxy/bluecoat.py | 21 +++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/6b79abbb/spot-ingest/pipelines/proxy/bluecoat.py
----------------------------------------------------------------------
diff --git a/spot-ingest/pipelines/proxy/bluecoat.py b/spot-ingest/pipelines/proxy/bluecoat.py
index 597c13c..2f5da0d 100644
--- a/spot-ingest/pipelines/proxy/bluecoat.py
+++ b/spot-ingest/pipelines/proxy/bluecoat.py
@@ -170,21 +170,30 @@ def save_data(rdd, sqc, db, db_table, topic):
         print("------------------------LISTENING KAFKA TOPIC:{0}------------------------".format(topic))
 
 
-def bluecoat_parse(zk,topic,db,db_table,num_of_workers,batch_size):
-    
+def bluecoat_parse(zk, topic, db, db_table, num_of_workers, batch_size):
+    """
+    Parse and save bluecoat logs.
+
+    :param zk: Apache ZooKeeper quorum
+    :param topic: Apache Kafka topic (application name)
+    :param db: Apache Hive database to save into
+    :param db_table: table of `db` to save into
+    :param num_of_workers: number of Apache Kafka workers
+    :param batch_size: batch size for Apache Spark streaming context
+    """
     app_name = topic
     wrks = int(num_of_workers)
 
     # create spark context
     sc = SparkContext(appName=app_name)
-    ssc = StreamingContext(sc,int(batch_size))
+    ssc = StreamingContext(sc, int(batch_size))
     sqc = HiveContext(sc)
 
     tp_stream = KafkaUtils.createStream(ssc, zk, app_name, {topic: wrks}, keyDecoder=spot_decoder, valueDecoder=spot_decoder)
 
-    proxy_data = tp_stream.map(lambda row: row[1]).flatMap(lambda row: row.split("\n")).filter(lambda row: rex_date.match(row)).map(lambda row: row.strip("\n").strip("\r").replace("\t", " ").replace("  ", " ")).map(lambda row:  split_log_entry(row)).map(lambda row: proxy_parser(row))
-    saved_data = proxy_data.foreachRDD(lambda row: save_data(row,sqc,db,db_table,topic))
-    ssc.start();
+    proxy_data = tp_stream.map(lambda row: row[1]).flatMap(lambda row: row.split("\n")).filter(lambda row: rex_date.match(row)).map(lambda row: row.strip("\n").strip("\r").replace("\t", " ").replace("  ", " ")).map(lambda row: split_log_entry(row)).map(lambda row: proxy_parser(row))
+    saved_data = proxy_data.foreachRDD(lambda row: save_data(row, sqc, db, db_table, topic))
+    ssc.start()
     ssc.awaitTermination()