You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by na...@apache.org on 2018/03/19 19:28:46 UTC
[37/42] incubator-spot git commit: fixed bluecoat_parse()
fixed bluecoat_parse()
Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/6b79abbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/6b79abbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/6b79abbb
Branch: refs/heads/SPOT-181_ODM
Commit: 6b79abbb079d99d283664382fba131864049f1fa
Parents: 2ea6b4e
Author: tpltnt <tp...@dropcut.net>
Authored: Thu Jan 25 12:40:38 2018 +0100
Committer: tpltnt <tp...@dropcut.net>
Committed: Thu Jan 25 12:42:22 2018 +0100
----------------------------------------------------------------------
spot-ingest/pipelines/proxy/bluecoat.py | 21 +++++++++++++++------
1 file changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/6b79abbb/spot-ingest/pipelines/proxy/bluecoat.py
----------------------------------------------------------------------
diff --git a/spot-ingest/pipelines/proxy/bluecoat.py b/spot-ingest/pipelines/proxy/bluecoat.py
index 597c13c..2f5da0d 100644
--- a/spot-ingest/pipelines/proxy/bluecoat.py
+++ b/spot-ingest/pipelines/proxy/bluecoat.py
@@ -170,21 +170,30 @@ def save_data(rdd, sqc, db, db_table, topic):
print("------------------------LISTENING KAFKA TOPIC:{0}------------------------".format(topic))
-def bluecoat_parse(zk,topic,db,db_table,num_of_workers,batch_size):
-
+def bluecoat_parse(zk, topic, db, db_table, num_of_workers, batch_size):
+ """
+ Parse and save bluecoat logs.
+
+ :param zk: Apache ZooKeeper quorum
+ :param topic: Apache Kafka topic (application name)
+ :param db: Apache Hive database to save into
+ :param db_table: table of `db` to save into
+ :param num_of_workers: number of Apache Kafka workers
+ :param batch_size: batch size for Apache Spark streaming context
+ """
app_name = topic
wrks = int(num_of_workers)
# create spark context
sc = SparkContext(appName=app_name)
- ssc = StreamingContext(sc,int(batch_size))
+ ssc = StreamingContext(sc, int(batch_size))
sqc = HiveContext(sc)
tp_stream = KafkaUtils.createStream(ssc, zk, app_name, {topic: wrks}, keyDecoder=spot_decoder, valueDecoder=spot_decoder)
- proxy_data = tp_stream.map(lambda row: row[1]).flatMap(lambda row: row.split("\n")).filter(lambda row: rex_date.match(row)).map(lambda row: row.strip("\n").strip("\r").replace("\t", " ").replace(" ", " ")).map(lambda row: split_log_entry(row)).map(lambda row: proxy_parser(row))
- saved_data = proxy_data.foreachRDD(lambda row: save_data(row,sqc,db,db_table,topic))
- ssc.start();
+ proxy_data = tp_stream.map(lambda row: row[1]).flatMap(lambda row: row.split("\n")).filter(lambda row: rex_date.match(row)).map(lambda row: row.strip("\n").strip("\r").replace("\t", " ").replace(" ", " ")).map(lambda row: split_log_entry(row)).map(lambda row: proxy_parser(row))
+ saved_data = proxy_data.foreachRDD(lambda row: save_data(row, sqc, db, db_table, topic))
+ ssc.start()
ssc.awaitTermination()