You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by na...@apache.org on 2017/09/26 22:41:08 UTC

[05/50] [abbrv] incubator-spot git commit: Fix merge issues with master

Fix  merge issues with master


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/731279f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/731279f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/731279f3

Branch: refs/heads/SPOT-181_ODM
Commit: 731279f33c2ca8557773a6770c07da1134c7243e
Parents: b08211b 0fa4e5c
Author: Everardo Lopez Sandoval (Intel) <el...@elopezsa-mac02.zpn.intel.com>
Authored: Thu Jul 13 13:39:31 2017 -0500
Committer: Everardo Lopez Sandoval (Intel) <el...@elopezsa-mac02.zpn.intel.com>
Committed: Thu Jul 13 13:39:31 2017 -0500

----------------------------------------------------------------------
 spot-ml/build.sbt                               |      18 +-
 .../dns/model/DNSSuspiciousConnectsModel.scala  |      28 -
 .../org/apache/spot/utilities/TopDomains.scala  |       3 +-
 spot-ml/top-1m.csv                              | 1000000 ---------------
 spot-oa/oa/dns/dns_oa.py                        |     106 +-
 5 files changed, 72 insertions(+), 1000083 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/731279f3/spot-oa/oa/dns/dns_oa.py
----------------------------------------------------------------------
diff --cc spot-oa/oa/dns/dns_oa.py
index 6c823f4,c329d63..5982e8b
--- a/spot-oa/oa/dns/dns_oa.py
+++ b/spot-oa/oa/dns/dns_oa.py
@@@ -22,10 -22,8 +22,9 @@@ import shuti
  import sys
  import datetime
  import csv, math
- import re
  from tld import get_tld
 -
 +import api.resources.impala_engine as impala
 +import api.resources.hdfs_client as HDFSClient
  from collections import OrderedDict
  from utils import Util
  from components.data.data import Data
@@@ -68,7 -67,8 +68,7 @@@ class OA(object)
  
          # initialize data engine
          self._db = self._spot_conf.get('conf', 'DBNAME').replace("'", "").replace('"', '')
-         
 -        self._engine = Data(self._db,self._table_name ,self._logger) 
+ 
  
      def start(self):
  
@@@ -94,29 -93,8 +94,29 @@@
          print(end - start)
          ##################
  
 +
 +    def _clear_previous_executions(self):
-         
-         self._logger.info("Cleaning data from previous executions for the day")       
++
++        self._logger.info("Cleaning data from previous executions for the day")
 +        yr = self._date[:4]
 +        mn = self._date[4:6]
-         dy = self._date[6:]  
++        dy = self._date[6:]
 +        table_schema = []
 +        HUSER = self._spot_conf.get('conf', 'HUSER').replace("'", "").replace('"', '')
 +        table_schema=['suspicious', 'edge', 'dendro', 'threat_dendro', 'threat_investigation', 'storyboard', 'summary' ]
 +
 +        for path in table_schema:
 +            HDFSClient.delete_folder("{0}/{1}/hive/oa/{2}/y={3}/m={4}/d={5}".format(HUSER,self._table_name,path,yr,int(mn),int(dy)),user="impala")
 +        impala.execute_query("invalidate metadata")
 +
 +        #removes Feedback file
 +        HDFSClient.delete_folder("{0}/{1}/scored_results/{2}{3}{4}/feedback/ml_feedback.csv".format(HUSER,self._table_name,yr,mn,dy))
 +        #removes json files from the storyboard
 +        HDFSClient.delete_folder("{0}/{1}/oa/{2}/{3}/{4}/{5}".format(HUSER,self._table_name,"storyboard",yr,mn,dy))
 +
 +
      def _create_folder_structure(self):
-         
+ 
          # create date folder structure if it does not exist.
          self._logger.info("Creating folder structure for OA (data and ipynb)")       
          self._data_path,self._ingest_summary_path,self._ipynb_path = Util.create_oa_folders("dns",self._date)
@@@ -148,9 -125,10 +148,9 @@@
          # get results file from hdfs.
          get_command = Util.get_ml_results_form_hdfs(hdfs_path,self._data_path)
          self._logger.info("{0}".format(get_command))
-  
+ 
 -         # validate files exists
          if os.path.isfile(dns_results):
-     
+ 
              # read number of results based in the limit specified.
              self._logger.info("Reading {0} dns results file: {1}".format(self._date,dns_results))
              self._dns_results = Util.read_results(dns_results,self._limit,self._results_delimiter)[:]
