You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by ev...@apache.org on 2017/03/29 16:51:56 UTC
[32/50] [abbrv] incubator-spot git commit: Improved ingest summary
query
Improved ingest summary query
Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/3386849d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/3386849d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/3386849d
Branch: refs/heads/SPOT-35_graphql_api
Commit: 3386849d4a628ef6c02fc818ba342d58a0f95f59
Parents: c526716
Author: LedaLima <le...@apache.org>
Authored: Mon Mar 6 18:30:19 2017 -0600
Committer: Diego Ortiz Huerta <di...@intel.com>
Committed: Wed Mar 15 11:49:48 2017 -0700
----------------------------------------------------------------------
spot-oa/oa/flow/flow_oa.py | 70 ++++++++++++++++++++++++++++-------------
1 file changed, 49 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3386849d/spot-oa/oa/flow/flow_oa.py
----------------------------------------------------------------------
diff --git a/spot-oa/oa/flow/flow_oa.py b/spot-oa/oa/flow/flow_oa.py
index 63de068..bf2d301 100644
--- a/spot-oa/oa/flow/flow_oa.py
+++ b/spot-oa/oa/flow/flow_oa.py
@@ -22,7 +22,7 @@ import sys
import json
import numpy as np
import linecache, bisect
-import csv
+import csv, math
import pandas as pd
import subprocess
import numbers
@@ -35,6 +35,7 @@ from utils import Util, ProgressBar
from components.data.data import Data
from components.geoloc.geoloc import GeoLocalization
from components.reputation.gti import gti
+from impala.util import as_pandas
import time
@@ -407,32 +408,59 @@ class OA(object):
impala.execute_query(query_to_load)
-
- def _ingest_summary(self):
+
+ def _ingest_summary(self):
# get date parameters.
yr = self._date[:4]
mn = self._date[4:6]
dy = self._date[6:]
- self._logger.info("Getting ingest summary data for the day")
-
- query_to_load = ("""
- INSERT INTO TABLE {0}.flow_ingest_summary PARTITION (y={2}, m={3})
- SELECT treceived,tryear, trmonth, trday, trhour, trminute, COUNT(*) total
- FROM {0}.{1}
- WHERE y={2} AND m={3} AND d={4}
- AND unix_tstamp IS NOT NULL
- AND sip IS NOT NULL
- AND sport IS NOT NULL
- AND dip IS NOT NULL
- AND dport IS NOT NULL
- AND ibyt IS NOT NULL
- AND ipkt IS NOT NULL
- AND cast(treceived as timestamp) IS NOT NULL
- GROUP BY treceived,tryear, trmonth, trday, trhour, trminute;
- """).format(self._db,self._table_name,yr,mn,dy)
+ self._logger.info("Getting ingest summary data for the day")
+
+ ingest_summary_cols = ["date","total"]
+ result_rows = []
+ df_filtered = pd.DataFrame()
+
+ # get ingest summary.
+
+ query_to_load=("""
+ SELECT tryear, trmonth, trday, trhour, trminute, COUNT(*) as total
+ FROM {0}.{1} WHERE y={2} AND m={3} AND d={4}
+ AND unix_tstamp IS NOT NULL
+ AND sip IS NOT NULL
+ AND sport IS NOT NULL
+ AND dip IS NOT NULL
+ AND dport IS NOT NULL
+ AND ibyt IS NOT NULL
+ AND ipkt IS NOT NULL
+ AND tryear={2}
+ AND cast(treceived as timestamp) IS NOT NULL
+ GROUP BY tryear, trmonth, trday, trhour, trminute;
+ """).format(self._db,self._table_name, yr, mn, dy)
- impala.execute_query(query_to_load)
+ results = impala.execute_query(query_to_load)
+
+ if results:
+ df_results = as_pandas(results)
+
+ #Forms a new dataframe splitting the minutes from the time column
+ df_new = pd.DataFrame([["{0}-{1}-{2} {3}:{4}".format(val['tryear'],val['trmonth'],val['trday'], val['trhour'], val['trminute']), int(val['total']) if not math.isnan(val['total']) else 0 ] for key,val in df_results.iterrows()],columns = ingest_summary_cols)
+ value_string = ''
+ #Groups the data by minute
+
+ sf = df_new.groupby(by=['date'])['total'].sum()
+ df_per_min = pd.DataFrame({'date':sf.index, 'total':sf.values})
+
+ df_final = df_filtered.append(df_per_min, ignore_index=True).to_records(False,False)
+ if len(df_final) > 0:
+ query_to_insert=("""
+ INSERT INTO {0}.flow_ingest_summary PARTITION (y={1}, m={2}) VALUES {3};
+ """).format(self._db, yr, mn, tuple(df_final))
+
+ impala.execute_query(query_to_insert)
+
+ else:
+ self._logger.info("No data found for the ingest summary")