You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by ev...@apache.org on 2017/03/29 16:51:35 UTC

[11/50] [abbrv] incubator-spot git commit: Removed csv files from OA and ipython notebooks

Removed csv files from OA and ipython notebooks


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/1904f2b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/1904f2b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/1904f2b4

Branch: refs/heads/SPOT-35_graphql_api
Commit: 1904f2b44cec2bd8cc7c6a22efad0009ddce1d3c
Parents: 2c951e9
Author: LedaLima <le...@apache.org>
Authored: Sun Mar 5 18:20:05 2017 -0600
Committer: Diego Ortiz Huerta <di...@intel.com>
Committed: Wed Mar 15 11:49:47 2017 -0700

----------------------------------------------------------------------
 spot-oa/oa/dns/dns_conf.json                    |  36 +-
 spot-oa/oa/dns/dns_oa.py                        | 318 ++++-----
 .../Edge_Investigation_master.ipynb             | 183 +++---
 .../Threat_Investigation_master.ipynb           | 186 +++---
 spot-oa/oa/flow/flow_conf.json                  |  27 +-
 spot-oa/oa/flow/flow_oa.py                      | 245 ++++---
 .../Edge_Investigation_master.ipynb             | 302 ++-------
 .../Threat_Investigation_master.ipynb           | 651 ++++---------------
 .../Edge_Investigation_master.ipynb             | 150 ++---
 .../Threat_Investigation_master.ipynb           | 306 ++++-----
 spot-oa/oa/proxy/proxy_conf.json                |  10 +-
 spot-oa/oa/proxy/proxy_oa.py                    | 232 ++++---
 spot-oa/oa/utils.py                             |  12 +
 13 files changed, 1035 insertions(+), 1623 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1904f2b4/spot-oa/oa/dns/dns_conf.json
----------------------------------------------------------------------
diff --git a/spot-oa/oa/dns/dns_conf.json b/spot-oa/oa/dns/dns_conf.json
index aadd92c..0e23a38 100644
--- a/spot-oa/oa/dns/dns_conf.json
+++ b/spot-oa/oa/dns/dns_conf.json
@@ -11,27 +11,25 @@
         , "score" : 8  
     },
     "dns_score_fields": 
-    {
+    { 
         "frame_time" : 0
-        , "frame_len" : 1 
-        , "ip_dst": 2
-        , "dns_qry_name" : 3
-        , "dns_qry_class" : 4
-        , "dns_qry_type" : 5
-        , "dns_qry_rcode" : 6
-        , "score" : 7
-        , "tld" : 8
-        , "query_rep" : 9 
-        , "hh": 10
-        , "ip_sev" : 11
-        , "dns_sev" : 12
-        , "dns_qry_class_name" : 13
-        , "dns_qry_type_name" : 14
-        , "dns_qry_rcode_name" : 15
-        , "network_context" : 16
-        , "unix_tstamp": 17
+        ,"unix_tstamp" : 1
+        , "frame_len" : 2
+        , "ip_dst": 3
+        , "dns_qry_name" : 4
+        , "dns_qry_class" : 5
+        , "dns_qry_type" : 6
+        , "dns_qry_rcode" : 7
+        , "score" : 8
+        , "tld" : 9
+        , "query_rep" : 10
+        , "hh": 11
+        , "dns_qry_class_name" : 12
+        , "dns_qry_type_name" : 13
+        , "dns_qry_rcode_name" : 14
+        , "network_context" : 15
     },
     "add_reputation":{
         "query_rep":4   
     }
-}
\ No newline at end of file
+} 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1904f2b4/spot-oa/oa/dns/dns_oa.py
----------------------------------------------------------------------
diff --git a/spot-oa/oa/dns/dns_oa.py b/spot-oa/oa/dns/dns_oa.py
index 2033c89..8d3ce80 100644
--- a/spot-oa/oa/dns/dns_oa.py
+++ b/spot-oa/oa/dns/dns_oa.py
@@ -23,7 +23,8 @@ import sys
 import datetime
 import csv, math
 from tld import get_tld
-
+import api.resources.impala_engine as impala
+import api.resources.hdfs_client as HDFSClient
 from collections import OrderedDict
 from utils import Util
 from components.data.data import Data
@@ -53,22 +54,20 @@ class OA(object):
         self._data_path = None
         self._ipynb_path = None
         self._ingest_summary_path = None
-        self._dns_scores = []
-        self._dns_scores_headers = []
+        self._dns_scores = [] 
         self._results_delimiter = '\t'
         self._details_limit = 250
 
         # get app configuration.
         self._spot_conf = Util.get_spot_conf()
 
-        # get scores fields conf
+        # # get scores fields conf
         conf_file = "{0}/dns_conf.json".format(self._scrtip_path)
         self._conf = json.loads(open (conf_file).read(),object_pairs_hook=OrderedDict)
 
         # initialize data engine
         self._db = self._spot_conf.get('conf', 'DBNAME').replace("'", "").replace('"', '')
-        self._engine = Data(self._db,self._table_name ,self._logger) 
-
+        
 
     def start(self):
 
@@ -76,15 +75,16 @@ class OA(object):
         start = time.time()
         ####################
 
+        self._clear_previous_executions()
         self._create_folder_structure()
         self._add_ipynb()
         self._get_dns_results()
         self._add_tld_column()
         self._add_reputation()
-        self._add_hh_and_severity()
+        self._add_hh_column()
         self._add_iana()
         self._add_network_context()
-        self._create_dns_scores_csv()
+        self._create_dns_scores()
         self._get_oa_details()
         self._ingest_summary()
 
@@ -93,12 +93,33 @@ class OA(object):
         print(end - start)
         ##################
 
-    def _create_folder_structure(self):
 
+    def _clear_previous_executions(self):
+        
+        self._logger.info("Cleaning data from previous executions for the day")       
+        yr = self._date[:4]
+        mn = self._date[4:6]
+        dy = self._date[6:]  
+        table_schema = []
+        HUSER = self._spot_conf.get('conf', 'HUSER').replace("'", "").replace('"', '')
+        table_schema=['suspicious', 'edge', 'dendro', 'threat_dendro', 'threat_investigation', 'storyboard' ]
+
+        for path in table_schema:
+            HDFSClient.delete_folder("{0}/{1}/hive/oa/{2}/y={3}/m={4}/d={5}".format(HUSER,self._table_name,path,yr,mn,dy),user="impala")
+        HDFSClient.delete_folder("{0}/{1}/hive/oa/{2}/y={3}/m={4}".format(HUSER,self._table_name,"summary",yr,mn),user="impala")
+        #removes Feedback file
+        HDFSClient.delete_folder("{0}/{1}/scored_results/{2}{3}{4}/feedback/ml_feedback.csv".format(HUSER,self._table_name,yr,mn,dy))
+        #removes json files from the storyboard
+        HDFSClient.delete_folder("{0}/{1}/oa/{2}/{3}/{4}/{5}".format(HUSER,self._table_name,"storyboard",yr,mn,dy))
+
+
+    def _create_folder_structure(self):
+        
         # create date folder structure if it does not exist.
         self._logger.info("Creating folder structure for OA (data and ipynb)")       
         self._data_path,self._ingest_summary_path,self._ipynb_path = Util.create_oa_folders("dns",self._date)
     
+
     def _add_ipynb(self):
 
         if os.path.isdir(self._ipynb_path):
@@ -125,59 +146,59 @@ class OA(object):
         # get results file from hdfs.
         get_command = Util.get_ml_results_form_hdfs(hdfs_path,self._data_path)
         self._logger.info("{0}".format(get_command))
-
-         # validate files exists
+ 
         if os.path.isfile(dns_results):
-
+    
             # read number of results based in the limit specified.
             self._logger.info("Reading {0} dns results file: {1}".format(self._date,dns_results))
             self._dns_results = Util.read_results(dns_results,self._limit,self._results_delimiter)[:]
-            if len(self._dns_results) == 0: self._logger.error("There are not flow results.");sys.exit(1)
+            if len(self._dns_results) == 0: self._logger.error("There are not dns results.");sys.exit(1)
 
         else:
             self._logger.error("There was an error getting ML results from HDFS")
-            sys.exit(1)
-
-        # add headers.        
-        self._logger.info("Adding headers")
-        self._dns_scores_headers = [  str(key) for (key,value) in self._conf['dns_score_fields'].items() ]
+            sys.exit(1) 
 
         # add dns content.
-        self._dns_scores = [ conn[:]  for conn in self._dns_results][:]       
+        self._dns_scores = [ conn[:]  for conn in self._dns_results][:]    
 
-    def _move_time_stamp(self,dns_data):
-        
-        for dns in dns_data:
-            time_stamp = dns[1]
-            dns.remove(time_stamp)
-            dns.append(time_stamp)
+
+    def _move_time_stamp(self,dns_data): 
         
+        # return dns_data_ordered
         return dns_data        
 
-    def _create_dns_scores_csv(self):
+
+    def _create_dns_scores(self):
         
-        dns_scores_csv = "{0}/dns_scores.csv".format(self._data_path)
-        dns_scores_final =  self._move_time_stamp(self._dns_scores)
-        dns_scores_final.insert(0,self._dns_scores_headers)
-        Util.create_csv_file(dns_scores_csv,dns_scores_final)   
+        # get date parameters.
+        yr = self._date[:4]
+        mn = self._date[4:6]
+        dy = self._date[6:] 
+        value_string = ""
 
-        # create bk file
-        dns_scores_bu_csv = "{0}/dns_scores_bu.csv".format(self._data_path)
-        Util.create_csv_file(dns_scores_bu_csv,dns_scores_final)     
+        dns_scores_final = self._move_time_stamp(self._dns_scores)
+        self._dns_scores = dns_scores_final
+        for row in dns_scores_final:
+            value_string += str(tuple(Util.cast_val(item) for item in row)) + ","              
+    
+        load_into_impala = ("""
+             INSERT INTO {0}.dns_scores partition(y={2}, m={3}, d={4}) VALUES {1}
+        """).format(self._db, value_string[:-1], yr, mn, dy) 
+        impala.execute_query(load_into_impala)
 
 
     def _add_tld_column(self):
         qry_name_col = self._conf['dns_results_fields']['dns_qry_name'] 
-        self._dns_scores = [conn + [ get_tld("http://" + str(conn[qry_name_col]), fail_silently=True) if "http://" not in str(conn[qry_name_col]) else get_tld(str(conn[qry_name_col]), fail_silently=True)] for conn in self._dns_scores ] 
-  
+        self._dns_scores = [conn + [ get_tld("http://" + str(conn[qry_name_col]), fail_silently=True) if "http://" not in str(conn[qry_name_col]) else get_tld(str(conn[qry_name_col]), fail_silently=True)] for conn in self._dns_scores ]
+        
+
     def _add_reputation(self):
 
         # read configuration.
         reputation_conf_file = "{0}/components/reputation/reputation_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
         self._logger.info("Reading reputation configuration file: {0}".format(reputation_conf_file))
         rep_conf = json.loads(open(reputation_conf_file).read())
-        
-        
+                
         # initialize reputation services.
         self._rep_services = []
         self._logger.info("Initializing reputation services.")
@@ -199,7 +220,6 @@ class OA(object):
         # get reputation per column.
         self._logger.info("Getting reputation for each service in config")        
         rep_services_results = []
-
  
         if self._rep_services :
             for key,value in rep_cols.items():