@@@ -158,50 -136,48 +158,50 @@@
  
          else:
              self._logger.error("There was an error getting ML results from HDFS")
-             sys.exit(1) 
+             sys.exit(1)
  
 -        # add headers.        
 -        self._logger.info("Adding headers")
 -        self._dns_scores_headers = [  str(key) for (key,value) in self._conf['dns_score_fields'].items() ]
 -
          # add dns content.
-         self._dns_scores = [ conn[:]  for conn in self._dns_results][:]    
 -        self._dns_scores = [ conn[:]  for conn in self._dns_results][:]       
++        self._dns_scores = [ conn[:]  for conn in self._dns_results][:]
 +
  
-     def _move_time_stamp(self,dns_data): 
+     def _move_time_stamp(self,dns_data):
          
 -        for dns in dns_data:
 -            time_stamp = dns[1]
 -            dns.remove(time_stamp)
 -            dns.append(time_stamp)
 -        
 +        # return dns_data_ordered
          return dns_data        
  
 -    def _create_dns_scores_csv(self):
 +
 +    def _create_dns_scores(self):
          
 -        dns_scores_csv = "{0}/dns_scores.csv".format(self._data_path)
 -        dns_scores_final =  self._move_time_stamp(self._dns_scores)
 -        dns_scores_final.insert(0,self._dns_scores_headers)
 -        Util.create_csv_file(dns_scores_csv,dns_scores_final)   
 +        # get date parameters.
 +        yr = self._date[:4]
 +        mn = self._date[4:6]
-         dy = self._date[6:] 
++        dy = self._date[6:]
 +        value_string = ""
 +
 +        dns_scores_final = self._move_time_stamp(self._dns_scores)
 +        self._dns_scores = dns_scores_final
-     
+ 
 -        # create bk file
 -        dns_scores_bu_csv = "{0}/dns_scores_bu.csv".format(self._data_path)
 -        Util.create_csv_file(dns_scores_bu_csv,dns_scores_final)     
 +        for row in dns_scores_final:
