You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by ev...@apache.org on 2017/03/29 16:51:34 UTC
[10/50] [abbrv] incubator-spot git commit: Removed csv files from OA
and ipython notebooks
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1904f2b4/spot-oa/oa/flow/ipynb_templates/Threat_Investigation_master.ipynb
----------------------------------------------------------------------
diff --git a/spot-oa/oa/flow/ipynb_templates/Threat_Investigation_master.ipynb b/spot-oa/oa/flow/ipynb_templates/Threat_Investigation_master.ipynb
index a0a7d26..761a434 100644
--- a/spot-oa/oa/flow/ipynb_templates/Threat_Investigation_master.ipynb
+++ b/spot-oa/oa/flow/ipynb_templates/Threat_Investigation_master.ipynb
@@ -8,6 +8,7 @@
},
"outputs": [],
"source": [
+ "import datetime\n",
"import struct, socket\n",
"import numpy as np\n",
"import linecache, bisect\n",
@@ -16,37 +17,24 @@
"import json\n",
"import os\n",
"import pandas as pd\n",
+ "\n",
"try:\n",
" import ipywidgets as widgets # For jupyter/ipython >= 1.4\n",
"except ImportError:\n",
" from IPython.html import widgets\n",
"from IPython.display import display, Javascript, clear_output\n",
"\n",
- "with open('/etc/spot.conf') as conf:\n",
- " for line in conf.readlines(): \n",
- " if \"DBNAME=\" in line: DBNAME = line.split(\"=\")[1].strip('\\n').replace(\"'\",\"\"); \n",
- " elif \"IMPALA_DEM=\" in line: IMPALA_DEM = line.split(\"=\")[1].strip('\\n').replace(\"'\",\"\"); \n",
- "\n",
"spath = os.getcwd()\n",
"path = spath.split(\"/\") \n",
"date = path[len(path)-1] \n",
"dpath = '/'.join(['data' if var == 'ipynb' else var for var in path]) + '/'\n",
"cpath = '/'.join(['context' if var == 'ipynb' else var for var in path][:len(path)-2]) + '/'\n",
"\n",
- "sconnect = dpath + 'flow_scores.csv'\n",
- "threats_file = dpath + 'threats.csv'\n",
- "iploc = cpath + 'iploc.csv'\n",
- "nwloc = cpath + 'networkcontext_1.csv'\n",
"anchor = ''\n",
"ir_f = ''\n",
"threat_name = ''\n",
"iplist = ''\n",
- "top_results = 20\n",
- "details_limit = 1000\n",
- "if os.path.isfile(iploc):\n",
- " iplist = np.loadtxt(iploc,dtype=np.uint32,delimiter=',',usecols={0}, converters={0: lambda s: np.uint32(s.replace('\"',''))})\n",
- "else:\n",
- " print \"No iploc.csv file was found, Map View map won't be created\""
+ "top_results = 20"
]
},
{
@@ -123,7 +111,7 @@
"cell_type": "code",
"execution_count": null,
"metadata": {
- "collapsed": true
+ "collapsed": false
},
"outputs": [],
"source": [
@@ -143,37 +131,50 @@
"def start_investigation(): \n",
" display(Javascript(\"$('.widget-area > .widget-subarea > *').remove();\")) \n",
" external_ips = []\n",
- " c_ips=[]\n",
- " clear_output() \n",
+ " clear_output() \n",
" \n",
- " if os.path.isfile(threats_file) and not file_is_empty(threats_file):\n",
- " with open(threats_file, 'r') as th:\n",
- " t_read = csv.DictReader(th, delimiter='|') \n",
- " for row in t_read: \n",
- " if row['ip'] != '' : c_ips.append(row['ip']) \n",
- "\n",
- " with open(sconnect, 'r') as f:\n",
- " reader = csv.DictReader(f, delimiter=',') \n",
- " #Internal Netflows use case:\n",
- " for row in reader: \n",
- " if row['sev'] == '1':\n",
+ " response = GraphQLClient.request(\n",
+ " query=\"\"\"query($date:SpotDateType!) {\n",
+ " flow{\n",
+ " threats{\n",
+ " list(date:$date) {\n",
+ " srcIp\n",
+ " dstPort\n",
+ " dstIp\n",
+ " srcPort\n",
+ " score \n",
+ " }\n",
+ " }\n",
+ " }\n",
+ " }\"\"\",\n",
+ " variables={\n",
+ " 'date': datetime.datetime.strptime(date, '%Y%m%d').strftime('%Y-%m-%d')\n",
+ " }\n",
+ " ) \n",
+ " \n",
+ " if not 'errors' in response : \n",
+ " for row in response['data']['flow']['threats']['list']:\n",
+ " if row['score'] == 1: \n",
" srcIP = ''\n",
" dstIP = '' \n",
- " if row['srcIP'] not in external_ips and row['srcIP'] not in c_ips: \n",
- " external_ips.append(row['srcIP'])\n",
- " if row['dstIP'] not in external_ips and row['dstIP'] not in c_ips: \n",
- " external_ips.append(row['dstIP'])\n",
- "\n",
- " if len(external_ips) == 0:\n",
- " display(widgets.Box((widgets.HTML(value=\"There are no connections scored as High risk.\\\n",
- " You can score some connections at the 'Suspicious' panel.\", width='100%'),)))\n",
- " else: \n",
- " sorted_dict = sorted(external_ips, key=operator.itemgetter(0)) \n",
- " display_controls(sorted_dict) \n",
+ " if row['srcIp'] not in external_ips: \n",
+ " external_ips.append(row['srcIp'])\n",
+ " if row['dstIp'] not in external_ips: \n",
+ " external_ips.append(row['dstIp'])\n",
+ "\n",
+ " if len(external_ips) == 0:\n",
+ " display(widgets.Box((widgets.HTML(value=\"There are no connections scored as High risk.\\\n",
+ " You can score some connections at the 'Suspicious' panel.\", width='100%'),)))\n",
+ " else: \n",
+ " sorted_dict = sorted(external_ips, key=operator.itemgetter(0)) \n",
+ " display_controls(sorted_dict) \n",
+ " else: \n",
" \n",
+ " display(widgets.Box((widgets.HTML(value=\"An error occurred while trying to get the results:\" \n",
+ " + response['errors'][0]['message'], width='100%'),)))\n",
" \n",
"\n",
- "def display_controls(threat_list): \n",
+ "def display_controls(threat_list):\n",
" threat_title.value =\"<h4>Suspicious Connections</h4>\" \n",
" susp_select.options = threat_list\n",
" susp_select.height=150\n",
@@ -186,36 +187,49 @@
" def search_ip(b):\n",
" global anchor \n",
" global top_inbound_b\n",
+ " global expanded_results\n",
" anchor = susp_select.value \n",
" if anchor != \"\":\n",
" clear_output() \n",
" removeWidget(1)\n",
- " print \"Searching for ip: \" + anchor\n",
- " global ir_f \n",
- " ir_f = dpath + \"ir-\" + anchor + \".tsv\"\n",
- "\n",
- " if not os.path.isfile(ir_f) or (os.path.isfile(ir_f) and file_is_empty(ir_f)): \n",
- " imp_query = (\" \\\"SELECT min(treceived) as firstSeen, max(treceived) as lastSeen, sip as srcIP, dip as dstIP, \" + \n",
- " \"sport as SPort, dport AS Dport, count(sip) as conns, max(ipkt) as maxPkts, avg(ipkt) \" + \n",
- " \"as avgPkts, max(ibyt) as maxBytes, avg(ibyt) as avgBytes FROM \"+DBNAME+\".flow WHERE \" + \n",
- " \"y=\"+ date[0:4] +\" AND m=\"+ date[4:6] +\" AND d=\"+ date[6:] +\" \" + \n",
- " \" AND (sip =\\'\" + anchor + \"\\' OR dip=\\'\" + anchor + \"\\') GROUP BY sip, dip,sport,dport\\\" \") \n",
- " !impala-shell -i $IMPALA_DEM --quiet -q \"INVALIDATE METADATA\"\n",
- " !impala-shell -i $IMPALA_DEM --quiet --print_header -B --output_delimiter='\\t' -q $imp_query -o $ir_f\n",
- " clear_output()\n",
" \n",
- " if not file_is_empty(ir_f): \n",
+ " expanded_results = GraphQLClient.request(\n",
+ " query=\"\"\"query($date:SpotDateType!,$ip:SpotIpType!){\n",
+ " flow{\n",
+ " threat{\n",
+ " details(date:$date,ip:$ip) {\n",
+ " srcIp\n",
+ " maxBytes\n",
+ " connections\n",
+ " maxPkts\n",
+ " avgPkts\n",
+ " lastSeen\n",
+ " srcPort\n",
+ " firstSeen \n",
+ " dstIp\n",
+ " avgBytes\n",
+ " dstPort\n",
+ " }\n",
+ " }\n",
+ " } \n",
+ " }\n",
+ " \"\"\",\n",
+ " variables={\n",
+ " 'date': datetime.datetime.strptime(date, '%Y%m%d').strftime('%Y-%m-%d'),\n",
+ " 'ip': anchor\n",
+ " }\n",
+ " ) \n",
+ " \n",
+ " if not 'errors' in expanded_results : \n",
" print \"\\n Looking for additional details...\"\n",
" display_threat_box(anchor)\n",
"\n",
" get_in_out_and_twoway_conns() \n",
- " add_geospatial_info()\n",
- " add_network_context() \n",
- " \n",
+ " \n",
" display(bottomBox) \n",
- " else:\n",
+ " else: \n",
" display(widgets.Box((widgets.HTML(value=\"Something went wrong. \\\n",
- " The expanded search couldn't be performed\", width='100%'),)))\n",
+ " The expanded search couldn't be performed\" + expanded_results['errors'][0]['message'], width='100%'),)))\n",
" \n",
" search_btn.on_click(search_ip)\n",
"\n",
@@ -241,10 +255,7 @@
" clear_output() \n",
" removeWidget(1) \n",
" response = \"\"\n",
- " response += generate_attack_map_file(anchor, top_inbound_b, top_outbound_b, top_twoway_b)\n",
- " response += generate_stats(anchor, inbound, outbound, twoway, threat_name)\n",
- " response += generate_dendro(anchor, inbound, outbound, twoway, date)\n",
- " response += details_inbound(anchor, top_inbound_b, top_outbound_b, top_twoway_b)\n",
+ " \n",
" response += add_threat(anchor, tc_txt_title.value, tc_txa_summary.value.replace('\\n', '\\\\n'))\n",
" response += \"Story board successfully created for {0}\".format(anchor)\n",
" start_investigation()\n",
@@ -262,234 +273,46 @@
},
"outputs": [],
"source": [
- "def details_inbound(anchor, inbound, outbound, twoway):\n",
- " top_keys = []\n",
- " if len(twoway) > 0: top_keys.extend(twoway.keys())\n",
- " if len(outbound) > 0: top_keys.extend(outbound.keys()) \n",
- " if len(inbound) > 0: top_keys.extend(inbound.keys())\n",
- " sbdet_f = dpath + \"sbdet-\" + anchor + \".tsv\"\n",
- " if not os.path.isfile(sbdet_f):\n",
- " imp_query = (\"\\\"SELECT min(treceived) as tstart, max(treceived) as tend, sip as srcIP, \"\n",
- " + \"dip as dstIP, proto as Proto, sport as SPort, dport AS Dport,ipkt as \"\n",
- " + \"Pkts, ibyt as Bytes FROM \"+DBNAME+\".flow WHERE \"\n",
- " + \"y=\"+ date[0:4] +\" AND m=\"+ date[4:6] +\" AND d=\"+ date[6:]\n",
- " + \" AND ((dip IN({0}) AND sip ='{1}') OR \"\n",
- " + \"(sip IN({0}) \"\n",
- " + \"AND dip ='{1}')) GROUP BY sip, dip, proto, sport, dport, ipkt, ibyt ORDER BY tstart \"\n",
- " + \"LIMIT {2}\\\" \") \n",
- " ips = \"'\" + \"','\".join(top_keys) + \"'\"\n",
- " imp_query = imp_query.format(ips,anchor,details_limit)\n",
- " !impala-shell -i $IMPALA_DEM --quiet -q \"INVALIDATE METADATA\"\n",
- " !impala-shell -i $IMPALA_DEM --quiet --print_header -B --output_delimiter='\\t' -q $imp_query -o $sbdet_f\n",
- "\n",
- " clear_output()\n",
- " return \"Timeline successfully created <br/>\"\n",
- " else:\n",
- " return \"Timeline file already existed <br/>\"\n",
- "\n",
- " \n",
- "def generate_dendro(ip, inbound, outbound, twoway, date): \n",
- " dendro_fpath = dpath + 'threat-dendro-' + anchor + \".json\"\n",
- " \n",
- " obj = {\n",
- " 'name':ip,\n",
- " 'children': [],\n",
- " 'time': date \n",
- " }\n",
- " \n",
- " #----- Add Inbound Connections-------#\n",
- " if len(inbound) > 0:\n",
- " obj[\"children\"].append({'name': 'Inbound Only', 'children': [], 'impact': 0}) \n",
- " in_ctxs = {}\n",
- " for ip in inbound:\n",
- " if 'nwloc' in inbound[ip] and len(inbound[ip]['nwloc']) > 0:\n",
- " ctx = inbound[ip]['nwloc'][2] # get the machine type Only for vast Data\n",
- " if ctx not in in_ctxs:\n",
- " in_ctxs[ctx] = 1\n",
- " else:\n",
- " in_ctxs[ctx] += 1\n",
- " for ctx in in_ctxs:\n",
- " obj[\"children\"][0]['children'].append({\n",
- " 'name': ctx,\n",
- " 'impact': in_ctxs[ctx]\n",
- " }) \n",
- " \n",
- " #------ Add Outbound ----------------#\n",
- " if len(outbound) > 0:\n",
- " obj[\"children\"].append({'name': 'Outbound Only', 'children': [], 'impact': 0})\n",
- " out_ctxs = {}\n",
- " for ip in outbound: \n",
- " if 'nwloc' in outbound[ip] and len(outbound[ip]['nwloc']) > 0:\n",
- " ctx = outbound[ip]['nwloc'][2] # get the machine type Only for vast Data\n",
- " if ctx not in out_ctxs:\n",
- " out_ctxs[ctx] = 1\n",
- " else:\n",
- " out_ctxs[ctx] += 1\n",
- " for ctx in out_ctxs:\n",
- " obj[\"children\"][1]['children'].append({\n",
- " 'name': ctx,\n",
- " 'impact': out_ctxs[ctx]\n",
- " }) \n",
- " \n",
- " #------ Add TwoWay ----------------#\n",
- " if len(twoway) > 0:\n",
- " obj[\"children\"].append({'name': 'two way', 'children': [], 'impact': 0})\n",
- " tw_ctxs = {}\n",
- " for ip in twoway:\n",
- " if 'nwloc' in twoway[ip] and len(twoway[ip]['nwloc']) > 0:\n",
- " ctx = twoway[ip]['nwloc'][2] # get the machine type Only for vast Data\n",
- " if ctx not in tw_ctxs:\n",
- " tw_ctxs[ctx] = 1\n",
- " else:\n",
- " tw_ctxs[ctx] += 1\n",
- "\n",
- " for ctx in tw_ctxs:\n",
- " obj[\"children\"][2]['children'].append({\n",
- " 'name': ctx,\n",
- " 'impact': tw_ctxs[ctx]\n",
- " })\n",
- " \n",
- " with open(dendro_fpath, 'w') as dendro_f:\n",
- " dendro_f.write(json.dumps(obj))\n",
- " return \"Incident progression successfully created <br/>\"\n",
- "\n",
- " \n",
- "def generate_stats(ip, inbound, outbound, twoway, threat_name):\n",
- " stats_fpath = dpath + 'stats-' + anchor + \".json\"\n",
- " \n",
- " obj = {\n",
- " 'name':threat_name,\n",
- " 'children': [],\n",
- " 'size': len(inbound) + len(outbound) + len(twoway)\n",
- " }\n",
- " \n",
- " #----- Add Inbound Connections-------#\n",
- " obj[\"children\"].append({'name': 'Inbound Only', 'children': [], 'size': len(inbound)}) \n",
- " in_ctxs = {}\n",
- " for ip in inbound:\n",
- " full_ctx = ''\n",
- " if 'nwloc' in inbound[ip] and len(inbound[ip]['nwloc']) > 0:\n",
- " full_ctx = inbound[ip]['nwloc'][2].split('.')[0]\n",
- " ctx = get_ctx_name(full_ctx) # get the machine type Only for vast Data\n",
- " if ctx not in in_ctxs:\n",
- " in_ctxs[ctx] = 1\n",
- " else:\n",
- " in_ctxs[ctx] += 1\n",
- " for ctx in in_ctxs:\n",
- " obj[\"children\"][0]['children'].append({\n",
- " 'name': ctx,\n",
- " 'size': in_ctxs[ctx]\n",
- " }) \n",
- " \n",
- " \n",
- " #------ Add Outbound ----------------#\n",
- " obj[\"children\"].append({'name': 'Outbound Only', 'children': [], 'size': len(outbound)})\n",
- " out_ctxs = {}\n",
- " for ip in outbound:\n",
- " full_ctx = ''\n",
- " if 'nwloc' in outbound[ip] and len(outbound[ip]['nwloc']) > 0:\n",
- " full_ctx = outbound[ip]['nwloc'][2].split('.')[0]\n",
- " ctx = get_ctx_name(full_ctx) # get the machine type Only for vast Data\n",
- " if ctx not in out_ctxs:\n",
- " out_ctxs[ctx] = 1\n",
- " else:\n",
- " out_ctxs[ctx] += 1\n",
- " for ctx in out_ctxs:\n",
- " obj[\"children\"][1]['children'].append({\n",
- " 'name': ctx,\n",
- " 'size': out_ctxs[ctx]\n",
- " }) \n",
- " \n",
- " #------ Add Twoway ----------------#\n",
- " obj[\"children\"].append({'name': 'two way', 'children': [], 'size': len(twoway)})\n",
- " tw_ctxs = {}\n",
- " for ip in twoway:\n",
- " full_ctx = ''\n",
- " if 'nwloc' in twoway[ip] and len(twoway[ip]['nwloc']) > 0:\n",
- " full_ctx = twoway[ip]['nwloc'][2].split('.')[0]\n",
- " ctx = get_ctx_name(full_ctx) # get the machine type Only for vast Data\n",
- " if ctx not in tw_ctxs:\n",
- " tw_ctxs[ctx] = 1\n",
- " else:\n",
- " tw_ctxs[ctx] += 1\n",
- " \n",
- " for ctx in tw_ctxs:\n",
- " obj[\"children\"][2]['children'].append({\n",
- " 'name': ctx,\n",
- " 'size': tw_ctxs[ctx]\n",
- " })\n",
- " \n",
- " json_str = json.dumps(obj)\n",
- " with open(stats_fpath, 'w') as stats_f:\n",
- " stats_f.write(json_str)\n",
- " return \"Stats file successfully created <br/>\"\n",
- "\n",
- " \n",
- "def get_ctx_name(full_context): \n",
- " ctx= 'DMZ'\n",
- " if \"VPN\" in full_context:\n",
- " ctx = \"VPN\" \n",
- " elif \"DMZ\" in full_context:\n",
- " ctx = \"DMZ\"\n",
- " elif \"Proxy\" in full_context:\n",
- " ctx = \"Proxy\" \n",
- " elif \"FW\" in full_context:\n",
- " ctx = \"FW\" \n",
- " return ctx\n",
"\n",
- "\n",
- "# calculate number of inbound only, two-way, and outbound only\n",
- "# build dict of IP addresses\n",
- "# firstSeen,lastSeen,srcIP, dstIP, sport,dport,conns, maxPkts, avgPkts,maxBytes, avgBytes\n",
"def get_in_out_and_twoway_conns():\n",
- " global inbound\n",
- " inbound = {}\n",
- " global outbound\n",
- " outbound = {}\n",
- " global twoway\n",
- " twoway = {}\n",
" srcdict = {}\n",
" dstdict = {}\n",
" conns_dict= {} \n",
- " rowct = 0\n",
- " if os.path.isfile(ir_f): \n",
- " df = pd.read_csv(ir_f,sep='\\t') \n",
- " with open(ir_f, 'r') as f:\n",
- " reader = csv.reader(f,delimiter='\\t')\n",
- " reader.next() #skip headers \n",
- " for row in reader: \n",
- " if row != []:\n",
- " srcdict[row[2]] = {\n",
- " 'ip_int': struct.unpack(\"!L\", socket.inet_aton(row[2]))[0],\n",
- " 'dst_ip': row[3],\n",
- " 'dst_ip_int': struct.unpack(\"!L\", socket.inet_aton(row[3]))[0],\n",
- " 'conns': int(row[6]),\n",
- " 'maxbytes': int(row[9])\n",
- " }\n",
- " dstdict[row[3]] = {\n",
- " 'ip_int': struct.unpack(\"!L\", socket.inet_aton(row[3]))[0],\n",
- " 'src_ip': row[2],\n",
- " 'src_ip_int': struct.unpack(\"!L\", socket.inet_aton(row[2]))[0],\n",
- " 'conns': int(row[6]),\n",
- " 'maxbytes': int(row[9])\n",
- " } \n",
- " rowct +=1 \n",
+ " rowct = 0 \n",
+ " if not 'errors' in expanded_results : \n",
+ " df = pd.DataFrame(expanded_results['data']['flow']['threat']['details']) \n",
+ " for row in expanded_results['data']['flow']['threat']['details']: \n",
+ " srcdict[row['srcIp']] = {\n",
+ " 'ip_int': struct.unpack(\"!L\", socket.inet_aton(str(row['srcIp'])))[0],\n",
+ " 'dst_ip': row['dstIp'],\n",
+ " 'dst_ip_int': struct.unpack(\"!L\", socket.inet_aton(str(row['dstIp'])))[0],\n",
+ " 'conns': int(row['connections']),\n",
+ " 'maxbytes': int(row['maxBytes'])\n",
+ " }\n",
+ " dstdict[row['dstIp']] = {\n",
+ " 'ip_int': struct.unpack(\"!L\", socket.inet_aton(str(row['dstIp'])))[0],\n",
+ " 'src_ip': row['srcIp'],\n",
+ " 'src_ip_int': struct.unpack(\"!L\", socket.inet_aton(str(row['srcIp'])))[0],\n",
+ " 'conns': int(row['connections']),\n",
+ " 'maxbytes': int(row['maxBytes'])\n",
+ " } \n",
+ " rowct +=1 \n",
" \n",
- " src = df.loc[df['dstip'] == anchor]\n",
- " src_per_conns = src.sort_values('conns',0,False)\n",
- " src_per_bytes = src.sort_values('maxbytes',0,False)\n",
- " dst = df.loc[df['srcip'] == anchor]\n",
- " dst_per_conns = dst.sort_values('conns',0,False)\n",
- " dst_per_bytes = dst.sort_values('maxbytes',0,False)\n",
+ " src = df.loc[df['dstIp'] == anchor]\n",
+ " src_per_conns = src.sort_values('connections',0,False)\n",
+ " src_per_bytes = src.sort_values('maxBytes',0,False)\n",
+ " dst = df.loc[df['srcIp'] == anchor]\n",
+ " dst_per_conns = dst.sort_values('connections',0,False)\n",
+ " dst_per_bytes = dst.sort_values('maxBytes',0,False)\n",
"\n",
" children = []\n",
- " children += (display_results(['srcip','conns','sport','dport'], src_per_conns, \n",
+ " children += (display_results(['srcIp','connections','srcPort','dstPort'], src_per_conns, \n",
" top_results),)\n",
- " children += (display_results(['dstip','conns','sport','dport'], dst_per_conns, \n",
+ " children += (display_results(['dstIp','connections','srcPort','dstPort'], dst_per_conns, \n",
" top_results),)\n",
- " children += (display_results(['srcip','maxbytes','sport','dport'], src_per_bytes, \n",
+ " children += (display_results(['srcIp','maxBytes','srcPort','dstPort'], src_per_bytes, \n",
" top_results),)\n",
- " children += (display_results(['dstip','maxbytes','sport','dport'], dst_per_bytes, \n",
+ " children += (display_results(['dstIp','maxBytes','srcPort','dstPort'], dst_per_bytes, \n",
" top_results),)\n",
"\n",
" result_tabs = widgets.Accordion(children=children, width='100%', selected_index=-1)\n",
@@ -505,245 +328,51 @@
"\n",
" resultTableBox.children = [result_tabs,]\n",
" \n",
- " if rowct > 0:\n",
- " for result in srcdict:\n",
- " if result in dstdict:\n",
- " twoway[result] = srcdict[result]\n",
- " else:\n",
- " outbound[result] = srcdict[result]\n",
- "\n",
- " for result in dstdict:\n",
- " if result not in srcdict:\n",
- " inbound[result] = dstdict[result] \n",
- " \n",
- " global top_inbound_b\n",
- " global top_outbound_b\n",
- " global top_twoway_b\n",
- " if len(inbound) > 0:\n",
- " top_inbound_b = get_top_bytes(inbound,top_results)\n",
- " top_inbound_conns = get_top_conns(inbound,top_results)\n",
- " top_inbound_b.update(top_inbound_conns) # merge the two dictionaries\n",
- " if len(outbound) > 0:\n",
- " top_outbound_b = get_top_bytes(outbound,top_results)\n",
- " top_outbound_conns = get_top_conns(outbound,top_results)\n",
- " top_outbound_b.update(top_outbound_conns) # merge the two dictionaries\n",
- " if len(twoway) > 0: \n",
- " top_twoway_b = get_top_bytes(twoway,top_results)\n",
- " top_twoway_conns = get_top_conns(twoway,top_results)\n",
- " top_twoway_b.update(top_twoway_conns) # merge the two dictionaries\n",
"\n",
"def display_results(cols, dataframe, top): \n",
" table = dataframe[:top].to_html(classes='table table-striped table-bordered table-hover', columns=cols, index=False)\n",
" return widgets.HTML(value=table, width='100%')\n",
- " \n",
- "#=========== Adds GEO IP information to the outbound, inbound and twoway connections==============================# \n",
- "def add_geospatial_info():\n",
- " # get geospatial info, only when iplocation file is available\n",
- " if iplist != '':\n",
- " for srcip in outbound:\n",
- " reader = csv.reader([linecache.getline(iploc, bisect.bisect(iplist,outbound[srcip]['ip_int'])).replace('\\n','')])\n",
- " outbound[srcip]['geo'] = reader.next()\n",
- " reader = csv.reader([linecache.getline(iploc, bisect.bisect(iplist,outbound[srcip]['dst_ip_int'])).replace('\\n','')])\n",
- " outbound[srcip]['geo_dst'] = reader.next()\n",
- "\n",
- " for dstip in twoway:\n",
- " reader = csv.reader([linecache.getline(iploc, bisect.bisect(iplist,twoway[dstip]['ip_int'])).replace('\\n','')])\n",
- " twoway[dstip]['geo'] = reader.next()\n",
- "\n",
- " for srcip in inbound:\n",
- " reader = csv.reader([linecache.getline(iploc, bisect.bisect(iplist,inbound[srcip]['ip_int'])).replace('\\n','')])\n",
- " inbound[srcip]['geo'] = reader.next()\n",
- " reader = csv.reader([linecache.getline(iploc, bisect.bisect(iplist,inbound[srcip]['src_ip_int'])).replace('\\n','')])\n",
- " inbound[srcip]['geo_src'] = reader.next()\n",
- " \n",
- " \n",
- " \n",
- "# need some way to combine timelines of outbound and two-way with big picture inbound only\n",
- "# get network context - get start and end ranges\n",
- "def add_network_context():\n",
- " nwdict = {}\n",
- " if os.path.isfile(nwloc) : \n",
- " with open(nwloc, 'r') as f:\n",
- " reader = csv.reader(f,delimiter=',')\n",
- " reader.next()\n",
- " #address range, description\n",
- " for row in reader:\n",
"\n",
- " if '/' in row[0]: \n",
- " #Range in subnet\n",
- " iprange = row[0].split('/')\n",
- " if len(iprange) < 2:\n",
- " ipend = 0\n",
- " else:\n",
- " ipend = int(iprange[1])\n",
- " nwdict[row[0]] = [struct.unpack(\"!L\", socket.inet_aton(iprange[0]))[0],\n",
- " struct.unpack(\"!L\", socket.inet_aton(iprange[0]))[0]+2**(32-ipend)-1, row[1]] \n",
- " elif '-' in row[0]: \n",
- " #IP Range \n",
- " iprange = row[0].split('-') \n",
- " nwdict[row[0]] = [struct.unpack(\"!L\", socket.inet_aton(iprange[0].replace(\" \", \"\")))[0],\n",
- " struct.unpack(\"!L\", socket.inet_aton(iprange[1].replace(\" \", \"\")))[0], row[1]]\n",
- " else:\n",
- " #Exact match \n",
- " nwdict[row[0]] = [struct.unpack(\"!L\", socket.inet_aton(row[0]))[0],\n",
- " struct.unpack(\"!L\", socket.inet_aton(row[0]))[0], row[1]] \n",
- "\n",
- " for srcip in outbound: \n",
- " temp_ip = struct.unpack(\"!L\", socket.inet_aton(srcip))[0]\n",
- " if srcip in nwdict:\n",
- " inbound[srcip]['nwloc'] = nwdict[srcip]\n",
- " else:\n",
- " matchingVals = [x for x in nwdict if nwdict[x][1] >= temp_ip and nwdict[x][0] <= temp_ip]\n",
- " outbound[srcip]['nwloc'] = nwdict[matchingVals[0]] if len(matchingVals) > 0 else '' \n",
- "\n",
- " for dstip in twoway: \n",
- " temp_ip = struct.unpack(\"!L\", socket.inet_aton(dstip))[0]\n",
- " if dstip in nwdict:\n",
- " twoway[dstip]['nwloc'] = nwdict[dstip]\n",
- " else:\n",
- " matchingVals = [x for x in nwdict if nwdict[x][1] >= temp_ip and nwdict[x][0] <= temp_ip]\n",
- " twoway[dstip]['nwloc'] = nwdict[matchingVals[0]] if len(matchingVals) > 0 else ''\n",
- "\n",
- " for srcip in inbound:\n",
- " temp_ip = struct.unpack(\"!L\", socket.inet_aton(srcip))[0]\n",
- " if srcip in nwdict:\n",
- " inbound[srcip]['nwloc'] = nwdict[srcip]\n",
- " else:\n",
- " matchingVals = [x for x in nwdict if nwdict[x][1] >= temp_ip and nwdict[x][0] <= temp_ip]\n",
- " inbound[srcip]['nwloc'] = nwdict[matchingVals[0]] if len(matchingVals) > 0 else ''\n",
- "\n",
- " \n",
- "def generate_attack_map_file(ip, inbound, outbound, twoway): \n",
- " if iplist != '':\n",
- " globe_fpath = dpath + 'globe-' + ip + \".json\"\n",
- " globe_json = {}\n",
- " globe_json['type'] = \"FeatureCollection\"\n",
- " globe_json['sourceips'] = []\n",
- " globe_json['destips'] = [] \n",
- " for srcip in twoway:\n",
- " try:\n",
- " row = twoway[srcip]['geo'] \n",
- " globe_json['destips'].append({\n",
- " 'type': 'Feature',\n",
- " 'properties': {\n",
- " 'location':row[8],\n",
- " 'ip':srcip,\n",
- " 'type':1\n",
- " },\n",
- " 'geometry': {\n",
- " 'type': 'Point',\n",
- " 'coordinates': [float(row[7]), float(row[6])]\n",
- " }\n",
- " })\n",
- " except ValueError:\n",
- " pass\n",
- "\n",
- "\n",
- " for dstip in outbound:\n",
- " try:\n",
- " row = outbound[dstip]['geo']\n",
- " dst_geo = outbound[dstip]['geo_dst']\n",
- " globe_json['sourceips'].append({\n",
- " 'type': 'Feature',\n",
- " 'properties': {\n",
- " 'location':row[8],\n",
- " 'ip':dstip,\n",
- " 'type':3\n",
- " },\n",
- " 'geometry': {\n",
- " 'type': 'Point',\n",
- " 'coordinates': [float(row[7]), float(row[6])]\n",
- " }\n",
- " })\n",
- " globe_json['destips'].append({\n",
- " 'type': 'Feature',\n",
- " 'properties': {\n",
- " 'location':row[8],\n",
- " 'ip':outbound[dstip]['dst_ip'],\n",
- " 'type':3\n",
- " },\n",
- " 'geometry': {\n",
- " 'type': 'Point',\n",
- " 'coordinates': [float(dst_geo[7]), float(dst_geo[6])]\n",
- " }\n",
- " }) \n",
- "\n",
- " except ValueError:\n",
- " pass\n",
- "\n",
- " for dstip in inbound:\n",
- " try:\n",
- " row = inbound[dstip]['geo']\n",
- " dst_geo = inbound[dstip]['geo_src']\n",
- " globe_json['sourceips'].append({\n",
- " 'type': 'Feature',\n",
- " 'properties': {\n",
- " 'location':row[8],\n",
- " 'ip':dstip,\n",
- " 'type':2\n",
- " },\n",
- " 'geometry': {\n",
- " 'type': 'Point',\n",
- " 'coordinates': [float(row[7]), float(row[6])]\n",
- " }\n",
- " })\n",
- " globe_json['destips'].append({\n",
- " 'type': 'Feature',\n",
- " 'properties': {\n",
- " 'location':row[8],\n",
- " 'ip':inbound[dstip]['src_ip'],\n",
- " 'type':2\n",
- " },\n",
- " 'geometry': {\n",
- " 'type': 'Point',\n",
- " 'coordinates': [float(dst_geo[7]), float(dst_geo[6])]\n",
- " }\n",
- " })\n",
- " except ValueError:\n",
- " pass\n",
- "\n",
- " json_str = json.dumps(globe_json)\n",
- " with open(globe_fpath, 'w') as globe_f:\n",
- " globe_f.write(json_str)\n",
- " response = \"Geolocation map successfully created <br/>\"\n",
- " else:\n",
- " response = \"The map can't be created without an iploc file <br/>\" \n",
- " \n",
- " return response\n",
"\n",
" \n",
"def add_threat(ip,threat_title, threat_comment):\n",
- " content = ''\n",
- " try:\n",
- " threat_f = open(threats_file, 'r')\n",
- " content = threat_f.read()\n",
- " if '{0}|{1}|{2}\\n'.format(ip,threat_title,threat_comment) not in content:\n",
- " content += '{0}|{1}|{2}\\n'.format(ip,threat_title,threat_comment)\n",
- " threat_f.close() \n",
- " except:\n",
- " content = 'ip|title|summary\\n'\n",
- " content += '{0}|{1}|{2}\\n'.format(ip,threat_title,threat_comment)\n",
- " \n",
- " threat_fw = open(threats_file, 'w')\n",
- " threat_fw.write(content)\n",
- " threat_fw.close()\n",
- " return \"\"\n",
- "\n",
+ " \n",
+ " mutation=\"\"\"mutation(\n",
+ " $date: SpotDateType, \n",
+ " $ip: SpotIpType!, \n",
+ " $text: String!, \n",
+ " $title: String!,\n",
+ " $threatDetails: [NetflowThreatDetailsInputType]!,\n",
+ " $topResults:Int) \n",
+ " {\n",
+ " flow {\n",
+ " createStoryboard(input:{\n",
+ " threatDetails: $threatDetails,\n",
+ " date: $date, \n",
+ " ip: $ip, \n",
+ " title: $title, \n",
+ " text: $text,\n",
+ " first:$topResults})\n",
+ " {success}\n",
+ " }\n",
+ " }\"\"\"\n",
+ " \n",
+ " variables={\n",
+ " 'date': datetime.datetime.strptime(date, '%Y%m%d').strftime('%Y-%m-%d'),\n",
+ " 'ip': ip,\n",
+ " 'title': threat_title,\n",
+ " 'text': threat_comment,\n",
+ " 'threatDetails': expanded_results['data']['flow']['threat']['details'],\n",
+ " 'first':top_results\n",
+ "\n",
+ " }\n",
" \n",
- "def get_top_bytes(conns_dict, top):\n",
- " topbytes = sorted(conns_dict.iteritems(), key=lambda (x,y): y['maxbytes'], reverse=True)\n",
- " topbytes = topbytes[0:top]\n",
- " return dict(topbytes)\n",
- "\n",
- "\n",
- "def get_top_conns(conns_dict, top): \n",
- " topconns = sorted(conns_dict.iteritems(), key=lambda (x,y): y['conns'], reverse=True)\n",
- " topconns = topconns[0:top]\n",
- " return dict(topconns)\n",
+ " response = GraphQLClient.request(mutation,variables)\n",
+ " if not 'errors' in response:\n",
+ " return \"Story board successfully created\"\n",
+ " else:\n",
+ " return response['errors'][0]['message']\n",
" \n",
- "def file_is_empty(path):\n",
- " return os.stat(path).st_size==0\n",
- "\n",
"def removeWidget(index):\n",
" js_command = \"$('.widget-area > .widget-subarea > .widget-box:eq({0})').remove();\".format(index) \n",
" display(Javascript(js_command))"
@@ -777,7 +406,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
- "version": "2.7.10"
+ "version": "2.7.5"
}
},
"nbformat": 4,
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1904f2b4/spot-oa/oa/proxy/ipynb_templates/Edge_Investigation_master.ipynb
----------------------------------------------------------------------
diff --git a/spot-oa/oa/proxy/ipynb_templates/Edge_Investigation_master.ipynb b/spot-oa/oa/proxy/ipynb_templates/Edge_Investigation_master.ipynb
index 0cfc28b..8eedc53 100644
--- a/spot-oa/oa/proxy/ipynb_templates/Edge_Investigation_master.ipynb
+++ b/spot-oa/oa/proxy/ipynb_templates/Edge_Investigation_master.ipynb
@@ -18,18 +18,14 @@
"import urllib2\n",
"import json\n",
"import os\n",
+ "import datetime\n",
"import csv \n",
"\n",
"# getting date from the parent path. \n",
"path = os.getcwd().split(\"/\") \n",
"date = path[len(path)-1] \n",
"dsource = path[len(path)-2] \n",
- "dpath = '/'.join(['data' if var == 'ipynb' else var for var in path]) + '/'\n",
- "\n",
- "sconnect = dpath + 'proxy_scores.tsv'\n",
- "sconnectbu = dpath + 'proxy_scores_bu.tsv'\n",
- "score_tmp = dpath + 'proxy_tmp.tsv' \n",
- "score_fbk = dpath + 'proxy_scores_fb.csv'"
+ "score_values = []"
]
},
{
@@ -47,7 +43,7 @@
"from IPython.display import display, HTML, clear_output, Javascript \n",
"\n",
"def fill_list(list_control,source):\n",
- " options_list = ['--Select--'] \n",
+ " options_list = ['- Select -'] \n",
" options_list.extend([s for s in source])\n",
" list_control.options = options_list\n",
"\n",
@@ -90,16 +86,28 @@
"\n",
"def data_loader(): \n",
" us_uris = []\n",
- "\n",
- " with open(sconnect, 'r') as f:\n",
- " reader = csv.DictReader(f, delimiter='\\t')\n",
+ " \n",
+ " response = GraphQLClient.request(\n",
+ " query=\"\"\"query($date:SpotDateType!) {\n",
+ " proxy{\n",
+ " suspicious(date:$date){\n",
+ " uri\n",
+ " }\n",
+ " }\n",
+ " }\"\"\",\n",
+ " variables={\n",
+ " 'date': datetime.datetime.strptime(date, '%Y%m%d').strftime('%Y-%m-%d')\n",
+ " }\n",
+ " )\n",
+ " \n",
+ " if not 'errors' in response:\n",
+ " for row in response['data']['proxy']['suspicious']:\n",
+ " us_uris.append(row['uri'])\n",
+ " else:\n",
+ " print 'An error occured : '+ response['errors'][0]['message']\n",
" \n",
- " for row in reader: \n",
- " if row['fulluri'] not in us_uris and row['uri_sev'] == '0': \n",
- " us_uris.append(row['fulluri'])\n",
- "\n",
" fill_list(uri_select,us_uris)\n",
- " uri_select.value = \"--Select--\" \n",
+ " uri_select.value = \"- Select -\" \n",
"\n",
"display(Javascript(\"$('.widget-area > .widget-subarea > *').remove();\"))\n",
"data_loader()\n",
@@ -125,83 +133,61 @@
"import datetime\n",
"import subprocess \n",
"\n",
- "def assign_score(b):\n",
- " scored_threats = []\n",
+ "def assign_score(b): \n",
" clear_output()\n",
" uri = quick_text.value or uri_select.value\n",
- " uri_sev = int(rating_btn.selected_label) if not \"--Select--\" in uri_select.value else \"\"\n",
- "\n",
- " with open(sconnect, 'r') as f:\n",
- " reader = csv.DictReader(f, delimiter='\\t')\n",
- " rowct = 0\n",
- " with open(score_tmp, 'w') as score:\n",
- " wr = csv.DictWriter(score, delimiter='\\t', quoting=csv.QUOTE_NONE, fieldnames=reader.fieldnames) \n",
- " wr.writeheader()\n",
- " \n",
- " for row in reader:\n",
- " if row['fulluri'] == uri:\n",
- " row['uri_sev'] = uri_sev\n",
- " scored_threats.append(row) \n",
- " rowct += 1\n",
- " try:\n",
- " wr.writerow(row)\n",
- " except:\n",
- " print str(row)\n",
- " \n",
- " #works on the feedback tab-separated file\n",
- " if not os.path.exists(score_fbk): \n",
- " with open(score_fbk, 'w') as feedback:\n",
- " wr = csv.DictWriter(feedback, delimiter='\\t', quoting=csv.QUOTE_NONE, fieldnames=reader.fieldnames) \n",
- " wr.writeheader()\n",
- " \n",
- " with open(score_fbk, 'a') as feedback:\n",
- " for row in scored_threats:\n",
- " wr = csv.DictWriter(feedback, delimiter='\\t', quoting=csv.QUOTE_NONE, fieldnames=reader.fieldnames) \n",
- " wr.writerow(row)\n",
- " \n",
+ " uri_sev = int(rating_btn.selected_label) if not \"- Select -\" in uri_select.value else \"\"\n",
+ " \n",
+ " clear_output()\n",
+ " #Gets input values\n",
+ " global score_values\n",
+ " \n",
+ " score_values.append((uri, uri_sev))\n",
+ " \n",
+ " if uri_select.value != \"- Select -\":\n",
+ " display(Javascript(\"$(\\\"option[data-value='\" + uri_select.value +\"']\\\").remove();\"))\n",
+ " \n",
" clear_output()\n",
- " print \"{0} matching requests scored\".format(rowct)\n",
- " !mv $score_tmp $sconnect\n",
" data_loader()\n",
- " uri_select.value = \"--Select--\"\n",
+ " uri_select.value = \"- Select -\"\n",
" quick_text.value = \"\"\n",
"\n",
"\n",
"def save(b): \n",
- " clear_output()\n",
- " display(Javascript(\"$('.widget-area > .widget-subarea > *').remove();\"))\n",
- " data_loader()\n",
- " display(scoring_form)\n",
- " display(Javascript('reloadParentData();')) \n",
- " ml_feedback()\n",
- " print \"Suspicious requests successfully updated\"\n",
- "\n",
+ " variables=[]\n",
+ " global score_values\n",
+ " mutation=\"\"\"mutation($input:[ProxyScoreInputType!]!)\n",
+ " {\n",
+ " proxy{\n",
+ " score(input:$input)\n",
+ " {success}\n",
+ " }\n",
+ " }\"\"\" \n",
+ " \n",
+ " for row in score_values:\n",
+ " variables.append({\n",
+ " 'date': datetime.datetime.strptime(date, '%Y%m%d').strftime('%Y-%m-%d'),\n",
+ " 'uri': row[0] if row[0] != \"\" else None,\n",
+ " 'score': row[1] if row[1] != \"\" else None \n",
+ " })\n",
+ "\n",
+ " var = {'input':variables}\n",
+ " response = GraphQLClient.request(mutation,var)\n",
+ " \n",
+ " score_values = []\n",
+ " if not 'errors' in response:\n",
+ " clear_output() \n",
+ " display(Javascript(\"$('.widget-area > .widget-subarea > *').remove();\"))\n",
+ " data_loader() \n",
+ " display(scoring_form)\n",
+ " display(Javascript('reloadParentData();')) \n",
+ " print \"Suspicious connects successfully updated\"\n",
+ " else:\n",
+ " print \"An error ocurred: \" + response['errors'][0]['message']\n",
"\n",
- "assign_btn.on_click(assign_score)\n",
- "save_btn.on_click(save)\n",
" \n",
- "\n",
- "def ml_feedback():\n",
- " dst_name = os.path.basename(sconnect)\n",
- " str_fb=\"DSOURCE={0} &&\\\n",
- " FDATE={1} &&\\\n",
- " source /etc/spot.conf &&\\\n",
- " usr=$(echo $LUSER | cut -f3 -d'/') &&\\\n",
- " mlnode=$MLNODE &&\\\n",
- " lpath=$LPATH &&\\\n",
- " scp {2} $usr@$mlnode:$lpath/{3}\".format(dsource,date,score_fbk,dst_name)\n",
- " subprocess.call(str_fb, shell=True)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {
- "collapsed": true
- },
- "outputs": [],
- "source": [
- "# !cp $sconnectbu $sconnect"
+ "assign_btn.on_click(assign_score)\n",
+ "save_btn.on_click(save)"
]
}
],
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1904f2b4/spot-oa/oa/proxy/ipynb_templates/Threat_Investigation_master.ipynb
----------------------------------------------------------------------
diff --git a/spot-oa/oa/proxy/ipynb_templates/Threat_Investigation_master.ipynb b/spot-oa/oa/proxy/ipynb_templates/Threat_Investigation_master.ipynb
index 5cd89db..f68f5c9 100644
--- a/spot-oa/oa/proxy/ipynb_templates/Threat_Investigation_master.ipynb
+++ b/spot-oa/oa/proxy/ipynb_templates/Threat_Investigation_master.ipynb
@@ -26,17 +26,10 @@
"except ImportError:\n",
" from IPython.html import widgets\n",
"from IPython.display import display, HTML, clear_output, Javascript \n",
- "\n",
- "with open('/etc/spot.conf') as conf:\n",
- " for line in conf.readlines():\n",
- " if \"DBNAME=\" in line: DBNAME = line.split(\"=\")[1].strip('\\n').replace(\"'\",\"\"); \n",
- " elif \"IMPALA_DEM=\" in line: IMPALA_DEM = line.split(\"=\")[1].strip('\\n').replace(\"'\",\"\"); \n",
" \n",
"path = os.getcwd().split(\"/\") \n",
"date = path[len(path)-1] \n",
"dpath = '/'.join(['data' if var == 'ipynb' else var for var in path]) + '/'\n",
- "sconnect = dpath + 'proxy_scores.tsv' \n",
- "threat_f = dpath + \"threats.csv\"\n",
"anchor = ''\n",
"anchor_hash = ''\n",
"clientips = defaultdict(int)\n",
@@ -148,28 +141,31 @@
" clear_output() \n",
" c_uri = []\n",
" uri_sev=[]\n",
- "\n",
- " #discards threats already commented\n",
- " if os.path.isfile(threat_f) and not file_is_empty(threat_f):\n",
- " with open(threat_f, 'r') as th:\n",
- " t_read = csv.reader(th, delimiter='|')\n",
- " t_read.next()\n",
- " for row in t_read: \n",
- " if row[0] != '' : c_uri.append(row[0])\n",
- " \n",
- " with open(sconnect, 'r') as f:\n",
- " reader = csv.reader(f, delimiter='\\t')\n",
- " reader.next()\n",
- " for row in reader:\n",
- " # \"p_date\":0 , \"p_time\":1, \"clientip\":2 , \"host\":3, \"reqmethod\":4 , \"useragent\":5 , \"resconttype\":6\n",
- " # , \"duration\":7, \"username\":8 , \"webcat\":9, \"referer\":10, \"respcode\":11, \"uriport\":12, \"uripath\":13\n",
- " # , \"uriquery\":14, \"serverip\":15, \"scbytes\":16 , \"csbytes\":17, \"fulluri\":18, \"word\":19\n",
- " #Forms a hash out of the anchor to use it as the file name\n",
- " if row[22] == '1': \n",
- " row_hash = md5.new(str(row[18])).hexdigest()\n",
- " if row[18] not in uri_sev and row_hash not in c_uri:\n",
- " uri_sev.append(row[18])\n",
- "\n",
+ " \n",
+ " response = GraphQLClient.request(\n",
+ " query=\"\"\"query($date:SpotDateType!) {\n",
+ " proxy{\n",
+ " threats{\n",
+ " list(date:$date) {\n",
+ " score\n",
+ " uri\n",
+ " datetime\n",
+ " }\n",
+ " }\n",
+ " }\n",
+ " }\"\"\",\n",
+ " variables={\n",
+ " 'date': datetime.datetime.strptime(date, '%Y%m%d').strftime('%Y-%m-%d')\n",
+ " }\n",
+ " ) \n",
+ " \n",
+ " if not 'errors' in response: \n",
+ " for row in response['data']['proxy']['threats']['list']: \n",
+ " if row['uri'] not in uri_sev and row['score'] == 1: \n",
+ " uri_sev.append(row['uri'])\n",
+ " else:\n",
+ " print \"An error ocurred: \" + response[\"errors\"][0][\"message\"]\n",
+ " \n",
" threat_title.value =\"<h4>Suspicious URI</h4>\"\n",
" \n",
" if len(uri_sev) == 0:\n",
@@ -191,92 +187,87 @@
" display(topBox) \n",
" \n",
" def search_ip(b): \n",
- " global anchor \n",
- " global anchor_hash\n",
- " global ir_f\n",
+ " global anchor\n",
+ " global expanded_results\n",
" anchor='' \n",
- " anchor_hash = ''\n",
" anchor = susp_select.value \n",
- " anchor_hash = md5.new(str(anchor)).hexdigest()\n",
" removeWidget(3)\n",
" removeWidget(2)\n",
" removeWidget(1) \n",
- " height=80 \n",
- " ir_f = dpath + 'es-' + anchor_hash + \".csv\" \n",
- " table = \"<table><th>TIME</th><th>CLIENT IP</th><th>USERNAME</th><th>DURATION</th> \\\n",
- " <th>FULL URI</th><th>WEB CATEGORY</th><th>RESPONSE CODE</th><th>REQUEST METHOD</th><th>USER AGENT</th> \\\n",
- " <th>MIME TYPE</th><th>REFERER</th><th>URI PORT</th><th>PROXY IP</th><th>SERVER BYTES</th><th>CLIENT BYTES</th>\"\n",
+ " height=80 \n",
" \n",
- " if not os.path.isfile(ir_f) or (os.path.isfile(ir_f) and file_is_empty(ir_f)):\n",
- " # time:0, clientip:1, username:2, duration:3, fullURI:4, webcat:5, respcode:6, reqmethod:7\n",
- " # useragent:8, resconttype: 9, referer: 10, uriport:11, serverip:12, scbytes:13, csbytes:14\n",
- " imp_query = (\"\\\"SELECT p_time, clientip, username, duration, fulluri, webcat, respcode, reqmethod,\\\n",
- " useragent, resconttype, referer, uriport, serverip, scbytes, csbytes FROM {0}.proxy\\\n",
- " WHERE y='{1}' AND m='{2}' AND d='{3}' AND (fulluri='{4}' OR referer ='{4}') ORDER BY p_time\\\"\") \n",
- " \n",
- " imp_query = imp_query.format(DBNAME,yy,mm,dd,anchor) \n",
- " !impala-shell -i $IMPALA_DEM --quiet -q \"INVALIDATE METADATA\"\n",
- " !impala-shell -i $IMPALA_DEM --quiet --print_header -B --output_delimiter='\\t' -q $imp_query -o $ir_f\n",
- " \n",
- " clear_output() \n",
- " req_method = {}\n",
+ " expanded_results = GraphQLClient.request(\n",
+ " query=\"\"\"query($date:SpotDateType!,$uri:String!){\n",
+ " proxy{\n",
+ " threat{\n",
+ " details(date:$date,uri:$uri) {\n",
+ " username\n",
+ " webCategory\n",
+ " responseContentType\n",
+ " datetime\n",
+ " referer\n",
+ " clientToServerBytes\n",
+ " duration\n",
+ " userAgent\n",
+ " uri\n",
+ " serverIp\n",
+ " requestMethod\n",
+ " responseCode\n",
+ " uriPort\n",
+ " clientIp\n",
+ " serverToClientBytes\n",
+ " }\n",
+ " }\n",
+ " } \n",
+ " }\n",
+ " \"\"\",\n",
+ " variables={\n",
+ " 'date': datetime.datetime.strptime(date, '%Y%m%d').strftime('%Y-%m-%d'),\n",
+ " 'uri': anchor\n",
+ " }\n",
+ " )\n",
+ " \n",
+ " \n",
+ " if not 'errors' in expanded_results: \n",
+ " i = 0\n",
+ " table = \"<table><th>TIME</th><th>CLIENT IP</th><th>USERNAME</th><th>DURATION</th> \\\n",
+ " <th>FULL URI</th><th>WEB CATEGORY</th><th>RESPONSE CODE</th><th>REQUEST METHOD</th><th>USER AGENT</th> \\\n",
+ " <th>MIME TYPE</th><th>REFERER</th><th>URI PORT</th><th>PROXY IP</th><th>SERVER BYTES</th><th>CLIENT BYTES</th>\"\n",
+ " for row in expanded_results['data']['proxy']['threat']['details']:\n",
+ " if i < top_results:\n",
+ " table += \"<tr><td>\"+ str(row['datetime'])+\"</td><td>\"+str(row['clientIp'])+\"</td>\\\n",
+ " <td><div class='spot-text-wrapper' data-toggle='tooltip'>\"+str(row['username'])+\"\\\n",
+ " </div></td><td>\"+str(row['duration'])+\"</td>\\\n",
+ " <td><div class='spot-text-wrapper' data-toggle='tooltip'>\"+str(row['uri'])+\"</div>\\\n",
+ " </td><td>\"+str(row['webCategory'])+\"</td>\\\n",
+ " <td>\"+str(row['responseCode'])+\"</td><td>\"+str(row['requestMethod'])+\"</td>\\\n",
+ " <td><div class='spot-text-wrapper' data-toggle='tooltip'>\"+str(row['userAgent'])+\"</div></td>\\\n",
+ " <td><div class='spot-text-wrapper' data-toggle='tooltip'>\"+str(row['responseContentType'])+\"</div></td>\\\n",
+ " <td><div class='spot-text-wrapper' data-toggle='tooltip'>\"+str(row['referer'])+\"</div></td>\\\n",
+ " <td>\"+str(row['uriPort'])+\"</td><td>\"+str(row['serverIp'])+\"</td><td>\\\n",
+ " \"+str(row['serverToClientBytes'])+\"</td><td>\"+str(row['clientToServerBytes'])+\"</td></tr>\"\n",
+ "\n",
+ " height += 20\n",
+ " i+=1\n",
+ " table += \"</table>\" \n",
+ " result_html_title.value='<h4>Displaying top {0} search results</h4>'.format(top_results)\n",
+ " else:\n",
+ " table = \"<table></table>\"\n",
+ " result_html_title.value='<h4>No results were found.</h4>'\n",
"\n",
- " with open(ir_f, 'r') as f:\n",
- " #Creates default dictionaries\n",
- " global reqmethods\n",
- " global rescontype\n",
- " global referers\n",
- " global refered\n",
- " global requests\n",
- " global clientips\n",
- " clientips = defaultdict(int)\n",
- " reqmethods = defaultdict(int)\n",
- " rescontype = defaultdict(int)\n",
- " referers = defaultdict(int)\n",
- " refered = defaultdict(int)\n",
- " try:\n",
- " reader = csv.reader(f, delimiter='\\t')\n",
- " reader.next() # Skip headers\n",
- " i=0 \n",
- " for row in reader:\n",
- " clientips[row[1]]+=1\n",
- " reqmethods[row[7]]+=1\n",
- " rescontype[row[9]]+=1\n",
- " if row[10] != anchor:\n",
- " #Source URI's that refered the user to the threat\n",
- " referers[row[10]]+=1\n",
- " if({'clientip':row[1],'referer':row[10],'reqmethod':row[7],'resconttype':row[9]}) not in requests:\n",
- " requests.append({'clientip':row[1],'referer':row[10],'reqmethod':row[7],'resconttype':row[9]})\n",
- " if i < top_results:\n",
- " table += \"<tr><td>\"+row[0]+\"</td><td>\"+row[1]+\"</td>\\\n",
- " <td><div class='spot-text-wrapper' data-toggle='tooltip'>\"+row[2]+\"</div></td><td>\"+row[3]+\"</td>\\\n",
- " <td><div class='spot-text-wrapper' data-toggle='tooltip'>\"+row[4]+\"</div></td><td>\"+row[5]+\"</td>\\\n",
- " <td>\"+row[6]+\"</td><td>\"+row[7]+\"</td>\\\n",
- " <td><div class='spot-text-wrapper' data-toggle='tooltip'>\"+row[8]+\"</div></td>\\\n",
- " <td><div class='spot-text-wrapper' data-toggle='tooltip'>\"+row[9]+\"</div></td>\\\n",
- " <td><div class='spot-text-wrapper' data-toggle='tooltip'>\"+row[10]+\"</div></td>\\\n",
- " <td>\"+row[11]+\"</td><td>\"+row[12]+\"</td><td>\"+row[13]+\"</td><td>\"+row[14]+\"</td></tr>\"\n",
- " else:\n",
- " #Destination URI's refered by the threat\n",
- " refered[row[4]]+=1\n",
- " height += 20\n",
- " i+=1\n",
- " table += \"</table>\" \n",
- " result_html_title.value='<h4>Displaying top {0} search results</h4>'.format(top_results)\n",
- " except:\n",
- " table = \"<table></table>\"\n",
- " result_html_title.value='<h4>No results were found.</h4>'\n",
- " \n",
- " result_html.value=table\n",
- " result_html_box.children = [result_html]\n",
+ " result_html.value=table\n",
+ " result_html_box.children = [result_html]\n",
" \n",
- " display_threat_box(anchor)\n",
- " resultTableBox.children = [result_html_title, result_html_box]\n",
- " display(bottomBox)\n",
+ " display_threat_box(anchor)\n",
+ " resultTableBox.children = [result_html_title, result_html_box]\n",
+ " display(bottomBox)\n",
+ " \n",
+ " \n",
" search_btn.on_click(search_ip)\n",
"\n",
" \n",
- "def display_threat_box(ip): \n",
+ "def display_threat_box(ip): \n",
+ " global expanded_results\n",
" result_title.value=\"<h4 class='spot-text-wrapper spot-text-xlg' data-toggle='tooltip'>Threat summary for \" + anchor +\"</h4>\"\n",
" tc_txt_title = widgets.Text(value='', placeholder='Threat Title', width='100%')\n",
" tc_txa_summary = widgets.Textarea(value='', height=100, width='95%')\n",
@@ -293,75 +284,56 @@
" resultSummaryBox.children = [result_title,result_summary_box]\n",
" \n",
" def save_threat_summary(b):\n",
- " global anchor \n",
- " global anchor_hash \n",
- " if anchor != '': \n",
- " global threat_f\n",
- " if not os.path.exists(threat_f): \n",
- " with open(threat_f, 'w') as comment:\n",
- " comment.write('hash|title|summary\\n')\n",
- " \n",
- " with open(threat_f, 'a') as comment:\n",
- " comment.write(anchor_hash + '|' + tc_txt_title.value + '|' +\n",
- " tc_txa_summary.value.replace('\\n', '\\\\n') + '\\n') \n",
- " \n",
- " display(Javascript(\"$(\\\"option[data-value='\" + anchor +\"']\\\").remove();\")) \n",
- " display(Javascript(\"$('.widget-area > .widget-subarea > .widget-box:gt(0)').remove();\"))\n",
- " \n",
- " response = \"Summary successfully saved\"\n",
- " incident_progression(anchor, anchor_hash)\n",
- " timeline(anchor, anchor_hash)\n",
+ " result_msg = \"\"\n",
+ " threat_title = tc_txt_title.value \n",
+ " threat_comment = tc_txa_summary.value\n",
+ "\n",
+ " \n",
+ " if anchor != '': \n",
+ " mutation=\"\"\"mutation(\n",
+ " $date: SpotDateType, \n",
+ " $uri: String!, \n",
+ " $text: String!, \n",
+ " $title: String!,\n",
+ " $threatDetails: [ProxyThreatDetailsInputType!]!,\n",
+ " $first:Int) \n",
+ " {\n",
+ " proxy {\n",
+ " createStoryboard(input:{\n",
+ " threatDetails: $threatDetails,\n",
+ " date: $date, \n",
+ " uri: $uri, \n",
+ " title: $title, \n",
+ " text: $text,\n",
+ " first:$first})\n",
+ " {success}\n",
+ " }\n",
+ " }\"\"\"\n",
+ "\n",
+ " variables={\n",
+ " 'date': datetime.datetime.strptime(date, '%Y%m%d').strftime('%Y-%m-%d'),\n",
+ " 'uri': anchor,\n",
+ " 'title': threat_title,\n",
+ " 'text': threat_comment,\n",
+ " 'threatDetails': expanded_results['data']['proxy']['threat']['details'],\n",
+ " 'first':top_results\n",
+ " }\n",
+ "\n",
+ " response = GraphQLClient.request(mutation,variables)\n",
+ " if not 'errors' in response:\n",
+ " start_investigation()\n",
+ " result_msg = \"Story board successfully created\"\n",
+ " else:\n",
+ " result_msg = response['errors'][0]['message'] \n",
" else:\n",
- " response = \"No data selected\" \n",
- " \n",
+ " result_msg = \"No data selected\" \n",
+ "\n",
" susp_select.selected_label = susp_select.options[0]\n",
- " display(widgets.Box((widgets.HTML(value=response, width='100%'),)))\n",
+ " display(widgets.Box((widgets.HTML(value=result_msg, width='100%'),)))\n",
" \n",
" tc_btn_save.on_click(save_threat_summary)\n",
- " \n",
- " \n",
- "def incident_progression(anchor, anchor_hash):\n",
- " file_name = dpath + 'incident-progression-'+anchor_hash+'.json'\n",
- " jsonstring = json.dumps({'fulluri':anchor, 'requests':requests,'referer_for':referers.keys()})\n",
- " if not os.path.exists(file_name):\n",
- " with open(file_name, 'w') as f:\n",
- " f.write(jsonstring) \n",
- " response = \"Incident progression successfuly created\"\n",
- " display(widgets.Box((widgets.HTML(value=response, width='100%'),)))\n",
- " \n",
- " \n",
- "def timeline(anchor, anchor_hash): \n",
- " response = \"\"\n",
- " susp_ips = []\n",
- " if clientips:\n",
- " srtlist = sorted(list(clientips.items()), key=lambda x: x[1], reverse=True)\n",
- " for val in srtlist[:top_results]:\n",
- " susp_ips.append(val[0]) \n",
- " \n",
- " if anchor != \"\":\n",
- " sbdet_f = dpath + \"timeline-\"+anchor_hash+\".tsv\"\n",
- " if not os.path.isfile(sbdet_f) or (os.path.isfile(sbdet_f) and file_is_empty(sbdet_f)): \n",
- " imp_query = \"\\\"SELECT concat(cast(p_date as string), ' ', cast(MIN(p_time) as string)) AS tstart,\\\n",
- " concat(cast(p_date as string), ' ', cast(MAX(p_time) as string)) AS tend, SUM(duration) AS duration,\\\n",
- " clientip, respcode from {0}.proxy WHERE fulluri='{1}' AND clientip IN ({5}) \\\n",
- " AND y='{2}' AND m='{3}' AND d='{4}' GROUP BY clientip, p_time, respcode, p_date ORDER BY clientip\\\n",
- " LIMIT {6}\\\"\"\n",
"\n",
- " imp_query=imp_query.format(DBNAME,anchor,yy,mm,dd,(\"'\" + \"','\".join(susp_ips) + \"'\"), details_limit) \n",
- " !impala-shell -i $IMPALA_DEM --quiet -q \"INVALIDATE METADATA\"\n",
- " !impala-shell -i $IMPALA_DEM --quiet --print_header -B --output_delimiter='\\t' -q $imp_query -o $sbdet_f\n",
- " clear_output()\n",
- " \n",
- " response = \"Timeline successfully saved\"\n",
- " else:\n",
- " response = \"Timeline couldn't be created\"\n",
" \n",
- " display(widgets.Box((widgets.HTML(value=response, width='100%'),)))\n",
- " data_loader()\n",
- " \n",
- "def file_is_empty(path):\n",
- " return os.stat(path).st_size==0\n",
- "\n",
"def removeWidget(index):\n",
" js_command = \"$('.widget-area > .widget-subarea > .widget-box:eq({0})').remove();\".format(index) \n",
" display(Javascript(js_command)) "
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1904f2b4/spot-oa/oa/proxy/proxy_conf.json
----------------------------------------------------------------------
diff --git a/spot-oa/oa/proxy/proxy_conf.json b/spot-oa/oa/proxy/proxy_conf.json
index 5378939..2228e80 100644
--- a/spot-oa/oa/proxy/proxy_conf.json
+++ b/spot-oa/oa/proxy/proxy_conf.json
@@ -42,14 +42,12 @@
, "serverip":15
, "scbytes":16
, "csbytes":17
- , "fulluri":18
+ , "fulluri":18
, "word":19
- , "score":20
+ , "ml_score":20
, "uri_rep":21
- , "uri_sev":22
- , "respcode_name":23
- , "network_context":24
- , "hash":25
+ , "respcode_name":22
+ , "network_context":23
},
"add_reputation":{
"fulluri":18
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1904f2b4/spot-oa/oa/proxy/proxy_oa.py
----------------------------------------------------------------------
diff --git a/spot-oa/oa/proxy/proxy_oa.py b/spot-oa/oa/proxy/proxy_oa.py
index 1324f1a..02a9297 100644
--- a/spot-oa/oa/proxy/proxy_oa.py
+++ b/spot-oa/oa/proxy/proxy_oa.py
@@ -27,8 +27,12 @@ from utils import Util
from components.data.data import Data
from components.iana.iana_transform import IanaTransform
from components.nc.network_context import NetworkContext
+
+import api.resources.hdfs_client as HDFSClient
+import api.resources.impala_engine as impala
from multiprocessing import Process
import pandas as pd
+from impala.util import as_pandas
import time
import md5
@@ -78,13 +82,12 @@ class OA(object):
####################
self._create_folder_structure()
+ self._clear_previous_executions()
self._add_ipynb()
self._get_proxy_results()
- self._add_reputation()
- self._add_severity()
+ self._add_reputation()
self._add_iana()
- self._add_network_context()
- self._add_hash()
+ self._add_network_context()
self._create_proxy_scores_csv()
self._get_oa_details()
self._ingest_summary()
@@ -102,6 +105,27 @@ class OA(object):
self._data_path,self._ingest_summary_path,self._ipynb_path = Util.create_oa_folders("proxy",self._date)
+ def _clear_previous_executions(self):
+
+ self._logger.info("Cleaning data from previous executions for the day")
+ yr = self._date[:4]
+ mn = self._date[4:6]
+ dy = self._date[6:]
+ table_schema = []
+ HUSER = self._spot_conf.get('conf', 'HUSER').replace("'", "").replace('"', '')
+ table_schema=['suspicious', 'edge','threat_investigation', 'timeline', 'storyboard', 'summary' ]
+
+ for path in table_schema:
+ HDFSClient.delete_folder("{0}/{1}/hive/oa/{2}/y={3}/m={4}/d={5}".format(HUSER,self._table_name,path,yr,mn,dy),user="impala")
+
+ HDFSClient.delete_folder("{0}/{1}/hive/oa/{2}/y={3}/m={4}".format(HUSER,self._table_name,"summary",yr,mn),user="impala")
+ #removes Feedback file
+ HDFSClient.delete_folder("{0}/{1}/scored_results/{2}{3}{4}/feedback/ml_feedback.csv".format(HUSER,self._table_name,yr,mn,dy))
+ #removes json files from the storyboard
+ HDFSClient.delete_folder("{0}/{1}/oa/{2}/{3}/{4}/{5}".format(HUSER,self._table_name,"storyboard",yr,mn,dy))
+
+
+
def _add_ipynb(self):
if os.path.isdir(self._ipynb_path):
@@ -140,23 +164,23 @@ class OA(object):
self._logger.error("There was an error getting ML results from HDFS")
sys.exit(1)
- # add headers.
- self._logger.info("Adding headers")
- self._proxy_scores_headers = [ str(key) for (key,value) in self._conf['proxy_score_fields'].items() ]
-
self._proxy_scores = self._proxy_results[:]
def _create_proxy_scores_csv(self):
-
- proxy_scores_csv = "{0}/proxy_scores.tsv".format(self._data_path)
- proxy_scores_final = self._proxy_scores[:];
- proxy_scores_final.insert(0,self._proxy_scores_headers)
- Util.create_csv_file(proxy_scores_csv,proxy_scores_final, self._results_delimiter)
-
- # create bk file
- proxy_scores_bu_csv = "{0}/proxy_scores_bu.tsv".format(self._data_path)
- Util.create_csv_file(proxy_scores_bu_csv,proxy_scores_final, self._results_delimiter)
+ # get date parameters.
+ yr = self._date[:4]
+ mn = self._date[4:6]
+ dy = self._date[6:]
+ value_string = ""
+
+ for row in self._proxy_scores:
+ value_string += str(tuple(Util.cast_val(item) for item in row)) + ","
+
+ load_into_impala = ("""
+ INSERT INTO {0}.proxy_scores partition(y={2}, m={3}, d={4}) VALUES {1}
+ """).format(self._db, value_string[:-1], yr, mn, dy)
+ impala.execute_query(load_into_impala)
def _add_reputation(self):
@@ -200,12 +224,6 @@ class OA(object):
self._proxy_scores = [ conn + [""] for conn in self._proxy_scores ]
-
- def _add_severity(self):
- # Add severity column
- self._proxy_scores = [conn + [0] for conn in self._proxy_scores]
-
-
def _add_iana(self):
iana_conf_file = "{0}/components/iana/iana_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@@ -225,96 +243,72 @@ class OA(object):
nc_conf = json.loads(open(nc_conf_file).read())["NC"]
proxy_nc = NetworkContext(nc_conf,self._logger)
ip_dst_index = self._conf["proxy_score_fields"]["clientip"]
- self._proxy_scores = [ conn + [proxy_nc.get_nc(conn[ip_dst_index])] for conn in self._proxy_scores ]
-
+ self._proxy_scores = [ conn + [proxy_nc.get_nc(conn[ip_dst_index])] for conn in self._proxy_scores ]
else:
self._proxy_scores = [ conn + [""] for conn in self._proxy_scores ]
- def _add_hash(self):
- #A hash string is generated to be used as the file name for the edge files.
- #These fields are used for the hash creation, so this combination of values is treated as
- #a 'unique' connection
- cip_index = self._conf["proxy_score_fields"]["clientip"]
- uri_index = self._conf["proxy_score_fields"]["fulluri"]
- tme_index = self._conf["proxy_score_fields"]["p_time"]
-
- self._proxy_scores = [conn + [str( md5.new(str(conn[cip_index]) + str(conn[uri_index])).hexdigest() + str((conn[tme_index].split(":"))[0]) )] for conn in self._proxy_scores]
-
-
def _get_oa_details(self):
self._logger.info("Getting OA Proxy suspicious details")
# start suspicious connects details process.
p_sp = Process(target=self._get_suspicious_details)
p_sp.start()
-
- # p_sp.join()
+
def _get_suspicious_details(self):
- hash_list = []
+ uri_list = []
iana_conf_file = "{0}/components/iana/iana_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if os.path.isfile(iana_conf_file):
iana_config = json.loads(open(iana_conf_file).read())
proxy_iana = IanaTransform(iana_config["IANA"])
for conn in self._proxy_scores:
- conn_hash = conn[self._conf["proxy_score_fields"]["hash"]]
- if conn_hash not in hash_list:
- hash_list.append(conn_hash)
- clientip = conn[self._conf["proxy_score_fields"]["clientip"]]
- fulluri = conn[self._conf["proxy_score_fields"]["fulluri"]]
- date=conn[self._conf["proxy_score_fields"]["p_date"]].split('-')
- if len(date) == 3:
- year=date[0]
- month=date[1].zfill(2)
- day=date[2].zfill(2)
- hh=(conn[self._conf["proxy_score_fields"]["p_time"]].split(":"))[0]
- self._get_proxy_details(fulluri,clientip,conn_hash,year,month,day,hh,proxy_iana)
-
-
- def _get_proxy_details(self,fulluri,clientip,conn_hash,year,month,day,hh,proxy_iana):
-
- limit = 250
- output_delimiter = '\t'
- edge_file ="{0}/edge-{1}-{2}.tsv".format(self._data_path,clientip,conn_hash)
- edge_tmp ="{0}/edge-{1}-{2}.tmp".format(self._data_path,clientip,conn_hash)
-
- if not os.path.isfile(edge_file):
- proxy_qry = ("SELECT p_date, p_time, clientip, host, webcat, respcode, reqmethod, useragent, resconttype, \
- referer, uriport, serverip, scbytes, csbytes, fulluri FROM {0}.{1} WHERE y=\'{2}\' AND m=\'{3}\' AND d=\'{4}\' AND \
- h=\'{5}\' AND fulluri =\'{6}\' AND clientip = \'{7}\' LIMIT {8};").format(self._db,self._table_name, year,month,day,hh,fulluri,clientip,limit)
-
- # execute query
- self._engine.query(proxy_qry,edge_tmp,output_delimiter)
- # add IANA to results.
+ clientip = conn[self._conf["proxy_score_fields"]["clientip"]]
+ fulluri = conn[self._conf["proxy_score_fields"]["fulluri"]]
+ date=conn[self._conf["proxy_score_fields"]["p_date"]].split('-')
+ if len(date) == 3:
+ year=date[0]
+ month=date[1].zfill(2)
+ day=date[2].zfill(2)
+ hh=(conn[self._conf["proxy_score_fields"]["p_time"]].split(":"))[0]
+ self._get_proxy_details(fulluri,clientip,year,month,day,hh,proxy_iana)
+
+
+
+ def _get_proxy_details(self,fulluri,clientip,year,month,day,hh,proxy_iana):
+ limit = 250
+ value_string = ""
+
+ query_to_load =("""
+ SELECT p_date, p_time, clientip, host, webcat, respcode, reqmethod, useragent, resconttype,
+ referer, uriport, serverip, scbytes, csbytes, fulluri, {5} as hh
+ FROM {0}.{1} WHERE y='{2}' AND m='{3}' AND d='{4}' AND
+ h='{5}' AND fulluri='{6}' AND clientip='{7}' LIMIT {8};
+ """).format(self._db,self._table_name, year,month,day,hh,fulluri,clientip,limit)
+
+ detail_results = impala.execute_query(query_to_load)
+
+ if proxy_iana:
+ # add IANA to results.
self._logger.info("Adding IANA translation to details results")
- with open(edge_tmp) as proxy_details_csv:
- rows = csv.reader(proxy_details_csv, delimiter=output_delimiter,quotechar='"')
- next(proxy_details_csv)
- update_rows = [[conn[0]] + [conn[1]] + [conn[2]] + [conn[3]] + [conn[4]] + [proxy_iana.get_name(conn[5],"proxy_http_rcode") if proxy_iana else conn[5]] + [conn[6]] + [conn[7]] + [conn[8]] + [conn[9]] + [conn[10]] + [conn[11]] + [conn[12]] + [conn[13]] + [conn[14]] if len(conn) > 0 else [] for conn in rows]
- update_rows = filter(None, update_rows)
- header = ["p_date","p_time","clientip","host","webcat","respcode","reqmethod","useragent","resconttype","referer","uriport","serverip","scbytes","csbytes","fulluri"]
- update_rows.insert(0,header)
-
- # due an issue with the output of the query.
- update_rows = [ [ w.replace('"','') for w in l ] for l in update_rows ]
-
-
- # create edge file.
- self._logger.info("Creating edge file:{0}".format(edge_file))
- with open(edge_file,'wb') as proxy_details_edge:
- writer = csv.writer(proxy_details_edge, quoting=csv.QUOTE_NONE, delimiter=output_delimiter)
- if update_rows:
- writer.writerows(update_rows)
- else:
- shutil.copy(edge_tmp,edge_file)
-
- try:
- os.remove(edge_tmp)
- except OSError:
- pass
+
+ updated_rows = [conn + (proxy_iana.get_name(conn[5],"proxy_http_rcode"),) for conn in detail_results]
+ updated_rows = filter(None, updated_rows)
+ else:
+ updated_rows = [conn + ("") for conn in detail_results ]
+
+ for row in updated_rows:
+ value_string += str(tuple(item for item in row)) + ","
+
+ if value_string != "":
+ query_to_insert=("""
+ INSERT INTO {0}.proxy_edge PARTITION (y={1}, m={2}, d={3}) VALUES ({4});
+ """).format(self._db,year, month, day, value_string[:-1])
+
+ impala.execute_query(query_to_insert)
+
def _ingest_summary(self):
# get date parameters.
@@ -328,44 +322,36 @@ class OA(object):
result_rows = []
df_filtered = pd.DataFrame()
- ingest_summary_file = "{0}/is_{1}{2}.csv".format(self._ingest_summary_path,yr,mn)
- ingest_summary_tmp = "{0}.tmp".format(ingest_summary_file)
-
- if os.path.isfile(ingest_summary_file):
- df = pd.read_csv(ingest_summary_file, delimiter=',')
- #discards previous rows from the same date
- df_filtered = df[df['date'].str.contains("{0}-{1}-{2}".format(yr, mn, dy)) == False]
- else:
- df = pd.DataFrame()
-
# get ingest summary.
- ingest_summary_qry = ("SELECT p_date, p_time, COUNT(*) as total "
- " FROM {0}.{1}"
- " WHERE y='{2}' AND m='{3}' AND d='{4}' "
- " AND p_date IS NOT NULL AND p_time IS NOT NULL "
- " AND clientip IS NOT NULL AND p_time != '' "
- " AND host IS NOT NULL AND fulluri IS NOT NULL "
- " GROUP BY p_date, p_time;")
-
- ingest_summary_qry = ingest_summary_qry.format(self._db,self._table_name, yr, mn, dy)
- results_file = "{0}/results_{1}.csv".format(self._ingest_summary_path,self._date)
- self._engine.query(ingest_summary_qry,output_file=results_file,delimiter=",")
+
+ query_to_load=("""
+ SELECT p_date, p_time, COUNT(*) as total
+ FROM {0}.{1} WHERE y='{2}' AND m='{3}' AND d='{4}'
+ AND p_date IS NOT NULL AND p_time IS NOT NULL
+ AND clientip IS NOT NULL AND p_time != ''
+ AND host IS NOT NULL AND fulluri IS NOT NULL
+ GROUP BY p_date, p_time;
+ """).format(self._db,self._table_name, yr, mn, dy)
- if os.path.isfile(results_file):
- df_results = pd.read_csv(results_file, delimiter=',')
-
+ results = impala.execute_query(query_to_load)
+
+ if results:
+ df_results = as_pandas(results)
#Forms a new dataframe splitting the minutes from the time column/
df_new = pd.DataFrame([["{0} {1}:{2}".format(val['p_date'], val['p_time'].split(":")[0].zfill(2), val['p_time'].split(":")[1].zfill(2)), int(val['total']) if not math.isnan(val['total']) else 0 ] for key,val in df_results.iterrows()],columns = ingest_summary_cols)
-
+ value_string = ''
#Groups the data by minute
sf = df_new.groupby(by=['date'])['total'].sum()
df_per_min = pd.DataFrame({'date':sf.index, 'total':sf.values})
- df_final = df_filtered.append(df_per_min, ignore_index=True)
- df_final.to_csv(ingest_summary_tmp,sep=',', index=False)
-
- os.remove(results_file)
- os.rename(ingest_summary_tmp,ingest_summary_file)
+ df_final = df_filtered.append(df_per_min, ignore_index=True).to_records(False,False)
+ if len(df_final) > 0:
+ query_to_insert=("""
+ INSERT INTO {0}.proxy_ingest_summary PARTITION (y={1}, m={2}) VALUES {3};
+ """).format(self._db, yr, mn, tuple(df_final))
+
+ impala.execute_query(query_to_insert)
+
else:
self._logger.info("No data found for the ingest summary")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1904f2b4/spot-oa/oa/utils.py
----------------------------------------------------------------------
diff --git a/spot-oa/oa/utils.py b/spot-oa/oa/utils.py
index 99b006e..2bed10e 100644
--- a/spot-oa/oa/utils.py
+++ b/spot-oa/oa/utils.py
@@ -121,6 +121,18 @@ class Util(object):
writer.writerows(content)
+ @classmethod
+ def cast_val(self,value):
+ try:
+ val = int(value)
+ except:
+ try:
+ val = float(value)
+ except:
+ val = str(value)
+ return val
+
+
class SecHead(object):
def __init__(self, fp):
self.fp = fp