@@ -213,12 +233,11 @@ class OA(object):
             self._dns_scores = [ conn + [""]   for conn in self._dns_scores  ]
 
 
+    def _add_hh_column(self):
 
-    def _add_hh_and_severity(self):
-
-        # add hh value and sev columns.
+        # add hh value column.
         dns_date_index = self._conf["dns_results_fields"]["frame_time"]
-        self._dns_scores = [conn + [ filter(None,conn[dns_date_index].split(" "))[3].split(":")[0]] + [0] + [0] for conn in self._dns_scores  ]
+        self._dns_scores = [conn + [ filter(None,conn[dns_date_index].split(" "))[3].split(":")[0]] for conn in self._dns_scores  ]
 
 
     def _add_iana(self):
@@ -236,8 +255,8 @@ class OA(object):
         else:            
             self._dns_scores = [ conn + ["","",""] for conn in self._dns_scores ] 
 
-    def _add_network_context(self):
 
+    def _add_network_context(self):
         nc_conf_file = "{0}/components/nc/nc_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
         if os.path.isfile(nc_conf_file):
             nc_conf = json.loads(open(nc_conf_file).read())["NC"]
@@ -245,15 +264,15 @@ class OA(object):
             ip_dst_index = self._conf["dns_results_fields"]["ip_dst"]
             self._dns_scores = [ conn + [dns_nc.get_nc(conn[ip_dst_index])] for conn in self._dns_scores ]
         else:
-            self._dns_scores = [ conn + [""] for conn in self._dns_scores ]
+            self._dns_scores = [ conn + [0] for conn in self._dns_scores ]
 
 
     def _get_oa_details(self):
         
-        self._logger.info("Getting OA DNS suspicious details/chord diagram")       
+        self._logger.info("Getting OA DNS suspicious details/dendro diagram")       
         # start suspicious connects details process.
         p_sp = Process(target=self._get_suspicious_details)
-        p_sp.start()        
+        p_sp.start()
 
         # start chord diagram process.            
         p_dn = Process(target=self._get_dns_dendrogram)
@@ -262,6 +281,7 @@ class OA(object):
         p_sp.join()
         p_dn.join()
 
+
     def _get_suspicious_details(self):
 
         iana_conf_file = "{0}/components/iana/iana_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@@ -269,86 +289,87 @@ class OA(object):
             iana_config  = json.loads(open(iana_conf_file).read())
             dns_iana = IanaTransform(iana_config["IANA"])
         
-        for conn in self._dns_scores:
-            # get data to query
-            date=conn[self._conf["dns_score_fields"]["frame_time"]].split(" ")
-            date = filter(None,date)
-
-            if len(date) == 5:
-                year=date[2]
-                month=datetime.datetime.strptime(date[0], '%b').strftime('%m')
-                day=date[1]                
-                hh=conn[self._conf["dns_score_fields"]["hh"]]
-                dns_qry_name = conn[self._conf["dns_score_fields"]["dns_qry_name"]]
-                self._get_dns_details(dns_qry_name,year,month,day,hh,dns_iana)
+        for conn in self._dns_scores:       
+  
+            timestamp = conn[self._conf["dns_score_fields"]["unix_tstamp"]]
+            full_date = datetime.datetime.utcfromtimestamp(int(timestamp)).strftime('%Y-%m-%d %H:%M:%S')
+  
+            date = full_date.split(" ")[0].split("-")
+            # get date parameters.
+            yr = date[0]
+            mn = date[1]
+            dy = date[2] 
+            time = full_date.split(" ")[1].split(":")
+            hh = int(time[0])
+
+            dns_qry_name = conn[self._conf["dns_score_fields"]["dns_qry_name"]]
+            self._get_dns_details(dns_qry_name,yr,mn,dy,hh,dns_iana)
 
     def _get_dns_details(self,dns_qry_name,year,month,day,hh,dns_iana):
+        value_string = ""
+        query_to_load =("""
+            SELECT unix_tstamp,frame_len,ip_dst,ip_src,dns_qry_name,dns_qry_class,dns_qry_type,dns_qry_rcode,dns_a,h as hh
+            FROM {0}.{1} WHERE y={2} AND m={3} AND d={4} AND dns_qry_name LIKE '%{5}%' AND h={6} LIMIT {7};
+        """).format(self._db,self._table_name,year,month,day,dns_qry_name,hh,self._details_limit)
+        
+        try: 
+             dns_details = impala.execute_query(query_to_load) 
+        except:
+            self._logger.error("ERROR. Details couldn't be retreived for {0}, skipping this step".format(dns_qry_name))
+        else:
+        # add IANA to results. 
+            update_rows = []
+            if dns_iana:
+                self._logger.info("Adding IANA translation to details results") 
                     
-        limit = self._details_limit
-        edge_file ="{0}/edge-{1}_{2}_00.csv".format(self._data_path,dns_qry_name.replace("/","-"),hh)
-        edge_tmp  ="{0}/edge-{1}_{2}_00.tmp".format(self._data_path,dns_qry_name.replace("/","-"),hh)
-
-        if not os.path.isfile(edge_file):
-    
-            dns_qry = ("SELECT frame_time,frame_len,ip_dst,ip_src,dns_qry_name,dns_qry_class,dns_qry_type,dns_qry_rcode,dns_a FROM {0}.{1} WHERE y={2} AND m={3} AND d={4} AND dns_qry_name LIKE '%{5}%' AND h={6} LIMIT {7};").format(self._db,self._table_name,year,month,day,dns_qry_name,hh,limit)
-            
-            # execute query
-	    try:
-                self._engine.query(dns_qry,edge_tmp)
-            except:
-		self._logger.error("ERROR. Edge file couldn't be created for {0}, skipping this step".format(dns_qry_name))
+                dns_details = [ conn + (str(dns_iana.get_name(conn[5],"dns_qry_class")),str(dns_iana.get_name(conn[6],"dns_qry_type")),str(dns_iana.get_name(conn[7],"dns_qry_rcode"))) for conn in dns_details ]
+            else: 
+                self._logger.info("WARNING: NO IANA configured.")
+                dns_details = [ conn + ("","","") for conn in dns_details ]
+
+            nc_conf_file = "{0}/components/nc/nc_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+            if os.path.isfile(nc_conf_file):
+                nc_conf = json.loads(open(nc_conf_file).read())["NC"]
+                dns_nc = NetworkContext(nc_conf,self._logger) 
+                dns_details = [ conn + (dns_nc.get_nc(conn[2]),) for conn in dns_details ]
             else:
-            # add IANA to results.
-                if dns_iana:
-                    update_rows = []
-                    self._logger.info("Adding IANA translation to details results")
-                    with open(edge_tmp) as dns_details_csv:
-                        rows = csv.reader(dns_details_csv, delimiter=',', quotechar='|')
-                        try:
-                            next(rows)
-                            update_rows = [[conn[0]] + [conn[1]] + [conn[2]] + [conn[3]] + [conn[4]] + [dns_iana.get_name(conn[5],"dns_qry_class")] + [dns_iana.get_name(conn[6],"dns_qry_type")] + [dns_iana.get_name(conn[7],"dns_qry_rcode")] + [conn[8]] for conn in rows]
-                            update_rows = filter(None, update_rows)
-                            header = [ "frame_time", "frame_len", "ip_dst","ip_src","dns_qry_name","dns_qry_class_name","dns_qry_type_name","dns_qry_rcode_name","dns_a" ]
-                            update_rows.insert(0,header)
-                        except IndexError:
-                            pass
-
-                else:
-                    self._logger.info("WARNING: NO IANA configured.")
-
-                    # create edge file.
-                self._logger.info("Creating edge file:{0}".format(edge_file))
-                with open(edge_file,'wb') as dns_details_edge:
-                    writer = csv.writer(dns_details_edge, quoting=csv.QUOTE_ALL)
-                    if update_rows:
-                        writer.writerows(update_rows)
-                    else:            
-                        shutil.copy(edge_tmp,edge_file)           
-                
-                os.remove(edge_tmp)
-
+                dns_details = [ conn + (0,) for conn in dns_details ]
+                         
+            for row in dns_details:
+                value_string += str(tuple(item for item in row)) + ","   
 
-    def _get_dns_dendrogram(self):
-        limit = self._details_limit
-        for conn in self._dns_scores:            
-            date=conn[self._conf["dns_score_fields"]["frame_time"]].split(" ")
-            date = filter(None,date)
-
-            if len(date) == 5:
-                year=date[2]
-                month=datetime.datetime.strptime(date[0], '%b').strftime('%m')
-                day=date[1]
-                ip_dst=conn[self._conf["dns_score_fields"]["ip_dst"]]
-                self._get_dendro(self._db,self._table_name,ip_dst,year,month,day, limit)
+            if value_string != "": 
+                
+                query_to_insert=("""
+                    INSERT INTO {0}.dns_edge PARTITION (y={1}, m={2}, d={3}) VALUES ({4});
+                """).format(self._db,year, month, day,  value_string[:-1])
 
+                impala.execute_query(query_to_insert) 
+ 
 
-    def _get_dendro(self,db,table,ip_dst,year,month,day,limit):
+    def _get_dns_dendrogram(self): 
 
-        dendro_file = "{0}/dendro-{1}.csv".format(self._data_path,ip_dst)
-        if not os.path.isfile(dendro_file):
-            dndro_qry = ("SELECT dns_a, dns_qry_name, ip_dst FROM (SELECT susp.ip_dst, susp.dns_qry_name, susp.dns_a FROM {0}.{1} as susp WHERE susp.y={2} AND susp.m={3} AND susp.d={4} AND susp.ip_dst='{5}' LIMIT {6}) AS tmp GROUP BY dns_a, dns_qry_name, ip_dst").format(db,table,year,month,day,ip_dst,limit)
-            # execute query
-            self._engine.query(dndro_qry,dendro_file)
+        for conn in self._dns_scores:   
+            timestamp = conn[self._conf["dns_score_fields"]["unix_tstamp"]]
+            
+            full_date = datetime.datetime.utcfromtimestamp(int(timestamp)).strftime('%Y-%m-%d %H:%M:%S')
+            date = full_date.split(" ")[0].split("-")
+            # get date parameters.
+            
+            yr = date[0]
+            mn = date[1]
+            dy = date[2]
+            ip_dst=conn[self._conf["dns_score_fields"]["ip_dst"]]
+
+            query_to_load = ("""
+                INSERT INTO TABLE {0}.dns_dendro PARTITION (y={2}, m={3},d={4})
+                SELECT unix_tstamp, dns_a, dns_qry_name, ip_dst 
+                FROM (SELECT unix_tstamp, susp.ip_dst, susp.dns_qry_name, susp.dns_a
+                    FROM {0}.{1} as susp WHERE susp.y={2} AND susp.m={3} AND susp.d={4} AND susp.ip_dst='{5}' 
+                LIMIT {6}) AS tmp GROUP BY dns_a, dns_qry_name, ip_dst, unix_tstamp
+            """).format(self._db,self._table_name,yr,mn,dy,ip_dst,self._details_limit)
+           
+            impala.execute_query(query_to_load)
 
         
     def _ingest_summary(self):
@@ -363,48 +384,31 @@ class OA(object):
         result_rows = []        
         df_filtered =  pd.DataFrame()
 