-             value_string += str(tuple(Util.cast_val(item) for item in row)) + ","              
-     
++            value_string += str(tuple(Util.cast_val(item) for item in row)) + ","
++
 +        load_into_impala = ("""
 +             INSERT INTO {0}.dns_scores partition(y={2}, m={3}, d={4}) VALUES {1}
-         """).format(self._db, value_string[:-1], yr, mn, dy) 
++        """).format(self._db, value_string[:-1], yr, mn, dy)
 +        impala.execute_query(load_into_impala)
  
  
      def _add_tld_column(self):
          qry_name_col = self._conf['dns_results_fields']['dns_qry_name'] 
 -        self._dns_scores = [conn + [ get_tld("http://" + str(conn[qry_name_col]), fail_silently=True) if "http://" not in str(conn[qry_name_col]) else get_tld(str(conn[qry_name_col]), fail_silently=True)] for conn in self._dns_scores ] 
 -  
 +        self._dns_scores = [conn + [ get_tld("http://" + str(conn[qry_name_col]), fail_silently=True) if "http://" not in str(conn[qry_name_col]) else get_tld(str(conn[qry_name_col]), fail_silently=True)] for conn in self._dns_scores ]
-         
++
 +
      def _add_reputation(self):
  
          # read configuration.
          reputation_conf_file = "{0}/components/reputation/reputation_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
          self._logger.info("Reading reputation configuration file: {0}".format(reputation_conf_file))
          rep_conf = json.loads(open(reputation_conf_file).read())
-                 
 -        
 -        
++
          # initialize reputation services.
          self._rep_services = []
          self._logger.info("Initializing reputation services.")
@@@ -230,11 -207,8 +230,11 @@@
                  rep_results = {}            
                  for result in rep_services_results:            
                      rep_results = {k: "{0}::{1}".format(rep_results.get(k, ""), result.get(k, "")).strip('::') for k in set(rep_results) | set(result)}
-                 
+ 
 -                self._dns_scores = [ conn + [ rep_results[conn[key]] ]   for conn in self._dns_scores  ]
 +                if rep_results:
 +                    self._dns_scores = [ conn + [ rep_results[conn[key]] ]   for conn in self._dns_scores  ]
 +                else:
 +                    self._dns_scores = [ conn + [""]   for conn in self._dns_scores  ]
          else:
              self._dns_scores = [ conn + [""]   for conn in self._dns_scores  ]
  
@@@ -247,15 -222,16 +247,15 @@@
  
  
      def _add_iana(self):
 -
          iana_conf_file = "{0}/components/iana/iana_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-         if os.path.isfile(iana_conf_file):            
+         if os.path.isfile(iana_conf_file):
              iana_config  = json.loads(open(iana_conf_file).read())
              dns_iana = IanaTransform(iana_config["IANA"])
  
              dns_qry_class_index = self._conf["dns_results_fields"]["dns_qry_class"]
              dns_qry_type_index = self._conf["dns_results_fields"]["dns_qry_type"]
-             dns_qry_rcode_index = self._conf["dns_results_fields"]["dns_qry_rcode"]            
+             dns_qry_rcode_index = self._conf["dns_results_fields"]["dns_qry_rcode"]
 -            self._dns_scores = [ conn + [ dns_iana.get_name(conn[dns_qry_class_index],"dns_qry_class")] + [dns_iana.get_name(conn[dns_qry_type_index],"dns_qry_type")] + [ dns_iana.get_name(conn[dns_qry_rcode_index],"dns_qry_rcode") ] for conn in self._dns_scores ]
 +            self._dns_scores = [ conn + [ dns_iana.get_name(conn[dns_qry_class_index],"dns_qry_class")] + [dns_iana.get_name(conn[dns_qry_type_index],"dns_qry_type")] + [dns_iana.get_name(conn[dns_qry_rcode_index],"dns_qry_rcode")] for conn in self._dns_scores ]
              
          else:            
              self._dns_scores = [ conn + ["","",""] for conn in self._dns_scores ] 
@@@ -274,10 -250,10 +274,10 @@@
  
      def _get_oa_details(self):
          
-         self._logger.info("Getting OA DNS suspicious details/dendro diagram")       
 -        self._logger.info("Getting OA DNS suspicious details/chord diagram")       
++        self._logger.info("Getting OA DNS suspicious details/dendro diagram")
          # start suspicious connects details process.
          p_sp = Process(target=self._get_suspicious_details)
 -        p_sp.start()        
 +        p_sp.start()
  
          # start chord diagram process.            
          p_dn = Process(target=self._get_dns_dendrogram)
@@@ -294,88 -269,86 +294,88 @@@
              iana_config  = json.loads(open(iana_conf_file).read())
              dns_iana = IanaTransform(iana_config["IANA"])
          
-         for conn in self._dns_scores:       
-   
+         for conn in self._dns_scores:
 -            # get data to query
 -            date=conn[self._conf["dns_score_fields"]["frame_time"]].split(" ")
 -            date = filter(None,date)
 -
 -            if len(date) == 5:
 -                year=date[2]
 -                month=datetime.datetime.strptime(date[0], '%b').strftime('%m')
 -                day=date[1]                
 -                hh=conn[self._conf["dns_score_fields"]["hh"]]
 -                dns_qry_name = conn[self._conf["dns_score_fields"]["dns_qry_name"]]
 -                self._get_dns_details(dns_qry_name,year,month,day,hh,dns_iana)
++
 +            timestamp = conn[self._conf["dns_score_fields"]["unix_tstamp"]]
 +            full_date = datetime.datetime.utcfromtimestamp(int(timestamp)).strftime('%Y-%m-%d %H:%M:%S')
-   
++
 +            date = full_date.split(" ")[0].split("-")
 +            # get date parameters.
 +            yr = date[0]
 +            mn = date[1]
-             dy = date[2] 
++            dy = date[2]
 +            time = full_date.split(" ")[1].split(":")
 +            hh = int(time[0])
 +
 +            dns_qry_name = conn[self._conf["dns_score_fields"]["dns_qry_name"]]
 +            self._get_dns_details(dns_qry_name,yr,mn,dy,hh,dns_iana)
 +
  
      def _get_dns_details(self,dns_qry_name,year,month,day,hh,dns_iana):
 -                    
 -        limit = self._details_limit
 -        edge_file ="{0}/edge-{1}_{2}_00.csv".format(self._data_path,dns_qry_name.replace("/","-"),hh)
 -        edge_tmp  ="{0}/edge-{1}_{2}_00.tmp".format(self._data_path,dns_qry_name.replace("/","-"),hh)
 +        value_string = ""
 +        query_to_load =("""
 +            SELECT unix_tstamp,frame_len,ip_dst,ip_src,dns_qry_name,dns_qry_class,dns_qry_type,dns_qry_rcode,dns_a,h as hh
 +            FROM {0}.{1} WHERE y={2} AND m={3} AND d={4} AND dns_qry_name LIKE '%{5}%' AND h={6} LIMIT {7};
 +        """).format(self._db,self._table_name,year,month,day,dns_qry_name,hh,self._details_limit)
-         
-         try: 
-              dns_details = impala.execute_query(query_to_load) 
++
++        try:
++             dns_details = impala.execute_query(query_to_load)
 +        except:
 +            self._logger.info("WARNING. Details couldn't be retreived for {0}, skipping this step".format(dns_qry_name))
 +        else:
-         # add IANA to results. 
++        # add IANA to results.
 +            update_rows = []
 +            if dns_iana:
-                 self._logger.info("Adding IANA translation to details results") 
-                     
++                self._logger.info("Adding IANA translation to details results")
+ 
 -        if not os.path.isfile(edge_file):
 -    
 -            dns_qry = ("SELECT frame_time,frame_len,ip_dst,ip_src,dns_qry_name,dns_qry_class,dns_qry_type,dns_qry_rcode,dns_a FROM {0}.{1} WHERE y={2} AND m={3} AND d={4} AND dns_qry_name LIKE '%{5}%' AND h={6} LIMIT {7};").format(self._db,self._table_name,year,month,day,dns_qry_name,hh,limit)
 -            
 -            # execute query
 -	    try:
 -                self._engine.query(dns_qry,edge_tmp)
 -            except:
 -		self._logger.error("ERROR. Edge file couldn't be created for {0}, skipping this step".format(dns_qry_name))
 +                dns_details = [ conn + (dns_iana.get_name(str(conn[5]),"dns_qry_class"),dns_iana.get_name(str(conn[6]),"dns_qry_type"),dns_iana.get_name(str(conn[7]),"dns_qry_rcode")) for conn in dns_details ]
-             else: 
++            else:
 +                self._logger.info("WARNING: NO IANA configured.")
 +                dns_details = [ conn + ("","","") for conn in dns_details ]
 +
 +            nc_conf_file = "{0}/components/nc/nc_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 +            if os.path.isfile(nc_conf_file):
 +                nc_conf = json.loads(open(nc_conf_file).read())["NC"]
-                 dns_nc = NetworkContext(nc_conf,self._logger) 
++                dns_nc = NetworkContext(nc_conf,self._logger)
 +                dns_details = [ conn + (dns_nc.get_nc(conn[2]),) for conn in dns_details ]
              else:
 -            # add IANA to results.
 -                if dns_iana:
 -                    update_rows = []
 -                    self._logger.info("Adding IANA translation to details results")
 -                    with open(edge_tmp) as dns_details_csv:
 -                        rows = csv.reader(dns_details_csv, delimiter=',', quotechar='|')
 -                        try:
 -                            next(rows)
 -                            update_rows = [[conn[0]] + [conn[1]] + [conn[2]] + [conn[3]] + [conn[4]] + [dns_iana.get_name(conn[5],"dns_qry_class")] + [dns_iana.get_name(conn[6],"dns_qry_type")] + [dns_iana.get_name(conn[7],"dns_qry_rcode")] + [conn[8]] for conn in rows]
 -                            update_rows = filter(None, update_rows)
 -                            header = [ "frame_time", "frame_len", "ip_dst","ip_src","dns_qry_name","dns_qry_class_name","dns_qry_type_name","dns_qry_rcode_name","dns_a" ]
 -                            update_rows.insert(0,header)
 -                        except IndexError:
 -                            pass
 +                dns_details = [ conn + (0,) for conn in dns_details ]
-                           
+ 
 -                else:
 -                    self._logger.info("WARNING: NO IANA configured.")
 -
 -                    # create edge file.
 -                self._logger.info("Creating edge file:{0}".format(edge_file))
 -                with open(edge_file,'wb') as dns_details_edge:
 -                    writer = csv.writer(dns_details_edge, quoting=csv.QUOTE_ALL)
 -                    if update_rows:
 -                        writer.writerows(update_rows)
 -                    else:            
 -                        shutil.copy(edge_tmp,edge_file)           
 +            for row in dns_details:
 +                value_string += str(tuple(item for item in row)) + ","
 +
-             if value_string != "": 
++            if value_string != "":
                  
 -                os.remove(edge_tmp)
 +                query_to_insert=("""
 +                    INSERT INTO {0}.dns_edge PARTITION (y={1}, m={2}, d={3}) VALUES ({4});
 +                """).format(self._db,year, month, day,  value_string[:-1])
 +
-                 impala.execute_query(query_to_insert) 
-  
++                impala.execute_query(query_to_insert)
  
-     def _get_dns_dendrogram(self): 
  
-         for conn in self._dns_scores:   
+     def _get_dns_dendrogram(self):
 -        limit = self._details_limit
 -        for conn in self._dns_scores:            
 -            date=conn[self._conf["dns_score_fields"]["frame_time"]].split(" ")
 -            date = filter(None,date)
+ 
 -            if len(date) == 5:
 -                year=date[2]
 -                month=datetime.datetime.strptime(date[0], '%b').strftime('%m')
 -                day=date[1]
 -                ip_dst=conn[self._conf["dns_score_fields"]["ip_dst"]]
 -                self._get_dendro(self._db,self._table_name,ip_dst,year,month,day, limit)
++        for conn in self._dns_scores:
 +            timestamp = conn[self._conf["dns_score_fields"]["unix_tstamp"]]
 +            full_date = datetime.datetime.utcfromtimestamp(int(timestamp)).strftime('%Y-%m-%d %H:%M:%S')
 +
 +            date = full_date.split(" ")[0].split("-")
 +            # get date parameters.
-             
+ 
 +            yr = date[0]
 +            mn = date[1]
 +            dy = date[2]
 +            ip_dst=conn[self._conf["dns_score_fields"]["ip_dst"]]
  
 -    def _get_dendro(self,db,table,ip_dst,year,month,day,limit):
 +            query_to_load = ("""
 +                INSERT INTO TABLE {0}.dns_dendro PARTITION (y={2}, m={3},d={4})
