You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by ev...@apache.org on 2017/03/29 16:51:56 UTC

[32/50] [abbrv] incubator-spot git commit: Improved ingest summary query

Improved ingest summary query


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/3386849d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/3386849d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/3386849d

Branch: refs/heads/SPOT-35_graphql_api
Commit: 3386849d4a628ef6c02fc818ba342d58a0f95f59
Parents: c526716
Author: LedaLima <le...@apache.org>
Authored: Mon Mar 6 18:30:19 2017 -0600
Committer: Diego Ortiz Huerta <di...@intel.com>
Committed: Wed Mar 15 11:49:48 2017 -0700

----------------------------------------------------------------------
 spot-oa/oa/flow/flow_oa.py | 70 ++++++++++++++++++++++++++++-------------
 1 file changed, 49 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3386849d/spot-oa/oa/flow/flow_oa.py
----------------------------------------------------------------------
diff --git a/spot-oa/oa/flow/flow_oa.py b/spot-oa/oa/flow/flow_oa.py
index 63de068..bf2d301 100644
--- a/spot-oa/oa/flow/flow_oa.py
+++ b/spot-oa/oa/flow/flow_oa.py
@@ -22,7 +22,7 @@ import sys
 import json
 import numpy as np
 import linecache, bisect
-import csv
+import csv, math
 import pandas as pd
 import subprocess
 import numbers
@@ -35,6 +35,7 @@ from utils import Util, ProgressBar
 from components.data.data import Data
 from components.geoloc.geoloc import GeoLocalization
 from components.reputation.gti import gti
+from impala.util import as_pandas
 import time
 
 
@@ -407,32 +408,59 @@ class OA(object):
 
                     impala.execute_query(query_to_load)
  
-
-    def _ingest_summary(self): 
+ 
+    def _ingest_summary(self):
         # get date parameters.
         yr = self._date[:4]
         mn = self._date[4:6]
         dy = self._date[6:]
 
-        self._logger.info("Getting ingest summary data for the day") 
-
-        query_to_load = ("""
-            INSERT INTO TABLE {0}.flow_ingest_summary PARTITION (y={2}, m={3})
-            SELECT treceived,tryear, trmonth, trday, trhour, trminute, COUNT(*) total
-            FROM {0}.{1}
-            WHERE y={2} AND m={3} AND d={4}
-            AND unix_tstamp IS NOT NULL
-            AND sip IS NOT NULL
-            AND sport IS NOT NULL
-            AND dip IS NOT NULL
-            AND dport IS NOT NULL
-            AND ibyt IS NOT NULL
-            AND ipkt IS NOT NULL
-            AND cast(treceived as timestamp) IS NOT NULL
-            GROUP BY treceived,tryear, trmonth, trday, trhour, trminute;
-            """).format(self._db,self._table_name,yr,mn,dy) 
+        self._logger.info("Getting ingest summary data for the day")
+        
+        ingest_summary_cols = ["date","total"]		
+        result_rows = []        
+        df_filtered =  pd.DataFrame()
+
+        # get ingest summary.
+
+        query_to_load=("""
+                SELECT tryear, trmonth, trday, trhour, trminute, COUNT(*) as total
+                FROM {0}.{1} WHERE y={2} AND m={3} AND d={4}
+                AND unix_tstamp IS NOT NULL
+                AND sip IS NOT NULL
+                AND sport IS NOT NULL
+                AND dip IS NOT NULL
+                AND dport IS NOT NULL
+                AND ibyt IS NOT NULL
+                AND ipkt IS NOT NULL
+                AND tryear={2}
+                AND cast(treceived as timestamp) IS NOT NULL
+                GROUP BY tryear, trmonth, trday, trhour, trminute;
+        """).format(self._db,self._table_name, yr, mn, dy)
         
-        impala.execute_query(query_to_load)
+        results = impala.execute_query(query_to_load) 
+ 
+        if results:
+            df_results = as_pandas(results) 
+            
+            #Forms a new dataframe splitting the minutes from the time column
+            df_new = pd.DataFrame([["{0}-{1}-{2} {3}:{4}".format(val['tryear'],val['trmonth'],val['trday'], val['trhour'], val['trminute']), int(val['total']) if not math.isnan(val['total']) else 0 ] for key,val in df_results.iterrows()],columns = ingest_summary_cols)
+            value_string = ''
+            #Groups the data by minute 
+
+            sf = df_new.groupby(by=['date'])['total'].sum()
+            df_per_min = pd.DataFrame({'date':sf.index, 'total':sf.values})
+            
+            df_final = df_filtered.append(df_per_min, ignore_index=True).to_records(False,False) 
+            if len(df_final) > 0:
+                query_to_insert=("""
+                    INSERT INTO {0}.flow_ingest_summary PARTITION (y={1}, m={2}) VALUES {3};
+                """).format(self._db, yr, mn, tuple(df_final))
+
+                impala.execute_query(query_to_insert)
+                
+        else:
+            self._logger.info("No data found for the ingest summary")