-        ingest_summary_file = "{0}/is_{1}{2}.csv".format(self._ingest_summary_path,yr,mn)			
-        ingest_summary_tmp = "{0}.tmp".format(ingest_summary_file)
+        query_to_load = ("""
+            SELECT frame_time, COUNT(*) as total FROM {0}.{1}
+            WHERE y={2} AND m={3} AND d={4} AND unix_tstamp IS NOT NULL 
+            AND frame_time IS NOT NULL AND frame_len IS NOT NULL 
+            AND dns_qry_name IS NOT NULL AND ip_src IS NOT NULL 
+            AND (dns_qry_class IS NOT NULL AND dns_qry_type IS NOT NULL 
+            AND dns_qry_rcode IS NOT NULL ) GROUP BY frame_time;
+        """).format(self._db,self._table_name, yr, mn, dy)
 
-        if os.path.isfile(ingest_summary_file):
-        	df = pd.read_csv(ingest_summary_file, delimiter=',')
-            #discards previous rows from the same date
-        	df_filtered = df[df['date'].str.contains("{0}-{1}-{2}".format(yr, mn, dy)) == False] 
-        else:
-        	df = pd.DataFrame()
-            
-        # get ingest summary.
-        ingest_summary_qry = ("SELECT frame_time, COUNT(*) as total "
-                                    " FROM {0}.{1}"
-                                    " WHERE y={2} AND m={3} AND d={4} "
-                                    " AND unix_tstamp IS NOT NULL AND frame_time IS NOT NULL"
-                                    " AND frame_len IS NOT NULL AND dns_qry_name IS NOT NULL"
-                                    " AND ip_src IS NOT NULL " 
-                                    " AND (dns_qry_class IS NOT NULL AND dns_qry_type IS NOT NULL AND dns_qry_rcode IS NOT NULL ) "
-                                    " GROUP BY frame_time;") 
-
-        ingest_summary_qry = ingest_summary_qry.format(self._db,self._table_name, yr, mn, dy)
-        
-        results_file = "{0}/results_{1}.csv".format(self._ingest_summary_path,self._date)
-        self._engine.query(ingest_summary_qry,output_file=results_file,delimiter=",")
+        results = impala.execute_query_as_list(query_to_load)
+        df = pd.DataFrame(results)
 
+        # Forms a new dataframe splitting the minutes from the time column
+        df_new = pd.DataFrame([["{0}-{1}-{2} {3}:{4}".format(yr, mn, dy,val['frame_time'].split(" ")[3].split(":")[0].zfill(2),val['frame_time'].split(" ")[3].split(":")[1].zfill(2)), int(val['total']) if not math.isnan(val['total']) else 0 ] for key,val in df.iterrows()],columns = ingest_summary_cols)
 
-        if os.path.isfile(results_file):        
-            df_results = pd.read_csv(results_file, delimiter=',') 
+        #Groups the data by minute 
+        sf = df_new.groupby(by=['date'])['total'].sum()
+        df_per_min = pd.DataFrame({'date':sf.index, 'total':sf.values})
 
-            # Forms a new dataframe splitting the minutes from the time column
-            df_new = pd.DataFrame([["{0}-{1}-{2} {3}:{4}".format(yr, mn, dy,val['frame_time'].split(" ")[3].split(":")[0].zfill(2),val['frame_time'].split(" ")[3].split(":")[1].zfill(2)), int(val['total']) if not math.isnan(val['total']) else 0 ] for key,val in df_results.iterrows()],columns = ingest_summary_cols)
-    
-            #Groups the data by minute 
-            sf = df_new.groupby(by=['date'])['total'].sum()
-        
-            df_per_min = pd.DataFrame({'date':sf.index, 'total':sf.values})
-            
-            df_final = df_filtered.append(df_per_min, ignore_index=True)
-            df_final.to_csv(ingest_summary_tmp,sep=',', index=False)
+        df_final = df_filtered.append(df_per_min, ignore_index=True).to_records(False,False) 
 
