You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by na...@apache.org on 2018/03/19 19:28:45 UTC

[36/42] incubator-spot git commit: fixes for save_data()

fixes for save_data()


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/2ea6b4ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/2ea6b4ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/2ea6b4ea

Branch: refs/heads/SPOT-181_ODM
Commit: 2ea6b4eac7c11ef6084955179eb211b696737e9e
Parents: b9befd7
Author: tpltnt <tp...@dropcut.net>
Authored: Thu Jan 25 12:01:31 2018 +0100
Committer: tpltnt <tp...@dropcut.net>
Committed: Thu Jan 25 12:01:31 2018 +0100

----------------------------------------------------------------------
 spot-ingest/pipelines/proxy/bluecoat.py | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/2ea6b4ea/spot-ingest/pipelines/proxy/bluecoat.py
----------------------------------------------------------------------
diff --git a/spot-ingest/pipelines/proxy/bluecoat.py b/spot-ingest/pipelines/proxy/bluecoat.py
index c2ddb04..597c13c 100644
--- a/spot-ingest/pipelines/proxy/bluecoat.py
+++ b/spot-ingest/pipelines/proxy/bluecoat.py
@@ -148,21 +148,28 @@ def proxy_parser(proxy_fields):
     return proxy_parsed_data
 
 
-def save_data(rdd,sqc,db,db_table,topic):
+def save_data(rdd, sqc, db, db_table, topic):
     """
     Create and save a data frame with the given data.
+
+    :param rdd: collection of objects (Resilient Distributed Dataset) to store
+    :param sqc: Apache Hive context
+    :param db: Apache Hive database to save into
+    :param db_table: table of `db` to save into
+    :param topic: Apache Kafka topic to listen for (if `rdd` is empty)
     """
     if not rdd.isEmpty():
 
-        df = sqc.createDataFrame(rdd,proxy_schema)        
+        df = sqc.createDataFrame(rdd, proxy_schema)
         sqc.setConf("hive.exec.dynamic.partition", "true")
         sqc.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
-        hive_table = "{0}.{1}".format(db,db_table)
+        hive_table = "{0}.{1}".format(db, db_table)
         df.write.format("parquet").mode("append").insertInto(hive_table)
 
     else:
         print("------------------------LISTENING KAFKA TOPIC:{0}------------------------".format(topic))
 
+
 def bluecoat_parse(zk,topic,db,db_table,num_of_workers,batch_size):
     
     app_name = topic