-                 SELECT unix_tstamp, dns_a, dns_qry_name, ip_dst 
++                SELECT unix_tstamp, dns_a, dns_qry_name, ip_dst
 +                FROM (SELECT unix_tstamp, susp.ip_dst, susp.dns_qry_name, susp.dns_a
-                     FROM {0}.{1} as susp WHERE susp.y={2} AND susp.m={3} AND susp.d={4} AND susp.ip_dst='{5}' 
++                    FROM {0}.{1} as susp WHERE susp.y={2} AND susp.m={3} AND susp.d={4} AND susp.ip_dst='{5}'
 +                LIMIT {6}) AS tmp GROUP BY dns_a, dns_qry_name, ip_dst, unix_tstamp
 +            """).format(self._db,self._table_name,yr,mn,dy,ip_dst,self._details_limit)
-            
+ 
 -        dendro_file = "{0}/dendro-{1}.csv".format(self._data_path,ip_dst)
 -        if not os.path.isfile(dendro_file):
 -            dndro_qry = ("SELECT dns_a, dns_qry_name, ip_dst FROM (SELECT susp.ip_dst, susp.dns_qry_name, susp.dns_a FROM {0}.{1} as susp WHERE susp.y={2} AND susp.m={3} AND susp.d={4} AND susp.ip_dst='{5}' LIMIT {6}) AS tmp GROUP BY dns_a, dns_qry_name, ip_dst").format(db,table,year,month,day,ip_dst,limit)
 -            # execute query
 -            self._engine.query(dndro_qry,dendro_file)
 +            impala.execute_query(query_to_load)
  
          
      def _ingest_summary(self):
@@@ -390,32 -363,51 +390,32 @@@
          result_rows = []        
          df_filtered =  pd.DataFrame()
  
 -        ingest_summary_file = "{0}/is_{1}{2}.csv".format(self._ingest_summary_path,yr,mn)			
 -        ingest_summary_tmp = "{0}.tmp".format(ingest_summary_file)
 -
 -        if os.path.isfile(ingest_summary_file):
 -        	df = pd.read_csv(ingest_summary_file, delimiter=',')
 -            #discards previous rows from the same date
 -        	df_filtered = df[df['date'].str.contains("{0}-{1}-{2}".format(yr, mn, dy)) == False] 
 -        else:
 -        	df = pd.DataFrame()
 -            
 -        # get ingest summary.
 -        ingest_summary_qry = ("SELECT frame_time, COUNT(*) as total "
 -                                    " FROM {0}.{1}"
 -                                    " WHERE y={2} AND m={3} AND d={4} "
 -                                    " AND unix_tstamp IS NOT NULL AND frame_time IS NOT NULL"
 -                                    " AND frame_len IS NOT NULL AND dns_qry_name IS NOT NULL"
 -                                    " AND ip_src IS NOT NULL " 
 -                                    " AND (dns_qry_class IS NOT NULL AND dns_qry_type IS NOT NULL AND dns_qry_rcode IS NOT NULL ) "
 -                                    " GROUP BY frame_time;") 
 -
 -        ingest_summary_qry = ingest_summary_qry.format(self._db,self._table_name, yr, mn, dy)
 -        
 -        results_file = "{0}/results_{1}.csv".format(self._ingest_summary_path,self._date)
 -        self._engine.query(ingest_summary_qry,output_file=results_file,delimiter=",")
 -
 +        query_to_load = ("""
 +            SELECT frame_time, COUNT(*) as total FROM {0}.{1}