-            os.remove(results_file)
-            os.rename(ingest_summary_tmp,ingest_summary_file)
-        else:
-            self._logger.info("No data found for the ingest summary")
+        if len(df_final) > 0:
+            query_to_insert=("""
+                INSERT INTO {0}.dns_ingest_summary PARTITION (y={1}, m={2}) VALUES {3};
+            """).format(self._db, yr, mn, tuple(df_final))
+            
+            impala.execute_query(query_to_insert)  
         

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1904f2b4/spot-oa/oa/dns/ipynb_templates/Edge_Investigation_master.ipynb
----------------------------------------------------------------------
diff --git a/spot-oa/oa/dns/ipynb_templates/Edge_Investigation_master.ipynb b/spot-oa/oa/dns/ipynb_templates/Edge_Investigation_master.ipynb
index 5573c9a..88f047e 100644
--- a/spot-oa/oa/dns/ipynb_templates/Edge_Investigation_master.ipynb
+++ b/spot-oa/oa/dns/ipynb_templates/Edge_Investigation_master.ipynb
@@ -17,19 +17,14 @@
    "source": [
     "import urllib2\n",
     "import json\n",
-    "import os\n",
-    "import csv\n",
+    "import os \n",
+    "import datetime\n",
     "\n",
     "# getting date from the parent path. \n",
     "path = os.getcwd().split(\"/\") \n",
     "date = path[len(path)-1]   \n",
     "dsource = path[len(path)-2]  \n",
-    "dpath = '/'.join(['data' if var == 'ipynb' else var for var in path]) + '/'\n",
-    "\n",
-    "sconnect = dpath + 'dns_scores.csv'\n",
-    "sconnectbu = dpath + 'dns_scores_bu.csv'\n",
-    "score_tmp = dpath + 'score_tmp.csv'  \n",
-    "score_fbk = dpath + 'dns_scores_fb.csv'  "
+    "dpath = '/'.join(['data' if var == 'ipynb' else var for var in path]) + '/' "
    ]
   },
   {
@@ -56,7 +51,7 @@
     "from IPython.display import display, HTML, clear_output, Javascript \n",
     "\n",
     "def fill_list(list_control,source):\n",
-    "    options_list = ['--Select--'] \n",
+    "    options_list = ['- Select -'] \n",
     "    options_list.extend([s for s in source])\n",
     "    list_control.options = options_list\n",
     "\n",
@@ -101,18 +96,29 @@
     "    us_ips = []\n",
     "    us_dns = []\n",
     "\n",
-    "    with open(sconnect, 'r') as f:\n",
-    "        reader = csv.DictReader(f, delimiter=',')\n",
-    "        for row in reader:           \n",
-    "            if row['ip_dst'] not in us_ips and row['ip_sev'] == '0': \n",
-    "                us_ips.append(row['ip_dst'])\n",
-    "            if row['dns_qry_name'] not in us_dns and row['dns_sev'] == '0':\n",
-    "                us_dns.append(row['dns_qry_name']) \n",
-    "\n",
+    "    query=\"\"\"query($date:SpotDateType!) {\n",
+    "            dns{\n",
+    "                suspicious(date:$date){\n",
+    "                dnsQuery\n",
+    "                clientIp\n",
+    "            }\n",
+    "        }\n",
+    "    }\"\"\"\n",
+    "    variables={\n",
+    "        'date': datetime.datetime.strptime(date, '%Y%m%d').strftime('%Y-%m-%d')\n",
+    "    }\n",
+    "    response = GraphQLClient.request(query, variables)\n",
+    "  \n",
+    "    for row in response['data']['dns']['suspicious']:           \n",
+    "        if row['clientIp'] not in us_ips: \n",
+    "            us_ips.append(row['clientIp'])\n",
+    "        if row['dnsQuery'] not in us_dns:\n",
+    "            us_dns.append(row['dnsQuery'])  \n",
+    "            \n",
     "    fill_list(client_select,us_ips)\n",
     "    fill_list(query_select,us_dns)\n",
-    "    client_select.value = \"--Select--\"\n",
-    "    query_select.value = \"--Select--\"    \n",
+    "    client_select.value = \"- Select -\"\n",
+    "    query_select.value = \"- Select -\"    \n",
     "\n",
     "\n",
     "display(Javascript(\"$('.widget-area > .widget-subarea > *').remove();\"))\n",
@@ -138,110 +144,75 @@
     "import csv\n",
     "import datetime\n",
     "import subprocess \n",
+    "global score_values\n",
+    "score_values = []\n",
+    "\n",
     "\n",
     "def assign_score(b):\n",
-    "    score_values = []\n",
-    "    scored_threats = []\n",
-    "    ip_sev = int(rating_btn.selected_label) if not \"--Select--\" in client_select.value else \"\"\n",
-    "    dns_sev = int(rating_btn.selected_label) if not \"--Select--\" in query_select.value else \"\"    \n",
     "\n",
+    "    sev = int(rating_btn.selected_label) \n",
+    "    \n",
     "    if quick_text.value: \n",
     "        ip = \"\"\n",
     "        dns = quick_text.value\n",
-    "        dns_sev = int(rating_btn.selected_label) \n",
-    "        # Loop over every element in query_select widget\n",
-    "        score_values = []\n",
+    "        dns_sev = int(rating_btn.selected_label)  \n",
     "        for query in query_select.options:\n",
-    "            if query.endswith(dns):\n",
-    "                # Matching element, create one row\n",
-    "                score_values.append((ip,query,ip_sev,dns_sev))\n",
+    "            if query.endswith(dns): \n",
+    "                score_values.append((ip,query,sev))\n",
     "    else: \n",
-    "        ip = client_select.value if not \"--Select--\" in client_select.value else \"\"\n",
-    "        dns = query_select.value if not \"--Select--\" in query_select.value else \"\"\n",
-    "        score_values.append((ip,dns,ip_sev,dns_sev))\n",
-    "\n",
-    "    with open(sconnect, 'r') as f:\n",
-    "        reader = csv.DictReader(f, delimiter=',')\n",
-    "        rowct = 0\n",
-    "        with open(score_tmp, 'w') as score:\n",
-    "            wr = csv.DictWriter(score, delimiter=',', quoting=csv.QUOTE_NONE, fieldnames=reader.fieldnames)            \n",
-    "            wr.writeheader()\n",
-    "            for row in reader:   \n",
-    "                for value in score_values: \n",
-    "                    if row['ip_dst'] == value[0]:  \n",
-    "                        row['ip_sev'] = value[2]       \n",
-    "                        scored_threats.append(row)  \n",
-    "                        rowct += 1                  \n",
-    "                        break\n",
-    "                    if row['dns_qry_name'] == value[1]:  \n",
-    "                        row['dns_sev'] = value[3]                        \n",
-    "                        scored_threats.append(row)  \n",
-    "                        rowct += 1\n",
-    "                        break\n",
-    "                wr.writerow(row)     \n",
-    "                    \n",
-    "        if not os.path.exists(score_fbk):  \n",
-    "            with open(score_fbk, 'w') as feedback:\n",
-    "                wr = csv.DictWriter(feedback, delimiter='\\t', quoting=csv.QUOTE_NONE, fieldnames=reader.fieldnames)            \n",
-    "                wr.writeheader()\n",
-    "\n",
-    "        with open(score_fbk, 'a') as feedback:\n",
-    "            for row in scored_threats:\n",
-    "                wr = csv.DictWriter(feedback, delimiter='\\t', quoting=csv.QUOTE_NONE, fieldnames=reader.fieldnames)            \n",
-    "                wr.writerow(row)\n",
-    "\n",
-    "    clear_output()\n",
-    "    print \"{0} matching connections scored\".format(rowct)\n",
-    "    !mv $score_tmp $sconnect \n",
-    "\n",
-    "    if ip != \"--Select--\":\n",
+    "        ip = client_select.value if not \"- Select -\" in client_select.value else \"\"\n",
+    "        dns = query_select.value if not \"- Select -\" in query_select.value else \"\"\n",
+    "        score_values.append((ip,dns,sev))\n",
+    "        clear_output()\n",
+    "   \n",
+    "    if ip != \"- Select -\":\n",
     "        display(Javascript(\"$(\\\"option[data-value='\" + ip +\"']\\\").remove();\"))\n",
     "    if quick_text.value:\n",
     "        display(Javascript(\"$(\\\"option[data-value$='\" + quick_text.value +\"']\\\").remove();\"))\n",
-    "    elif dns != \"--Select--\":\n",
+    "    elif dns != \"- Select -\":\n",
     "        display(Javascript(\"$(\\\"option[data-value='\" + dns +\"']\\\").remove();\"))\n",
     "\n",
-    "    client_select.value = \"--Select--\"\n",
-    "    query_select.value = \"--Select--\"\n",
+    "    client_select.value = \"- Select -\"\n",
+    "    query_select.value = \"- Select -\"\n",
     "    quick_text.value = \"\"\n",
     "\n",
     "\n",
-    "def save(b):   \n",
-    "    clear_output()    \n",
-    "    display(Javascript(\"$('.widget-area > .widget-subarea > *').remove();\"))\n",
-    "    data_loader() \n",
-    "    display(scoring_form)\n",
-    "    display(Javascript('reloadParentData();'))\n",
-    "    ml_feedback() \n",
-    "    print \"Suspicious connects successfully updated\"\n",
-    "\n",
-    "\n",
-    "assign_btn.on_click(assign_score)\n",
-    "save_btn.on_click(save)\n",
+    "def save(b):    \n",
+    "    variables=[]\n",
+    "    global score_values\n",
+    "    mutation=\"\"\"mutation($input:[DnsScoreType!]!)\n",
+    "                {\n",
+    "                  dns{\n",
+    "                    score(input:$input)\n",
+    "                        {success}\n",
+    "                  }\n",
+    "                }\"\"\" \n",
+    "    \n",
+    "    for row in score_values:\n",
+    "        variables.append({\n",
+    "            'date': datetime.datetime.strptime(date, '%Y%m%d').strftime('%Y-%m-%d'),\n",
+    "            'clientIp': row[0] if row[0] != \"\" else None,\n",
+    "            'dnsQuery': row[1] if row[1] != \"\" else None,\n",
+    "            'score': row[2] if row[2] != \"\" else None \n",
+    "            })\n",
+    "\n",
+    "    var = {'input':variables}\n",
+    "    response = GraphQLClient.request(mutation,var)\n",
+    "    \n",
+    "    score_values = []\n",
+    "    if not 'errors' in response:\n",
+    "        clear_output()    \n",
+    "        display(Javascript(\"$('.widget-area > .widget-subarea > *').remove();\"))\n",
+    "        data_loader() \n",
+    "        display(scoring_form)\n",
+    "        display(Javascript('reloadParentData();')) \n",
+    "        print \"Suspicious connects successfully updated\"\n",
+    "    else:\n",
+    "        print \"An error ocurred: \" + response['errors'][0]['message']\n",
     "        \n",
-    "\n",
-    "def ml_feedback():\n",
-    "    dst_name = os.path.basename(sconnect)\n",
-    "    str_fb=\"DSOURCE={0} &&\\\n",
-    "        FDATE={1} &&\\\n",
-    "        source /etc/spot.conf &&\\\n",
-    "        usr=$(echo $LUSER | cut -f3 -d'/') &&\\\n",
-    "        mlnode=$MLNODE &&\\\n",
-    "        lpath=$LPATH &&\\\n",
-    "        scp {2} $usr@$mlnode:$lpath/{3}\".format(dsource,date,score_fbk,dst_name) \n",
-    "\n",
-    "    subprocess.call(str_fb, shell=True)"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {
-    "collapsed": true
-   },
-   "outputs": [],
-   "source": [
-    "# !cp $sconnectbu $sconnect"
+    "        \n",
+    "assign_btn.on_click(assign_score)\n",
+    "save_btn.on_click(save) "
    ]
   }
  ],

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1904f2b4/spot-oa/oa/dns/ipynb_templates/Threat_Investigation_master.ipynb
----------------------------------------------------------------------
diff --git a/spot-oa/oa/dns/ipynb_templates/Threat_Investigation_master.ipynb b/spot-oa/oa/dns/ipynb_templates/Threat_Investigation_master.ipynb
index 6eb9bad..cbaa2b7 100644
--- a/spot-oa/oa/dns/ipynb_templates/Threat_Investigation_master.ipynb
+++ b/spot-oa/oa/dns/ipynb_templates/Threat_Investigation_master.ipynb
@@ -11,7 +11,7 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": true
+    "collapsed": false
    },
    "outputs": [],
    "source": [
@@ -30,17 +30,9 @@
     "    from IPython.html import widgets\n",
     "from IPython.display import display, HTML, clear_output, Javascript \n",
     "\n",
-    "with open('/etc/spot.conf') as conf:\n",
-    "    for line in conf.readlines():\n",
-    "        if \"DBNAME=\" in line: DBNAME = line.split(\"=\")[1].strip('\\n').replace(\"'\",\"\");      \n",
-    "        elif \"IMPALA_DEM=\" in line: IMPALA_DEM = line.split(\"=\")[1].strip('\\n').replace(\"'\",\"\"); \n",
-    "\n",
     "path = os.getcwd().split(\"/\") \n",
     "t_date = path[len(path)-1]   \n",
-    "dpath = '/'.join(['data' if var == 'ipynb' else var for var in path]) + '/'\n",
-    "t_date = path[len(path)-1] \n",
-    "sconnect = dpath + 'dns_scores.csv' \n",
-    "threat_f = dpath + \"threats.csv\"\n",
+    "\n",
     "anchor = ''\n",
     "anchor_type = ''\n",
     "top_results = 20\n",
@@ -51,7 +43,7 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": true
+    "collapsed": false
    },
    "outputs": [],
    "source": [
@@ -127,7 +119,7 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": true
+    "collapsed": false
    },
    "outputs": [],
    "source": [
@@ -144,29 +136,39 @@
     "    ips_query = {} \n",
     "    ip_sev={}\n",
     "    dns_sev={}\n",
-    "    c_ips=[]\n",
-    "    c_dns=[]\n",
-    "\n",
-    "    if os.path.isfile(threat_f) and not file_is_empty(threat_f):\n",
-    "        with open(threat_f, 'r') as th:\n",
-    "            t_read = csv.reader(th, delimiter='|')\n",
-    "            t_read.next()\n",
-    "            for row in t_read: \n",
-    "                if row[0] != '' : c_ips.append(row[0])\n",
-    "                if row[1] != '' : c_dns.append(row[1])\n",
-    "            \n",
-    "    with open(sconnect, 'r') as f:\n",
-    "        reader = csv.DictReader(f, delimiter=',')\n",
-    "        for row in reader:\n",
-    "            if row['ip_dst'] not in ips_query and row['ip_dst'] not in c_ips and row['ip_sev'] == '1': \n",
-    "                    ips_query[row['ip_dst']]='i'\n",
-    "            if row['dns_qry_name'] not in ips_query and row['dns_qry_name'] not in c_dns and row['dns_sev'] == '1':\n",
-    "                    ips_query[row['dns_qry_name']]='q' \n",
+    "      \n",
+    "    response = GraphQLClient.request(\n",
+    "        query=\"\"\"query($date:SpotDateType!) {\n",
+    "                dns{\n",
+    "                    threats{\n",
+    "                        list(date:$date) {\n",
+    "                            dnsScore\n",
+    "                            clientIpScore\n",
+    "                            clientIp\n",
+    "                            dnsQuery\n",
+    "                        }\n",
+    "                    }\n",
+    "            }\n",
+    "        }\"\"\",\n",
+    "        variables={\n",
+    "            'date': datetime.datetime.strptime(t_date, '%Y%m%d').strftime('%Y-%m-%d')\n",
+    "        }\n",
+    "    )  \n",
+    "    \n",
+    "    if not 'errors' in response: \n",
+    "        for row in response['data']['dns']['threats']['list']:        \n",
+    "            if row['clientIp'] not in ips_query and row['clientIpScore'] == 1: \n",
+    "                    ips_query[row['clientIp']]='i'\n",
+    "            if row['dnsQuery'] not in ips_query and row['dnsScore'] == 1: \n",
+    "                    ips_query[row['dnsQuery']]='q' \n",
     "            \n",
-    "            if row['ip_dst'] not in ip_sev: \n",
-    "                ip_sev[row['ip_dst']] = row['score']\n",
-    "            if row['dns_qry_name'] not in dns_sev: \n",
-    "                dns_sev[row['dns_qry_name']] =row['score']\n",
+    "            if row['clientIp'] not in ip_sev: \n",
+    "                ip_sev[row['clientIp']] = row['clientIpScore']\n",
+    "            if row['dnsQuery'] not in dns_sev: \n",
+    "                dns_sev[row['dnsQuery']] =row['dnsScore']\n",
+    "    else:\n",
+    "        print \"An error ocurred: \" + response[\"errors\"][0][\"message\"]\n",
+    " \n",
     "                \n",
     "    threat_title.value =\"<h4>Suspicious DNS</h4>\"\n",
     "                       \n",
@@ -192,7 +194,8 @@
     "  \n",
     "    def search_ip(b):  \n",
     "        global anchor \n",
-    "        global anchor_type\n",
+    "        global anchor_type \n",
+    "        global expanded_results\n",
     "        anchor = ''\n",
     "        anchor_type = ''\n",
     "        anchor = susp_select.selected_label  \n",
@@ -200,42 +203,40 @@
     "        removeWidget(2)\n",
     "        removeWidget(1) \n",
     "        clear_output()\n",
-    "        \n",
-    "        global ir_f\n",
-    "        ir_f = dpath + 'threat-dendro-' + anchor + \".csv\"             \n",
-    "        table = \"<table><th>IP</th><th>QUERY</th><th>TOTAL</th>\"\n",
-    "        \n",
-    "        if not os.path.isfile(ir_f) or (os.path.isfile(ir_f) and file_is_empty(ir_f)):\n",
-    "            if anchor_type == 'i':\n",
-    "                imp_query = \"\\\" SELECT COUNT(dns_qry_name) as total, dns_qry_name, ip_dst, 0 as sev FROM {0}.dns \\\n",
-    "                        WHERE y={1} AND m={2} AND d={3} AND ip_dst='{4}' GROUP BY dns_qry_name, ip_dst \\\n",
-    "                        ORDER BY total DESC LIMIT {5}\\\"\"\n",
-    "            elif anchor_type == 'q':\n",
-    "                imp_query = \"\\\" SELECT COUNT(ip_dst) as total, dns_qry_name, ip_dst, 0 as sev FROM {0}.dns \\\n",
-    "                        WHERE y={1} AND m={2} AND d={3} AND dns_qry_name='{4}'\\\n",
-    "                        GROUP BY ip_dst, dns_qry_name ORDER BY total DESC LIMIT {5}\\\"\"\n",
     "\n",
-    "            imp_query=imp_query.format(DBNAME, yy, mm, dd, anchor, details_limit)\n",
-    "            !impala-shell -i $IMPALA_DEM --quiet -q \"INVALIDATE METADATA\"\n",
-    "            !impala-shell -i $IMPALA_DEM --quiet --print_header -B --output_delimiter=',' -q $imp_query -o $ir_f\n",
+    "        \n",
+    "        expanded_results = GraphQLClient.request(\n",
+    "            query=\"\"\"query($date:SpotDateType,$dnsQuery:String, $clientIp:SpotIpType){\n",
+    "                      dns{\n",
+    "                        threat{details(date:$date,dnsQuery:$dnsQuery,clientIp:$clientIp){\n",
+    "                          total \n",
+    "                          clientIp\n",
+    "                          dnsQuery \n",
+    "                        }}\n",
+    "                      }\n",
     "\n",
+    "                }\"\"\",\n",
+    "            variables={\n",
+    "                'date': datetime.datetime.strptime(t_date, '%Y%m%d').strftime('%Y-%m-%d'),\n",
+    "                'dnsQuery': anchor if anchor_type == 'q' else None,\n",
+    "                'clientIp': anchor if anchor_type == 'i' else None  \n",
+    "            }\n",
+    "        )  \n",
+    "        \n",
     "        clear_output() \n",
-    "#       total, dns_qry_name, ip_dst, sev\n",
-    "        with open(ir_f, 'r') as f:\n",
-    "            try:\n",
-    "                reader = itertools.islice(csv.reader(f, delimiter=','), top_results) \n",
-    "                if reader!= '':\n",
-    "                    reader.next()\n",
-    "                    for row in reader:  \n",
-    "                        table += \"<tr><td class='spot-text-wrapper' data-toggle='tooltip'>\"+row[2]+\"</td>\\\n",
-    "                            <td class='spot-text-wrapper' data-toggle='tooltip'>\"+row[1]+\"</td>\\\n",
-    "                            <td align='center'>\"+str(row[0])+\"</td></tr>\"  \n",
+    "        \n",
+    "        if not 'errors' in expanded_results:\n",
+    "            table = \"<table><th>IP</th><th>QUERY</th><th>TOTAL</th>\"\n",
+    "        \n",
+    "            for row in expanded_results[\"data\"][\"dns\"][\"threat\"][\"details\"]: \n",
+    "                table += \"<tr><td class='spot-text-wrapper' data-toggle='tooltip'>\"+row[\"clientIp\"]+\"</td>\\\n",
+    "                    <td class='spot-text-wrapper' data-toggle='tooltip'>\"+row[\"dnsQuery\"]+\"</td>\\\n",
+    "                            <td align='center'>\"+str(row[\"total\"])+\"</td></tr>\"  \n",
     "\n",
-    "                table += \"</table>\"                  \n",
-    "                result_html_title.value='<h4>Displaying top {0} search results</h4>'.format(top_results)      \n",
-    "            except:\n",
-    "                table = \"<table></table>\"\n",
-    "                result_html_title.value='<h4>No results were found.</h4>'\n",
+    "            table += \"</table>\"                  \n",
+    "            result_html_title.value='<h4>Displaying top {0} search results</h4>'.format(top_results)  \n",
+    "        else:\n",
+    "            print \"An error ocurred: \" + response[\"errors\"][0][\"message\"]\n",
     "\n",
     "        result_html.value=table\n",
     "        result_html_box.children = [result_html]\n",
@@ -263,24 +264,49 @@
     "    result_summary_box.children = [result_summary_container, result_button_container]\n",
     "    resultSummaryBox.children = [result_title,result_summary_box]\n",
     "    \n",
-    "    \n",
+    "\n",
     "    def save_threat_summary(b):\n",
     "        global anchor\n",
     "        anchor_ip =''\n",
     "        anchor_dns ='' \n",
-    "        if anchor != '':             \n",
-    "            if anchor_type == 'i':\n",
-    "                anchor_ip = anchor\n",
-    "            elif anchor_type == 'q':\n",
-    "                anchor_dns = anchor\n",
-    "                \n",
-    "            if not os.path.exists(threat_f):  \n",
-    "                with open(threat_f, 'w') as comment:\n",
-    "                    comment.write('ip_dst|dns_qry_name|title|summary\\n')\n",
+    "        \n",
+    "        if anchor_type == 'i':\n",
+    "            anchor_ip = anchor\n",
+    "        elif anchor_type == 'q':\n",
+    "            anchor_dns = anchor\n",
     "            \n",
-    "            with open(threat_f, 'a') as comment:\n",
-    "                comment.write(anchor_ip + '|' + anchor_dns + '|' + tc_txt_title.value + '|' +\n",
-    "                                  tc_txa_summary.value.replace('\\n', '\\\\n') + '\\n') \n",
+    "        if anchor != '':\n",
+    "            mutation=\"\"\"mutation(\n",
+    "                        $date: SpotDateType, \n",
+    "                        $dnsQuery:String, \n",
+    "                        $clientIp:SpotIpType,\n",
+    "                        $text: String!, \n",
+    "                        $title: String!,\n",
+    "                        $threatDetails: [DnsThreatDetailsInputType!]!) \n",
+    "                        {\n",
+    "                          dns{\n",
+    "                            createStoryboard(input:{\n",
+    "                                threatDetails: $threatDetails,\n",
+    "                                date: $date, \n",
+    "                                dnsQuery: $dnsQuery, \n",
+    "                                clientIp: $clientIp,\n",
+    "                                title: $title, \n",
+    "                                text: $text\n",
+    "                                })\n",
+    "                            {success}\n",
+    "                          }\n",
+    "                        }\"\"\"\n",
+    "\n",
+    "            variables={\n",
+    "                'date': datetime.datetime.strptime(t_date, '%Y%m%d').strftime('%Y-%m-%d'),\n",
+    "                'dnsQuery': anchor_dns if anchor_type == 'q' else None, \n",
+    "                'clientIp': anchor_ip if anchor_type == 'i' else None, \n",
+    "                'title': tc_txt_title.value,\n",
+    "                'text': tc_txa_summary.value.replace('\\n', '\\\\n'),\n",
+    "                'threatDetails': expanded_results['data']['dns']['threat']['details']  \n",
+    "            }\n",
+    "\n",
+    "            response = GraphQLClient.request(mutation, variables)\n",
     "\n",
     "            display(Javascript(\"$(\\\"option[data-value='\" + anchor +\"']\\\").remove();\"))   \n",
     "            display(Javascript(\"$('.widget-area > .widget-subarea > .widget-box:gt(0)').remove();\"))\n",

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1904f2b4/spot-oa/oa/flow/flow_conf.json
----------------------------------------------------------------------
diff --git a/spot-oa/oa/flow/flow_conf.json b/spot-oa/oa/flow/flow_conf.json
index 18e76a7..cd96b08 100644
--- a/spot-oa/oa/flow/flow_conf.json
+++ b/spot-oa/oa/flow/flow_conf.json
@@ -21,20 +21,19 @@
 	    ,"score":17
     },
 	"column_indexes_filter": [0,8,9,10,11,12,13,14,15,16,17],	
