You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by na...@apache.org on 2017/09/26 22:41:08 UTC
[05/50] [abbrv] incubator-spot git commit: Fix merge issues with
master
Fix merge issues with master
Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/731279f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/731279f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/731279f3
Branch: refs/heads/SPOT-181_ODM
Commit: 731279f33c2ca8557773a6770c07da1134c7243e
Parents: b08211b 0fa4e5c
Author: Everardo Lopez Sandoval (Intel) <el...@elopezsa-mac02.zpn.intel.com>
Authored: Thu Jul 13 13:39:31 2017 -0500
Committer: Everardo Lopez Sandoval (Intel) <el...@elopezsa-mac02.zpn.intel.com>
Committed: Thu Jul 13 13:39:31 2017 -0500
----------------------------------------------------------------------
spot-ml/build.sbt | 18 +-
.../dns/model/DNSSuspiciousConnectsModel.scala | 28 -
.../org/apache/spot/utilities/TopDomains.scala | 3 +-
spot-ml/top-1m.csv | 1000000 ---------------
spot-oa/oa/dns/dns_oa.py | 106 +-
5 files changed, 72 insertions(+), 1000083 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/731279f3/spot-oa/oa/dns/dns_oa.py
----------------------------------------------------------------------
diff --cc spot-oa/oa/dns/dns_oa.py
index 6c823f4,c329d63..5982e8b
--- a/spot-oa/oa/dns/dns_oa.py
+++ b/spot-oa/oa/dns/dns_oa.py
@@@ -22,10 -22,8 +22,9 @@@ import shuti
import sys
import datetime
import csv, math
- import re
from tld import get_tld
-
+import api.resources.impala_engine as impala
+import api.resources.hdfs_client as HDFSClient
from collections import OrderedDict
from utils import Util
from components.data.data import Data
@@@ -68,7 -67,8 +68,7 @@@ class OA(object)
# initialize data engine
self._db = self._spot_conf.get('conf', 'DBNAME').replace("'", "").replace('"', '')
-
- self._engine = Data(self._db,self._table_name ,self._logger)
+
def start(self):
@@@ -94,29 -93,8 +94,29 @@@
print(end - start)
##################
+
+ def _clear_previous_executions(self):
-
- self._logger.info("Cleaning data from previous executions for the day")
++
++ self._logger.info("Cleaning data from previous executions for the day")
+ yr = self._date[:4]
+ mn = self._date[4:6]
- dy = self._date[6:]
++ dy = self._date[6:]
+ table_schema = []
+ HUSER = self._spot_conf.get('conf', 'HUSER').replace("'", "").replace('"', '')
+ table_schema=['suspicious', 'edge', 'dendro', 'threat_dendro', 'threat_investigation', 'storyboard', 'summary' ]
+
+ for path in table_schema:
+ HDFSClient.delete_folder("{0}/{1}/hive/oa/{2}/y={3}/m={4}/d={5}".format(HUSER,self._table_name,path,yr,int(mn),int(dy)),user="impala")
+ impala.execute_query("invalidate metadata")
+
+ #removes Feedback file
+ HDFSClient.delete_folder("{0}/{1}/scored_results/{2}{3}{4}/feedback/ml_feedback.csv".format(HUSER,self._table_name,yr,mn,dy))
+ #removes json files from the storyboard
+ HDFSClient.delete_folder("{0}/{1}/oa/{2}/{3}/{4}/{5}".format(HUSER,self._table_name,"storyboard",yr,mn,dy))
+
+
def _create_folder_structure(self):
-
+
# create date folder structure if it does not exist.
self._logger.info("Creating folder structure for OA (data and ipynb)")
self._data_path,self._ingest_summary_path,self._ipynb_path = Util.create_oa_folders("dns",self._date)
@@@ -148,9 -125,10 +148,9 @@@
# get results file from hdfs.
get_command = Util.get_ml_results_form_hdfs(hdfs_path,self._data_path)
self._logger.info("{0}".format(get_command))
-
+
- # validate files exists
if os.path.isfile(dns_results):
-
+
# read number of results based in the limit specified.
self._logger.info("Reading {0} dns results file: {1}".format(self._date,dns_results))
self._dns_results = Util.read_results(dns_results,self._limit,self._results_delimiter)[:]
@@@ -158,50 -136,48 +158,50 @@@
else:
self._logger.error("There was an error getting ML results from HDFS")
- sys.exit(1)
+ sys.exit(1)
- # add headers.
- self._logger.info("Adding headers")
- self._dns_scores_headers = [ str(key) for (key,value) in self._conf['dns_score_fields'].items() ]
-
# add dns content.
- self._dns_scores = [ conn[:] for conn in self._dns_results][:]
- self._dns_scores = [ conn[:] for conn in self._dns_results][:]
++ self._dns_scores = [ conn[:] for conn in self._dns_results][:]
+
- def _move_time_stamp(self,dns_data):
+ def _move_time_stamp(self,dns_data):
- for dns in dns_data:
- time_stamp = dns[1]
- dns.remove(time_stamp)
- dns.append(time_stamp)
-
+ # return dns_data_ordered
return dns_data
- def _create_dns_scores_csv(self):
+
+ def _create_dns_scores(self):
- dns_scores_csv = "{0}/dns_scores.csv".format(self._data_path)
- dns_scores_final = self._move_time_stamp(self._dns_scores)
- dns_scores_final.insert(0,self._dns_scores_headers)
- Util.create_csv_file(dns_scores_csv,dns_scores_final)
+ # get date parameters.
+ yr = self._date[:4]
+ mn = self._date[4:6]
- dy = self._date[6:]
++ dy = self._date[6:]
+ value_string = ""
+
+ dns_scores_final = self._move_time_stamp(self._dns_scores)
+ self._dns_scores = dns_scores_final
-
+
- # create bk file
- dns_scores_bu_csv = "{0}/dns_scores_bu.csv".format(self._data_path)
- Util.create_csv_file(dns_scores_bu_csv,dns_scores_final)
+ for row in dns_scores_final:
- value_string += str(tuple(Util.cast_val(item) for item in row)) + ","
-
++ value_string += str(tuple(Util.cast_val(item) for item in row)) + ","
++
+ load_into_impala = ("""
+ INSERT INTO {0}.dns_scores partition(y={2}, m={3}, d={4}) VALUES {1}
- """).format(self._db, value_string[:-1], yr, mn, dy)
++ """).format(self._db, value_string[:-1], yr, mn, dy)
+ impala.execute_query(load_into_impala)
def _add_tld_column(self):
qry_name_col = self._conf['dns_results_fields']['dns_qry_name']
- self._dns_scores = [conn + [ get_tld("http://" + str(conn[qry_name_col]), fail_silently=True) if "http://" not in str(conn[qry_name_col]) else get_tld(str(conn[qry_name_col]), fail_silently=True)] for conn in self._dns_scores ]
-
+ self._dns_scores = [conn + [ get_tld("http://" + str(conn[qry_name_col]), fail_silently=True) if "http://" not in str(conn[qry_name_col]) else get_tld(str(conn[qry_name_col]), fail_silently=True)] for conn in self._dns_scores ]
-
++
+
def _add_reputation(self):
# read configuration.
reputation_conf_file = "{0}/components/reputation/reputation_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
self._logger.info("Reading reputation configuration file: {0}".format(reputation_conf_file))
rep_conf = json.loads(open(reputation_conf_file).read())
-
-
-
++
# initialize reputation services.
self._rep_services = []
self._logger.info("Initializing reputation services.")
@@@ -230,11 -207,8 +230,11 @@@
rep_results = {}
for result in rep_services_results:
rep_results = {k: "{0}::{1}".format(rep_results.get(k, ""), result.get(k, "")).strip('::') for k in set(rep_results) | set(result)}
-
+
- self._dns_scores = [ conn + [ rep_results[conn[key]] ] for conn in self._dns_scores ]
+ if rep_results:
+ self._dns_scores = [ conn + [ rep_results[conn[key]] ] for conn in self._dns_scores ]
+ else:
+ self._dns_scores = [ conn + [""] for conn in self._dns_scores ]
else:
self._dns_scores = [ conn + [""] for conn in self._dns_scores ]
@@@ -247,15 -222,16 +247,15 @@@
def _add_iana(self):
-
iana_conf_file = "{0}/components/iana/iana_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- if os.path.isfile(iana_conf_file):
+ if os.path.isfile(iana_conf_file):
iana_config = json.loads(open(iana_conf_file).read())
dns_iana = IanaTransform(iana_config["IANA"])
dns_qry_class_index = self._conf["dns_results_fields"]["dns_qry_class"]
dns_qry_type_index = self._conf["dns_results_fields"]["dns_qry_type"]
- dns_qry_rcode_index = self._conf["dns_results_fields"]["dns_qry_rcode"]
+ dns_qry_rcode_index = self._conf["dns_results_fields"]["dns_qry_rcode"]
- self._dns_scores = [ conn + [ dns_iana.get_name(conn[dns_qry_class_index],"dns_qry_class")] + [dns_iana.get_name(conn[dns_qry_type_index],"dns_qry_type")] + [ dns_iana.get_name(conn[dns_qry_rcode_index],"dns_qry_rcode") ] for conn in self._dns_scores ]
+ self._dns_scores = [ conn + [ dns_iana.get_name(conn[dns_qry_class_index],"dns_qry_class")] + [dns_iana.get_name(conn[dns_qry_type_index],"dns_qry_type")] + [dns_iana.get_name(conn[dns_qry_rcode_index],"dns_qry_rcode")] for conn in self._dns_scores ]
else:
self._dns_scores = [ conn + ["","",""] for conn in self._dns_scores ]
@@@ -274,10 -250,10 +274,10 @@@
def _get_oa_details(self):
- self._logger.info("Getting OA DNS suspicious details/dendro diagram")
- self._logger.info("Getting OA DNS suspicious details/chord diagram")
++ self._logger.info("Getting OA DNS suspicious details/dendro diagram")
# start suspicious connects details process.
p_sp = Process(target=self._get_suspicious_details)
- p_sp.start()
+ p_sp.start()
# start chord diagram process.
p_dn = Process(target=self._get_dns_dendrogram)
@@@ -294,88 -269,86 +294,88 @@@
iana_config = json.loads(open(iana_conf_file).read())
dns_iana = IanaTransform(iana_config["IANA"])
- for conn in self._dns_scores:
-
+ for conn in self._dns_scores:
- # get data to query
- date=conn[self._conf["dns_score_fields"]["frame_time"]].split(" ")
- date = filter(None,date)
-
- if len(date) == 5:
- year=date[2]
- month=datetime.datetime.strptime(date[0], '%b').strftime('%m')
- day=date[1]
- hh=conn[self._conf["dns_score_fields"]["hh"]]
- dns_qry_name = conn[self._conf["dns_score_fields"]["dns_qry_name"]]
- self._get_dns_details(dns_qry_name,year,month,day,hh,dns_iana)
++
+ timestamp = conn[self._conf["dns_score_fields"]["unix_tstamp"]]
+ full_date = datetime.datetime.utcfromtimestamp(int(timestamp)).strftime('%Y-%m-%d %H:%M:%S')
-
++
+ date = full_date.split(" ")[0].split("-")
+ # get date parameters.
+ yr = date[0]
+ mn = date[1]
- dy = date[2]
++ dy = date[2]
+ time = full_date.split(" ")[1].split(":")
+ hh = int(time[0])
+
+ dns_qry_name = conn[self._conf["dns_score_fields"]["dns_qry_name"]]
+ self._get_dns_details(dns_qry_name,yr,mn,dy,hh,dns_iana)
+
def _get_dns_details(self,dns_qry_name,year,month,day,hh,dns_iana):
-
- limit = self._details_limit
- edge_file ="{0}/edge-{1}_{2}_00.csv".format(self._data_path,dns_qry_name.replace("/","-"),hh)
- edge_tmp ="{0}/edge-{1}_{2}_00.tmp".format(self._data_path,dns_qry_name.replace("/","-"),hh)
+ value_string = ""
+ query_to_load =("""
+ SELECT unix_tstamp,frame_len,ip_dst,ip_src,dns_qry_name,dns_qry_class,dns_qry_type,dns_qry_rcode,dns_a,h as hh
+ FROM {0}.{1} WHERE y={2} AND m={3} AND d={4} AND dns_qry_name LIKE '%{5}%' AND h={6} LIMIT {7};
+ """).format(self._db,self._table_name,year,month,day,dns_qry_name,hh,self._details_limit)
-
- try:
- dns_details = impala.execute_query(query_to_load)
++
++ try:
++ dns_details = impala.execute_query(query_to_load)
+ except:
+ self._logger.info("WARNING. Details couldn't be retreived for {0}, skipping this step".format(dns_qry_name))
+ else:
- # add IANA to results.
++ # add IANA to results.
+ update_rows = []
+ if dns_iana:
- self._logger.info("Adding IANA translation to details results")
-
++ self._logger.info("Adding IANA translation to details results")
+
- if not os.path.isfile(edge_file):
-
- dns_qry = ("SELECT frame_time,frame_len,ip_dst,ip_src,dns_qry_name,dns_qry_class,dns_qry_type,dns_qry_rcode,dns_a FROM {0}.{1} WHERE y={2} AND m={3} AND d={4} AND dns_qry_name LIKE '%{5}%' AND h={6} LIMIT {7};").format(self._db,self._table_name,year,month,day,dns_qry_name,hh,limit)
-
- # execute query
- try:
- self._engine.query(dns_qry,edge_tmp)
- except:
- self._logger.error("ERROR. Edge file couldn't be created for {0}, skipping this step".format(dns_qry_name))
+ dns_details = [ conn + (dns_iana.get_name(str(conn[5]),"dns_qry_class"),dns_iana.get_name(str(conn[6]),"dns_qry_type"),dns_iana.get_name(str(conn[7]),"dns_qry_rcode")) for conn in dns_details ]
- else:
++ else:
+ self._logger.info("WARNING: NO IANA configured.")
+ dns_details = [ conn + ("","","") for conn in dns_details ]
+
+ nc_conf_file = "{0}/components/nc/nc_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+ if os.path.isfile(nc_conf_file):
+ nc_conf = json.loads(open(nc_conf_file).read())["NC"]
- dns_nc = NetworkContext(nc_conf,self._logger)
++ dns_nc = NetworkContext(nc_conf,self._logger)
+ dns_details = [ conn + (dns_nc.get_nc(conn[2]),) for conn in dns_details ]
else:
- # add IANA to results.
- if dns_iana:
- update_rows = []
- self._logger.info("Adding IANA translation to details results")
- with open(edge_tmp) as dns_details_csv:
- rows = csv.reader(dns_details_csv, delimiter=',', quotechar='|')
- try:
- next(rows)
- update_rows = [[conn[0]] + [conn[1]] + [conn[2]] + [conn[3]] + [conn[4]] + [dns_iana.get_name(conn[5],"dns_qry_class")] + [dns_iana.get_name(conn[6],"dns_qry_type")] + [dns_iana.get_name(conn[7],"dns_qry_rcode")] + [conn[8]] for conn in rows]
- update_rows = filter(None, update_rows)
- header = [ "frame_time", "frame_len", "ip_dst","ip_src","dns_qry_name","dns_qry_class_name","dns_qry_type_name","dns_qry_rcode_name","dns_a" ]
- update_rows.insert(0,header)
- except IndexError:
- pass
+ dns_details = [ conn + (0,) for conn in dns_details ]
-
+
- else:
- self._logger.info("WARNING: NO IANA configured.")
-
- # create edge file.
- self._logger.info("Creating edge file:{0}".format(edge_file))
- with open(edge_file,'wb') as dns_details_edge:
- writer = csv.writer(dns_details_edge, quoting=csv.QUOTE_ALL)
- if update_rows:
- writer.writerows(update_rows)
- else:
- shutil.copy(edge_tmp,edge_file)
+ for row in dns_details:
+ value_string += str(tuple(item for item in row)) + ","
+
- if value_string != "":
++ if value_string != "":
- os.remove(edge_tmp)
+ query_to_insert=("""
+ INSERT INTO {0}.dns_edge PARTITION (y={1}, m={2}, d={3}) VALUES ({4});
+ """).format(self._db,year, month, day, value_string[:-1])
+
- impala.execute_query(query_to_insert)
-
++ impala.execute_query(query_to_insert)
- def _get_dns_dendrogram(self):
- for conn in self._dns_scores:
+ def _get_dns_dendrogram(self):
- limit = self._details_limit
- for conn in self._dns_scores:
- date=conn[self._conf["dns_score_fields"]["frame_time"]].split(" ")
- date = filter(None,date)
+
- if len(date) == 5:
- year=date[2]
- month=datetime.datetime.strptime(date[0], '%b').strftime('%m')
- day=date[1]
- ip_dst=conn[self._conf["dns_score_fields"]["ip_dst"]]
- self._get_dendro(self._db,self._table_name,ip_dst,year,month,day, limit)
++ for conn in self._dns_scores:
+ timestamp = conn[self._conf["dns_score_fields"]["unix_tstamp"]]
+ full_date = datetime.datetime.utcfromtimestamp(int(timestamp)).strftime('%Y-%m-%d %H:%M:%S')
+
+ date = full_date.split(" ")[0].split("-")
+ # get date parameters.
-
+
+ yr = date[0]
+ mn = date[1]
+ dy = date[2]
+ ip_dst=conn[self._conf["dns_score_fields"]["ip_dst"]]
- def _get_dendro(self,db,table,ip_dst,year,month,day,limit):
+ query_to_load = ("""
+ INSERT INTO TABLE {0}.dns_dendro PARTITION (y={2}, m={3},d={4})
- SELECT unix_tstamp, dns_a, dns_qry_name, ip_dst
++ SELECT unix_tstamp, dns_a, dns_qry_name, ip_dst
+ FROM (SELECT unix_tstamp, susp.ip_dst, susp.dns_qry_name, susp.dns_a
- FROM {0}.{1} as susp WHERE susp.y={2} AND susp.m={3} AND susp.d={4} AND susp.ip_dst='{5}'
++ FROM {0}.{1} as susp WHERE susp.y={2} AND susp.m={3} AND susp.d={4} AND susp.ip_dst='{5}'
+ LIMIT {6}) AS tmp GROUP BY dns_a, dns_qry_name, ip_dst, unix_tstamp
+ """).format(self._db,self._table_name,yr,mn,dy,ip_dst,self._details_limit)
-
+
- dendro_file = "{0}/dendro-{1}.csv".format(self._data_path,ip_dst)
- if not os.path.isfile(dendro_file):
- dndro_qry = ("SELECT dns_a, dns_qry_name, ip_dst FROM (SELECT susp.ip_dst, susp.dns_qry_name, susp.dns_a FROM {0}.{1} as susp WHERE susp.y={2} AND susp.m={3} AND susp.d={4} AND susp.ip_dst='{5}' LIMIT {6}) AS tmp GROUP BY dns_a, dns_qry_name, ip_dst").format(db,table,year,month,day,ip_dst,limit)
- # execute query
- self._engine.query(dndro_qry,dendro_file)
+ impala.execute_query(query_to_load)
def _ingest_summary(self):
@@@ -390,32 -363,51 +390,32 @@@
result_rows = []
df_filtered = pd.DataFrame()
- ingest_summary_file = "{0}/is_{1}{2}.csv".format(self._ingest_summary_path,yr,mn)
- ingest_summary_tmp = "{0}.tmp".format(ingest_summary_file)
-
- if os.path.isfile(ingest_summary_file):
- df = pd.read_csv(ingest_summary_file, delimiter=',')
- #discards previous rows from the same date
- df_filtered = df[df['date'].str.contains("{0}-{1}-{2}".format(yr, mn, dy)) == False]
- else:
- df = pd.DataFrame()
-
- # get ingest summary.
- ingest_summary_qry = ("SELECT frame_time, COUNT(*) as total "
- " FROM {0}.{1}"
- " WHERE y={2} AND m={3} AND d={4} "
- " AND unix_tstamp IS NOT NULL AND frame_time IS NOT NULL"
- " AND frame_len IS NOT NULL AND dns_qry_name IS NOT NULL"
- " AND ip_src IS NOT NULL "
- " AND (dns_qry_class IS NOT NULL AND dns_qry_type IS NOT NULL AND dns_qry_rcode IS NOT NULL ) "
- " GROUP BY frame_time;")
-
- ingest_summary_qry = ingest_summary_qry.format(self._db,self._table_name, yr, mn, dy)
-
- results_file = "{0}/results_{1}.csv".format(self._ingest_summary_path,self._date)
- self._engine.query(ingest_summary_qry,output_file=results_file,delimiter=",")
-
+ query_to_load = ("""
+ SELECT frame_time, COUNT(*) as total FROM {0}.{1}
- WHERE y={2} AND m={3} AND d={4} AND unix_tstamp IS NOT NULL
- AND frame_time IS NOT NULL AND frame_len IS NOT NULL
- AND dns_qry_name IS NOT NULL AND ip_src IS NOT NULL
- AND (dns_qry_class IS NOT NULL AND dns_qry_type IS NOT NULL
++ WHERE y={2} AND m={3} AND d={4} AND unix_tstamp IS NOT NULL
++ AND frame_time IS NOT NULL AND frame_len IS NOT NULL
++ AND dns_qry_name IS NOT NULL AND ip_src IS NOT NULL
++ AND (dns_qry_class IS NOT NULL AND dns_qry_type IS NOT NULL
+ AND dns_qry_rcode IS NOT NULL ) GROUP BY frame_time;
+ """).format(self._db,self._table_name, yr, mn, dy)
- if os.path.isfile(results_file):
- df_results = pd.read_csv(results_file, delimiter=',')
+ results = impala.execute_query_as_list(query_to_load)
+ df = pd.DataFrame(results)
- # Forms a new dataframe splitting the minutes from the time column
- df_new = pd.DataFrame([["{0}-{1}-{2} {3}:{4}".format(yr, mn, dy,\
+ # Forms a new dataframe splitting the minutes from the time column
+ df_new = pd.DataFrame([["{0}-{1}-{2} {3}:{4}".format(yr, mn, dy,\
val['frame_time'].replace(" "," ").split(" ")[3].split(":")[0].zfill(2),\
val['frame_time'].replace(" "," ").split(" ")[3].split(":")[1].zfill(2)),\
- int(val['total']) if not math.isnan(val['total']) else 0 ] for key,val in df_results.iterrows()],columns = ingest_summary_cols)
-
- #Groups the data by minute
- sf = df_new.groupby(by=['date'])['total'].sum()
-
- df_per_min = pd.DataFrame({'date':sf.index, 'total':sf.values})
-
- df_final = df_filtered.append(df_per_min, ignore_index=True)
- df_final.to_csv(ingest_summary_tmp,sep=',', index=False)
+ int(val['total']) if not math.isnan(val['total']) else 0 ] for key,val in df.iterrows()],columns = ingest_summary_cols)
- #Groups the data by minute
- os.remove(results_file)
- os.rename(ingest_summary_tmp,ingest_summary_file)
- else:
- self._logger.info("No data found for the ingest summary")
-
++ #Groups the data by minute
+ sf = df_new.groupby(by=['date'])['total'].sum()
+ df_per_min = pd.DataFrame({'date':sf.index, 'total':sf.values})
+
- df_final = df_filtered.append(df_per_min, ignore_index=True).to_records(False,False)
++ df_final = df_filtered.append(df_per_min, ignore_index=True).to_records(False,False)
+
+ if len(df_final) > 0:
+ query_to_insert=("""
+ INSERT INTO {0}.dns_ingest_summary PARTITION (y={1}, m={2}, d={3}) VALUES {4};
- """).format(self._db, yr, mn, dy, tuple(df_final))
++ """).format(self._db, yr, mn, dy, tuple(df_final))
+ impala.execute_query(query_to_insert)