-             WHERE y={2} AND m={3} AND d={4} AND unix_tstamp IS NOT NULL 
-             AND frame_time IS NOT NULL AND frame_len IS NOT NULL 
-             AND dns_qry_name IS NOT NULL AND ip_src IS NOT NULL 
-             AND (dns_qry_class IS NOT NULL AND dns_qry_type IS NOT NULL 
++            WHERE y={2} AND m={3} AND d={4} AND unix_tstamp IS NOT NULL
++            AND frame_time IS NOT NULL AND frame_len IS NOT NULL
++            AND dns_qry_name IS NOT NULL AND ip_src IS NOT NULL
++            AND (dns_qry_class IS NOT NULL AND dns_qry_type IS NOT NULL
 +            AND dns_qry_rcode IS NOT NULL ) GROUP BY frame_time;
 +        """).format(self._db,self._table_name, yr, mn, dy)
  
 -        if os.path.isfile(results_file):        
 -            df_results = pd.read_csv(results_file, delimiter=',') 
 +        results = impala.execute_query_as_list(query_to_load)
 +        df = pd.DataFrame(results)
  
 -            # Forms a new dataframe splitting the minutes from the time column
 -            df_new = pd.DataFrame([["{0}-{1}-{2} {3}:{4}".format(yr, mn, dy,\
 +        # Forms a new dataframe splitting the minutes from the time column
 +        df_new = pd.DataFrame([["{0}-{1}-{2} {3}:{4}".format(yr, mn, dy,\
              val['frame_time'].replace("  "," ").split(" ")[3].split(":")[0].zfill(2),\
              val['frame_time'].replace("  "," ").split(" ")[3].split(":")[1].zfill(2)),\
 -            int(val['total']) if not math.isnan(val['total']) else 0 ] for key,val in df_results.iterrows()],columns = ingest_summary_cols)
 -    
 -            #Groups the data by minute 
 -            sf = df_new.groupby(by=['date'])['total'].sum()
 -        
 -            df_per_min = pd.DataFrame({'date':sf.index, 'total':sf.values})
 -            
 -            df_final = df_filtered.append(df_per_min, ignore_index=True)
 -            df_final.to_csv(ingest_summary_tmp,sep=',', index=False)
 +            int(val['total']) if not math.isnan(val['total']) else 0 ] for key,val in df.iterrows()],columns = ingest_summary_cols)
  
-         #Groups the data by minute 
 -            os.remove(results_file)
 -            os.rename(ingest_summary_tmp,ingest_summary_file)
 -        else:
 -            self._logger.info("No data found for the ingest summary")
 -        
++        #Groups the data by minute
 +        sf = df_new.groupby(by=['date'])['total'].sum()
 +        df_per_min = pd.DataFrame({'date':sf.index, 'total':sf.values})
 +
-         df_final = df_filtered.append(df_per_min, ignore_index=True).to_records(False,False) 
++        df_final = df_filtered.append(df_per_min, ignore_index=True).to_records(False,False)
 +
 +        if len(df_final) > 0:
 +            query_to_insert=("""
 +                INSERT INTO {0}.dns_ingest_summary PARTITION (y={1}, m={2}, d={3}) VALUES {4};
-             """).format(self._db, yr, mn, dy, tuple(df_final))            
++            """).format(self._db, yr, mn, dy, tuple(df_final))
 +            impala.execute_query(query_to_insert)