-	"flow_score_fields": {
-		"sev": 0
-		,"tstart": 1
-		,"srcIP": 2
-		,"dstIP":3
-		,"sport":4
-		,"dport":5
-		,"proto":6
-		,"ipkt":7
-		,"ibyt":8
-		,"opkt":9
-		,"obyt":10
-		,"score":11
-		,"rank":12
+	"flow_score_fields": { 
+		"tstart": 0
+		,"srcIP": 1
+		,"dstIP":2
+		,"sport":3
+		,"dport":4
+		,"proto":5
+		,"ipkt":6
+		,"ibyt":7
+		,"opkt":8
+		,"obyt":9
+		,"score":10
+		,"rank":11
 	},
 	"flow_feedback_fields": {
 		"sev":0

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1904f2b4/spot-oa/oa/flow/flow_oa.py
----------------------------------------------------------------------
diff --git a/spot-oa/oa/flow/flow_oa.py b/spot-oa/oa/flow/flow_oa.py
index 027b54f..26e224b 100644
--- a/spot-oa/oa/flow/flow_oa.py
+++ b/spot-oa/oa/flow/flow_oa.py
@@ -24,25 +24,27 @@ import numpy as np
 import linecache, bisect
 import csv
 import pandas as pd
+import subprocess
+import numbers
+import api.resources.hdfs_client as HDFSClient
+import api.resources.impala_engine as impala
 
 from collections import OrderedDict
 from multiprocessing import Process
-from utils import Util,ProgressBar
+from utils import Util, ProgressBar
 from components.data.data import Data
 from components.geoloc.geoloc import GeoLocalization
 from components.reputation.gti import gti
-
 import time
 
 
 class OA(object):
 
-    def __init__(self,date,limit=500,logger=None):       
-       
-       self._initialize_members(date,limit,logger)
-       
-    def _initialize_members(self,date,limit,logger):
-        
+    def __init__(self,date,limit=500,logger=None):
+        self._initialize_members(date,limit,logger)
+
+    def _initialize_members(self,date,limit,logger): 
+
         # get logger if exists. if not, create new instance.
         self._logger = logging.getLogger('OA.Flow') if logger else Util.get_logger('OA.Flow',create_file=False)
 
@@ -57,31 +59,35 @@ class OA(object):
         self._ingest_summary_path = None
         self._flow_scores = []
         self._results_delimiter = '\t'
+        
 
         # get app configuration.
         self._spot_conf = Util.get_spot_conf()
 
-        # get scores fields conf
+        # # get scores fields conf
         conf_file = "{0}/flow_conf.json".format(self._scrtip_path)
-        self._conf = json.loads(open (conf_file).read(),object_pairs_hook=OrderedDict)     
- 
+        self._conf = json.loads(open (conf_file).read(),object_pairs_hook=OrderedDict)
+
         # initialize data engine
         self._db = self._spot_conf.get('conf', 'DBNAME').replace("'", "").replace('"', '')
         self._engine = Data(self._db, self._table_name,self._logger)
-                      
+        
+        
+                
     def start(self):       
         
         ####################
         start = time.time()
-        ####################
+        ####################         
 
         self._create_folder_structure()
+        self._clear_previous_executions()        
         self._add_ipynb()  
         self._get_flow_results()
         self._add_network_context()
         self._add_geo_localization()
         self._add_reputation()        
-        self._create_flow_scores_csv()
+        self._create_flow_scores()
         self._get_oa_details()
         self._ingest_summary()
 
@@ -89,12 +95,34 @@ class OA(object):
         end = time.time()
         print(end - start)
         ##################
-       
-    def _create_folder_structure(self):
+        
+
+    def _clear_previous_executions(self):
+        
+        self._logger.info("Cleaning data from previous executions for the day")       
+        yr = self._date[:4]
+        mn = self._date[4:6]
+        dy = self._date[6:]  
+        table_schema = []
+        HUSER = self._spot_conf.get('conf', 'HUSER').replace("'", "").replace('"', '')
+        table_schema=['suspicious', 'edge','chords','threat_investigation', 'timeline', 'storyboard', 'summary' ] 
+
+        for path in table_schema:
+            HDFSClient.delete_folder("{0}/flow/hive/oa/{1}/y={2}/m={3}/d={4}".format(HUSER,path,yr,mn,dy),user="impala")
+        HDFSClient.delete_folder("{0}/flow/hive/oa/{1}/y={2}/m={3}".format(HUSER,"",yr,mn),user="impala")
+        #removes Feedback file
+        HDFSClient.delete_folder("{0}/{1}/scored_results/{2}{3}{4}/feedback/ml_feedback.csv".format(HUSER,self._table_name,yr,mn,dy))
+        #removes json files from the storyboard
+        HDFSClient.delete_folder("{0}/{1}/oa/{2}/{3}/{4}/{5}".format(HUSER,self._table_name,"storyboard",yr,mn,dy))
+
+        
+
+
+    def _create_folder_structure(self):   
 
-        # create date folder structure if it does not exist.
         self._logger.info("Creating folder structure for OA (data and ipynb)")       
         self._data_path,self._ingest_summary_path,self._ipynb_path = Util.create_oa_folders("flow",self._date)
+ 
 
     def _add_ipynb(self):     
 
@@ -109,6 +137,7 @@ class OA(object):
         else:
             self._logger.error("There was a problem adding the IPython Notebooks, please check the directory exists.")
             
+            
     def _get_flow_results(self):
                
         self._logger.info("Getting {0} Machine Learning Results from HDFS".format(self._date))
@@ -117,8 +146,8 @@ class OA(object):
         # get hdfs path from conf file 
         HUSER = self._spot_conf.get('conf', 'HUSER').replace("'", "").replace('"', '')
         hdfs_path = "{0}/flow/scored_results/{1}/scores/flow_results.csv".format(HUSER,self._date)
-               
-        # get results file from hdfs
+        
+         # get results file from hdfs
         get_command = Util.get_ml_results_form_hdfs(hdfs_path,self._data_path)
         self._logger.info("{0}".format(get_command))
 
@@ -134,36 +163,36 @@ class OA(object):
             self._logger.error("There was an error getting ML results from HDFS")
             sys.exit(1)
 
-        # add headers.        
-        self._logger.info("Adding headers based on configuration file: score_fields.json")
-        self._flow_scores = [ [ str(key) for (key,value) in self._conf['flow_score_fields'].items()] ]
-
-        # filter results add sev and rank.
+        # filter results add rank.
         self._logger.info("Filtering required columns based on configuration")
-        self._flow_scores.extend([ [0] +  [ conn[i] for i in self._conf['column_indexes_filter'] ] + [n] for n, conn in enumerate(self._flow_results) ])
+
+        self._flow_scores.extend([ [ conn[i] for i in self._conf['column_indexes_filter'] ] + [n] for n, conn in enumerate(self._flow_results) ])
      
-    def _create_flow_scores_csv(self):
 
-        flow_scores_csv = "{0}/flow_scores.csv".format(self._data_path)
-        Util.create_csv_file(flow_scores_csv,self._flow_scores)
+    def _create_flow_scores(self):
+
+        # get date parameters.
+        yr = self._date[:4]
+        mn = self._date[4:6]
+        dy = self._date[6:] 
+        value_string = ""
 
-        # create bk file
-        flow_scores_bu_csv = "{0}/flow_scores_bu.csv".format(self._data_path)
-        Util.create_csv_file(flow_scores_bu_csv,self._flow_scores)  
+        for row in self._flow_scores:
+            value_string += str(tuple(Util.cast_val(item) for item in row)) + ","              
+    
+        load_into_impala = ("""
+             INSERT INTO {0}.flow_scores partition(y={2}, m={3}, d={4}) VALUES {1}
+        """).format(self._db, value_string[:-1], yr, mn, dy) 
+        impala.execute_query(load_into_impala)
+ 
 
     def _add_network_context(self):
 
         # use ipranges to see if the IPs are internals.         
         ip_ranges_file = "{0}/context/ipranges.csv".format(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
 
-        # add new headers (srcIpInternal/destIpInternal).
-        self._logger.info("Adding network context headers")
-        flow_headers = self._flow_scores[0]
-        flow_headers.extend(["srcIpInternal","destIpInternal"])
-
         # add values to srcIpInternal and destIpInternal.
         flow_scores = iter(self._flow_scores)
-        next(flow_scores)
 
         if os.path.isfile(ip_ranges_file):
 
@@ -184,11 +213,9 @@ class OA(object):
             self._flow_scores = [ conn + [ self._is_ip_internal(conn[src_ip_index],ip_internal_ranges)]+[ self._is_ip_internal(conn[dst_ip_index],ip_internal_ranges)] for conn in flow_scores]
            
         else:
-
-            self._flow_scores = [ conn + ["",""] for conn in flow_scores ]            
+            self._flow_scores = [ conn + [0,0] for conn in flow_scores ]            
             self._logger.info("WARNING: Network context was not added because the file ipranges.csv does not exist.")
         
-        self._flow_scores.insert(0,flow_headers)
 
     def _is_ip_internal(self,ip, ranges):
         result = 0
@@ -204,14 +231,10 @@ class OA(object):
         # use ipranges to see if the IPs are internals.         
         iploc_file = "{0}/context/iploc.csv".format(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
 
-        # add new headers (srcIpInternal/destIpInternal).     
         self._logger.info("Adding geo localization headers")
-        flow_headers = self._flow_scores[0]
-        flow_headers.extend(["srcGeo","dstGeo","srcDomain","dstDomain"]) 
 
         # add values to srcIpInternal and destIpInternal.
         flow_scores = iter(self._flow_scores)
-        next(flow_scores)
 
         if os.path.isfile(iploc_file):
 
@@ -241,17 +264,11 @@ class OA(object):
             self._flow_scores = [ conn + ["","","",""] for conn in flow_scores ]   
             self._logger.info("WARNING: IP location was not added because the file {0} does not exist.".format(iploc_file))
 
-        self._flow_scores.insert(0,flow_headers)       
-
+        
     def _add_reputation(self):
         
         reputation_conf_file = "{0}/components/reputation/reputation_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
         
-        # add new headers (gtiSrcRep/gtiDstRep).
-        self._logger.info("Adding reputation headers")
-        flow_headers_rep = self._flow_scores[0]
-        flow_headers_rep.extend(["srcIP_rep","dstIP_rep"])
-        
         # read configuration.
         self._logger.info("Reading reputation configuration file: {0}".format(reputation_conf_file))
         rep_conf = json.loads(open(reputation_conf_file).read())
@@ -268,7 +285,6 @@ class OA(object):
 
             self._logger.info("Getting GTI reputation for src IPs")
             flow_scores_src = iter(self._flow_scores)
-            next(flow_scores_src)
 
             # getting reputation for src IPs
             src_ips = [ conn[src_ip_index] for conn in flow_scores_src ]            
@@ -276,30 +292,25 @@ class OA(object):
 
             self._logger.info("Getting GTI reputation for dst IPs")
             flow_scores_dst = iter(self._flow_scores)
-            next(flow_scores_dst)
 
             # getting reputation for dst IPs            
             dst_ips = [  conn[dst_ip_index] for conn in flow_scores_dst ]
             dst_rep_results = flow_gti.check(dst_ips)
 
             flow_scores_final = iter(self._flow_scores)
-            next(flow_scores_final)
 
             self._flow_scores = []
-            flow_scores = [conn + [src_rep_results[conn[src_ip_index]]] + [dst_rep_results[conn[dst_ip_index]]]  for conn in  flow_scores_final ]
+            flow_scores = [conn + [src_rep_results[conn[src_ip_index]]] + [dst_rep_results[conn[dst_ip_index]]] for conn in flow_scores_final ]
             self._flow_scores = flow_scores           
             
         else:
             # add values to gtiSrcRep and gtiDstRep.
             flow_scores = iter(self._flow_scores)
-            next(flow_scores)
 
             self._flow_scores = [ conn + ["",""] for conn in flow_scores ]   
             self._logger.info("WARNING: IP reputation was not added. No refclient configured")  
 
 
-        self._flow_scores.insert(0,flow_headers_rep)       
-
     def _get_oa_details(self):
 
         self._logger.info("Getting OA Flow suspicious details/chord diagram")
@@ -319,8 +330,6 @@ class OA(object):
         
         # skip header
         sp_connections = iter(self._flow_scores)
-        next(sp_connections)
-      
         # loop connections.
         connections_added = [] 
         for conn in sp_connections:
@@ -330,7 +339,7 @@ class OA(object):
                 continue
             else:
                 connections_added.append(conn)
-           
+            
             src_ip_index = self._conf["flow_score_fields"]["srcIP"]
             dst_ip_index = self._conf["flow_score_fields"]["dstIP"]
 
@@ -340,34 +349,32 @@ class OA(object):
             dip = conn[dst_ip_index]
 
             # get hour and date  (i.e. 2014-07-08 10:10:40)
-            date_array = conn[1].split(' ')
+            
+            date_array = conn[0].split(' ')
             date_array_1 = date_array[0].split('-')
             date_array_2 = date_array[1].split(':')
-
+	    
             yr = date_array_1[0]                   
             dy = date_array_1[2]
             mh = date_array_1[1]
 
             hr = date_array_2[0]
             mm = date_array_2[1]
-        
-            # connection details query.
-            sp_query = ("SELECT treceived as tstart,sip as srcip,dip as dstip,sport as sport,dport as dport,proto as proto,flag as flags,stos as TOS,ibyt as ibytes,ipkt as ipkts,input as input, output as output,rip as rip, obyt as obytes, opkt as opkts from {0}.{1} where ((sip='{2}' AND dip='{3}') or (sip='{3}' AND dip='{2}')) AND y={8} AND m={4} AND d={5} AND h={6} AND trminute={7} order by tstart limit 100")
-                 
-            # sp query.
-            sp_query = sp_query.format(self._db,self._table_name,sip,dip,mh,dy,hr,mm,yr)
-
-            # output file.
-            edge_file = "{0}/edge-{1}-{2}-{3}-{4}.tsv".format(self._data_path,sip.replace(".","_"),dip.replace(".","_"),hr,mm)
+            
+            query_to_load = ("""
+                INSERT INTO TABLE {0}.flow_edge PARTITION (y={2}, m={3}, d={4})
+                SELECT treceived as tstart,sip as srcip,dip as dstip,sport as sport,dport as dport,proto as proto,flag as flags,
+                stos as tos,ibyt as ibyt,ipkt as ipkt, input as input, output as output,rip as rip, obyt as obyt, 
+                opkt as opkt, h as hh, trminute as mn from {0}.{1} where ((sip='{7}' AND dip='{8}') or (sip='{8}' AND dip='{7}')) 
+                AND y={2} AND m={3} AND d={4} AND h={5} AND trminute={6};
+                """).format(self._db,self._table_name,yr, mh, dy, hr, mm, sip,dip)
+            impala.execute_query(query_to_load)
+            
 
-            # execute query
-            self._engine.query(sp_query,output_file=edge_file,delimiter="\\t")
-    
     def _get_chord_details(self,bar=None):
 
          # skip header
         sp_connections = iter(self._flow_scores)
-        next(sp_connections) 
 
         src_ip_index = self._conf["flow_score_fields"]["srcIP"]
         dst_ip_index = self._conf["flow_score_fields"]["dstIP"] 
@@ -389,69 +396,51 @@ class OA(object):
             if n > 1:
                 ip_list = []                
                 sp_connections = iter(self._flow_scores)
-                next(sp_connections)
                 for row in sp_connections:                    
-                    if ip == row[2] : ip_list.append(row[3])
-                    if ip == row[3] :ip_list.append(row[2])    
+                    if ip == row[1] : ip_list.append(row[2])
+                    if ip == row[2] :ip_list.append(row[1])    
                 ips = list(set(ip_list))
              
                 if len(ips) > 1:
                     ips_filter = (",".join(str("'{0}'".format(ip)) for ip in ips))
-                    chord_file = "{0}/chord-{1}.tsv".format(self._data_path,ip.replace(".","_"))                     
-                    ch_query = ("SELECT sip as srcip, dip as dstip, SUM(ibyt) as ibytes, SUM(ipkt) as ipkts from {0}.{1} where y={2} and m={3} \
-                        and d={4} and ( (sip='{5}' and dip IN({6})) or (sip IN({6}) and dip='{5}') ) group by sip,dip")
-                    self._engine.query(ch_query.format(self._db,self._table_name,yr,mn,dy,ip,ips_filter),chord_file,delimiter="\\t")
+ 
+                    query_to_load = ("""
+                        INSERT INTO TABLE {0}.flow_chords PARTITION (y={2}, m={3}, d={4})
+                        SELECT '{5}' as ip_threat, sip as srcip, dip as dstip, SUM(ibyt) as ibyt, SUM(ipkt) as ipkt from {0}.{1} where y={2} and m={3}
+                        and d={4} and ((sip='{5}' and dip IN({6})) or (sip IN({6}) and dip='{5}')) group by sip,dip,m,d;
+                        """).format(self._db,self._table_name,yr,mn,dy,ip,ips_filter)
+
+                    impala.execute_query(query_to_load)
+ 
 
-     
     def _ingest_summary(self): 
         # get date parameters.
         yr = self._date[:4]
         mn = self._date[4:6]
         dy = self._date[6:]
 
-        self._logger.info("Getting ingest summary data for the day")
+        self._logger.info("Getting ingest summary data for the day") 
+
+        query_to_load = ("""
+            INSERT INTO TABLE {0}.flow_ingest_summary PARTITION (y={2}, m={3})
+            SELECT treceived,tryear, trmonth, trday, trhour, trminute, COUNT(*) total
+            FROM {0}.{1}
+            WHERE y={2} AND m={3} AND d={4}
+            AND unix_tstamp IS NOT NULL
+            AND sip IS NOT NULL
+            AND sport IS NOT NULL
+            AND dip IS NOT NULL
+            AND dport IS NOT NULL
+            AND ibyt IS NOT NULL
+            AND ipkt IS NOT NULL
+            AND cast(treceived as timestamp) IS NOT NULL
+            GROUP BY treceived,tryear, trmonth, trday, trhour, trminute;
+            """).format(self._db,self._table_name,yr,mn,dy) 
         
-        ingest_summary_cols = ["date","total"]		
-        result_rows = []       
-        df_filtered =  pd.DataFrame() 
-
-        ingest_summary_file = "{0}/is_{1}{2}.csv".format(self._ingest_summary_path,yr,mn)			
-        ingest_summary_tmp = "{0}.tmp".format(ingest_summary_file)
-        if os.path.isfile(ingest_summary_file):
-            df = pd.read_csv(ingest_summary_file, delimiter=',',names=ingest_summary_cols, skiprows=1)
-            df_filtered = df[df['date'].str.contains("{0}-{1}-{2}".format(yr, mn, dy)) == False] 
-        else:
-            df = pd.DataFrame()
-        
-        # get ingest summary.           
-        ingest_summary_qry = ("SELECT tryear, trmonth, trday, trhour, trminute, COUNT(*) total"
-                            " FROM {0}.{1} "
-                            " WHERE "
-                            " y={2} "
-                            " AND m={3} "
-                            " AND d={4} "
-                            " AND unix_tstamp IS NOT NULL AND sip IS NOT NULL "
-                            " AND sport IS NOT NULL AND dip IS NOT NULL "
-                            " AND dport IS NOT NULL AND ibyt IS NOT NULL "
-                            " AND ipkt IS NOT NULL "
-                            " GROUP BY tryear, trmonth, trday, trhour, trminute;")
-
-
-        ingest_summary_qry = ingest_summary_qry.format(self._db,self._table_name, yr, mn, dy)
-        results_file = "{0}/results_{1}.csv".format(self._ingest_summary_path,self._date)
-        self._engine.query(ingest_summary_qry,output_file=results_file,delimiter=",")
-
-        if os.path.isfile(results_file):
-            result_rows = pd.read_csv(results_file, delimiter=',') 
-
-            df_new = pd.DataFrame([["{0}-{1}-{2} {3}:{4}".format(yr, mn, dy, str(val['trhour']).zfill(2), str(val['trminute']).zfill(2)), int(val[5])] for key,val in result_rows.iterrows()],columns = ingest_summary_cols)						
-
-            df_filtered = df_filtered.append(df_new, ignore_index=True)
-            df_filtered.to_csv(ingest_summary_tmp,sep=',', index=False)
-
-            os.remove(results_file)
-            os.rename(ingest_summary_tmp,ingest_summary_file)
-        else:
-            self._logger.info("No data found for the ingest summary")
+        impala.execute_query(query_to_load)
+
+
 
-        
\ No newline at end of file
+ 
+
+        

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1904f2b4/spot-oa/oa/flow/ipynb_templates/Edge_Investigation_master.ipynb
----------------------------------------------------------------------
diff --git a/spot-oa/oa/flow/ipynb_templates/Edge_Investigation_master.ipynb b/spot-oa/oa/flow/ipynb_templates/Edge_Investigation_master.ipynb
index f4536a3..ab41963 100644
--- a/spot-oa/oa/flow/ipynb_templates/Edge_Investigation_master.ipynb
+++ b/spot-oa/oa/flow/ipynb_templates/Edge_Investigation_master.ipynb
@@ -40,21 +40,16 @@
     "dsource = path[len(path)-2]  \n",
     "dpath = '/'.join(['data' if var == 'ipynb' else var for var in path]) + '/'\n",
     "cpath = '/'.join(['context' if var == 'ipynb' else var for var in path][:len(path)-2]) + '/'\n",
-    "opath = '/'.join(['oa' if var == 'ipynb' else var for var in path][:len(path)-1]) + '/'\n",
-    "sconnect = dpath + 'flow_scores.csv' \n",
-    "sconnectbu = dpath + 'flow_scores_bu.csv'\n",
-    "score_fbk = dpath + 'flow_scores_fb.csv'\n",
-    "tmpconnect = sconnect +'.tmp'\n",
-    "stemp = sconnect + '.new'\n",
-    "file_schemas = opath + dsource + '_conf.json'\n",
-    "#gets feedback columns from config file\n",
-    "feedback_cols = json.loads(open (file_schemas).read(),object_pairs_hook=OrderedDict)['flow_feedback_fields']\n",
+    "opath = '/'.join(['oa' if var == 'ipynb' else var for var in path][:len(path)-1]) + '/'  \n",
+    "\n",
     "coff = 250;\n",
     "nwloc = cpath + 'networkcontext.csv' \n",
     "srcdict,srclist = {},[]\n",
     "dstdict,dstlist = {},[]\n",
     "sportdict,sportlist = {},[]\n",
-    "dportdict,dportlist = {},[]"
+    "dportdict,dportlist = {},[]\n",
+    "global svals\n",
+    "svals = []"
    ]
   },
   {
@@ -87,8 +82,7 @@
     "    srclist.append('- Select -')\n",
     "    dstlist.append('- Select -')\n",
     "    sportlist.append('- Select -')\n",
-    "    dportlist.append('- Select -')\n",
-    "    set_rules()\n",
+    "    dportlist.append('- Select -')\n", 
     "    \n",
     "    response = GraphQLClient.request(\n",
     "        query=\"\"\"query($date:SpotDateType!) {\n",
@@ -98,7 +92,6 @@
     "                    sport: srcPort\n",
     "                    dstIP: dstIp\n",
     "                    dport: dstPort\n",
-    "                    sev\n",
     "                }\n",
     "            }\n",
     "        }\"\"\",\n",
@@ -108,22 +101,24 @@
     "    )\n",
     "\n",
     "    rowct = 1\n",
-    "    for row in response['data']['flow']['suspicious']:\n",
-    "        if row['srcIP'] not in srcdict and row['sev'] == 0:\n",
-    "            srclist.append(row['srcIP'])\n",
-    "            srcdict[row['srcIP']] = struct.unpack(\"!L\", socket.inet_aton(row['srcIP']))[0]\n",
-    "        if row['dstIP'] not in dstdict and row['sev'] == 0:\n",
-    "            dstlist.append(row['dstIP'])\n",
-    "            dstdict[row['dstIP']] = struct.unpack(\"!L\", socket.inet_aton(row['dstIP']))[0]\n",
-    "        if row['sport'] not in sportdict and row['sev'] == 0:\n",
-    "            sportlist.append(row['sport'])\n",
-    "            sportdict[row['sport']] = row['sport']\n",
-    "        if row['dport'] not in dportdict and row['sev'] == 0:\n",
-    "            dportlist.append(row['dport'])\n",
-    "            dportdict[row['dport']] = row['dport']\n",
-    "        if rowct == coff:\n",
-    "            break;\n",
-    "        rowct += 1\n",
+    "    if not 'errors' in response:\n",
+    "        for row in response['data']['flow']['suspicious']:\n",
+    "            if row['srcIP'] not in srcdict:\n",
+    "                srclist.append(row['srcIP'])\n",
+    "                srcdict[row['srcIP']] = struct.unpack(\"!L\", socket.inet_aton(row['srcIP']))[0]\n",
+    "            if row['dstIP'] not in dstdict:\n",
+    "                dstlist.append(row['dstIP'])\n",
+    "                dstdict[row['dstIP']] = struct.unpack(\"!L\", socket.inet_aton(row['dstIP']))[0]\n",
+    "            if row['sport'] not in sportdict:\n",
+    "                sportlist.append(str(row['sport']))\n",
+    "                sportdict[row['sport']] = row['sport']\n",
+    "            if row['dport'] not in dportdict:\n",
+    "                dportlist.append(str(row['dport']))\n",
+    "                dportdict[row['dport']] = row['dport']\n",
+    "            if rowct == coff:\n",
+    "                break;\n",
+    "            rowct += 1\n",
+    "     \n",
     "    \n",
     "    # Source IP box\n",
     "    scrIpLalbel = widgets.HTML(value=\"Source IP:\", height='10%', width='100%')\n",
@@ -178,217 +173,64 @@
     "    \n",
     "    def update_sconnects(b):\n",
     "        clear_output()\n",
-    "        time.sleep(.25)\n",
-    "        dvals,svals = [], [] \n",
-    "        scored_threats =[]\n",
-    "        #define logic based on combo of input\n",
     "        #Gets input values\n",
+    "        global svals\n",
     "        if srctext.value != '':\n",
-    "            svals = [srctext.value,dstselect.value,sportselect.value,dportselect.value]\n",
-    "            dvals = [srcselect.value,srctext.value,sportselect.value,dportselect.value] \n",
+    "            svals.append([srctext.value,dstselect.value,sportselect.value,dportselect.value, ratingbut.value])\n",
+    "            svals.append([srcselect.value,srctext.value,sportselect.value,dportselect.value, ratingbut.value])\n",
     "        else:\n",
-    "            svals = [srcselect.value,dstselect.value,sportselect.value,dportselect.value]\n",
-    "            dvals = [] \n",
-    "        risk = ratingbut.value \n",
-    "        shash, dhash = 0, 0\n",
-    "        fhash = ['srcIP','dstIP','sport','dport'] \n",
-    "        \n",
-    "        for k in xrange(len(svals)):\n",
-    "            if svals[k] == '- Select -': svals[k] = ''\n",
-    "            if svals[k] != '': shash += 2**k    \n",
-    "            if len(dvals) > 0:\n",
-    "                if dvals[k] == '- Select -': dvals[k] = ''\n",
-    "                if dvals[k] != '': dhash += 2**k    \n",
-    "        \n",
-    "        rowct = 0\n",
-    "        threat = []\n",
-    "        if shash > 0 or dhash > 0:            \n",
-    "            with open(tmpconnect,'w') as g:\n",
-    "                with open(sconnect, 'r') as f:\n",
-    "                    reader = csv.DictReader(f,delimiter=',')\n",
-    "                    riter = csv.DictWriter(g,delimiter=',', fieldnames=reader.fieldnames)\n",
-    "                    riter.writeheader()\n",
-    "                    \n",
-    "                    for row in reader: \n",
-    "                        result, resultd = 0,0\n",
-    "                        for n in xrange(0,len(svals)):\n",
-    "                            if (2**n & shash) > 0:  \n",
-    "                                if row[fhash[n]] == svals[n]:\n",
-    "                                    result += 2**n \n",
-    "                        if result == shash:\n",
-    "                            row['sev'] = risk \n",
-    "                            scored_threats.append({col:row[col] for col in feedback_cols.keys()})\n",
-    "                            rowct += 1\n",
-    "\n",
-    "                        if len(dvals) > 0:\n",
-    "                            for n in xrange(0,len(dvals)):\n",
-    "                                if (2**n & dhash) > 0:  \n",
-    "                                    if row[fhash[n]] == dvals[n]:\n",
-    "                                        resultd += 2**n \n",
-    "                            if resultd == dhash:\n",
-    "                                row['sev'] = risk\n",
-    "                                scored_threats.append({col:row[col] for col in feedback_cols.keys()})\n",
-    "                                rowct += 1\n",
-    "                                \n",
-    "                        riter.writerow(row) \n",
+    "            svals.append([srcselect.value,dstselect.value,sportselect.value,dportselect.value, ratingbut.value])\n",
+    "     \n",
+    "        if srcselect.value != \"- Select -\":\n",
+    "            display(Javascript(\"$(\\\"option[data-value='\" + srcselect.value +\"']\\\").remove();\"))\n",
+    "        if dstselect.value != \"- Select -\":\n",
+    "            display(Javascript(\"$(\\\"option[data-value='\" + srcselect.value +\"']\\\").remove();\"))\n",
+    "        if sportselect.value != \"- Select -\":\n",
+    "            display(Javascript(\"$(\\\"option[data-value='\" + srcselect.value +\"']\\\").remove();\"))\n",
+    "        if dportselect.value != \"- Select -\":\n",
+    "            display(Javascript(\"$(\\\"option[data-value='\" + dportselect.value +\"']\\\").remove();\"))\n",
     "\n",
-    "            create_feedback_file(scored_threats)\n",
-    "            shutil.copyfile(tmpconnect,sconnect)\n",
-    "            \n",
-    "        print \"{0} matching connections scored\".format(rowct)\n",
     "        \n",
     "            \n",
     "    def savesort(b):\n",
+    "        global svals\n",
     "        clear_output()\n",
-    "        with open(stemp,'w') as g:\n",
-    "            reader = csv.DictReader(open(sconnect), delimiter=\",\")\n",
-    "            riter = csv.DictWriter(g,fieldnames=reader.fieldnames, delimiter=',')\n",
-    "            srtlist = sorted(reader, key=lambda x: (int(x[\"sev\"]), float(x[\"score\"])))\n",
-    "            riter.writeheader()\n",
-    "            riter.writerows(srtlist)\n",
-    "                \n",
-    "        shutil.copyfile(stemp,sconnect)\n",
-    "        print \"Suspicious connects successfully updated\"        \n",
-    "        display(Javascript('reloadParentData();')) \n",
-    "        bigBox.close()\n",
-    "        # Rebuild widgets form\n",
-    "        displaythis()\n",
-    "        ml_feedback()\n",
+    "        variables = []\n",
+    "        mutation=\"\"\"mutation($input:[NetflowScoreInputType!]!)\n",
+    "                {\n",
+    "                  flow{\n",
+    "                    score(input:$input)\n",
+    "                        {success}\n",
+    "                  }\n",
+    "                }\"\"\"\n",
+    "        \n",
+    "        for row in svals:\n",
+    "            variables.append({\n",
+    "                'date': datetime.datetime.strptime(date, '%Y%m%d').strftime('%Y-%m-%d'),\n",
+    "                'score': row[4],\n",
+    "                'srcIp': row[0] if row[0] != '- Select -' else None,\n",
+    "                'dstIp': row[1] if row[1] != '- Select -' else None,\n",
+    "                'srcPort': row[2] if row[2] != '- Select -' else None,\n",
+    "                'dstPort': row[3]  if row[3] != '- Select -' else None\n",
+    "                })\n",
+    "        \n",
+    "        var = {'input':variables}\n",
+    "        response = GraphQLClient.request(mutation,var)\n",
+    "         \n",
+    "        svals = []\n",
+    "        if not 'errors' in response :\n",
+    "            print \"Suspicious connects successfully updated\"        \n",
+    "            display(Javascript('reloadParentData();')) \n",
+    "            bigBox.close()\n",
+    "            # Rebuild widgets form\n",
+    "            displaythis() \n",
+    "        else:\n",
+    "            print \"An error ocurred whith the scoring process\"\n",
+    "            print response['errors'][0]['message']\n",
+    "        \n",
     "    assignbut.on_click(update_sconnects)\n",
     "    updatebut.on_click(savesort)\n",
-    "\n",
-    "    \n",
-    "def create_feedback_file(scored_rows):\n",
-    "#     #works on the feedback tab-separated file\n",
-    "    if not os.path.exists(score_fbk):  \n",
-    "        with open(score_fbk, 'w') as feedback:\n",
-    "            wr = csv.DictWriter(feedback, fieldnames=feedback_cols, delimiter='\\t', quoting=csv.QUOTE_NONE)   \n",
-    "            wr.writeheader()\n",
-    "\n",
-    "    wr = csv.DictWriter(open(score_fbk, 'a'), delimiter='\\t', fieldnames=feedback_cols, quoting=csv.QUOTE_NONE)\n",
-    "    for row in scored_rows:\n",
-    "        wr.writerow(row)\n",
-    "\n",
-    "\n",
-    "def set_rules():\n",
-    "    rops = ['leq','leq','leq','leq','leq','leq']\n",
-    "    rvals = ['','','',1024,'',54]\n",
-    "    risk = 2\n",
-    "    apply_rules(rops,rvals,risk)\n",
-    "    rops = ['leq','leq','leq','leq','eq','eq']\n",
-    "    rvals = ['','','',1024,3,152]\n",
-    "    risk = 2\n",
-    "    apply_rules(rops,rvals,risk)\n",
-    "    rops = ['leq','leq','leq','leq','eq','eq']\n",
-    "    rvals = ['','','',1024,2,104]\n",
-    "    risk = 2\n",
-    "    rops = ['leq','leq','eq','leq','leq','leq']\n",
-    "    rvals = ['','',0,1023,'','']\n",
-    "    risk = 2\n",
-    "    apply_rules(rops,rvals,risk)\n",
-    "\n",
-    "    \n",
-    "    \n",
-    "def apply_rules(rops,rvals,risk):\n",
-    "    #define logic based on combo of input\n",
-    "    rhash = 0\n",
-    "    rfhash = ['srcIP','dstIP','sport','dport', 'ipkt', 'ibyt']\n",
-    "    scored_threats=[]\n",
-    "    \n",
-    "    for k in xrange(len(rvals)):\n",
-    "        if rvals[k] != '':                \n",
-    "            rhash += 2**k\n",
-    "            \n",
-    "    with open(sconnect, 'r') as f:\n",
-    "        with open(tmpconnect,'w') as g:\n",
-    "            reader = csv.DictReader(f,delimiter=',')\n",
-    "            riter = csv.DictWriter(g,fieldnames=reader.fieldnames,delimiter=',')\n",
-    "            riter.writeheader()\n",
-    "            for row in reader: \n",
-    "                result = 0\n",
-    "                for n in xrange(0,len(rvals)):\n",
-    "                    if (2**n & rhash) > 0:\n",
-    "                        if rops[n] == 'leq':\n",
-    "                            if int(row[rfhash[n]]) <= int(rvals[n]):\n",
-    "                                result += 2**n                           \n",
-    "                        if rops[n] == 'eq':\n",
-    "                            if int(row[rfhash[n]]) == int(rvals[n]):\n",
-    "                                result += 2**n                           \n",
-    "                if result == rhash:\n",
-    "                    row['sev'] = risk\n",
-    "                    scored_threats.append({col:row[col] for col in feedback_cols.keys()})\n",
-    "                riter.writerow(row)  \n",
-    "                \n",
-    "    create_feedback_file(scored_threats)\n",
-    "    shutil.copyfile(tmpconnect,sconnect)\n",
-    "    \n",
-    "    \n",
-    "def attack_heuristics():\n",
-    "    with open(sconnect, 'rb') as f:\n",
-    "        reader = csv.DictReader(f,delimiter=',') \n",
-    "        reader.next();\n",
-    "        rowct = 1\n",
-    "        for row in reader:\n",
-    "            if row['srcIP'] not in srcdict:\n",
-    "                srcdict[row['srcIP']] = row['srcIP']\n",
-    "            if row['dstIP'] not in dstdict:\n",
-    "                 dstdict[row['dstIP']] = row['dstIP']\n",
-    "            if row['sport'] not in sportdict:\n",
-    "                sportdict[row['sport']] = row['sport']\n",
-    "            if row['dport'] not in dportdict:\n",
-    "                dportdict[row['dport']] = row['dport']\n",
-    "\n",
-    "    df = pd.read_csv(sconnect)   \n",
-    "    gb = df.groupby([u'srcIP'])      \n",
-    "  \n",
-    "    for srcip in srcdict:\n",
-    "        try:\n",
-    "            if len(gb.get_group(srcip)) > 20:\n",
-    "                print srcip,'connects:',len(gb.get_group(srcip))\n",
-    "        except:\n",
-    "            print \"Key Error for ip: \" + srcip\n",
-    "               \n",
-    "            \n",
-    "def ml_feedback():\n",
-    "    dst_name = os.path.basename(sconnect)\n",
-    "    str_fb=\"DSOURCE={0} &&\\\n",
-    "        FDATE={1} &&\\\n",
-    "        source /etc/spot.conf &&\\\n",
-    "        usr=$(echo $LUSER | cut -f3 -d'/') &&\\\n",
-    "        mlnode=$MLNODE &&\\\n",
-    "        lpath=$LPATH &&\\\n",
-    "        scp {2} $usr@$mlnode:$lpath/{3}\".format(dsource,date,score_fbk,dst_name)  \n",
-    "    \n",
-    "    subprocess.call(str_fb, shell=True)"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "Run attack heuristics."
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {
-    "collapsed": true
-   },
-   "outputs": [],
-   "source": [
-    "# set_rules()"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {
-    "collapsed": true
-   },
-   "outputs": [],
-   "source": [
-    "# attack_heuristics()"
+    "    "
    ]
   },
   {