You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/03/08 00:25:07 UTC

[1/3] incubator-metron git commit: METRON-58 Remediate Deployment Integration Testing Issues (dlyle65535 via cestella) closes apache/incubator-metron#36

Repository: incubator-metron
Updated Branches:
  refs/heads/master 6638a71ad -> 2e9f2c6ce


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/SampleIndexed/YafIndexed
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/SampleIndexed/YafIndexed b/metron-streaming/Metron-Topologies/src/main/resources/SampleIndexed/YafIndexed
index 27b3589..1c38406 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/SampleIndexed/YafIndexed
+++ b/metron-streaming/Metron-Topologies/src/main/resources/SampleIndexed/YafIndexed
@@ -1,10 +1,10 @@
-{enrichments.geo.dip.longitude=test longitude, iflags=AS, enrichments.geo.dip.location_point=test longitude,test latitude, uflags=0, isn=22efa001, dip=10.0.2.15, dp=39468, threatintels.ip.dip=, enrichments.geo.sip.postalCode=test postalCode, duration=0.000, rpkt=0, enrichments.geo.dip.country=test country, original_string=2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                          216.21.170.221|   80|                               10.0.2.15|39468|      AS|       0|       0|       0|22efa001|00000000|000|000|       1|      44|       0|       0|    0|idle, enrichments.geo.dip.locID=1, enrichments.geo.sip.city=test city, enrichments.geo.dip.latitude=test latitude, enrichments.geo.sip.country=test country, enrichments.geo.dip.city=test city, enrichments.geo.sip.dmaCode=test dmaCode, pkt=1, enrichments.geo.sip.location_point=test longitude,test latitude, ruflags=0, roct=0, sip=216.21.170.221, tag=0, enrichments.geo.dip.dmaCode=test dmaCode, rtag=0, sp
 =80, enrichments.geo.sip.longitude=test longitude, enrichments.geo.sip.latitude=test latitude, timestamp=1453994988512, app=0, threatintels.ip.sip=, oct=44, end_reason=idle, enrichments.geo.sip.locID=1, risn=0, enrichments.host.dip.known_info.type=printer, end_time=1453994988512, enrichments.host.dip.known_info.asset_value=important, enrichments.geo.dip.postalCode=test postalCode, source.type=yaf, enrichments.host.sip=, start_time=1453994988512, riflags=0, rtt=0.000, proto=6, enrichments.host.dip.known_info.local=YES}
-{enrichments.geo.dip.longitude=test longitude, iflags=A, enrichments.geo.dip.location_point=test longitude,test latitude, uflags=0, enrichments.host.sip.known_info.asset_value=important, isn=10000000, dip=10.0.2.3, enrichments.host.sip.known_info.local=YES, dp=53, enrichments.geo.sip.postalCode=test postalCode, duration=0.000, rpkt=0, enrichments.geo.dip.country=test country, original_string=2016-01-28 15:29:48.502|2016-01-28 15:29:48.502|   0.000|   0.000| 17|                               10.0.2.15|37299|                                10.0.2.3|   53|       A|       0|       0|       0|10000000|00000000|000|000|       1|      56|       0|       0|    0|idle, enrichments.geo.dip.locID=1, enrichments.geo.sip.city=test city, enrichments.host.sip.known_info.type=printer, enrichments.geo.dip.latitude=test latitude, enrichments.geo.sip.country=test country, enrichments.geo.dip.city=test city, enrichments.geo.sip.dmaCode=test dmaCode, pkt=1, enrichments.geo.sip.location_point=test longit
 ude,test latitude, ruflags=0, roct=0, sip=10.0.2.15, tag=0, enrichments.geo.dip.dmaCode=test dmaCode, rtag=0, sp=37299, enrichments.geo.sip.longitude=test longitude, enrichments.geo.sip.latitude=test latitude, timestamp=1453994988502, app=0, threatintels.ip.sip=, enrichments.host.dip=, oct=56, end_reason=idle, enrichments.geo.sip.locID=1, risn=0, end_time=1453994988502, enrichments.geo.dip.postalCode=test postalCode, source.type=yaf, start_time=1453994988502, riflags=0, rtt=0.000, threatintels.ip.dip.threat_source=ip_threat_intel, proto=17}
-{enrichments.geo.dip.longitude=test longitude, iflags=A, enrichments.geo.dip.location_point=test longitude,test latitude, uflags=0, isn=0, dip=10.0.2.15, dp=37299, threatintels.ip.dip=, enrichments.geo.sip.postalCode=test postalCode, duration=0.000, rpkt=0, enrichments.geo.dip.country=test country, original_string=2016-01-28 15:29:48.504|2016-01-28 15:29:48.504|   0.000|   0.000| 17|                                10.0.2.3|   53|                               10.0.2.15|37299|       A|       0|       0|       0|00000000|00000000|000|000|       1|     312|       0|       0|    0|idle, enrichments.geo.dip.locID=1, enrichments.geo.sip.city=test city, enrichments.geo.dip.latitude=test latitude, enrichments.geo.sip.country=test country, enrichments.geo.dip.city=test city, enrichments.geo.sip.dmaCode=test dmaCode, pkt=1, enrichments.geo.sip.location_point=test longitude,test latitude, ruflags=0, roct=0, sip=10.0.2.3, tag=0, enrichments.geo.dip.dmaCode=test dmaCode, rtag=0, sp=53, enrichmen
 ts.geo.sip.longitude=test longitude, enrichments.geo.sip.latitude=test latitude, timestamp=1453994988504, app=0, oct=312, end_reason=idle, enrichments.geo.sip.locID=1, risn=0, enrichments.host.dip.known_info.type=printer, end_time=1453994988504, enrichments.host.dip.known_info.asset_value=important, enrichments.geo.dip.postalCode=test postalCode, source.type=yaf, enrichments.host.sip=, start_time=1453994988504, threatintels.ip.sip.threat_source=ip_threat_intel, riflags=0, rtt=0.000, proto=17, enrichments.host.dip.known_info.local=YES}
-{enrichments.geo.dip.longitude=test longitude, iflags=A, enrichments.geo.dip.location_point=test longitude,test latitude, uflags=0, enrichments.host.sip.known_info.asset_value=important, isn=0, dip=10.0.2.3, enrichments.host.sip.known_info.local=YES, dp=53, enrichments.geo.sip.postalCode=test postalCode, duration=0.000, rpkt=0, enrichments.geo.dip.country=test country, original_string=2016-01-28 15:29:48.504|2016-01-28 15:29:48.504|   0.000|   0.000| 17|                               10.0.2.15|56303|                                10.0.2.3|   53|       A|       0|       0|       0|00000000|00000000|000|000|       1|      56|       0|       0|    0|idle, enrichments.geo.dip.locID=1, enrichments.geo.sip.city=test city, enrichments.host.sip.known_info.type=printer, enrichments.geo.dip.latitude=test latitude, enrichments.geo.sip.country=test country, enrichments.geo.dip.city=test city, enrichments.geo.sip.dmaCode=test dmaCode, pkt=1, enrichments.geo.sip.location_point=test longitude,tes
 t latitude, ruflags=0, roct=0, sip=10.0.2.15, tag=0, enrichments.geo.dip.dmaCode=test dmaCode, rtag=0, sp=56303, enrichments.geo.sip.longitude=test longitude, enrichments.geo.sip.latitude=test latitude, timestamp=1453994988504, app=0, threatintels.ip.sip=, enrichments.host.dip=, oct=56, end_reason=idle, enrichments.geo.sip.locID=1, risn=0, end_time=1453994988504, enrichments.geo.dip.postalCode=test postalCode, source.type=yaf, start_time=1453994988504, riflags=0, rtt=0.000, threatintels.ip.dip.threat_source=ip_threat_intel, proto=17}
-{enrichments.geo.dip.longitude=test longitude, iflags=A, enrichments.geo.dip.location_point=test longitude,test latitude, uflags=0, isn=0, dip=10.0.2.15, dp=56303, threatintels.ip.dip=, enrichments.geo.sip.postalCode=test postalCode, duration=0.000, rpkt=0, enrichments.geo.dip.country=test country, original_string=2016-01-28 15:29:48.506|2016-01-28 15:29:48.506|   0.000|   0.000| 17|                                10.0.2.3|   53|                               10.0.2.15|56303|       A|       0|       0|       0|00000000|00000000|000|000|       1|      84|       0|       0|    0|idle, enrichments.geo.dip.locID=1, enrichments.geo.sip.city=test city, enrichments.geo.dip.latitude=test latitude, enrichments.geo.sip.country=test country, enrichments.geo.dip.city=test city, enrichments.geo.sip.dmaCode=test dmaCode, pkt=1, enrichments.geo.sip.location_point=test longitude,test latitude, ruflags=0, roct=0, sip=10.0.2.3, tag=0, enrichments.geo.dip.dmaCode=test dmaCode, rtag=0, sp=53, enrichmen
 ts.geo.sip.longitude=test longitude, enrichments.geo.sip.latitude=test latitude, timestamp=1453994988506, app=0, oct=84, end_reason=idle, enrichments.geo.sip.locID=1, risn=0, enrichments.host.dip.known_info.type=printer, end_time=1453994988506, enrichments.host.dip.known_info.asset_value=important, enrichments.geo.dip.postalCode=test postalCode, source.type=yaf, enrichments.host.sip=, start_time=1453994988506, threatintels.ip.sip.threat_source=ip_threat_intel, riflags=0, rtt=0.000, proto=17, enrichments.host.dip.known_info.local=YES}
-{enrichments.geo.dip.longitude=test longitude, iflags=S, enrichments.geo.dip.location_point=test longitude,test latitude, uflags=0, enrichments.host.sip.known_info.asset_value=important, isn=58c52fca, dip=216.21.170.221, enrichments.host.sip.known_info.local=YES, dp=80, threatintels.ip.dip=, enrichments.geo.sip.postalCode=test postalCode, duration=0.000, rpkt=0, enrichments.geo.dip.country=test country, original_string=2016-01-28 15:29:48.508|2016-01-28 15:29:48.508|   0.000|   0.000|  6|                               10.0.2.15|39468|                          216.21.170.221|   80|       S|       0|       0|       0|58c52fca|00000000|000|000|       1|      60|       0|       0|    0|idle, enrichments.geo.dip.locID=1, enrichments.geo.sip.city=test city, enrichments.host.sip.known_info.type=printer, enrichments.geo.dip.latitude=test latitude, enrichments.geo.sip.country=test country, enrichments.geo.dip.city=test city, enrichments.geo.sip.dmaCode=test dmaCode, pkt=1, enrichments.geo.si
 p.location_point=test longitude,test latitude, ruflags=0, roct=0, sip=10.0.2.15, tag=0, enrichments.geo.dip.dmaCode=test dmaCode, rtag=0, sp=39468, enrichments.geo.sip.longitude=test longitude, enrichments.geo.sip.latitude=test latitude, timestamp=1453994988508, app=0, threatintels.ip.sip=, enrichments.host.dip=, oct=60, end_reason=idle, enrichments.geo.sip.locID=1, risn=0, end_time=1453994988508, enrichments.geo.dip.postalCode=test postalCode, source.type=yaf, start_time=1453994988508, riflags=0, rtt=0.000, proto=6}
-{enrichments.geo.dip.longitude=test longitude, iflags=A, enrichments.geo.dip.location_point=test longitude,test latitude, uflags=0, enrichments.host.sip.known_info.asset_value=important, isn=58c52fcb, dip=216.21.170.221, enrichments.host.sip.known_info.local=YES, dp=80, threatintels.ip.dip=, enrichments.geo.sip.postalCode=test postalCode, duration=0.000, rpkt=0, enrichments.geo.dip.country=test country, original_string=2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                               10.0.2.15|39468|                          216.21.170.221|   80|       A|       0|       0|       0|58c52fcb|00000000|000|000|       1|      40|       0|       0|    0|idle , enrichments.geo.dip.locID=1, enrichments.geo.sip.city=test city, enrichments.host.sip.known_info.type=printer, enrichments.geo.dip.latitude=test latitude, enrichments.geo.sip.country=test country, enrichments.geo.dip.city=test city, enrichments.geo.sip.dmaCode=test dmaCode, pkt=1, enrichments.geo.s
 ip.location_point=test longitude,test latitude, ruflags=0, roct=0, sip=10.0.2.15, tag=0, enrichments.geo.dip.dmaCode=test dmaCode, rtag=0, sp=39468, enrichments.geo.sip.longitude=test longitude, enrichments.geo.sip.latitude=test latitude, timestamp=1453994988512, app=0, threatintels.ip.sip=, enrichments.host.dip=, oct=40, end_reason=idle , enrichments.geo.sip.locID=1, risn=0, end_time=1453994988512, enrichments.geo.dip.postalCode=test postalCode, source.type=yaf, start_time=1453994988512, riflags=0, rtt=0.000, proto=6}
-{enrichments.geo.dip.longitude=test longitude, iflags=AP, enrichments.geo.dip.location_point=test longitude,test latitude, uflags=0, enrichments.host.sip.known_info.asset_value=important, isn=58c52fcb, dip=216.21.170.221, enrichments.host.sip.known_info.local=YES, dp=80, threatintels.ip.dip=, enrichments.geo.sip.postalCode=test postalCode, duration=0.000, rpkt=0, enrichments.geo.dip.country=test country, original_string=2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                               10.0.2.15|39468|                          216.21.170.221|   80|      AP|       0|       0|       0|58c52fcb|00000000|000|000|       1|     148|       0|       0|    0|idle , enrichments.geo.dip.locID=1, enrichments.geo.sip.city=test city, enrichments.host.sip.known_info.type=printer, enrichments.geo.dip.latitude=test latitude, enrichments.geo.sip.country=test country, enrichments.geo.dip.city=test city, enrichments.geo.sip.dmaCode=test dmaCode, pkt=1, enrichments.geo.
 sip.location_point=test longitude,test latitude, ruflags=0, roct=0, sip=10.0.2.15, tag=0, enrichments.geo.dip.dmaCode=test dmaCode, rtag=0, sp=39468, enrichments.geo.sip.longitude=test longitude, enrichments.geo.sip.latitude=test latitude, timestamp=1453994988512, app=0, threatintels.ip.sip=, enrichments.host.dip=, oct=148, end_reason=idle , enrichments.geo.sip.locID=1, risn=0, end_time=1453994988512, enrichments.geo.dip.postalCode=test postalCode, source.type=yaf, start_time=1453994988512, riflags=0, rtt=0.000, proto=6}
-{enrichments.geo.dip.longitude=test longitude, iflags=A, enrichments.geo.dip.location_point=test longitude,test latitude, uflags=0, isn=22efa002, dip=10.0.2.15, dp=39468, threatintels.ip.dip=, enrichments.geo.sip.postalCode=test postalCode, duration=0.000, rpkt=0, enrichments.geo.dip.country=test country, original_string=2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                          216.21.170.221|   80|                               10.0.2.15|39468|       A|       0|       0|       0|22efa002|00000000|000|000|       1|      40|       0|       0|    0|idle , enrichments.geo.dip.locID=1, enrichments.geo.sip.city=test city, enrichments.geo.dip.latitude=test latitude, enrichments.geo.sip.country=test country, enrichments.geo.dip.city=test city, enrichments.geo.sip.dmaCode=test dmaCode, pkt=1, enrichments.geo.sip.location_point=test longitude,test latitude, ruflags=0, roct=0, sip=216.21.170.221, tag=0, enrichments.geo.dip.dmaCode=test dmaCode, rtag=0, sp
 =80, enrichments.geo.sip.longitude=test longitude, enrichments.geo.sip.latitude=test latitude, timestamp=1453994988512, app=0, threatintels.ip.sip=, oct=40, end_reason=idle , enrichments.geo.sip.locID=1, risn=0, enrichments.host.dip.known_info.type=printer, end_time=1453994988512, enrichments.host.dip.known_info.asset_value=important, enrichments.geo.dip.postalCode=test postalCode, source.type=yaf, enrichments.host.sip=, start_time=1453994988512, riflags=0, rtt=0.000, proto=6, enrichments.host.dip.known_info.local=YES}
-{enrichments.geo.dip.longitude=test longitude, iflags=AP, enrichments.geo.dip.location_point=test longitude,test latitude, uflags=0, isn=22efa002, dip=10.0.2.15, dp=39468, threatintels.ip.dip=, enrichments.geo.sip.postalCode=test postalCode, duration=0.000, rpkt=0, enrichments.geo.dip.country=test country, original_string=2016-01-28 15:29:48.562|2016-01-28 15:29:48.562|   0.000|   0.000|  6|                          216.21.170.221|   80|                               10.0.2.15|39468|      AP|       0|       0|       0|22efa002|00000000|000|000|       1|     604|       0|       0|    0|idle, enrichments.geo.dip.locID=1, enrichments.geo.sip.city=test city, enrichments.geo.dip.latitude=test latitude, enrichments.geo.sip.country=test country, enrichments.geo.dip.city=test city, enrichments.geo.sip.dmaCode=test dmaCode, pkt=1, enrichments.geo.sip.location_point=test longitude,test latitude, ruflags=0, roct=0, sip=216.21.170.221, tag=0, enrichments.geo.dip.dmaCode=test dmaCode, rtag=0, sp
 =80, enrichments.geo.sip.longitude=test longitude, enrichments.geo.sip.latitude=test latitude, timestamp=1453994988562, app=0, threatintels.ip.sip=, oct=604, end_reason=idle, enrichments.geo.sip.locID=1, risn=0, enrichments.host.dip.known_info.type=printer, end_time=1453994988562, enrichments.host.dip.known_info.asset_value=important, enrichments.geo.dip.postalCode=test postalCode, source.type=yaf, enrichments.host.sip=, start_time=1453994988562, riflags=0, rtt=0.000, proto=6, enrichments.host.dip.known_info.local=YES}
+{"adapter.threatinteladapter.end.ts":"1457102731219","enrichments.geo.dip.location_point":"test longitude,test latitude","isn":"22efa001","index.elasticsearchwriter.ts":"1457102731220","dip":"10.0.2.15","dp":39468,"rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                          216.21.170.221|   80|                               10.0.2.15|39468|      AS|       0|       0|       0|22efa001|00000000|000|000|       1|      44|       0|       0|    0|idle","enrichments.geo.dip.locID":"1","enrichments.geo.sip.city":"test city","enrichmentjoinbolt.joiner.ts":"1457102731206","adapter.hostfromjsonlistadapter.begin.ts":"1457102731185","tag":0,"enrichments.geo.dip.dmaCode":"test dmaCode","app":0,"oct":44,"end_reason":"idle","enrichments.geo.sip.locID":"1","adapter.mockgeoadapter.begin.ts":"1457102731185","threatintelsplitterbolt.splitter.ts":"1457102731207","enrichments.geo.dip.postalCode":"test postalCode","start_time":1453994988512,
 "adapter.threatinteladapter.begin.ts":"1457102731210","riflags":0,"proto":6,"enrichments.host.dip.known_info.local":"YES","enrichments.geo.dip.longitude":"test longitude","iflags":"AS","uflags":0,"adapter.mockgeoadapter.end.ts":"1457102731198","adapter.hostfromjsonlistadapter.end.ts":"1457102731197","enrichments.geo.sip.postalCode":"test postalCode","duration":"0.000","enrichments.geo.dip.country":"test country","threatinteljoinbolt.joiner.ts":"1457102731220","enrichments.geo.dip.latitude":"test latitude","enrichments.geo.sip.country":"test country","enrichments.geo.dip.city":"test city","enrichments.geo.sip.dmaCode":"test dmaCode","pkt":1,"enrichments.geo.sip.location_point":"test longitude,test latitude","ruflags":0,"roct":0,"sip":"216.21.170.221","rtag":0,"sp":80,"enrichments.geo.sip.longitude":"test longitude","enrichments.geo.sip.latitude":"test latitude","timestamp":1453994988512,"risn":0,"enrichments.host.dip.known_info.type":"printer","end_time":1453994988512,"enrichments.ho
 st.dip.known_info.asset_value":"important","source.type":"yaf","rtt":"0.000"}
+{"adapter.threatinteladapter.end.ts":"1457102731221","enrichments.geo.dip.location_point":"test longitude,test latitude","enrichments.host.sip.known_info.asset_value":"important","isn":10000000,"index.elasticsearchwriter.ts":"1457102731221","dip":"10.0.2.3","dp":53,"rpkt":0,"original_string":"2016-01-28 15:29:48.502|2016-01-28 15:29:48.502|   0.000|   0.000| 17|                               10.0.2.15|37299|                                10.0.2.3|   53|       A|       0|       0|       0|10000000|00000000|000|000|       1|      56|       0|       0|    0|idle","enrichments.geo.dip.locID":"1","enrichments.geo.sip.city":"test city","enrichments.host.sip.known_info.type":"printer","enrichmentjoinbolt.joiner.ts":"1457102731208","adapter.hostfromjsonlistadapter.begin.ts":"1457102731197","tag":0,"enrichments.geo.dip.dmaCode":"test dmaCode","app":0,"oct":56,"end_reason":"idle","enrichments.geo.sip.locID":"1","adapter.mockgeoadapter.begin.ts":"1457102731198","threatintelsplitterbolt.splitt
 er.ts":"1457102731210","enrichments.geo.dip.postalCode":"test postalCode","start_time":1453994988502,"adapter.threatinteladapter.begin.ts":"1457102731219","riflags":0,"proto":17,"enrichments.geo.dip.longitude":"test longitude","iflags":"A","uflags":0,"adapter.mockgeoadapter.end.ts":"1457102731198","adapter.hostfromjsonlistadapter.end.ts":"1457102731197","enrichments.host.sip.known_info.local":"YES","threatintels.ip.dip.ip_threat_intel":"alert","enrichments.geo.sip.postalCode":"test postalCode","duration":"0.000","enrichments.geo.dip.country":"test country","threatinteljoinbolt.joiner.ts":"1457102731221","enrichments.geo.dip.latitude":"test latitude","enrichments.geo.sip.country":"test country","enrichments.geo.dip.city":"test city","enrichments.geo.sip.dmaCode":"test dmaCode","pkt":1,"enrichments.geo.sip.location_point":"test longitude,test latitude","ruflags":0,"roct":0,"sip":"10.0.2.15","rtag":0,"sp":37299,"enrichments.geo.sip.longitude":"test longitude","enrichments.geo.sip.latit
 ude":"test latitude","timestamp":1453994988502,"risn":0,"end_time":1453994988502,"is_alert":"true","source.type":"yaf","rtt":"0.000"}
+{"adapter.threatinteladapter.end.ts":"1457102731221","enrichments.geo.dip.location_point":"test longitude,test latitude","isn":0,"index.elasticsearchwriter.ts":"1457102731222","dip":"10.0.2.15","dp":37299,"rpkt":0,"original_string":"2016-01-28 15:29:48.504|2016-01-28 15:29:48.504|   0.000|   0.000| 17|                                10.0.2.3|   53|                               10.0.2.15|37299|       A|       0|       0|       0|00000000|00000000|000|000|       1|     312|       0|       0|    0|idle","enrichments.geo.dip.locID":"1","enrichments.geo.sip.city":"test city","enrichmentjoinbolt.joiner.ts":"1457102731209","adapter.hostfromjsonlistadapter.begin.ts":"1457102731197","tag":0,"enrichments.geo.dip.dmaCode":"test dmaCode","app":0,"oct":312,"end_reason":"idle","enrichments.geo.sip.locID":"1","adapter.mockgeoadapter.begin.ts":"1457102731198","threatintelsplitterbolt.splitter.ts":"1457102731210","enrichments.geo.dip.postalCode":"test postalCode","start_time":1453994988504,"adapter
 .threatinteladapter.begin.ts":"1457102731221","riflags":0,"proto":17,"enrichments.host.dip.known_info.local":"YES","enrichments.geo.dip.longitude":"test longitude","iflags":"A","uflags":0,"adapter.mockgeoadapter.end.ts":"1457102731199","adapter.hostfromjsonlistadapter.end.ts":"1457102731198","enrichments.geo.sip.postalCode":"test postalCode","duration":"0.000","enrichments.geo.dip.country":"test country","threatinteljoinbolt.joiner.ts":"1457102731222","enrichments.geo.dip.latitude":"test latitude","enrichments.geo.sip.country":"test country","enrichments.geo.dip.city":"test city","enrichments.geo.sip.dmaCode":"test dmaCode","pkt":1,"enrichments.geo.sip.location_point":"test longitude,test latitude","ruflags":0,"roct":0,"sip":"10.0.2.3","rtag":0,"sp":53,"enrichments.geo.sip.longitude":"test longitude","enrichments.geo.sip.latitude":"test latitude","timestamp":1453994988504,"risn":0,"enrichments.host.dip.known_info.type":"printer","end_time":1453994988504,"enrichments.host.dip.known_i
 nfo.asset_value":"important","is_alert":"true","source.type":"yaf","threatintels.ip.sip.ip_threat_intel":"alert","rtt":"0.000"}
+{"adapter.threatinteladapter.end.ts":"1457102731222","enrichments.geo.dip.location_point":"test longitude,test latitude","enrichments.host.sip.known_info.asset_value":"important","isn":0,"index.elasticsearchwriter.ts":"1457102731222","dip":"10.0.2.3","dp":53,"rpkt":0,"original_string":"2016-01-28 15:29:48.504|2016-01-28 15:29:48.504|   0.000|   0.000| 17|                               10.0.2.15|56303|                                10.0.2.3|   53|       A|       0|       0|       0|00000000|00000000|000|000|       1|      56|       0|       0|    0|idle","enrichments.geo.dip.locID":"1","enrichments.geo.sip.city":"test city","enrichments.host.sip.known_info.type":"printer","enrichmentjoinbolt.joiner.ts":"1457102731209","adapter.hostfromjsonlistadapter.begin.ts":"1457102731198","tag":0,"enrichments.geo.dip.dmaCode":"test dmaCode","app":0,"oct":56,"end_reason":"idle","enrichments.geo.sip.locID":"1","adapter.mockgeoadapter.begin.ts":"1457102731199","threatintelsplitterbolt.splitter.ts":
 "1457102731211","enrichments.geo.dip.postalCode":"test postalCode","start_time":1453994988504,"adapter.threatinteladapter.begin.ts":"1457102731221","riflags":0,"proto":17,"enrichments.geo.dip.longitude":"test longitude","iflags":"A","uflags":0,"adapter.mockgeoadapter.end.ts":"1457102731199","adapter.hostfromjsonlistadapter.end.ts":"1457102731198","enrichments.host.sip.known_info.local":"YES","threatintels.ip.dip.ip_threat_intel":"alert","enrichments.geo.sip.postalCode":"test postalCode","duration":"0.000","enrichments.geo.dip.country":"test country","threatinteljoinbolt.joiner.ts":"1457102731222","enrichments.geo.dip.latitude":"test latitude","enrichments.geo.sip.country":"test country","enrichments.geo.dip.city":"test city","enrichments.geo.sip.dmaCode":"test dmaCode","pkt":1,"enrichments.geo.sip.location_point":"test longitude,test latitude","ruflags":0,"roct":0,"sip":"10.0.2.15","rtag":0,"sp":56303,"enrichments.geo.sip.longitude":"test longitude","enrichments.geo.sip.latitude":"t
 est latitude","timestamp":1453994988504,"risn":0,"end_time":1453994988504,"is_alert":"true","source.type":"yaf","rtt":"0.000"}
+{"adapter.threatinteladapter.end.ts":"1457102731222","enrichments.geo.dip.location_point":"test longitude,test latitude","isn":0,"index.elasticsearchwriter.ts":"1457102731222","dip":"10.0.2.15","dp":56303,"rpkt":0,"original_string":"2016-01-28 15:29:48.506|2016-01-28 15:29:48.506|   0.000|   0.000| 17|                                10.0.2.3|   53|                               10.0.2.15|56303|       A|       0|       0|       0|00000000|00000000|000|000|       1|      84|       0|       0|    0|idle","enrichments.geo.dip.locID":"1","enrichments.geo.sip.city":"test city","enrichmentjoinbolt.joiner.ts":"1457102731210","adapter.hostfromjsonlistadapter.begin.ts":"1457102731198","tag":0,"enrichments.geo.dip.dmaCode":"test dmaCode","app":0,"oct":84,"end_reason":"idle","enrichments.geo.sip.locID":"1","adapter.mockgeoadapter.begin.ts":"1457102731199","threatintelsplitterbolt.splitter.ts":"1457102731212","enrichments.geo.dip.postalCode":"test postalCode","start_time":1453994988506,"adapter.
 threatinteladapter.begin.ts":"1457102731222","riflags":0,"proto":17,"enrichments.host.dip.known_info.local":"YES","enrichments.geo.dip.longitude":"test longitude","iflags":"A","uflags":0,"adapter.mockgeoadapter.end.ts":"1457102731199","adapter.hostfromjsonlistadapter.end.ts":"1457102731198","enrichments.geo.sip.postalCode":"test postalCode","duration":"0.000","enrichments.geo.dip.country":"test country","threatinteljoinbolt.joiner.ts":"1457102731222","enrichments.geo.dip.latitude":"test latitude","enrichments.geo.sip.country":"test country","enrichments.geo.dip.city":"test city","enrichments.geo.sip.dmaCode":"test dmaCode","pkt":1,"enrichments.geo.sip.location_point":"test longitude,test latitude","ruflags":0,"roct":0,"sip":"10.0.2.3","rtag":0,"sp":53,"enrichments.geo.sip.longitude":"test longitude","enrichments.geo.sip.latitude":"test latitude","timestamp":1453994988506,"risn":0,"enrichments.host.dip.known_info.type":"printer","end_time":1453994988506,"enrichments.host.dip.known_in
 fo.asset_value":"important","is_alert":"true","source.type":"yaf","threatintels.ip.sip.ip_threat_intel":"alert","rtt":"0.000"}
+{"adapter.threatinteladapter.end.ts":"1457102731222","enrichments.geo.dip.location_point":"test longitude,test latitude","enrichments.host.sip.known_info.asset_value":"important","isn":"58c52fca","index.elasticsearchwriter.ts":"1457102732038","dip":"216.21.170.221","dp":80,"rpkt":0,"original_string":"2016-01-28 15:29:48.508|2016-01-28 15:29:48.508|   0.000|   0.000|  6|                               10.0.2.15|39468|                          216.21.170.221|   80|       S|       0|       0|       0|58c52fca|00000000|000|000|       1|      60|       0|       0|    0|idle","enrichments.geo.dip.locID":"1","enrichments.geo.sip.city":"test city","enrichments.host.sip.known_info.type":"printer","enrichmentjoinbolt.joiner.ts":"1457102731210","adapter.hostfromjsonlistadapter.begin.ts":"1457102731198","tag":0,"enrichments.geo.dip.dmaCode":"test dmaCode","app":0,"oct":60,"end_reason":"idle","enrichments.geo.sip.locID":"1","adapter.mockgeoadapter.begin.ts":"1457102731199","threatintelsplitterbol
 t.splitter.ts":"1457102731212","enrichments.geo.dip.postalCode":"test postalCode","start_time":1453994988508,"adapter.threatinteladapter.begin.ts":"1457102731222","riflags":0,"proto":6,"enrichments.geo.dip.longitude":"test longitude","iflags":"S","uflags":0,"adapter.mockgeoadapter.end.ts":"1457102731199","adapter.hostfromjsonlistadapter.end.ts":"1457102731198","enrichments.host.sip.known_info.local":"YES","enrichments.geo.sip.postalCode":"test postalCode","duration":"0.000","enrichments.geo.dip.country":"test country","threatinteljoinbolt.joiner.ts":"1457102731223","enrichments.geo.dip.latitude":"test latitude","enrichments.geo.sip.country":"test country","enrichments.geo.dip.city":"test city","enrichments.geo.sip.dmaCode":"test dmaCode","pkt":1,"enrichments.geo.sip.location_point":"test longitude,test latitude","ruflags":0,"roct":0,"sip":"10.0.2.15","rtag":0,"sp":39468,"enrichments.geo.sip.longitude":"test longitude","enrichments.geo.sip.latitude":"test latitude","timestamp":145399
 4988508,"risn":0,"end_time":1453994988508,"source.type":"yaf","rtt":"0.000"}
+{"adapter.threatinteladapter.end.ts":"1457102731223","enrichments.geo.dip.location_point":"test longitude,test latitude","enrichments.host.sip.known_info.asset_value":"important","isn":"58c52fcb","index.elasticsearchwriter.ts":"1457102732038","dip":"216.21.170.221","dp":80,"rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                               10.0.2.15|39468|                          216.21.170.221|   80|       A|       0|       0|       0|58c52fcb|00000000|000|000|       1|      40|       0|       0|    0|idle ","enrichments.geo.dip.locID":"1","enrichments.geo.sip.city":"test city","enrichments.host.sip.known_info.type":"printer","enrichmentjoinbolt.joiner.ts":"1457102731210","adapter.hostfromjsonlistadapter.begin.ts":"1457102731198","tag":0,"enrichments.geo.dip.dmaCode":"test dmaCode","app":0,"oct":40,"end_reason":"idle ","enrichments.geo.sip.locID":"1","adapter.mockgeoadapter.begin.ts":"1457102731199","threatintelsplitterb
 olt.splitter.ts":"1457102731212","enrichments.geo.dip.postalCode":"test postalCode","start_time":1453994988512,"adapter.threatinteladapter.begin.ts":"1457102731223","riflags":0,"proto":6,"enrichments.geo.dip.longitude":"test longitude","iflags":"A","uflags":0,"adapter.mockgeoadapter.end.ts":"1457102731199","adapter.hostfromjsonlistadapter.end.ts":"1457102731198","enrichments.host.sip.known_info.local":"YES","enrichments.geo.sip.postalCode":"test postalCode","duration":"0.000","enrichments.geo.dip.country":"test country","threatinteljoinbolt.joiner.ts":"1457102731223","enrichments.geo.dip.latitude":"test latitude","enrichments.geo.sip.country":"test country","enrichments.geo.dip.city":"test city","enrichments.geo.sip.dmaCode":"test dmaCode","pkt":1,"enrichments.geo.sip.location_point":"test longitude,test latitude","ruflags":0,"roct":0,"sip":"10.0.2.15","rtag":0,"sp":39468,"enrichments.geo.sip.longitude":"test longitude","enrichments.geo.sip.latitude":"test latitude","timestamp":1453
 994988512,"risn":0,"end_time":1453994988512,"source.type":"yaf","rtt":"0.000"}
+{"adapter.threatinteladapter.end.ts":"1457102731223","enrichments.geo.dip.location_point":"test longitude,test latitude","enrichments.host.sip.known_info.asset_value":"important","isn":"58c52fcb","index.elasticsearchwriter.ts":"1457102732038","dip":"216.21.170.221","dp":80,"rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                               10.0.2.15|39468|                          216.21.170.221|   80|      AP|       0|       0|       0|58c52fcb|00000000|000|000|       1|     148|       0|       0|    0|idle ","enrichments.geo.dip.locID":"1","enrichments.geo.sip.city":"test city","enrichments.host.sip.known_info.type":"printer","enrichmentjoinbolt.joiner.ts":"1457102731210","adapter.hostfromjsonlistadapter.begin.ts":"1457102731198","tag":0,"enrichments.geo.dip.dmaCode":"test dmaCode","app":0,"oct":148,"end_reason":"idle ","enrichments.geo.sip.locID":"1","adapter.mockgeoadapter.begin.ts":"1457102731199","threatintelsplitter
 bolt.splitter.ts":"1457102731212","enrichments.geo.dip.postalCode":"test postalCode","start_time":1453994988512,"adapter.threatinteladapter.begin.ts":"1457102731223","riflags":0,"proto":6,"enrichments.geo.dip.longitude":"test longitude","iflags":"AP","uflags":0,"adapter.mockgeoadapter.end.ts":"1457102731199","adapter.hostfromjsonlistadapter.end.ts":"1457102731198","enrichments.host.sip.known_info.local":"YES","enrichments.geo.sip.postalCode":"test postalCode","duration":"0.000","enrichments.geo.dip.country":"test country","threatinteljoinbolt.joiner.ts":"1457102731225","enrichments.geo.dip.latitude":"test latitude","enrichments.geo.sip.country":"test country","enrichments.geo.dip.city":"test city","enrichments.geo.sip.dmaCode":"test dmaCode","pkt":1,"enrichments.geo.sip.location_point":"test longitude,test latitude","ruflags":0,"roct":0,"sip":"10.0.2.15","rtag":0,"sp":39468,"enrichments.geo.sip.longitude":"test longitude","enrichments.geo.sip.latitude":"test latitude","timestamp":14
 53994988512,"risn":0,"end_time":1453994988512,"source.type":"yaf","rtt":"0.000"}
+{"adapter.threatinteladapter.end.ts":"1457102731225","enrichments.geo.dip.location_point":"test longitude,test latitude","isn":"22efa002","index.elasticsearchwriter.ts":"1457102732038","dip":"10.0.2.15","dp":39468,"rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                          216.21.170.221|   80|                               10.0.2.15|39468|       A|       0|       0|       0|22efa002|00000000|000|000|       1|      40|       0|       0|    0|idle ","enrichments.geo.dip.locID":"1","enrichments.geo.sip.city":"test city","enrichmentjoinbolt.joiner.ts":"1457102731211","adapter.hostfromjsonlistadapter.begin.ts":"1457102731198","tag":0,"enrichments.geo.dip.dmaCode":"test dmaCode","app":0,"oct":40,"end_reason":"idle ","enrichments.geo.sip.locID":"1","adapter.mockgeoadapter.begin.ts":"1457102731199","threatintelsplitterbolt.splitter.ts":"1457102731212","enrichments.geo.dip.postalCode":"test postalCode","start_time":145399498851
 2,"adapter.threatinteladapter.begin.ts":"1457102731223","riflags":0,"proto":6,"enrichments.host.dip.known_info.local":"YES","enrichments.geo.dip.longitude":"test longitude","iflags":"A","uflags":0,"adapter.mockgeoadapter.end.ts":"1457102731199","adapter.hostfromjsonlistadapter.end.ts":"1457102731198","enrichments.geo.sip.postalCode":"test postalCode","duration":"0.000","enrichments.geo.dip.country":"test country","threatinteljoinbolt.joiner.ts":"1457102731225","enrichments.geo.dip.latitude":"test latitude","enrichments.geo.sip.country":"test country","enrichments.geo.dip.city":"test city","enrichments.geo.sip.dmaCode":"test dmaCode","pkt":1,"enrichments.geo.sip.location_point":"test longitude,test latitude","ruflags":0,"roct":0,"sip":"216.21.170.221","rtag":0,"sp":80,"enrichments.geo.sip.longitude":"test longitude","enrichments.geo.sip.latitude":"test latitude","timestamp":1453994988512,"risn":0,"enrichments.host.dip.known_info.type":"printer","end_time":1453994988512,"enrichments.h
 ost.dip.known_info.asset_value":"important","source.type":"yaf","rtt":"0.000"}
+{"adapter.threatinteladapter.end.ts":"1457102731226","enrichments.geo.dip.location_point":"test longitude,test latitude","isn":"22efa002","index.elasticsearchwriter.ts":"1457102732038","dip":"10.0.2.15","dp":39468,"rpkt":0,"original_string":"2016-01-28 15:29:48.562|2016-01-28 15:29:48.562|   0.000|   0.000|  6|                          216.21.170.221|   80|                               10.0.2.15|39468|      AP|       0|       0|       0|22efa002|00000000|000|000|       1|     604|       0|       0|    0|idle","enrichments.geo.dip.locID":"1","enrichments.geo.sip.city":"test city","enrichmentjoinbolt.joiner.ts":"1457102731211","adapter.hostfromjsonlistadapter.begin.ts":"1457102731198","tag":0,"enrichments.geo.dip.dmaCode":"test dmaCode","app":0,"oct":604,"end_reason":"idle","enrichments.geo.sip.locID":"1","adapter.mockgeoadapter.begin.ts":"1457102731199","threatintelsplitterbolt.splitter.ts":"1457102731213","enrichments.geo.dip.postalCode":"test postalCode","start_time":1453994988562
 ,"adapter.threatinteladapter.begin.ts":"1457102731226","riflags":0,"proto":6,"enrichments.host.dip.known_info.local":"YES","enrichments.geo.dip.longitude":"test longitude","iflags":"AP","uflags":0,"adapter.mockgeoadapter.end.ts":"1457102731199","adapter.hostfromjsonlistadapter.end.ts":"1457102731198","enrichments.geo.sip.postalCode":"test postalCode","duration":"0.000","enrichments.geo.dip.country":"test country","threatinteljoinbolt.joiner.ts":"1457102731226","enrichments.geo.dip.latitude":"test latitude","enrichments.geo.sip.country":"test country","enrichments.geo.dip.city":"test city","enrichments.geo.sip.dmaCode":"test dmaCode","pkt":1,"enrichments.geo.sip.location_point":"test longitude,test latitude","ruflags":0,"roct":0,"sip":"216.21.170.221","rtag":0,"sp":80,"enrichments.geo.sip.longitude":"test longitude","enrichments.geo.sip.latitude":"test latitude","timestamp":1453994988562,"risn":0,"enrichments.host.dip.known_info.type":"printer","end_time":1453994988562,"enrichments.h
 ost.dip.known_info.asset_value":"important","source.type":"yaf","rtt":"0.000"}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/SnortParsed
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/SnortParsed b/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/SnortParsed
index 4b74794..86236ea 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/SnortParsed
+++ b/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/SnortParsed
@@ -1,3 +1,3 @@
-{"msg":"\"Consecutive TCP small segments exceeding threshold\"","sig_rev":"1","dst":"10.0.2.15","dstport":"22","ethsrc":"52:54:00:12:35:02","tcpseq":"0x9AFF3D7","dgmlen":"64","icmpid":"","tcplen":"","tcpwindow":"0xFFFF","icmpseq":"","tcpack":"0xC8761D52","original_string":"01\/27-16:01:04.877970 ,129,12,1,\"Consecutive TCP small segments exceeding threshold\",TCP,10.0.2.2,56642,10.0.2.15,22,52:54:00:12:35:02,08:00:27:7F:93:2D,0x4E,***AP***,0x9AFF3D7,0xC8761D52,,0xFFFF,64,0,59677,64,65536,,,,","icmpcode":"","tos":"0","id":"59677","timestamp":1453932941970,"ethdst":"08:00:27:7F:93:2D","src":"10.0.2.2","ttl":"64","source.type":"test","ethlen":"0x4E","iplen":"65536","icmptype":"","proto":"TCP","srcport":"56642","tcpflags":"***AP***","sig_id":"12","sig_generator":"129"}
-{"msg":"\"Consecutive TCP small segments exceeding threshold\"","sig_rev":"1","dst":"10.0.2.15","dstport":"50895","ethsrc":"52:54:00:12:35:02","tcpseq":"0xDB45F7A","dgmlen":"96","icmpid":"","tcplen":"","tcpwindow":"0xFFFF","icmpseq":"","tcpack":"0x7701DD5B","original_string":"02\/22-15:56:48.612494 ,129,12,1,\"Consecutive TCP small segments exceeding threshold\",TCP,96.44.142.5,80,10.0.2.15,50895,52:54:00:12:35:02,08:00:27:7F:93:2D,0x6E,***AP***,0xDB45F7A,0x7701DD5B,,0xFFFF,64,0,16785,96,98304,,,,","icmpcode":"","tos":"0","id":"16785","timestamp":1456178820494,"ethdst":"08:00:27:7F:93:2D","src":"96.44.142.5","ttl":"64","source.type":"test","ethlen":"0x6E","iplen":"98304","icmptype":"","proto":"TCP","srcport":"80","tcpflags":"***AP***","sig_id":"12","sig_generator":"129"}
-{"msg":"\"Consecutive TCP small segments exceeding threshold\"","sig_rev":"1","dst":"10.0.2.15","dstport":"50895","ethsrc":"52:54:00:12:35:02","tcpseq":"0xDB508F2","dgmlen":"152","icmpid":"","tcplen":"","tcpwindow":"0xFFFF","icmpseq":"","tcpack":"0x7701DD5B","original_string":"02\/22-15:56:48.616775 ,129,12,1,\"Consecutive TCP small segments exceeding threshold\",TCP,96.44.142.5,80,10.0.2.15,50895,52:54:00:12:35:02,08:00:27:7F:93:2D,0xA6,***AP***,0xDB508F2,0x7701DD5B,,0xFFFF,64,0,16824,152,155648,,,,","icmpcode":"","tos":"0","id":"16824","timestamp":1456178824775,"ethdst":"08:00:27:7F:93:2D","src":"96.44.142.5","ttl":"64","source.type":"test","ethlen":"0xA6","iplen":"155648","icmptype":"","proto":"TCP","srcport":"80","tcpflags":"***AP***","sig_id":"12","sig_generator":"129"}
+{"msg":"\"Consecutive TCP small segments exceeding threshold\"","sig_rev":"1","dst":"10.0.2.15","dstport":"22","ethsrc":"52:54:00:12:35:02","tcpseq":"0x9AFF3D7","dgmlen":"64","icmpid":"","tcplen":"","tcpwindow":"0xFFFF","icmpseq":"","tcpack":"0xC8761D52","original_string":"01\/27-16:01:04.877970 ,129,12,1,\"Consecutive TCP small segments exceeding threshold\",TCP,10.0.2.2,56642,10.0.2.15,22,52:54:00:12:35:02,08:00:27:7F:93:2D,0x4E,***AP***,0x9AFF3D7,0xC8761D52,,0xFFFF,64,0,59677,64,65536,,,,","icmpcode":"","tos":"0","id":"59677","timestamp":1453932941970,"ethdst":"08:00:27:7F:93:2D","src":"10.0.2.2","ttl":"64","source.type":"test","ethlen":"0x4E","iplen":"65536","icmptype":"","proto":"TCP","srcport":"56642","tcpflags":"***AP***","sig_id":"12","sig_generator":"129", "is_alert" : "true"}
+{"msg":"\"Consecutive TCP small segments exceeding threshold\"","sig_rev":"1","dst":"10.0.2.15","dstport":"50895","ethsrc":"52:54:00:12:35:02","tcpseq":"0xDB45F7A","dgmlen":"96","icmpid":"","tcplen":"","tcpwindow":"0xFFFF","icmpseq":"","tcpack":"0x7701DD5B","original_string":"02\/22-15:56:48.612494 ,129,12,1,\"Consecutive TCP small segments exceeding threshold\",TCP,96.44.142.5,80,10.0.2.15,50895,52:54:00:12:35:02,08:00:27:7F:93:2D,0x6E,***AP***,0xDB45F7A,0x7701DD5B,,0xFFFF,64,0,16785,96,98304,,,,","icmpcode":"","tos":"0","id":"16785","timestamp":1456178820494,"ethdst":"08:00:27:7F:93:2D","src":"96.44.142.5","ttl":"64","source.type":"test","ethlen":"0x6E","iplen":"98304","icmptype":"","proto":"TCP","srcport":"80","tcpflags":"***AP***","sig_id":"12","sig_generator":"129", "is_alert" : "true"}
+{"msg":"\"Consecutive TCP small segments exceeding threshold\"","sig_rev":"1","dst":"10.0.2.15","dstport":"50895","ethsrc":"52:54:00:12:35:02","tcpseq":"0xDB508F2","dgmlen":"152","icmpid":"","tcplen":"","tcpwindow":"0xFFFF","icmpseq":"","tcpack":"0x7701DD5B","original_string":"02\/22-15:56:48.616775 ,129,12,1,\"Consecutive TCP small segments exceeding threshold\",TCP,96.44.142.5,80,10.0.2.15,50895,52:54:00:12:35:02,08:00:27:7F:93:2D,0xA6,***AP***,0xDB508F2,0x7701DD5B,,0xFFFF,64,0,16824,152,155648,,,,","icmpcode":"","tos":"0","id":"16824","timestamp":1456178824775,"ethdst":"08:00:27:7F:93:2D","src":"96.44.142.5","ttl":"64","source.type":"test","ethlen":"0xA6","iplen":"155648","icmptype":"","proto":"TCP","srcport":"80","tcpflags":"***AP***","sig_id":"12","sig_generator":"129", "is_alert" : "true"}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
index ef1318e..6e62e84 100644
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
@@ -17,7 +17,8 @@
  */
 package org.apache.metron.integration;
 
-import com.google.common.base.Function;
+import com.google.common.base.*;
+import com.google.common.collect.Iterables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.metron.Constants;
@@ -32,16 +33,19 @@ import org.apache.metron.integration.util.integration.ReadinessState;
 import org.apache.metron.integration.util.integration.components.ElasticSearchComponent;
 import org.apache.metron.integration.util.integration.components.FluxTopologyComponent;
 import org.apache.metron.integration.util.integration.components.KafkaWithZKComponent;
+import org.apache.metron.integration.util.mock.MockGeoAdapter;
 import org.apache.metron.integration.util.mock.MockHTable;
 import org.apache.metron.integration.util.threatintel.ThreatIntelHelper;
 import org.apache.metron.reference.lookup.LookupKV;
 import org.apache.metron.utils.SourceConfigUtils;
 import org.junit.Assert;
 import org.junit.Test;
+import org.apache.metron.utils.JSONUtils;
 
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.io.Serializable;
 import java.text.SimpleDateFormat;
 import java.util.*;
@@ -66,7 +70,7 @@ public class EnrichmentIntegrationTest {
   @Test
   public void test() throws Exception {
     final String dateFormat = "yyyy.MM.dd.hh";
-    final String index = "yaf_" + new SimpleDateFormat(dateFormat).format(new Date());
+    final String index = "yaf_index_" + new SimpleDateFormat(dateFormat).format(new Date());
     String yafConfig = "{\n" +
             "  \"index\": \"yaf\",\n" +
             "  \"batchSize\": 5,\n" +
@@ -142,7 +146,9 @@ public class EnrichmentIntegrationTest {
             .withComponent("kafka", kafkaComponent)
             .withComponent("elasticsearch", esComponent)
             .withComponent("storm", fluxComponent)
-            .withTimeBetweenAttempts(10000)
+            .withMillisecondsBetweenAttempts(10000)
+            .withNumRetries(30)
+            .withMaxTimeMS(300000)
             .build();
     runner.start();
     fluxComponent.submitTopology();
@@ -154,7 +160,7 @@ public class EnrichmentIntegrationTest {
                 ElasticSearchComponent elasticSearchComponent = runner.getComponent("elasticsearch", ElasticSearchComponent.class);
                 if(elasticSearchComponent.hasIndex(index)) {
                   try {
-                    docs = elasticSearchComponent.getAllIndexedDocs(index, "yaf");
+                    docs = elasticSearchComponent.getAllIndexedDocs(index, "yaf_doc");
                   } catch (IOException e) {
                     throw new IllegalStateException("Unable to retrieve indexed documents.", e);
                   }
@@ -177,19 +183,209 @@ public class EnrichmentIntegrationTest {
 
     List<byte[]> sampleIndexedMessages = TestUtils.readSampleData(sampleIndexedPath);
     Assert.assertEquals(sampleIndexedMessages.size(), docs.size());
-    for (int i = 0; i < docs.size(); i++) {
-      String doc = docs.get(i).toString();
-      String sampleIndexedMessage = new String(sampleIndexedMessages.get(i));
-      assertEqual(sampleIndexedMessage, doc);
+
+    for (Map<String, Object> doc : docs) {
+      baseValidation(doc);
+
+      hostEnrichmentValidation(doc);
+      geoEnrichmentValidation(doc);
+      threatIntelValidation(doc);
+
     }
     runner.stop();
   }
-  public static void assertEqual(String doc1, String doc2) {
-    Assert.assertEquals(doc1.length(), doc2.length());
-    char[] c1 = doc1.toCharArray();
-    Arrays.sort(c1);
-    char[] c2 = doc2.toCharArray();
-    Arrays.sort(c2);
-    Assert.assertArrayEquals(c1, c2);
+
+  public static void baseValidation(Map<String, Object> jsonDoc) {
+    assertEnrichmentsExists("threatintels.", setOf("ip"), jsonDoc.keySet());
+    assertEnrichmentsExists("enrichments.", setOf("geo", "host"), jsonDoc.keySet());
+    for(Map.Entry<String, Object> kv : jsonDoc.entrySet()) {
+      //ensure no values are empty.
+      Assert.assertTrue(kv.getValue().toString().length() > 0);
+    }
+    //ensure we always have a source ip and destination ip
+    Assert.assertNotNull(jsonDoc.get("sip"));
+    Assert.assertNotNull(jsonDoc.get("dip"));
+  }
+
+  private static class EvaluationPayload {
+    Map<String, Object> indexedDoc;
+    String key;
+    public EvaluationPayload(Map<String, Object> indexedDoc, String key) {
+      this.indexedDoc = indexedDoc;
+      this.key = key;
+    }
+  }
+
+  private static enum HostEnrichments implements Predicate<EvaluationPayload>{
+    LOCAL_LOCATION(new Predicate<EvaluationPayload>() {
+
+      @Override
+      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+        return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.local").equals("YES");
+      }
+    })
+    ,UNKNOWN_LOCATION(new Predicate<EvaluationPayload>() {
+
+      @Override
+      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+        return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.local").equals("UNKNOWN");
+      }
+    })
+    ,IMPORTANT(new Predicate<EvaluationPayload>() {
+      @Override
+      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+        return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.asset_value").equals("important");
+      }
+    })
+    ,PRINTER_TYPE(new Predicate<EvaluationPayload>() {
+      @Override
+      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+        return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.type").equals("printer");
+      }
+    })
+    ,WEBSERVER_TYPE(new Predicate<EvaluationPayload>() {
+      @Override
+      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+        return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.type").equals("webserver");
+      }
+    })
+    ,UNKNOWN_TYPE(new Predicate<EvaluationPayload>() {
+      @Override
+      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
+        return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.type").equals("unknown");
+      }
+    })
+    ;
+
+    Predicate<EvaluationPayload> _predicate;
+    HostEnrichments(Predicate<EvaluationPayload> predicate) {
+      this._predicate = predicate;
+    }
+
+    public boolean apply(EvaluationPayload payload) {
+      return _predicate.apply(payload);
+    }
+
+  }
+
+  private static void assertEnrichmentsExists(String topLevel, Set<String> expectedEnrichments, Set<String> keys) {
+    for(String key : keys) {
+      if(key.startsWith(topLevel)) {
+        String secondLevel = Iterables.get(Splitter.on(".").split(key), 1);
+        String message = "Found an enrichment/threat intel (" + secondLevel + ") that I didn't expect (expected enrichments :"
+                       + Joiner.on(",").join(expectedEnrichments) + "), but it was not there.  If you've created a new"
+                       + " enrichment, then please add a validation method to this unit test.  Otherwise, it's a solid error"
+                       + " and should be investigated.";
+        Assert.assertTrue( message, expectedEnrichments.contains(secondLevel));
+      }
+    }
   }
+  private static void threatIntelValidation(Map<String, Object> indexedDoc) {
+    if(keyPatternExists("threatintels.", indexedDoc)) {
+      //if we have any threat intel messages, we want to tag is_alert to true
+      Assert.assertEquals(indexedDoc.get("is_alert"), "true");
+    }
+    else {
+      //For YAF this is the case, but if we do snort later on, this will be invalid.
+      Assert.assertNull(indexedDoc.get("is_alert"));
+    }
+    //ip threat intels
+    if(keyPatternExists("threatintels.ip.", indexedDoc)) {
+      if(indexedDoc.get("sip").equals("10.0.2.3")) {
+        Assert.assertEquals(indexedDoc.get("threatintels.ip.sip.ip_threat_intel"), "alert");
+      }
+      else if(indexedDoc.get("dip").equals("10.0.2.3")) {
+        Assert.assertEquals(indexedDoc.get("threatintels.ip.dip.ip_threat_intel"), "alert");
+      }
+      else {
+        Assert.fail("There was a threat intels that I did not expect.");
+      }
+    }
+
+  }
+
+  private static void geoEnrichmentValidation(Map<String, Object> indexedDoc) {
+    //should have geo enrichment on every message due to mock geo adapter
+    Assert.assertEquals(indexedDoc.get("enrichments.geo.dip.location_point"), MockGeoAdapter.DEFAULT_LOCATION_POINT);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo.sip.location_point"), MockGeoAdapter.DEFAULT_LOCATION_POINT);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo.dip.longitude"), MockGeoAdapter.DEFAULT_LONGITUDE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo.sip.longitude"), MockGeoAdapter.DEFAULT_LONGITUDE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo.dip.city"), MockGeoAdapter.DEFAULT_CITY);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo.sip.city"), MockGeoAdapter.DEFAULT_CITY);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo.dip.latitude"), MockGeoAdapter.DEFAULT_LATITUDE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo.sip.latitude"), MockGeoAdapter.DEFAULT_LATITUDE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo.dip.country"), MockGeoAdapter.DEFAULT_COUNTRY);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo.sip.country"), MockGeoAdapter.DEFAULT_COUNTRY);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo.dip.dmaCode"), MockGeoAdapter.DEFAULT_DMACODE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo.sip.dmaCode"), MockGeoAdapter.DEFAULT_DMACODE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo.dip.postalCode"), MockGeoAdapter.DEFAULT_POSTAL_CODE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo.sip.postalCode"), MockGeoAdapter.DEFAULT_POSTAL_CODE);
+  }
+
+  private static void hostEnrichmentValidation(Map<String, Object> indexedDoc) {
+    boolean enriched = false;
+    //important local printers
+    {
+      Set<String> ips = setOf("10.0.2.15", "10.60.10.254");
+      if (ips.contains(indexedDoc.get("sip"))) {
+        //this is a local, important, printer
+        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
+                ,HostEnrichments.IMPORTANT
+                ,HostEnrichments.PRINTER_TYPE
+                ).apply(new EvaluationPayload(indexedDoc, "sip"))
+        );
+        enriched = true;
+      }
+      if (ips.contains(indexedDoc.get("dip"))) {
+        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
+                ,HostEnrichments.IMPORTANT
+                ,HostEnrichments.PRINTER_TYPE
+                ).apply(new EvaluationPayload(indexedDoc, "dip"))
+        );
+        enriched = true;
+      }
+    }
+    //important local webservers
+    {
+      Set<String> ips = setOf("10.1.128.236");
+      if (ips.contains(indexedDoc.get("sip"))) {
+        //this is a local, important, printer
+        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
+                ,HostEnrichments.IMPORTANT
+                ,HostEnrichments.WEBSERVER_TYPE
+                ).apply(new EvaluationPayload(indexedDoc, "sip"))
+        );
+        enriched = true;
+      }
+      if (ips.contains(indexedDoc.get("dip"))) {
+        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
+                ,HostEnrichments.IMPORTANT
+                ,HostEnrichments.WEBSERVER_TYPE
+                ).apply(new EvaluationPayload(indexedDoc, "dip"))
+        );
+        enriched = true;
+      }
+    }
+    if(!enriched) {
+      Assert.assertFalse(keyPatternExists("enrichments.host", indexedDoc));
+    }
+  }
+
+
+  private static boolean keyPatternExists(String pattern, Map<String, Object> indexedObj) {
+    for(String k : indexedObj.keySet()) {
+      if(k.startsWith(pattern)) {
+        return true;
+      }
+    }
+    return false;
+  }
+  private static Set<String> setOf(String... items) {
+    Set<String> ret = new HashSet<>();
+    for(String item : items) {
+      ret.add(item);
+    }
+    return ret;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java
index c55a069..80688b7 100644
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java
@@ -18,27 +18,14 @@
 package org.apache.metron.integration;
 
 import com.google.common.base.Function;
-import kafka.api.FetchRequest;
-import kafka.api.FetchRequestBuilder;
-import kafka.consumer.ConsumerIterator;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.producer.Producer;
-import kafka.message.MessageAndMetadata;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.metron.Constants;
 import org.apache.metron.integration.util.TestUtils;
 import org.apache.metron.integration.util.UnitTestHelper;
 import org.apache.metron.integration.util.integration.ComponentRunner;
 import org.apache.metron.integration.util.integration.Processor;
 import org.apache.metron.integration.util.integration.ReadinessState;
-import org.apache.metron.integration.util.integration.components.ElasticSearchComponent;
 import org.apache.metron.integration.util.integration.components.FluxTopologyComponent;
 import org.apache.metron.integration.util.integration.components.KafkaWithZKComponent;
-import org.apache.metron.integration.util.integration.util.KafkaUtil;
-import org.apache.metron.spout.pcap.HDFSWriterCallback;
-import org.apache.metron.test.converters.HexStringConverter;
 import org.apache.metron.utils.SourceConfigUtils;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Assert;
@@ -95,7 +82,7 @@ public abstract class ParserIntegrationTest {
     ComponentRunner runner = new ComponentRunner.Builder()
             .withComponent("kafka", kafkaComponent)
             .withComponent("storm", fluxComponent)
-            .withTimeBetweenAttempts(5000)
+            .withMillisecondsBetweenAttempts(5000)
             .build();
     runner.start();
     fluxComponent.submitTopology();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/TestUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/TestUtils.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/TestUtils.java
index 594700b..a3db041 100644
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/TestUtils.java
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/TestUtils.java
@@ -29,7 +29,6 @@ public class TestUtils {
     BufferedReader br = new BufferedReader(new FileReader(samplePath));
     List<byte[]> ret = new ArrayList<>();
     for (String line = null; (line = br.readLine()) != null; ) {
-      long ts = System.currentTimeMillis();
       ret.add(line.getBytes());
     }
     br.close();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockGeoAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockGeoAdapter.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockGeoAdapter.java
index 62ae618..ee71cda 100644
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockGeoAdapter.java
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockGeoAdapter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.integration.util.mock;
 
+import com.google.common.base.Joiner;
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
 import org.json.simple.JSONObject;
 
@@ -25,6 +26,15 @@ import java.io.Serializable;
 public class MockGeoAdapter implements EnrichmentAdapter<String>,
         Serializable {
 
+  public static final String DEFAULT_LOC_ID = "1";
+  public static final String DEFAULT_COUNTRY = "test country";
+  public static final String DEFAULT_CITY = "test city";
+  public static final String DEFAULT_POSTAL_CODE = "test postalCode";
+  public static final String DEFAULT_LATITUDE = "test latitude";
+  public static final String DEFAULT_LONGITUDE = "test longitude";
+  public static final String DEFAULT_DMACODE= "test dmaCode";
+  public static final String DEFAULT_LOCATION_POINT= Joiner.on(',').join(DEFAULT_LONGITUDE, DEFAULT_LATITUDE);
+
   @Override
   public void logAccess(String value) {
 
@@ -32,14 +42,14 @@ public class MockGeoAdapter implements EnrichmentAdapter<String>,
 
   public JSONObject enrich(String metadata) {
     JSONObject enriched = new JSONObject();
-    enriched.put("locID", "1");
-    enriched.put("country", "test country");
-    enriched.put("city", "test city");
-    enriched.put("postalCode", "test postalCode");
-    enriched.put("latitude", "test latitude");
-    enriched.put("longitude", "test longitude");
-    enriched.put("dmaCode", "test dmaCode");
-    enriched.put("location_point", enriched.get("longitude") + "," + enriched.get("latitude"));
+    enriched.put("locID", DEFAULT_LOC_ID);
+    enriched.put("country", DEFAULT_COUNTRY);
+    enriched.put("city", DEFAULT_CITY);
+    enriched.put("postalCode", DEFAULT_POSTAL_CODE);
+    enriched.put("latitude", DEFAULT_LATITUDE);
+    enriched.put("longitude", DEFAULT_LONGITUDE);
+    enriched.put("dmaCode", DEFAULT_DMACODE);
+    enriched.put("location_point", DEFAULT_LOCATION_POINT);
     return enriched;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 499e323..d8ac9d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,7 +45,7 @@
 						<exclude>**/.*</exclude>
 						<exclude>**/.*/**</exclude>
 						<exclude>**/*.seed</exclude>
-            <exclude>**/*.iml</exclude>
+						<exclude>**/*.iml</exclude>
 						<exclude>**/ansible.cfg</exclude>
 						<exclude>site/**</exclude>
 						<exclude>metron-ui/lib/public/**</exclude>
@@ -54,7 +54,6 @@
 						<exclude>**/src/main/resources/Sample*/**</exclude>
 						<exclude>**/dependency-reduced-pom.xml</exclude>
 					        <exclude>**/files/opensoc-ui</exclude>
-					        <exclude>**/*.iml</exclude>
 					</excludes>
 				</configuration>
 			</plugin>


[2/3] incubator-metron git commit: METRON-58 Remediate Deployment Integration Testing Issues (dlyle65535 via cestella) closes apache/incubator-metron#36

Posted by ce...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
new file mode 100644
index 0000000..d704908
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public enum JSONUtils {
+  INSTANCE;
+  private static ThreadLocal<ObjectMapper> _mapper = new ThreadLocal<ObjectMapper>() {
+    /**
+     * Returns the current thread's "initial value" for this
+     * thread-local variable.  This method will be invoked the first
+     * time a thread accesses the variable with the {@link #get}
+     * method, unless the thread previously invoked the {@link #set}
+     * method, in which case the {@code initialValue} method will not
+     * be invoked for the thread.  Normally, this method is invoked at
+     * most once per thread, but it may be invoked again in case of
+     * subsequent invocations of {@link #remove} followed by {@link #get}.
+     * <p>
+     * <p>This implementation simply returns {@code null}; if the
+     * programmer desires thread-local variables to have an initial
+     * value other than {@code null}, {@code ThreadLocal} must be
+     * subclassed, and this method overridden.  Typically, an
+     * anonymous inner class will be used.
+     *
+     * @return the initial value for this thread-local
+     */
+    @Override
+    protected ObjectMapper initialValue() {
+      return new ObjectMapper();
+    }
+  };
+
+  public <T> T load(InputStream is, Class<T> clazz) throws IOException {
+    return _mapper.get().readValue(is, clazz);
+  }
+
+  public <T> T load(String is, Class<T> clazz) throws IOException {
+    return _mapper.get().readValue(is, clazz);
+  }
+
+  public String toJSON(Object o, boolean pretty) throws JsonProcessingException {
+    if(pretty) {
+      return _mapper.get().writerWithDefaultPrettyPrinter().writeValueAsString(o);
+    }
+    else {
+      return _mapper.get().writeValueAsString(o);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Common/src/test/resources/config/source/bro-config.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/resources/config/source/bro-config.json b/metron-streaming/Metron-Common/src/test/resources/config/source/bro-config.json
index fcbfc03..34109b8 100644
--- a/metron-streaming/Metron-Common/src/test/resources/config/source/bro-config.json
+++ b/metron-streaming/Metron-Common/src/test/resources/config/source/bro-config.json
@@ -3,11 +3,12 @@
   "batchSize": 5,
   "enrichmentFieldMap":
   {
-    "geo": ["id.orig_h"],
-    "host": ["id.orig_h"]
+    "geo": ["ip_dst_addr", "ip_src_addr"],
+    "host": ["host"]
   },
   "threatIntelFieldMap":
   {
-    "ip": ["id.orig_h"]
+    "ip": ["ip_dst_addr", "ip_src_addr"]
   }
-}
\ No newline at end of file
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Common/src/test/resources/config/source/snort-config.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/resources/config/source/snort-config.json b/metron-streaming/Metron-Common/src/test/resources/config/source/snort-config.json
index ceb441e..1208637 100644
--- a/metron-streaming/Metron-Common/src/test/resources/config/source/snort-config.json
+++ b/metron-streaming/Metron-Common/src/test/resources/config/source/snort-config.json
@@ -3,11 +3,12 @@
   "batchSize": 1,
   "enrichmentFieldMap":
   {
-    "geo": ["src", "dst"],
-    "host": ["src", "dst"]
+    "geo": ["ip_dst_addr", "ip_src_addr"],
+    "host": ["host"]
   },
   "threatIntelFieldMap":
   {
-    "ip": ["src", "dst"]
+    "ip": ["ip_dst_addr", "ip_src_addr"]
   }
-}
\ No newline at end of file
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Common/src/test/resources/config/source/yaf-config.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/resources/config/source/yaf-config.json b/metron-streaming/Metron-Common/src/test/resources/config/source/yaf-config.json
index abf4ff4..65de961 100644
--- a/metron-streaming/Metron-Common/src/test/resources/config/source/yaf-config.json
+++ b/metron-streaming/Metron-Common/src/test/resources/config/source/yaf-config.json
@@ -3,11 +3,12 @@
   "batchSize": 5,
   "enrichmentFieldMap":
   {
-    "geo": ["sip", "dip"],
-    "host": ["sip", "dip"]
+    "geo": ["ip_dst_addr", "ip_src_addr"],
+    "host": ["host"]
   },
   "threatIntelFieldMap":
   {
-    "ip": ["sip", "dip"]
+    "ip": ["ip_dst_addr", "ip_src_addr"]
   }
-}
\ No newline at end of file
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
index 7d7ef98..02f9f9b 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
@@ -103,7 +103,7 @@ public class ThreatIntelBulkLoader  {
                 return o;
             }
         })
-        ,AS_OF_TIME_FORMAT("t", new OptionHandler() {
+        ,AS_OF_TIME_FORMAT("z", new OptionHandler() {
             @Nullable
             @Override
             public Option apply(@Nullable String s) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
index 10e1e71..a2cec5a 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
@@ -25,10 +25,7 @@ import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
 
@@ -67,8 +64,20 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
   public JSONObject joinMessages(Map<String, JSONObject> streamMessageMap) {
     JSONObject message = new JSONObject();
     for (String key : streamMessageMap.keySet()) {
-      message.putAll(streamMessageMap.get(key));
+      JSONObject obj = streamMessageMap.get(key);
+      message.putAll(obj);
     }
+    List<Object> emptyKeys = new ArrayList<>();
+    for(Object key : message.keySet()) {
+      Object value = message.get(key);
+      if(value.toString().length() == 0) {
+        emptyKeys.add(key);
+      }
+    }
+    for(Object o : emptyKeys) {
+      message.remove(o);
+    }
+    message.put(getClass().getSimpleName().toLowerCase() + ".joiner.ts", "" + System.currentTimeMillis());
     return message;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
index 5839f39..51508d8 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
@@ -87,6 +87,7 @@ public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
             }
         } else {
             message = (JSONObject) tuple.getValueByField(messageFieldName);
+            message.put(getClass().getSimpleName().toLowerCase() + ".splitter.ts", "" + System.currentTimeMillis());
         }
         return message;
     }
@@ -103,6 +104,7 @@ public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
     @SuppressWarnings("unchecked")
     @Override
     public Map<String, JSONObject> splitMessage(JSONObject message) {
+
         Map<String, JSONObject> streamMessageMap = new HashMap<>();
         String sourceType = TopologyUtils.getSourceType(message);
         Map<String, List<String>> enrichmentFieldMap = getFieldMap(sourceType);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index b184975..b5c4c44 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -148,6 +148,7 @@ public class GenericEnrichmentBolt extends ConfiguredBolt {
     String key = tuple.getStringByField("key");
     JSONObject rawMessage = (JSONObject) tuple.getValueByField("message");
     JSONObject enrichedMessage = new JSONObject();
+    enrichedMessage.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".begin.ts", "" + System.currentTimeMillis());
     try {
       if (rawMessage == null || rawMessage.isEmpty())
         throw new Exception("Could not parse binary stream to JSON");
@@ -174,11 +175,10 @@ public class GenericEnrichmentBolt extends ConfiguredBolt {
           } else {
             enrichedMessage.put(field, "");
           }
-          if (enrichmentType.equals("host")) {
-            String test = "";
-          }
         }
       }
+
+      enrichedMessage.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".end.ts", "" + System.currentTimeMillis());
       if (!enrichedMessage.isEmpty()) {
         collector.emit(enrichmentType, new Values(key, enrichedMessage));
       }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
index ba17fdb..3516ee0 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.enrichment.bolt;
 
+import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,5 +38,15 @@ public class ThreatIntelJoinBolt extends EnrichmentJoinBolt {
     return configurations.get(sourceType).getThreatIntelFieldMap();
   }
 
-
+  @Override
+  public JSONObject joinMessages(Map<String, JSONObject> streamMessageMap) {
+    JSONObject ret = super.joinMessages(streamMessageMap);
+    for(Object key : ret.keySet()) {
+      if(key.toString().startsWith("threatintels") && !key.toString().endsWith(".ts")) {
+        ret.put("is_alert" , "true");
+        break;
+      }
+    }
+    return ret;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
index 9db6398..b95f4b8 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
@@ -63,7 +63,7 @@ public class ThreatIntelAdapter implements EnrichmentAdapter<String>,Serializabl
             throw new RuntimeException("Unable to retrieve value", e);
         }
         if(isThreat) {
-            enriched.put("threat_source", config.getHBaseTable());
+            enriched.put(config.getHBaseTable(), "alert");
             _LOG.trace("Enriched value => " + enriched);
         }
         //throw new RuntimeException("Unable to retrieve value " + value);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/TelemetryIndexingBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/TelemetryIndexingBolt.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/TelemetryIndexingBolt.java
index ff151c7..21ecb18 100644
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/TelemetryIndexingBolt.java
+++ b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/TelemetryIndexingBolt.java
@@ -92,6 +92,17 @@ public class TelemetryIndexingBolt extends AbstractIndexingBolt {
 	}
 
 	/**
+	 *
+	 * @param IndexName
+	 *            name of the index in ElasticSearch/Solr/etc...
+	 * @return instance of bolt
+	 */
+	public TelemetryIndexingBolt withIndexName(String IndexName) {
+		_IndexName = IndexName;
+		return this;
+	}
+
+	/**
 	 * 
 	 * @param ClusterName
 	 *            name of cluster to index into in ElasticSearch/Solr/etc...

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
index a0df685..2769efe 100644
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
+++ b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
@@ -27,6 +27,8 @@ import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.text.SimpleDateFormat;
@@ -42,6 +44,8 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
   private String host;
   private int port;
   private SimpleDateFormat dateFormat;
+  private static final Logger LOG = LoggerFactory
+          .getLogger(ElasticsearchWriter.class);
 
   public ElasticsearchWriter(String clusterName, String host, int port, String dateFormat) {
     this.clusterName = clusterName;
@@ -64,7 +68,8 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
       builder.put(optionalSettings);
     }
     client = new TransportClient(builder.build())
-            .addTransportAddress(new InetSocketTransportAddress(host, port));
+            .addTransportAddress(new InetSocketTransportAddress(host, port))
+            ;
 
   }
 
@@ -77,8 +82,9 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
       if (configuration != null) {
         indexName = configuration.getIndex();
       }
-      IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName + "_" + indexPostfix,
+      IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName + "_index_" + indexPostfix,
               sourceType);
+
       indexRequestBuilder.setSource(message.toJSONString());
       bulkRequest.add(indexRequestBuilder);
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java
index 04b943c..9aa8d72 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java
@@ -89,8 +89,13 @@ public class BasicBroParser extends BasicParser {
             long timestamp = 0L;
             if (payload.containsKey("timestamp")) {
                 try {
-                    timestamp = Long.parseLong(payload.get("timestamp").toString());
+                    String broTimestamp = payload.get("timestamp").toString();
+                    String convertedTimestamp = broTimestamp.replace(".","");
+                    convertedTimestamp = convertedTimestamp.substring(0,13);
+                    timestamp = Long.parseLong(convertedTimestamp);
                     payload.put("timestamp", timestamp);
+                    payload.put("bro_timestamp",broTimestamp);
+                    _LOG.trace(String.format("[Metron] new bro record - timestamp : %s", payload.get("timestamp")));
                 } catch (NumberFormatException nfe) {
                     _LOG.error(String.format("[Metron] timestamp is invalid: %s", payload.get("timestamp")));
                     payload.put("timestamp", 0);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java
index 6d89428..27b05a4 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java
@@ -111,6 +111,7 @@ public class BasicSnortParser extends BasicParser {
 
 			// add original msg; required by 'checkForSchemaCorrectness'
 			jsonMessage.put("original_string", csvMessage);
+			jsonMessage.put("is_alert", "true");
 			messages.add(jsonMessage);
 		} catch (Exception e) {
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java
index e2c5f32..f080e96 100644
--- a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java
+++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java
@@ -89,6 +89,28 @@ public class BasicBroParserTest extends TestCase {
 	}
 
 	@SuppressWarnings("rawtypes")
+	public void testHttpDecimalBroMessage() throws ParseException {
+		String rawMessage = "{\"http\":{\"ts\":1457149494.166991,\"uid\":\"CTo78A11g7CYbbOHvj\",\"id.orig_h\":\"192.249.113.37\",\"id.orig_p\":58808,\"id.resp_h\":\"72.163.4.161\",\"id.resp_p\":80,\"trans_depth\":1,\"method\":\"GET\",\"host\":\"www.cisco.com\",\"uri\":\"/\",\"user_agent\":\"curl/7.22.0 (x86_64-pc-linux-gnu) libcurl/7.22.0 OpenSSL/1.0.1 zlib/1.2.3.4 libidn/1.23 librtmp/2.3\",\"request_body_len\":0,\"response_body_len\":25523,\"status_code\":200,\"status_msg\":\"OK\",\"tags\":[],\"resp_fuids\":[\"FJDyMC15lxUn5ngPfd\"],\"resp_mime_types\":[\"text/html\"]}}";
+		String expectedTimestamp = "1457149494166";
+		Map rawMessageMap = (Map) jsonParser.parse(rawMessage);
+		JSONObject rawJson = (JSONObject) rawMessageMap.get(rawMessageMap.keySet().iterator().next());
+
+		JSONObject broJson = broParser.parse(rawMessage.getBytes()).get(0);
+		Assert.assertEquals(broJson.get("timestamp").toString(), expectedTimestamp);
+		Assert.assertEquals(broJson.get("ip_src_addr").toString(), rawJson.get("id.orig_h").toString());
+		Assert.assertEquals(broJson.get("ip_dst_addr").toString(), rawJson.get("id.resp_h").toString());
+		Assert.assertEquals(broJson.get("ip_src_port").toString(), rawJson.get("id.orig_p").toString());
+		Assert.assertEquals(broJson.get("ip_dst_port").toString(), rawJson.get("id.resp_p").toString());
+		Assert.assertTrue(broJson.get("original_string").toString().startsWith(rawMessageMap.keySet().iterator().next().toString().toUpperCase()));
+
+		Assert.assertEquals(broJson.get("uid").toString(), rawJson.get("uid").toString());
+		Assert.assertEquals(broJson.get("method").toString(), rawJson.get("method").toString());
+		Assert.assertEquals(broJson.get("host").toString(), rawJson.get("host").toString());
+		Assert.assertEquals(broJson.get("resp_mime_types").toString(), rawJson.get("resp_mime_types").toString());
+	}
+
+
+	@SuppressWarnings("rawtypes")
 	public void testDnsBroMessage() throws ParseException {
 		String rawMessage = "{\"dns\":{\"ts\":1402308259609,\"uid\":\"CuJT272SKaJSuqO0Ia\",\"id.orig_h\":\"10.122.196.204\",\"id.orig_p\":33976,\"id.resp_h\":\"144.254.71.184\",\"id.resp_p\":53,\"proto\":\"udp\",\"trans_id\":62418,\"query\":\"www.cisco.com\",\"qclass\":1,\"qclass_name\":\"C_INTERNET\",\"qtype\":28,\"qtype_name\":\"AAAA\",\"rcode\":0,\"rcode_name\":\"NOERROR\",\"AA\":true,\"TC\":false,\"RD\":true,\"RA\":true,\"Z\":0,\"answers\":[\"www.cisco.com.akadns.net\",\"origin-www.cisco.com\",\"2001:420:1201:2::a\"],\"TTLs\":[3600.0,289.0,14.0],\"rejected\":false}}";
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-MessageParsers/src/test/resources/BroParserTest.log
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/resources/BroParserTest.log b/metron-streaming/Metron-MessageParsers/src/test/resources/BroParserTest.log
index e71f28e..2fcdd5a 100644
--- a/metron-streaming/Metron-MessageParsers/src/test/resources/BroParserTest.log
+++ b/metron-streaming/Metron-MessageParsers/src/test/resources/BroParserTest.log
@@ -1,3 +1,4 @@
 {"http":{"ts":1402307733473,"uid":"CTo78A11g7CYbbOHvj","id.orig_h":"192.249.113.37","id.orig_p":58808,"id.resp_h":"72.163.4.161","id.resp_p":80,"trans_depth":1,"method":"GET","host":"www.cisco.com","uri":"/","user_agent":"curl/7.22.0 (x86_64-pc-linux-gnu) libcurl/7.22.0 OpenSSL/1.0.1 zlib/1.2.3.4 libidn/1.23 librtmp/2.3","request_body_len":0,"response_body_len":25523,"status_code":200,"status_msg":"OK","tags":[],"resp_fuids":["FJDyMC15lxUn5ngPfd"],"resp_mime_types":["text/html"]}}
 {"dns":{"ts":1402308259609,"uid":"CuJT272SKaJSuqO0Ia","id.orig_h":"10.122.196.204","id.orig_p":33976,"id.resp_h":"144.254.71.184","id.resp_p":53,"proto":"udp","trans_id":62418,"query":"www.cisco.com","qclass":1,"qclass_name":"C_INTERNET","qtype":28,"qtype_name":"AAAA","rcode":0,"rcode_name":"NOERROR","AA":true,"TC":false,"RD":true,"RA":true,"Z":0,"answers":["www.cisco.com.akadns.net","origin-www.cisco.com","2001:420:1201:2::a"],"TTLs":[3600.0,289.0,14.0],"rejected":false}}
-{"files":{"analyzers": ["X509","MD5","SHA1"],"conn_uids":["C4tygJ3qxJBEJEBCeh"],"depth": 0,"duration": 0.0,"fuid":"FZEBC33VySG0nHSoO9","is_orig": false,"local_orig": false,"md5": "eba37166385e3ef42464ed9752e99f1b","missing_bytes": 0,"overflow_bytes": 0,"protocol": "files","rx_hosts": ["10.220.15.205"],"seen_bytes": 1136,"sha1": "73e42686657aece354fbf685712361658f2f4357","source": "SSL","timedout": false,"ts": "1425845251334","tx_hosts": ["68.171.237.7"]}}
\ No newline at end of file
+{"files":{"analyzers": ["X509","MD5","SHA1"],"conn_uids":["C4tygJ3qxJBEJEBCeh"],"depth": 0,"duration": 0.0,"fuid":"FZEBC33VySG0nHSoO9","is_orig": false,"local_orig": false,"md5": "eba37166385e3ef42464ed9752e99f1b","missing_bytes": 0,"overflow_bytes": 0,"protocol": "files","rx_hosts": ["10.220.15.205"],"seen_bytes": 1136,"sha1": "73e42686657aece354fbf685712361658f2f4357","source": "SSL","timedout": false,"ts": "1425845251334","tx_hosts": ["68.171.237.7"]}}
+{"http": {"ts":1457149494.166991,"uid":"C5xbFM2QfGB8OZKPrg","id.orig_h":"192.168.138.158","id.orig_p":49195,"id.resp_h":"188.165.164.184","id.resp_p":80,"trans_depth":1,"method":"GET","host":"ip-addr.es","uri":"/","user_agent":"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0)","request_body_len":0,"response_body_len":0,"tags":[]}}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ComponentRunner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ComponentRunner.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ComponentRunner.java
index f9a8ca2..2c31759 100644
--- a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ComponentRunner.java
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ComponentRunner.java
@@ -26,11 +26,23 @@ public class ComponentRunner {
         LinkedHashMap<String, InMemoryComponent> components;
         String[] startupOrder;
         String[] shutdownOrder;
-        long timeBetweenAttempts;
+        long timeBetweenAttempts = 1000;
+        int numRetries = 5;
+        long maxTimeMS = 120000;
         public Builder() {
             components = new LinkedHashMap<String, InMemoryComponent>();
         }
 
+        public Builder withNumRetries(int numRetries) {
+            this.numRetries = numRetries;
+            return this;
+        }
+
+        public Builder withMaxTimeMS(long maxTimeMS) {
+            this.maxTimeMS = maxTimeMS;
+            return this;
+        }
+
         public Builder withComponent(String name, InMemoryComponent component) {
             components.put(name, component);
             return this;
@@ -44,7 +56,7 @@ public class ComponentRunner {
             this.shutdownOrder = shutdownOrder;
             return this;
         }
-        public Builder withTimeBetweenAttempts(long timeBetweenAttempts) {
+        public Builder withMillisecondsBetweenAttempts(long timeBetweenAttempts) {
             this.timeBetweenAttempts = timeBetweenAttempts;
             return this;
         }
@@ -63,7 +75,7 @@ public class ComponentRunner {
             if(startupOrder == null) {
                 startupOrder = toOrderedList(components);
             }
-            return new ComponentRunner(components, startupOrder, shutdownOrder, timeBetweenAttempts);
+            return new ComponentRunner(components, startupOrder, shutdownOrder, timeBetweenAttempts, numRetries, maxTimeMS);
         }
 
     }
@@ -72,16 +84,22 @@ public class ComponentRunner {
     String[] startupOrder;
     String[] shutdownOrder;
     long timeBetweenAttempts;
+    int numRetries;
+    long maxTimeMS;
     public ComponentRunner( LinkedHashMap<String, InMemoryComponent> components
                           , String[] startupOrder
                           , String[] shutdownOrder
                           , long timeBetweenAttempts
+                          , int numRetries
+                          , long maxTimeMS
                           )
     {
         this.components = components;
         this.startupOrder = startupOrder;
         this.shutdownOrder = shutdownOrder;
         this.timeBetweenAttempts = timeBetweenAttempts;
+        this.numRetries = numRetries;
+        this.maxTimeMS = maxTimeMS;
     }
 
     public <T extends InMemoryComponent> T getComponent(String name, Class<T> clazz) {
@@ -103,17 +121,14 @@ public class ComponentRunner {
         }
     }
 
-    public <T> T process(Processor<T> successState) {
-        return process(successState, 5, 120000);
-    }
 
-    public <T> T process(Processor<T> successState, int numRetries, long maxTimeMs) {
+    public <T> T process(Processor<T> successState) {
         int retryCount = 0;
         long start = System.currentTimeMillis();
         while(true) {
             long duration = System.currentTimeMillis() - start;
-            if(duration > maxTimeMs) {
-                throw new RuntimeException("Took too long to complete: " + duration + " > " + maxTimeMs);
+            if(duration > maxTimeMS) {
+                throw new RuntimeException("Took too long to complete: " + duration + " > " + maxTimeMS);
             }
             ReadinessState state = successState.process(this);
             if(state == ReadinessState.READY) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/pom.xml b/metron-streaming/Metron-Topologies/pom.xml
index 9a7d8df..4530e6d 100644
--- a/metron-streaming/Metron-Topologies/pom.xml
+++ b/metron-streaming/Metron-Topologies/pom.xml
@@ -150,8 +150,11 @@
             <type>pom</type>
             <scope>provided</scope>
         </dependency>
-
-
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+            <version>1.2</version>
+        </dependency>
         <dependency>
             <groupId>com.github.ptgoetz</groupId>
             <artifactId>storm-hbase</artifactId>
@@ -251,16 +254,10 @@
                                 <excludes>
                                     <exclude>storm:storm-core:*</exclude>
                                     <exclude>storm:storm-lib:*</exclude>
-                                    <exclude>*slf4j*</exclude>
+                                    <exclude>org.slf4j.impl*</exclude>
+                                    <exclude>org.slf4j:slf4j-log4j*</exclude>
                                 </excludes>
                             </artifactSet>
-                            <!--relocations>
-                                <relocation>
-                                    <pattern>com.google.common</pattern>
-                                    <shadedPattern>org.apache.metron.guava.common</shadedPattern>
-                                </relocation>
-                            </relocations-->
-
                             <transformers>
                                 <transformer
                                         implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
index ef8b2e2..13ccd0c 100644
--- a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.utils;
 
+import org.apache.commons.cli.*;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -27,6 +28,7 @@ import org.apache.zookeeper.KeeperException;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.io.PrintWriter;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.List;
@@ -80,16 +82,39 @@ public class SourceConfigUtils {
   }
 
   public static void main(String[] args) {
+
+      Options options = new Options();
+      options.addOption("p", true, "Path to source option files");
+      options.addOption("z", true, "Zookeeper Quroum URL (zk1:2181,zk2:2181,...");
+
     try {
-      File root = new File("./metron-streaming/Metron-Common/src/test/resources/config/source/");
-      for(File child: root.listFiles()) {
-        writeToZookeeperFromFile(child.getName().replaceFirst("-config.json", ""), child.getPath(), "node1:2181");
+      CommandLineParser parser = new BasicParser();
+      CommandLine cmd = parser.parse( options, args);
+
+      if( !cmd.hasOption('p') || !cmd.hasOption('z') ){
+        final PrintWriter writer = new PrintWriter(System.out);
+        final HelpFormatter usageFormatter = new HelpFormatter();
+        usageFormatter.printUsage(writer, 80, "Apache Metron SourceConfigUtils", options);
+        writer.close();
+        System.exit(1);
+      }
+
+      String sourcePath = cmd.getOptionValue('p');
+      String zkQuorum = cmd.getOptionValue('z');
+
+
+      File root = new File(sourcePath);
+
+      if( root.isDirectory() ) {
+        for (File child : root.listFiles()) {
+          writeToZookeeperFromFile(child.getName().replaceFirst("-config.json", ""), child.getPath(), zkQuorum);
+        }
       }
-      SourceConfigUtils.dumpConfigs("node1:2181");
+
+      SourceConfigUtils.dumpConfigs(zkQuorum);
+
     } catch (Exception e) {
       e.printStackTrace();
     }
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
index e805fba..5d2786d 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
@@ -18,8 +18,9 @@
 ##### Kafka #####
 
 kafka.zk=zkpr1:2181,zkpr2:2181,zkpr3:2181
+kafka.broker=kfka1:6667
 spout.kafka.topic.asa=asa
-spout.kafka.topic.bro=bro_raw
+spout.kafka.topic.bro=bro
 spout.kafka.topic.fireeye=fireeye
 spout.kafka.topic.ise=ise
 spout.kafka.topic.lancope=lancope

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
index fb594b5..42412fa 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
@@ -18,7 +18,6 @@ name: "bro"
 config:
     topology.workers: 1
 
-
 components:
     -   id: "parser"
         className: "org.apache.metron.parsing.parsers.BasicBroParser"
@@ -45,21 +44,11 @@ components:
             -   name: "ignoreZkOffsets"
                 value: true
             -   name: "startOffsetTime"
-                value: -1
+                value: -2
             -   name: "socketTimeoutMs"
                 value: 1000000
 
 spouts:
-    -   id: "testingSpout"
-        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
-        parallelism: 1
-        configMethods:
-            -   name: "withFilename"
-                args:
-                    - "SampleInput/YafExampleOutput"
-            -   name: "withRepeating"
-                args:
-                    - true
     -   id: "kafkaSpout"
         className: "storm.kafka.KafkaSpout"
         constructorArgs:
@@ -70,7 +59,7 @@ bolts:
         className: "org.apache.metron.bolt.ParserBolt"
         constructorArgs:
             - "${kafka.zk}"
-            - "yaf"
+            - "${spout.kafka.topic.bro}"
             - ref: "parser"
             - ref: "writer"
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/test.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/test.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/test.yaml
index 3bd3eed..39131f3 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/test.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/test.yaml
@@ -50,16 +50,6 @@ components:
                 value: 1000000
 
 spouts:
-    -   id: "testingSpout"
-        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
-        parallelism: 1
-        configMethods:
-            -   name: "withFilename"
-                args:
-                    - "SampleInput/YafExampleOutput"
-            -   name: "withRepeating"
-                args:
-                    - false
     -   id: "kafkaSpout"
         className: "storm.kafka.KafkaSpout"
         constructorArgs:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
index 8033374..ec36f2c 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
@@ -17,6 +17,7 @@
 name: "enrichment"
 config:
     topology.workers: 1
+    topology.acker.executors: 0
 
 components:
 # Enrichment
@@ -129,19 +130,9 @@ components:
             -   name: "ignoreZkOffsets"
                 value: true
             -   name: "startOffsetTime"
-                value: -1
+                value: -2
 
 spouts:
-    -   id: "testingSpout"
-        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
-        parallelism: 1
-        configMethods:
-            -   name: "withFilename"
-                args:
-                    - "SampleInput/YafExampleOutput"
-            -   name: "withRepeating"
-                args:
-                    - true
     -   id: "kafkaSpout"
         className: "storm.kafka.KafkaSpout"
         constructorArgs:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/paloalto/test.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/paloalto/test.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/paloalto/test.yaml
index e56e16f..07a9f48 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/paloalto/test.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/paloalto/test.yaml
@@ -44,7 +44,7 @@ components:
             -   name: "ignoreZkOffsets"
                 value: true
             -   name: "startOffsetTime"
-                value: -2
+                value: -1
 
 spouts:
     -   id: "testingSpout"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/parse.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/parse.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/parse.yaml
index dabaa7d..bfc8527 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/parse.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/parse.yaml
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-name: "yaf-test"
+name: "pcap-parse"
 config:
     topology.workers: 1
 
@@ -45,7 +45,7 @@ components:
             -   name: "ignoreZkOffsets"
                 value: true
             -   name: "startOffsetTime"
-                value: -2
+                value: -1
 
 spouts:
     -   id: "kafkaSpout"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml
index f7b0f20..5bdbc17 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml
@@ -19,12 +19,12 @@ config:
     topology.workers: 1
 
 components:
+# Parser
     -   id: "parser"
         className: "org.apache.metron.parsing.parsers.PcapParser"
         configMethods:
             -   name: "withTsPrecision"
                 args: ["MICRO"]
-
 # Threat Intel
     -   id: "ipThreatIntelConfig"
         className: "org.apache.metron.threatintel.ThreatIntelConfig"
@@ -50,7 +50,7 @@ components:
     -   id: "ipThreatIntelEnrichment"
         className: "org.apache.metron.domain.Enrichment"
         properties:
-           - name: "name"
+           - name: "type"
              value: "ip"
            - name: "fields"
              value: ["message/ip_src_addr", "message/ip_dst_addr"]
@@ -62,35 +62,8 @@ components:
             -   name: "add"
                 args:
                     - ref: "ipThreatIntelEnrichment"
-#Enrichment
-#    -   id: "jdbcConfig"
-#        className: "org.apache.metron.enrichment.adapters.jdbc.MySqlConfig"
-#        properties:
-#            -   name: "host"
-#                value: "${mysql.ip}"
-#            -   name: "port"
-#                value: ${mysql.port}
-#            -   name: "username"
-#                value: "${mysql.username}"
-#            -   name: "password"
-#                value: "${mysql.password}"
-#            -   name: "table"
-#                value: "GEO"
-#    -   id: "geoEnrichmentAdapter"
-#        className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
-#        configMethods:
-#            -   name: "withJdbcConfig"
-#                args:
-#                    - ref: "jdbcConfig"
-#    -   id: "geoEnrichment"
-#        className: "org.apache.metron.domain.Enrichment"
-#        properties:
-#            -   name: "name"
-#                value:  "geo"
-#            -   name: "fields"
-#                value: ["ip_src_addr", "ip_dst_addr"]
-#            -   name: "adapter"
-#                ref: "geoEnrichmentAdapter"
+# Enrichment
+
     -   id: "hostEnrichmentAdapter"
         className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
         constructorArgs:
@@ -98,7 +71,7 @@ components:
     -   id: "hostEnrichment"
         className: "org.apache.metron.domain.Enrichment"
         properties:
-            -   name: "name"
+            -   name: "type"
                 value:  "host"
             -   name: "fields"
                 value: ["ip_src_addr", "ip_dst_addr"]
@@ -107,12 +80,10 @@ components:
     -   id: "enrichments"
         className: "java.util.ArrayList"
         configMethods:
-#            -   name: "add"
-#                args:
-#                    - ref: "geoEnrichment"
             -   name: "add"
                 args:
                     - ref: "hostEnrichment"
+#indexing
     -   id: "indexAdapter"
         className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
     -   id: "metricConfig"
@@ -190,10 +161,13 @@ components:
             # id
             - "${spout.kafka.topic.pcap}"
         properties:
-            -   name: "forceFromStart"
+            -   name: "ignoreZkOffsets"
                 value: true
             -   name: "startOffsetTime"
                 value: -1
+            -   name: "socketTimeoutMs"
+                value: 1000000
+#hbase bolt
     -   id: "hbaseConfig"
         className: "org.apache.metron.hbase.TupleTableConfig"
         configMethods:
@@ -217,7 +191,6 @@ spouts:
         className: "storm.kafka.KafkaSpout"
         constructorArgs:
             - ref: "kafkaConfig"
-
 bolts:
     -   id: "hbaseBolt"
         className: "org.apache.metron.hbase.HBaseBolt"
@@ -226,6 +199,8 @@ bolts:
             - "${kafka.zk}"
     -   id: "parserBolt"
         className: "org.apache.metron.bolt.PcapParserBolt"
+        constructorArgs:
+            - "${kafka.zk}"
         configMethods:
             -   name: "withMessageParser"
                 args:
@@ -235,6 +210,8 @@ bolts:
                     - ref: "enrichments"
     -   id: "indexingBolt"
         className: "org.apache.metron.indexing.TelemetryIndexingBolt"
+        constructorArgs:
+            - "${kafka.zk}"
         configMethods:
             -   name: "withIndexIP"
                 args:
@@ -265,6 +242,8 @@ bolts:
                     - ref: "metricConfig"
     -   id: "errorIndexingBolt"
         className: "org.apache.metron.indexing.TelemetryIndexingBolt"
+        constructorArgs:
+            - "${kafka.zk}"
         configMethods:
             -   name: "withIndexIP"
                 args:
@@ -293,15 +272,19 @@ bolts:
             -   name: "withMetricConfiguration"
                 args:
                     - ref: "metricConfig"
-    # Threat Intel Bolts
+# Threat Intel Bolts
     -   id: "threatIntelSplitBolt"
         className: "org.apache.metron.enrichment.bolt.EnrichmentSplitterBolt"
+        constructorArgs:
+            - "${kafka.zk}"
         configMethods:
             -   name: "withEnrichments"
                 args:
                     - ref: "threatIntels"
     -   id: "ipThreatIntelBolt"
         className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+        constructorArgs:
+            - "${kafka.zk}"
         configMethods:
             -   name: "withEnrichment"
                 args:
@@ -312,29 +295,21 @@ bolts:
                 args: [10]
     -   id: "threatIntelJoinBolt"
         className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
+        constructorArgs:
+            - "${kafka.zk}"
         configMethods:
             -   name: "withEnrichments"
                 args:
                     - ref: "threatIntels"
-            -   name: "withType"
-                args:
-                    - "alerts"
             -   name: "withMaxCacheSize"
                 args: [10000]
             -   name: "withMaxTimeRetain"
                 args: [10]
-#    -   id: "geoEnrichmentBolt"
-#        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-#        configMethods:
-#            -   name: "withEnrichment"
-#                args:
-#                    - ref: "geoEnrichment"
-#            -   name: "withMaxCacheSize"
-#                args: [10000]
-#            -   name: "withMaxTimeRetain"
-#                args: [10]
+# Enrichment Bolts
     -   id: "hostEnrichmentBolt"
         className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+        constructorArgs:
+            - "${kafka.zk}"
         configMethods:
             -   name: "withEnrichment"
                 args:
@@ -345,6 +320,8 @@ bolts:
                 args: [10]
     -   id: "joinBolt"
         className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
+        constructorArgs:
+            - "${kafka.zk}"
         configMethods:
         -   name: "withEnrichments"
             args:
@@ -355,11 +332,13 @@ bolts:
             args: [10]
 
 streams:
+#parser
     -   name: "spout -> parser"
         from: "kafkaSpout"
         to: "parserBolt"
         grouping:
             type: SHUFFLE
+#hbase
     -   name: "parser -> hbase"
         from: "parserBolt"
         to: "hbaseBolt"
@@ -367,6 +346,7 @@ streams:
             streamId: "raw"
             type: FIELDS
             args: ["key"]
+#enrichment
     -   name: "parser -> host"
         from: "parserBolt"
         to: "hostEnrichmentBolt"
@@ -374,13 +354,6 @@ streams:
             streamId: "host"
             type: FIELDS
             args: ["key"]
-#    -   name: "parser -> geo"
-#        from: "parserBolt"
-#        to: "geoEnrichmentBolt"
-#        grouping:
-#            streamId: "geo"
-#            type: FIELDS
-#            args: ["key"]
     -   name: "parser -> join"
         from: "parserBolt"
         to: "joinBolt"
@@ -388,13 +361,6 @@ streams:
             streamId: "message"
             type: FIELDS
             args: ["key"]
-#    -   name: "geo -> join"
-#        from: "geoEnrichmentBolt"
-#        to: "joinBolt"
-#        grouping:
-#            streamId: "geo"
-#            type: FIELDS
-#            args: ["key"]
     -   name: "host -> join"
         from: "hostEnrichmentBolt"
         to: "joinBolt"
@@ -402,6 +368,7 @@ streams:
             streamId: "host"
             type: FIELDS
             args: ["key"]
+
 #threat intel
     -   name: "enrichmentJoin -> threatSplit"
         from: "joinBolt"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/snort/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/snort/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/snort/remote.yaml
index 7f52d0f..3354d73 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/snort/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/snort/remote.yaml
@@ -47,16 +47,6 @@ components:
                 value: -1
 
 spouts:
-    -   id: "testingSpout"
-        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
-        parallelism: 1
-        configMethods:
-            -   name: "withFilename"
-                args:
-                    - "SampleInput/YafExampleOutput"
-            -   name: "withRepeating"
-                args:
-                    - true
     -   id: "kafkaSpout"
         className: "storm.kafka.KafkaSpout"
         constructorArgs:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/snort/test.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/snort/test.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/snort/test.yaml
index bdbea97..2734ead 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/snort/test.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/snort/test.yaml
@@ -42,21 +42,11 @@ components:
             - "${spout.kafka.topic.snort}"
         properties:
             -   name: "ignoreZkOffsets"
-                value: true
+                value: false
             -   name: "startOffsetTime"
                 value: -2
 
 spouts:
-    -   id: "testingSpout"
-        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
-        parallelism: 1
-        configMethods:
-            -   name: "withFilename"
-                args:
-                    - "SampleInput/YafExampleOutput"
-            -   name: "withRepeating"
-                args:
-                    - false
     -   id: "kafkaSpout"
         className: "storm.kafka.KafkaSpout"
         constructorArgs:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/remote.yaml
index 98395e9..f1a8ea5 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/remote.yaml
@@ -14,11 +14,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-name: "yaf-test"
+name: "yaf"
 config:
     topology.workers: 1
 
-
 components:
     -   id: "parser"
         className: "org.apache.metron.parsing.parsers.GrokParser"
@@ -63,16 +62,6 @@ components:
                 value: 1000000
 
 spouts:
-    -   id: "testingSpout"
-        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
-        parallelism: 1
-        configMethods:
-            -   name: "withFilename"
-                args:
-                    - "SampleInput/YafExampleOutput"
-            -   name: "withRepeating"
-                args:
-                    - true
     -   id: "kafkaSpout"
         className: "storm.kafka.KafkaSpout"
         constructorArgs:
@@ -83,7 +72,7 @@ bolts:
         className: "org.apache.metron.bolt.ParserBolt"
         constructorArgs:
             - "${kafka.zk}"
-            - "yaf"
+            - "${spout.kafka.topic.yaf}"
             - ref: "parser"
             - ref: "writer"
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/test.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/test.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/test.yaml
index 021d3f8..fe764d4 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/test.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/test.yaml
@@ -63,16 +63,6 @@ components:
                 value: 1000000
 
 spouts:
-    -   id: "testingSpout"
-        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
-        parallelism: 1
-        configMethods:
-            -   name: "withFilename"
-                args:
-                    - "SampleInput/YafExampleOutput"
-            -   name: "withRepeating"
-                args:
-                    - false
     -   id: "kafkaSpout"
         className: "storm.kafka.KafkaSpout"
         constructorArgs:


[3/3] incubator-metron git commit: METRON-58 Remediate Deployment Integration Testing Issues (dlyle65535 via cestella) closes apache/incubator-metron#36

Posted by ce...@apache.org.
METRON-58 Remediate Deployment Integration Testing Issues (dlyle65535 via cestella) closes apache/incubator-metron#36


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/2e9f2c6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/2e9f2c6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/2e9f2c6c

Branch: refs/heads/master
Commit: 2e9f2c6ceac70fb19d38281a1abe1d3ee0d088bb
Parents: 6638a71
Author: dlyle65535 <dl...@gmail.com>
Authored: Mon Mar 7 18:24:31 2016 -0500
Committer: cstella <ce...@gmail.com>
Committed: Mon Mar 7 18:24:31 2016 -0500

----------------------------------------------------------------------
 .../inventory/metron_example/group_vars/all     |   6 -
 .../inventory/multinode-vagrant/group_vars/all  |   6 -
 .../inventory/singlenode-vagrant/group_vars/all |  11 +-
 deployment/playbooks/ambari_install.yml         |  14 +-
 deployment/playbooks/metron_full_install.yml    |   4 +
 deployment/playbooks/metron_install.yml         |  17 +-
 deployment/roles/ambari_common/tasks/main.yml   |   2 +-
 .../roles/ambari_config/defaults/main.yml       |  30 +++
 .../roles/ambari_config/vars/single_node_vm.yml |  40 ++--
 .../roles/ambari_master/defaults/main.yml       |  19 ++
 deployment/roles/ambari_master/tasks/main.yml   |   7 +
 deployment/roles/bro/defaults/main.yml          |  20 ++
 deployment/roles/bro/tasks/main.yml             |   8 +
 .../roles/elasticsearch/defaults/main.yml       |  20 ++
 deployment/roles/elasticsearch/tasks/main.yml   |   4 +-
 deployment/roles/hadoop_setup/defaults/main.yml |  25 ++
 deployment/roles/hadoop_setup/tasks/main.yml    |   3 +-
 .../roles/metron_common/defaults/main.yml       |  19 ++
 deployment/roles/metron_common/vars/main.yml    |  19 --
 .../roles/metron_streaming/defaults/main.yml    |  31 +++
 .../roles/metron_streaming/files/extractor.json |  11 +
 .../files/source/bro-config.json                |  14 ++
 .../files/source/pcap-config.json               |  14 ++
 .../files/source/snort-config.json              |  14 ++
 .../files/source/yaf-config.json                |  14 ++
 .../roles/metron_streaming/handlers/main.yml    |   4 +-
 .../metron_streaming/tasks/full_topology.yml    |  26 +++
 .../roles/metron_streaming/tasks/main.yml       |  24 +-
 .../metron_streaming/tasks/small_topology.yml   |  26 +++
 .../metron_streaming/tasks/source_config.yml    |  31 +++
 .../metron_streaming/tasks/threat_intel.yml     |  48 ++++
 .../metron_streaming/templates/threat_ip.csv    |  37 +++
 deployment/roles/mysql/files/geoip_ddl.sql      |  49 ----
 deployment/roles/mysql/files/mylogin.cnf        |  19 --
 .../mysql57-community-release-el6-7.noarch.rpm  | Bin 8848 -> 0 bytes
 deployment/roles/mysql/handlers/main.yml        |  19 --
 deployment/roles/mysql/tasks/main.yml           |  85 -------
 deployment/roles/mysql/templates/.my.cnf        |  20 --
 deployment/roles/mysql/vars/main.yml            |  20 --
 deployment/roles/mysql_client/tasks/main.yml    |  34 +++
 .../roles/mysql_client/templates/db_config.sql  |  21 ++
 .../roles/mysql_server/files/geoip_ddl.sql      |  49 ++++
 .../mysql57-community-release-el6-7.noarch.rpm  | Bin 0 -> 8848 bytes
 deployment/roles/mysql_server/handlers/main.yml |  19 ++
 deployment/roles/mysql_server/tasks/main.yml    |  86 +++++++
 deployment/roles/mysql_server/templates/.my.cnf |  20 ++
 deployment/roles/mysql_server/vars/main.yml     |  20 ++
 deployment/roles/pcap_replay/defaults/main.yml  |  21 ++
 .../roles/pcap_replay/templates/pcap-replay     |   2 +-
 deployment/roles/pcap_replay/vars/main.yml      |  21 --
 .../roles/tap_interface/defaults/main.yml       |  19 ++
 deployment/roles/tap_interface/tasks/main.yml   |  30 +++
 deployment/roles/yaf/defaults/main.yml          |  29 +++
 deployment/roles/yaf/tasks/main.yml             |   9 +-
 deployment/roles/yaf/vars/main.yml              |  22 --
 metron-streaming/Metron-Common/pom.xml          |   5 +
 .../metron/bolt/BulkMessageWriterBolt.java      |   5 +-
 .../java/org/apache/metron/bolt/JoinBolt.java   |   1 +
 .../org/apache/metron/domain/Enrichment.java    |  11 +
 .../java/org/apache/metron/utils/JSONUtils.java |  70 ++++++
 .../resources/config/source/bro-config.json     |   9 +-
 .../resources/config/source/snort-config.json   |   9 +-
 .../resources/config/source/yaf-config.json     |   9 +-
 .../dataloads/bulk/ThreatIntelBulkLoader.java   |   2 +-
 .../enrichment/bolt/EnrichmentJoinBolt.java     |  19 +-
 .../enrichment/bolt/EnrichmentSplitterBolt.java |   2 +
 .../enrichment/bolt/GenericEnrichmentBolt.java  |   6 +-
 .../enrichment/bolt/ThreatIntelJoinBolt.java    |  13 +-
 .../metron/threatintel/ThreatIntelAdapter.java  |   2 +-
 .../metron/indexing/TelemetryIndexingBolt.java  |  11 +
 .../metron/writer/ElasticsearchWriter.java      |  10 +-
 .../metron/parsing/parsers/BasicBroParser.java  |   7 +-
 .../parsing/parsers/BasicSnortParser.java       |   1 +
 .../metron/parsing/test/BasicBroParserTest.java |  22 ++
 .../src/test/resources/BroParserTest.log        |   3 +-
 .../util/integration/ComponentRunner.java       |  33 ++-
 metron-streaming/Metron-Topologies/pom.xml      |  17 +-
 .../apache/metron/utils/SourceConfigUtils.java  |  37 ++-
 .../Metron_Configs/etc/env/config.properties    |   3 +-
 .../Metron_Configs/topologies/bro/remote.yaml   |  15 +-
 .../Metron_Configs/topologies/bro/test.yaml     |  10 -
 .../topologies/enrichment/remote.yaml           |  13 +-
 .../topologies/paloalto/test.yaml               |   2 +-
 .../Metron_Configs/topologies/pcap/parse.yaml   |   4 +-
 .../Metron_Configs/topologies/pcap/remote.yaml  |  97 +++-----
 .../Metron_Configs/topologies/snort/remote.yaml |  10 -
 .../Metron_Configs/topologies/snort/test.yaml   |  12 +-
 .../Metron_Configs/topologies/yaf/remote.yaml   |  15 +-
 .../Metron_Configs/topologies/yaf/test.yaml     |  10 -
 .../src/main/resources/SampleIndexed/YafIndexed |  20 +-
 .../src/main/resources/SampleParsed/SnortParsed |   6 +-
 .../integration/EnrichmentIntegrationTest.java  | 226 +++++++++++++++++--
 .../integration/ParserIntegrationTest.java      |  15 +-
 .../metron/integration/util/TestUtils.java      |   1 -
 .../integration/util/mock/MockGeoAdapter.java   |  26 ++-
 pom.xml                                         |   3 +-
 96 files changed, 1382 insertions(+), 592 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/inventory/metron_example/group_vars/all
----------------------------------------------------------------------
diff --git a/deployment/inventory/metron_example/group_vars/all b/deployment/inventory/metron_example/group_vars/all
index b8cf9dc..3a26769 100644
--- a/deployment/inventory/metron_example/group_vars/all
+++ b/deployment/inventory/metron_example/group_vars/all
@@ -31,12 +31,6 @@ pcap_hbase_table: pcap
 tracker_hbase_table: access_tracker
 threatintel_ip_hbase_table: malicious_ip
 
-# kafka
-pycapa_topic: pcap
-bro_topic: bro
-yaf_topic: ipfix
-snort_topic: snort
-
 #elasticsearch
 elasticsearch_transport_port: 9300
 elasticsearch_network_interface: eth0

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/inventory/multinode-vagrant/group_vars/all
----------------------------------------------------------------------
diff --git a/deployment/inventory/multinode-vagrant/group_vars/all b/deployment/inventory/multinode-vagrant/group_vars/all
index bb41e89..fc3b56d 100644
--- a/deployment/inventory/multinode-vagrant/group_vars/all
+++ b/deployment/inventory/multinode-vagrant/group_vars/all
@@ -28,12 +28,6 @@ pcap_hbase_table: pcap
 tracker_hbase_table: access_tracker
 threatintel_ip_hbase_table: malicious_ip
 
-# kafka
-pycapa_topic: pcap
-bro_topic: bro
-yaf_topic: ipfix
-snort_topic: snort
-
 #elasticsearch
 elasticsearch_transport_port: 9300
 elasticsearch_network_interface: eth1

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/inventory/singlenode-vagrant/group_vars/all
----------------------------------------------------------------------
diff --git a/deployment/inventory/singlenode-vagrant/group_vars/all b/deployment/inventory/singlenode-vagrant/group_vars/all
index 1e08a6a..6405eea 100644
--- a/deployment/inventory/singlenode-vagrant/group_vars/all
+++ b/deployment/inventory/singlenode-vagrant/group_vars/all
@@ -28,12 +28,6 @@ pcap_hbase_table: pcap
 tracker_hbase_table: access_tracker
 threatintel_ip_hbase_table: malicious_ip
 
-# kafka
-pycapa_topic: pcap
-bro_topic: bro
-yaf_topic: ipfix
-snort_topic: snort
-
 #elasticsearch
 elasticsearch_transport_port: 9300
 elasticsearch_network_interface: eth1
@@ -55,7 +49,7 @@ snort_version: "2.9.8.0-1"
 snort_alert_csv_path: "/var/log/snort/alert.csv"
 
 #PCAP Replay
-pcap_replay: True
+pcap_replay: False
 pcap_replay_interface: eth1
 
 #data directories - only required to override defaults
@@ -73,3 +67,6 @@ storm_local_dir: "/data1/hadoop/storm"
 kafka_log_dirs: "/data1/kafka-log"
 elasticsearch_data_dir: "/data1/elasticsearch,/data2/elasticsearch"
 
+ambari_server_mem: 512
+topology_name: small_topology.yml
+threat_intel_bulk_load: False

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/playbooks/ambari_install.yml
----------------------------------------------------------------------
diff --git a/deployment/playbooks/ambari_install.yml b/deployment/playbooks/ambari_install.yml
index e1da427..c7f8249 100644
--- a/deployment/playbooks/ambari_install.yml
+++ b/deployment/playbooks/ambari_install.yml
@@ -19,17 +19,29 @@
   sudo: yes
   roles:
     - role: ambari_common
+  tags:
+    - ambari-prereqs
+    - hdp-install
 
 - hosts: ambari_master
   sudo: yes
   roles:
-    - role: ambari_master
+    - role:  ambari_master
+  tags:
+    - ambari-server
+    - hdp-install
 
 - hosts: ambari_slave
   sudo: yes
   roles:
     - role: ambari_slave
+  tags:
+    - ambari-agent
+    - hdp-install
 
 - hosts: ambari_master
   roles:
     - role: ambari_config
+  tags:
+    - hdp-install
+    - hdp-deploy

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/playbooks/metron_full_install.yml
----------------------------------------------------------------------
diff --git a/deployment/playbooks/metron_full_install.yml b/deployment/playbooks/metron_full_install.yml
index 38203da..26ffd62 100644
--- a/deployment/playbooks/metron_full_install.yml
+++ b/deployment/playbooks/metron_full_install.yml
@@ -16,4 +16,8 @@
 #
 ---
 - include: ambari_install.yml
+  tags:
+    - ambari
 - include: metron_install.yml
+  tags:
+    - metron

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/playbooks/metron_install.yml
----------------------------------------------------------------------
diff --git a/deployment/playbooks/metron_install.yml b/deployment/playbooks/metron_install.yml
index ad070c9..b8646fc 100644
--- a/deployment/playbooks/metron_install.yml
+++ b/deployment/playbooks/metron_install.yml
@@ -19,12 +19,16 @@
   sudo: yes
   roles:
     - role: metron_common
+  tags:
+    - metron-prereqs
 
 - hosts: hadoop_client
   sudo: yes
   roles:
     - role: ambari_gather_facts
     - role: hadoop_setup
+  tags:
+    - metron-prereqs
 
 - hosts: search
   sudo: yes
@@ -38,13 +42,22 @@
 - hosts: mysql
   sudo: yes
   roles:
-    - role: mysql
+    - role: mysql_server
   tags:
-    - mysql
+    - mysql-server
+
+- hosts: ambari_slave
+  sudo: yes
+  roles:
+    - role: mysql_client
+  tags:
+    - mysql-client
+
 
 - hosts: sensors
   sudo: yes
   roles:
+    - { role: tap_interface, when: install_tap | default(False) == True }
     - role: ambari_gather_facts
     - role: flume
     - role: pycapa

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/ambari_common/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/ambari_common/tasks/main.yml b/deployment/roles/ambari_common/tasks/main.yml
index 992468e..35f3fce 100644
--- a/deployment/roles/ambari_common/tasks/main.yml
+++ b/deployment/roles/ambari_common/tasks/main.yml
@@ -59,7 +59,7 @@
 - name: install epel-repo rpm
   yum: pkg=/tmp/epel-release.rpm state=installed
 
-- name: Download HDP repo
+- name: Download Ambari repo
   get_url: url="{{ rhel_ambari_install_url }}" dest=/etc/yum.repos.d/ambari.repo
 
 - name: Clean yum

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/ambari_config/defaults/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/ambari_config/defaults/main.yml b/deployment/roles/ambari_config/defaults/main.yml
new file mode 100644
index 0000000..507b6e3
--- /dev/null
+++ b/deployment/roles/ambari_config/defaults/main.yml
@@ -0,0 +1,30 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+---
+zookeeper_data_dir: /hadoop/zookeeper
+namenode_checkpoint_dir: /hadoop/hdfs/namesecondary
+namenode_name_dir: /hadoop/hdfs/namenode
+datanode_data_dir: /hadoop/hdfs/data
+journalnode_edits_dir: /hadoop/hdfs/journalnode
+jhs_recovery_store_ldb_path: /hadoop/mapreduce/jhs
+nodemanager_local_dirs: /hadoop/yarn/local
+timeline_ldb_store_path: /hadoop/yarn/timeline
+timeline_ldb_state_path: /hadoop/yarn/timeline
+nodemanager_log_dirs: /hadoop/yarn/log
+storm_local_dir: /hadoop/storm
+kafka_log_dirs: /kafka-log
+cluster_type: small_cluster

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/ambari_config/vars/single_node_vm.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/ambari_config/vars/single_node_vm.yml b/deployment/roles/ambari_config/vars/single_node_vm.yml
index d0d3b78..6b18825 100644
--- a/deployment/roles/ambari_config/vars/single_node_vm.yml
+++ b/deployment/roles/ambari_config/vars/single_node_vm.yml
@@ -29,47 +29,49 @@ zookeeper_slave: [ZOOKEEPER_CLIENT]
 hbase_master: [HBASE_MASTER, HBASE_CLIENT]
 hbase_slave: [HBASE_REGIONSERVER]
 
-metron_components: "{{ hadoop_master | union(zookeeper_master) | union(storm_master) | union(spark_master) | union(hbase_master) | union(hadoop_slave) | union(zookeeper_slave) | union(storm_slave) | union(spark_slave) | union(kafka_broker) | union(hbase_slave) }}"
+metron_components: "{{ hadoop_master | union(zookeeper_master) | union(storm_master) | union(hbase_master) | union(hadoop_slave) | union(zookeeper_slave) | union(storm_slave) | union(kafka_broker) | union(hbase_slave) }}"
 
 cluster_name: "metron_cluster"
 blueprint_name: "metron_blueprint"
 
 configurations:
   - zoo.cfg:
-      dataDir: '{{ zookeeper_data_dir | default("/hadoop/zookeeper") }}'
+      dataDir: '{{ zookeeper_data_dir }}'
   - hadoop-env:
-      namenode_heapsize: 1024
-      dtnode_heapsize: 1024
+      hadoop_heapsize: 1024
+      namenode_heapsize: 512
+      dtnode_heapsize: 512
+      namenode_opt_permsize: 128m
   - hbase-env:
-      hbase_regionserver_heapsize: 1024
-      hbase_master_heapsize: 1024
+      hbase_regionserver_heapsize: 512
+      hbase_master_heapsize: 512
+      hbase_regionserver_xmn_max: 512
   - hdfs-site:
-      dfs.namenode.checkpoint.dir: '{{ namenode_checkpoint_dir | default("/hadoop/hdfs/namesecondary") }}'
-      dfs.namenode.name.dir: '{{ namenode_name_dir | default("/hadoop/hdfs/namenode") }}'
-      dfs.datanode.data.dir: '{{ datanode_data_dir | default("/hadoop/hdfs/data" ) }}'
-      dfs.journalnode.edits.dir: '{{ journalnode_edits_dir | default("/hadoop/hdfs/journalnode") }}'
+      dfs.namenode.checkpoint.dir: '{{ namenode_checkpoint_dir  }}'
+      dfs.namenode.name.dir: '{{ namenode_name_dir }}'
+      dfs.datanode.data.dir: '{{ datanode_data_dir }}'
+      dfs.journalnode.edits.dir: '{{ journalnode_edits_dir }}'
   - yarn-env:
       nodemanager_heapsize: 512
       yarn_heapsize: 512
       apptimelineserver_heapsize : 512
+      resourcemanager_heapsize: 1024
   - mapred-env:
       jobhistory_heapsize: 256
   - mapred-site:
-      mapreduce.jobhistory.recovery.store.leveldb.path : '{{ jhs_recovery_store_ldb_path | default("/hadoop/mapreduce/jhs") }}'
+      mapreduce.jobhistory.recovery.store.leveldb.path : '{{ jhs_recovery_store_ldb_path }}'
   - yarn-site:
-      yarn.nodemanager.resource.memory-mb: 1024
-      yarn.scheduler.maximum-allocation-mb: 1024
-      yarn.nodemanager.local-dirs : '{{ nodemanager_local_dirs| default("/hadoop/yarn/local") }}'
-      yarn.timeline-service.leveldb-timeline-store.path: '{{ timeline_ldb_store_path | default("/hadoop/yarn/timeline") }}'
-      yarn.timeline-service.leveldb-state-store.path: '{{ timeline_ldb_state_path| default("/hadoop/yarn/timeline") }}'
-      yarn.nodemanager.log-dirs: '{{ nodemanager_log_dirs| default("/hadoop/yarn/log") }}'
+      yarn.nodemanager.local-dirs : '{{ nodemanager_local_dirs }}'
+      yarn.timeline-service.leveldb-timeline-store.path: '{{ timeline_ldb_store_path }}'
+      yarn.timeline-service.leveldb-state-store.path: '{{ timeline_ldb_state_path }}'
+      yarn.nodemanager.log-dirs: '{{ nodemanager_log_dirs }}'
   - storm-site:
       supervisor.slots.ports: "[6700, 6701, 6702, 6703]"
-      storm.local.dir: '{{ storm_local_dir | default("/hadoop/storm") }}'
+      storm.local.dir: '{{ storm_local_dir }}'
   - kafka-env:
       content: "{% raw %}\n#!/bin/bash\n\n# Set KAFKA specific environment variables here.\n\n# The java implementation to use.\nexport KAFKA_HEAP_OPTS=\"-Xms256M -Xmx256M\"\nexport KAFKA_JVM_PERFORMANCE_OPTS=\"-server -XX:+UseG1GC -XX:+DisableExplicitGC -Djava.awt.headless=true\"\nexport JAVA_HOME={{java64_home}}\nexport PATH=$PATH:$JAVA_HOME/bin\nexport PID_DIR={{kafka_pid_dir}}\nexport LOG_DIR={{kafka_log_dir}}\nexport KAFKA_KERBEROS_PARAMS={{kafka_kerberos_params}}\n# Add kafka sink to classpath and related depenencies\nif [ -e \"/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar\" ]; then\n  export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar\n  export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/lib/*\nfi\nif [ -f /etc/kafka/conf/kafka-ranger-env.sh ]; then\n   . /etc/kafka/conf/kafka-ranger-env.sh\nfi{% endraw %}"
   - kafka-broker:
-      log.dirs: '{{ kafka_log_dirs | default("/kafka-log") }}'
+      log.dirs: '{{ kafka_log_dirs }}'
 
 blueprint:
   stack_name: HDP

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/ambari_master/defaults/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/ambari_master/defaults/main.yml b/deployment/roles/ambari_master/defaults/main.yml
new file mode 100644
index 0000000..3b8cc73
--- /dev/null
+++ b/deployment/roles/ambari_master/defaults/main.yml
@@ -0,0 +1,19 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+---
+ambari_server_mem: 2048
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/ambari_master/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/ambari_master/tasks/main.yml b/deployment/roles/ambari_master/tasks/main.yml
index 8c78f06..daf4e41 100644
--- a/deployment/roles/ambari_master/tasks/main.yml
+++ b/deployment/roles/ambari_master/tasks/main.yml
@@ -24,6 +24,13 @@
   register: ambari_server_setup
   failed_when: ambari_server_setup.stderr
 
+- name: Set Ambari Server Max Memory
+  replace:
+    dest: /var/lib/ambari-server/ambari-env.sh
+    regexp:  "\ -Xmx2048m\ "
+    replace: " -Xmx{{ ambari_server_mem }}m "
+    backup: no
+
 - name: start ambari server
   service: name=ambari-server state=restarted
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/bro/defaults/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/bro/defaults/main.yml b/deployment/roles/bro/defaults/main.yml
new file mode 100644
index 0000000..c7a2c1f
--- /dev/null
+++ b/deployment/roles/bro/defaults/main.yml
@@ -0,0 +1,20 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+---
+bro_crontab_minutes: 0-59/5
+bro_crontab_job: /usr/local/bro/bin/broctl cron
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/bro/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/bro/tasks/main.yml b/deployment/roles/bro/tasks/main.yml
index 04dfe8f..0191052 100644
--- a/deployment/roles/bro/tasks/main.yml
+++ b/deployment/roles/bro/tasks/main.yml
@@ -31,6 +31,7 @@
     - python-devel
     - swig
     - zlib-devel
+    - perl
 
 - include: librdkafka.yml
 
@@ -46,3 +47,10 @@
 
 - name: Start bro
   shell: /usr/local/bro/bin/broctl start
+
+- name: Bro Cronjob
+  cron:
+    name: Bro Cron
+    minute: "{{ bro_crontab_minutes }}"
+    job: "{{ bro_crontab_job }}"
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/elasticsearch/defaults/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/elasticsearch/defaults/main.yml b/deployment/roles/elasticsearch/defaults/main.yml
new file mode 100644
index 0000000..d91fa1a
--- /dev/null
+++ b/deployment/roles/elasticsearch/defaults/main.yml
@@ -0,0 +1,20 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+---
+elasticsearch_data_dir: /var/lib/elasticsearch
+elasticsearch_network_interface: eth0
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/elasticsearch/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/elasticsearch/tasks/main.yml b/deployment/roles/elasticsearch/tasks/main.yml
index 555666b..fa8d4f3 100644
--- a/deployment/roles/elasticsearch/tasks/main.yml
+++ b/deployment/roles/elasticsearch/tasks/main.yml
@@ -55,10 +55,10 @@
   with_items:
     - { regexp: '#cluster\.name', line: 'cluster.name: metron' }
     - { regexp: '#network\.host:', line: 'network.host: _{{
-    elasticsearch_network_interface | default("eth0") }}:ipv4_' }
+    elasticsearch_network_interface  }}:ipv4_' }
     - { regexp: '#discovery\.zen\.ping\.unicast\.hosts',
     line: 'discovery.zen.ping.unicast.hosts: [ {{ es_hosts }} ]'}
-    - { regexp: '#path\.data', line: 'path.data: {{     elasticsearch_data_dir | default("/var/lib/elasticsearch")}}' }
+    - { regexp: '#path\.data', line: 'path.data: {{     elasticsearch_data_dir }}' }
   notify: restart elasticsearch
 
 - name: Start Elasticsearch.

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/hadoop_setup/defaults/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/hadoop_setup/defaults/main.yml b/deployment/roles/hadoop_setup/defaults/main.yml
new file mode 100644
index 0000000..c783cea
--- /dev/null
+++ b/deployment/roles/hadoop_setup/defaults/main.yml
@@ -0,0 +1,25 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+---
+num_partitions: 1
+retention_in_gb: 10
+pycapa_topic: pcap
+bro_topic: bro
+yaf_topic: ipfix
+snort_topic: snort
+enrichments_topic: enrichments
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/hadoop_setup/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/hadoop_setup/tasks/main.yml b/deployment/roles/hadoop_setup/tasks/main.yml
index 5e77b99..5b6c47c 100644
--- a/deployment/roles/hadoop_setup/tasks/main.yml
+++ b/deployment/roles/hadoop_setup/tasks/main.yml
@@ -26,10 +26,11 @@
 
 #if kafka topic
 - name: Create Kafka topics
-  shell: "{{ kafka_home }}/bin/kafka-topics.sh --zookeeper {{ zookeeper_url }} --create --topic {{ item }} --partitions 1 --replication-factor 1"
+  shell: "{{ kafka_home }}/bin/kafka-topics.sh --zookeeper {{ zookeeper_url }} --create --topic {{ item }} --partitions {{ num_partitions }} --replication-factor 1 --config retention.bytes={{ retention_in_gb * 1024 * 1024 * 1024}}"
   ignore_errors: yes
   with_items:
     - "{{ pycapa_topic }}"
     - "{{ bro_topic }}"
     - "{{ yaf_topic }}"
     - "{{ snort_topic }}"
+    - "{{ enrichments_topic }}"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/metron_common/defaults/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_common/defaults/main.yml b/deployment/roles/metron_common/defaults/main.yml
new file mode 100644
index 0000000..50aaefd
--- /dev/null
+++ b/deployment/roles/metron_common/defaults/main.yml
@@ -0,0 +1,19 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+---
+metron_jar_name: Metron-Topologies-{{ metron_version }}.jar
+metron_jar_path: "{{ playbook_dir }}/../../metron-streaming/Metron-Topologies/target/{{ metron_jar_name }}"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/metron_common/vars/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_common/vars/main.yml b/deployment/roles/metron_common/vars/main.yml
deleted file mode 100644
index 50aaefd..0000000
--- a/deployment/roles/metron_common/vars/main.yml
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-#  Licensed to the Apache Software Foundation (ASF) under one or more
-#  contributor license agreements.  See the NOTICE file distributed with
-#  this work for additional information regarding copyright ownership.
-#  The ASF licenses this file to You under the Apache License, Version 2.0
-#  (the "License"); you may not use this file except in compliance with
-#  the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-#
----
-metron_jar_name: Metron-Topologies-{{ metron_version }}.jar
-metron_jar_path: "{{ playbook_dir }}/../../metron-streaming/Metron-Topologies/target/{{ metron_jar_name }}"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/metron_streaming/defaults/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/defaults/main.yml b/deployment/roles/metron_streaming/defaults/main.yml
new file mode 100644
index 0000000..cb425f9
--- /dev/null
+++ b/deployment/roles/metron_streaming/defaults/main.yml
@@ -0,0 +1,31 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+---
+source_config_path: "{{ metron_directory }}/config/source"
+threat_intel_bulk_load: True
+threat_intel_bin: "{{ metron_directory }}/bin/threatintel_bulk_load.sh"
+threat_intel_host: "{{ groups.ambari_master[0] }}"
+threat_intel_work_dir: /tmp/ti_bulk
+threat_intel_csv_filename: "threat_ip.csv"
+threat_intel_csv_filepath: "../roles/metron_streaming/templates/{{ threat_intel_csv_filename }}"
+
+topology_name: full_topology.yml
+pycapa_topic: pcap
+bro_topic: bro
+yaf_topic: ipfix
+snort_topic: snort
+enrichments_topic: enrichments

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/metron_streaming/files/extractor.json
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/files/extractor.json b/deployment/roles/metron_streaming/files/extractor.json
new file mode 100644
index 0000000..81429e8
--- /dev/null
+++ b/deployment/roles/metron_streaming/files/extractor.json
@@ -0,0 +1,11 @@
+{
+  "config": {
+    "columns": {
+      "ip": 0
+    },
+    "indicator_column": "ip",
+    "separator": ","
+  },
+  "extractor": "CSV"
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/metron_streaming/files/source/bro-config.json
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/files/source/bro-config.json b/deployment/roles/metron_streaming/files/source/bro-config.json
new file mode 100644
index 0000000..34109b8
--- /dev/null
+++ b/deployment/roles/metron_streaming/files/source/bro-config.json
@@ -0,0 +1,14 @@
+{
+  "index": "bro",
+  "batchSize": 5,
+  "enrichmentFieldMap":
+  {
+    "geo": ["ip_dst_addr", "ip_src_addr"],
+    "host": ["host"]
+  },
+  "threatIntelFieldMap":
+  {
+    "ip": ["ip_dst_addr", "ip_src_addr"]
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/metron_streaming/files/source/pcap-config.json
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/files/source/pcap-config.json b/deployment/roles/metron_streaming/files/source/pcap-config.json
new file mode 100644
index 0000000..4b9c639
--- /dev/null
+++ b/deployment/roles/metron_streaming/files/source/pcap-config.json
@@ -0,0 +1,14 @@
+{
+  "index": "pcap",
+  "batchSize": 5,
+  "enrichmentFieldMap":
+  {
+    "geo": ["ip_src_addr", "ip_dst_addr"],
+    "host": ["ip_src_addr", "ip_dst_addr"]
+  },
+  "threatIntelFieldMap":
+  {
+    "ip": ["ip_src_addr", "ip_dst_addr"]
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/metron_streaming/files/source/snort-config.json
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/files/source/snort-config.json b/deployment/roles/metron_streaming/files/source/snort-config.json
new file mode 100644
index 0000000..1208637
--- /dev/null
+++ b/deployment/roles/metron_streaming/files/source/snort-config.json
@@ -0,0 +1,14 @@
+{
+  "index": "snort",
+  "batchSize": 1,
+  "enrichmentFieldMap":
+  {
+    "geo": ["ip_dst_addr", "ip_src_addr"],
+    "host": ["host"]
+  },
+  "threatIntelFieldMap":
+  {
+    "ip": ["ip_dst_addr", "ip_src_addr"]
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/metron_streaming/files/source/yaf-config.json
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/files/source/yaf-config.json b/deployment/roles/metron_streaming/files/source/yaf-config.json
new file mode 100644
index 0000000..65de961
--- /dev/null
+++ b/deployment/roles/metron_streaming/files/source/yaf-config.json
@@ -0,0 +1,14 @@
+{
+  "index": "yaf",
+  "batchSize": 5,
+  "enrichmentFieldMap":
+  {
+    "geo": ["ip_dst_addr", "ip_src_addr"],
+    "host": ["host"]
+  },
+  "threatIntelFieldMap":
+  {
+    "ip": ["ip_dst_addr", "ip_src_addr"]
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/metron_streaming/handlers/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/handlers/main.yml b/deployment/roles/metron_streaming/handlers/main.yml
index 112c5ca..634d591 100644
--- a/deployment/roles/metron_streaming/handlers/main.yml
+++ b/deployment/roles/metron_streaming/handlers/main.yml
@@ -15,5 +15,5 @@
 #  limitations under the License.
 #
 ---
-- name: restart elasticsearch
-  service: name=elasticsearch state=restarted
+- name: Load Source Config
+  shell: java -cp {{ metron_directory }}/lib/{{ metron_jar_name }}::/usr/hdp/current/hadoop-client/lib/slf4j-api-1.7.10.jar org.apache.metron.utils.SourceConfigUtils -p {{ source_config_path }} -z {{ zookeeper_url }} && touch {{ source_config_path }}/configured

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/metron_streaming/tasks/full_topology.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/tasks/full_topology.yml b/deployment/roles/metron_streaming/tasks/full_topology.yml
new file mode 100644
index 0000000..060caf8
--- /dev/null
+++ b/deployment/roles/metron_streaming/tasks/full_topology.yml
@@ -0,0 +1,26 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+---
+
+- name: Submit Metron topologies
+  command: storm jar {{ metron_directory }}/lib/{{ metron_jar_name }} org.apache.storm.flux.Flux --remote {{ item }} --filter {{ metron_properties_config_path }}
+  with_items:
+    - "{{ metron_directory }}/config/topologies/bro/remote.yaml"
+    - "{{ metron_directory }}/config/topologies/snort/remote.yaml"
+    - "{{ metron_directory }}/config/topologies/yaf/remote.yaml"
+    - "{{ metron_directory }}/config/topologies/pcap/parse.yaml"
+    - "{{ metron_directory }}/config/topologies/enrichment/remote.yaml"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/metron_streaming/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/tasks/main.yml b/deployment/roles/metron_streaming/tasks/main.yml
index 7d6fe9c..c1e1642 100644
--- a/deployment/roles/metron_streaming/tasks/main.yml
+++ b/deployment/roles/metron_streaming/tasks/main.yml
@@ -52,6 +52,10 @@
     - "etc"
     - "topologies"
 
+- name: Get Default mysql passowrd
+  include_vars: "../roles/mysql_server/vars/main.yml"
+  when: mysql_root_password is undefined
+
 - name: Configure Metron topologies
   lineinfile: >
     dest={{ metron_properties_config_path }}
@@ -59,6 +63,7 @@
     line="{{ item.line }}"
   with_items:
     - { regexp: "kafka.zk=", line: "kafka.zk={{ zookeeper_url }}" }
+    - { regexp: "kafka.broker=", line: "kafka.broker={{ kafka_broker_url }}" }
     - { regexp: "es.ip=", line: "es.ip={{ groups.search[0] }}" }
     - { regexp: "es.port=", line: "es.port={{ elasticsearch_transport_port }}" }
     - { regexp: "es.clustername=", line: "es.clustername={{ elasticsearch_cluster_name }}" }
@@ -70,7 +75,8 @@
     - { regexp: "threat.intel.tracker.cf=", line: "threat.intel.tracker.cf=t" }
     - { regexp: "threat.intel.ip.table=", line: "threat.intel.ip.table={{ threatintel_ip_hbase_table }}" }
     - { regexp: "threat.intel.ip.cf=", line: "threat.intel.ip.cf=t" }
-    - { regexp: "mysql.ip=", line: "mysql.ip={{ groups.search[0] }}" }
+    - { regexp: "mysql.ip=", line: "mysql.ip={{ groups.mysql[0] }}" }
+    - { regexp: "mysql.password=", line: "mysql.password={{ mysql_root_password }}" }
 
 - name: Add Elasticsearch templates for topologies
   uri:
@@ -80,11 +86,11 @@
     status_code: 200
     body_format: json
 
-- name: Submit Metron topologies
-  command: storm jar {{ metron_directory }}/lib/{{ metron_jar_name }} org.apache.storm.flux.Flux --remote {{ item }} --filter {{ metron_properties_config_path }}
-  ignore_errors: yes
-  with_items:
-    - "{{ metron_directory }}/config/topologies/pcap/remote.yaml"
-    - "{{ metron_directory }}/config/topologies/bro/remote.yaml"
-    - "{{ metron_directory }}/config/topologies/snort/remote.yaml"
-    - "{{ metron_directory }}/config/topologies/yaf/remote.yaml"
+- include: source_config.yml
+  run_once: true
+- include: threat_intel.yml
+  run_once: true
+  when: threat_intel_bulk_load == True
+
+- include: "{{ topology_name }}"
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/metron_streaming/tasks/small_topology.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/tasks/small_topology.yml b/deployment/roles/metron_streaming/tasks/small_topology.yml
new file mode 100644
index 0000000..6707210
--- /dev/null
+++ b/deployment/roles/metron_streaming/tasks/small_topology.yml
@@ -0,0 +1,26 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+---
+
+- name: Submit Metron topologies
+  command: storm jar {{ metron_directory }}/lib/{{ metron_jar_name }} org.apache.storm.flux.Flux --remote {{ item }} --filter {{ metron_properties_config_path }}
+  with_items:
+    - "{{ metron_directory }}/config/topologies/bro/remote.yaml"
+    - "{{ metron_directory }}/config/topologies/pcap/parse.yaml"
+    - "{{ metron_directory }}/config/topologies/yaf/remote.yaml"
+    - "{{ metron_directory }}/config/topologies/enrichment/remote.yaml"
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/metron_streaming/tasks/source_config.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/tasks/source_config.yml b/deployment/roles/metron_streaming/tasks/source_config.yml
new file mode 100644
index 0000000..9233bac
--- /dev/null
+++ b/deployment/roles/metron_streaming/tasks/source_config.yml
@@ -0,0 +1,31 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+---
+- name: Create Source Config Directory
+  file:
+    path: "{{ source_config_path }}"
+    state: directory
+
+- name: Copy Source Config Files
+  copy:
+    src: "{{ item }}"
+    dest: "{{ source_config_path }}"
+    mode: 0644
+  with_fileglob:
+    - ../roles/metron_streaming/files/source/*.json
+  notify: Load Source Config
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/metron_streaming/tasks/threat_intel.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/tasks/threat_intel.yml b/deployment/roles/metron_streaming/tasks/threat_intel.yml
new file mode 100644
index 0000000..0439e46
--- /dev/null
+++ b/deployment/roles/metron_streaming/tasks/threat_intel.yml
@@ -0,0 +1,48 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+---
+- name: Create root user HDFS directory
+  command: su - hdfs -c "hdfs dfs -mkdir -p /user/root && hdfs dfs -chown root:root /user/root"
+
+- name: Create Bulk load working Directory
+  file:
+    path: "{{ threat_intel_work_dir }}"
+    state: directory
+
+- name: Copy extractor.json to {{ inventory_hostname }}
+  copy:
+    src: ../roles/metron_streaming/files/extractor.json
+    dest: "{{  threat_intel_work_dir }}"
+    mode: 0644
+
+- name: Copy Bulk Load CSV File
+  template:
+    src: "{{ threat_intel_csv_filepath }}"
+    dest: "{{ threat_intel_work_dir }}/{{ threat_intel_csv_filename }}"
+    mode: 0644
+
+- name: Copy Bulk Load CSV File to HDFS
+  command: "hdfs dfs -put {{ threat_intel_work_dir }}/{{ threat_intel_csv_filename }} ."
+
+- name: Run Threat Intel Bulk Load
+  shell: "{{ threat_intel_bin }} -f t --table malicious_ip -e {{ threat_intel_work_dir }}/extractor.json  -i /user/root && touch {{ threat_intel_work_dir }}/loaded"
+  args:
+    creates: "{{ threat_intel_work_dir }}/loaded"
+
+- name: Clean up HDFS File
+  command: "hdfs dfs -rm {{ threat_intel_csv_filename }}"
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/metron_streaming/templates/threat_ip.csv
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/templates/threat_ip.csv b/deployment/roles/metron_streaming/templates/threat_ip.csv
new file mode 100644
index 0000000..3ac38f3
--- /dev/null
+++ b/deployment/roles/metron_streaming/templates/threat_ip.csv
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#Add single column of ip address to alert
+#Public lists are available on the internet
+# example: 
+23.113.113.105
+24.107.205.249
+24.108.62.255
+24.224.153.71
+27.4.1.212
+27.131.149.102
+31.24.30.31
+31.131.251.33
+31.186.99.250
+31.192.209.119
+31.192.209.150
+31.200.244.17
+37.34.52.185
+37.58.112.101
+37.99.146.27
+37.128.132.96
+37.140.195.177
+37.140.199.100

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/mysql/files/geoip_ddl.sql
----------------------------------------------------------------------
diff --git a/deployment/roles/mysql/files/geoip_ddl.sql b/deployment/roles/mysql/files/geoip_ddl.sql
deleted file mode 100644
index 02616c6..0000000
--- a/deployment/roles/mysql/files/geoip_ddl.sql
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
-CREATE DATABASE IF NOT EXISTS GEO;
-
-USE GEO;
-
-DROP TABLE IF EXISTS `blocks`;
-CREATE TABLE  `blocks` ( `startIPNum` int(10) unsigned NOT NULL,`endIPNum` int(10) unsigned NOT NULL,`locID`
-int(10) unsigned NOT NULL, PRIMARY KEY  (`startIPNum`,`endIPNum`) )
-ENGINE=MyISAM DEFAULT CHARSET=latin1 PACK_KEYS=1 DELAY_KEY_WRITE=1;
-
-DROP TABLE IF EXISTS `location`;
-CREATE TABLE  `location` (`locID` int(10) unsigned NOT NULL,`country` char(2) default NULL,`region` char(2)
- default NULL,`city` varchar(45) default NULL,`postalCode` char(7) default NULL,`latitude` double default
-NULL,`longitude` double default NULL,`dmaCode` char(3) default NULL,`areaCode` char(3) default NULL,PRIMARY KEY
-  (`locID`),KEY `Index_Country` (`country`) ) ENGINE=MyISAM DEFAULT CHARSET=latin1 ROW_FORMAT=FIXED;
-
-load data infile '/var/lib/mysql-files/GeoLiteCity-Blocks.csv'  into table `blocks`  fields terminated by ',' optionally enclosed by '"'  lines terminated by '\n' ignore 2 lines;
-load data infile '/var/lib/mysql-files/GeoLiteCity-Location.csv'  into table `location`  fields terminated by ',' optionally enclosed by '"'  lines terminated by '\n' ignore 2 lines;
-
-
-DELIMITER $$
-DROP FUNCTION IF EXISTS `IPTOLOCID` $$
-CREATE FUNCTION `IPTOLOCID`( ip VARCHAR(15)) RETURNS int(10) unsigned
-  BEGIN
-    DECLARE ipn INTEGER UNSIGNED;
-    DECLARE locID_var INTEGER;
-    IF ip LIKE '192.168.%' OR ip LIKE '10.%' THEN RETURN 0;
-    END IF;
-    SET ipn = INET_ATON(ip);
-    SELECT locID INTO locID_var FROM `blocks` INNER JOIN (SELECT MAX(startIPNum) AS start FROM `blocks` WHERE startIPNum <= ipn) AS s ON (startIPNum = s.start) WHERE endIPNum >= ipn;
-    RETURN locID_var;
-  END
-$$
-DELIMITER ;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/mysql/files/mylogin.cnf
----------------------------------------------------------------------
diff --git a/deployment/roles/mysql/files/mylogin.cnf b/deployment/roles/mysql/files/mylogin.cnf
deleted file mode 100644
index b8d5781..0000000
--- a/deployment/roles/mysql/files/mylogin.cnf
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-#  Licensed to the Apache Software Foundation (ASF) under one or more
-#  contributor license agreements.  See the NOTICE file distributed with
-#  this work for additional information regarding copyright ownership.
-#  The ASF licenses this file to You under the Apache License, Version 2.0
-#  (the "License"); you may not use this file except in compliance with
-#  the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-
-[client]
-user=root
-password=P@ssw0rd
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/mysql/files/mysql57-community-release-el6-7.noarch.rpm
----------------------------------------------------------------------
diff --git a/deployment/roles/mysql/files/mysql57-community-release-el6-7.noarch.rpm b/deployment/roles/mysql/files/mysql57-community-release-el6-7.noarch.rpm
deleted file mode 100644
index 8603602..0000000
Binary files a/deployment/roles/mysql/files/mysql57-community-release-el6-7.noarch.rpm and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/mysql/handlers/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/mysql/handlers/main.yml b/deployment/roles/mysql/handlers/main.yml
deleted file mode 100644
index 112c5ca..0000000
--- a/deployment/roles/mysql/handlers/main.yml
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-#  Licensed to the Apache Software Foundation (ASF) under one or more
-#  contributor license agreements.  See the NOTICE file distributed with
-#  this work for additional information regarding copyright ownership.
-#  The ASF licenses this file to You under the Apache License, Version 2.0
-#  (the "License"); you may not use this file except in compliance with
-#  the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-#
----
-- name: restart elasticsearch
-  service: name=elasticsearch state=restarted

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/mysql/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/mysql/tasks/main.yml b/deployment/roles/mysql/tasks/main.yml
deleted file mode 100644
index 91db896..0000000
--- a/deployment/roles/mysql/tasks/main.yml
+++ /dev/null
@@ -1,85 +0,0 @@
-#
-#  Licensed to the Apache Software Foundation (ASF) under one or more
-#  contributor license agreements.  See the NOTICE file distributed with
-#  this work for additional information regarding copyright ownership.
-#  The ASF licenses this file to You under the Apache License, Version 2.0
-#  (the "License"); you may not use this file except in compliance with
-#  the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-#
----
-- name: Create temporary directories
-  file:
-    path: "/tmp/{{ item }}"
-    state: directory
-    mode: 0755
-  with_items:
-    - "geoip"
-
-- name: Distribute Mysql
-  copy:
-    src: "{{ mysql_rpm_version }}.rpm"
-    dest: /tmp
-
-- name: Install Msyql Yum Repository
-  yum:
-    name: "/tmp/{{ mysql_rpm_version }}.rpm"
-
-- name: Install MySQL
-  yum:
-    name: "{{ item }}"
-    state: latest
-  with_items:
-    - "mysql-community-server"
-    - "MySQL-python"
-
-- name: Start MySQL
-  service:
-    name: mysqld
-    state: started
-    enabled: yes
-
-- name: Retrieve temporary root password
-  shell: "grep 'temporary password' /var/log/mysqld.log | sed 's/.*root@localhost: //'"
-  args:
-    creates: ~/.my.cnf
-  register: temp_root_password
-
-- name: Update mysql root password
-  command: "mysqladmin --user=root --password='{{ temp_root_password.stdout }}' password '{{ mysql_root_password }}'"
-  ignore_errors: yes
-  args:
-    creates: ~/.my.cnf
-
-- name: Copy mylogin.cnf
-  copy:
-    src:  mylogin.cnf
-    dest: ~/.my.cnf
-
-- name: Download GeoIP databases
-  unarchive:
-    src:  http://geolite.maxmind.com/download/geoip/database/GeoLiteCity_CSV/GeoLiteCity-latest.tar.xz
-    dest: /tmp/geoip
-    copy: no
-    creates: /tmp/geopip/*/GeoLiteCity-Blocks.csv
-
-- name: Copy to MySQL import directory
-  shell: "cp /tmp/geoip/*/*.csv /var/lib/mysql-files/"
-
-- name: Copy DDL
-  copy:
-    src: geoip_ddl.sql
-    dest: /tmp/geoip_ddl.sql
-
-- name: Import GeoIP DDL
-  mysql_db:
-    name: all
-    state: import
-    target: /tmp/geoip_ddl.sql

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/mysql/templates/.my.cnf
----------------------------------------------------------------------
diff --git a/deployment/roles/mysql/templates/.my.cnf b/deployment/roles/mysql/templates/.my.cnf
deleted file mode 100644
index d5c0825..0000000
--- a/deployment/roles/mysql/templates/.my.cnf
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-#  Licensed to the Apache Software Foundation (ASF) under one or more
-#  contributor license agreements.  See the NOTICE file distributed with
-#  this work for additional information regarding copyright ownership.
-#  The ASF licenses this file to You under the Apache License, Version 2.0
-#  (the "License"); you may not use this file except in compliance with
-#  the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-#
-[client]
-user=root
-password={{ mysql_root_password }}
-host=localhost
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/mysql/vars/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/mysql/vars/main.yml b/deployment/roles/mysql/vars/main.yml
deleted file mode 100644
index ccf2426..0000000
--- a/deployment/roles/mysql/vars/main.yml
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-#  Licensed to the Apache Software Foundation (ASF) under one or more
-#  contributor license agreements.  See the NOTICE file distributed with
-#  this work for additional information regarding copyright ownership.
-#  The ASF licenses this file to You under the Apache License, Version 2.0
-#  (the "License"); you may not use this file except in compliance with
-#  the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-#
----
-mysql_rpm_version: mysql57-community-release-el6-7.noarch
-mysql_root_password: P@ssw0rd
-

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/mysql_client/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/mysql_client/tasks/main.yml b/deployment/roles/mysql_client/tasks/main.yml
new file mode 100644
index 0000000..5c98eb9
--- /dev/null
+++ b/deployment/roles/mysql_client/tasks/main.yml
@@ -0,0 +1,34 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+---
+
+- name: Get Default mysql passowrd
+  include_vars: "../roles/mysql_server/vars/main.yml"
+  when: mysql_root_password is undefined
+
+- name: Allow remote login to mysql
+  template:
+    src: "../roles/mysql_client/templates/db_config.sql"
+    dest: "/tmp/{{ansible_fqdn}}.sql"
+  delegate_to: "{{ groups.mysql[0] }}"
+
+- name: Import DB_Config
+  mysql_db:
+    name: "all"
+    state: "import"
+    target: "/tmp/{{ansible_fqdn}}.sql"
+  delegate_to: "{{ groups.mysql[0] }}"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/mysql_client/templates/db_config.sql
----------------------------------------------------------------------
diff --git a/deployment/roles/mysql_client/templates/db_config.sql b/deployment/roles/mysql_client/templates/db_config.sql
new file mode 100644
index 0000000..c407a13
--- /dev/null
+++ b/deployment/roles/mysql_client/templates/db_config.sql
@@ -0,0 +1,21 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+CREATE USER 'root'@'{{ ansible_fqdn }}' IDENTIFIED BY '{{ mysql_root_password }}';
+SET PASSWORD FOR 'root'@'{{ ansible_fqdn }}' = PASSWORD('{{ mysql_root_password }}');
+GRANT ALL PRIVILEGES ON *.* to 'root'@'{{ ansible_fqdn }}' WITH GRANT OPTION;
+FLUSH PRIVILEGES;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/mysql_server/files/geoip_ddl.sql
----------------------------------------------------------------------
diff --git a/deployment/roles/mysql_server/files/geoip_ddl.sql b/deployment/roles/mysql_server/files/geoip_ddl.sql
new file mode 100644
index 0000000..02616c6
--- /dev/null
+++ b/deployment/roles/mysql_server/files/geoip_ddl.sql
@@ -0,0 +1,49 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+CREATE DATABASE IF NOT EXISTS GEO;
+
+USE GEO;
+
+DROP TABLE IF EXISTS `blocks`;
+CREATE TABLE  `blocks` ( `startIPNum` int(10) unsigned NOT NULL,`endIPNum` int(10) unsigned NOT NULL,`locID`
+int(10) unsigned NOT NULL, PRIMARY KEY  (`startIPNum`,`endIPNum`) )
+ENGINE=MyISAM DEFAULT CHARSET=latin1 PACK_KEYS=1 DELAY_KEY_WRITE=1;
+
+DROP TABLE IF EXISTS `location`;
+CREATE TABLE  `location` (`locID` int(10) unsigned NOT NULL,`country` char(2) default NULL,`region` char(2)
+ default NULL,`city` varchar(45) default NULL,`postalCode` char(7) default NULL,`latitude` double default
+NULL,`longitude` double default NULL,`dmaCode` char(3) default NULL,`areaCode` char(3) default NULL,PRIMARY KEY
+  (`locID`),KEY `Index_Country` (`country`) ) ENGINE=MyISAM DEFAULT CHARSET=latin1 ROW_FORMAT=FIXED;
+
+load data infile '/var/lib/mysql-files/GeoLiteCity-Blocks.csv'  into table `blocks`  fields terminated by ',' optionally enclosed by '"'  lines terminated by '\n' ignore 2 lines;
+load data infile '/var/lib/mysql-files/GeoLiteCity-Location.csv'  into table `location`  fields terminated by ',' optionally enclosed by '"'  lines terminated by '\n' ignore 2 lines;
+
+
+DELIMITER $$
+DROP FUNCTION IF EXISTS `IPTOLOCID` $$
+CREATE FUNCTION `IPTOLOCID`( ip VARCHAR(15)) RETURNS int(10) unsigned
+  BEGIN
+    DECLARE ipn INTEGER UNSIGNED;
+    DECLARE locID_var INTEGER;
+    IF ip LIKE '192.168.%' OR ip LIKE '10.%' THEN RETURN 0;
+    END IF;
+    SET ipn = INET_ATON(ip);
+    SELECT locID INTO locID_var FROM `blocks` INNER JOIN (SELECT MAX(startIPNum) AS start FROM `blocks` WHERE startIPNum <= ipn) AS s ON (startIPNum = s.start) WHERE endIPNum >= ipn;
+    RETURN locID_var;
+  END
+$$
+DELIMITER ;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/mysql_server/files/mysql57-community-release-el6-7.noarch.rpm
----------------------------------------------------------------------
diff --git a/deployment/roles/mysql_server/files/mysql57-community-release-el6-7.noarch.rpm b/deployment/roles/mysql_server/files/mysql57-community-release-el6-7.noarch.rpm
new file mode 100644
index 0000000..8603602
Binary files /dev/null and b/deployment/roles/mysql_server/files/mysql57-community-release-el6-7.noarch.rpm differ

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/mysql_server/handlers/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/mysql_server/handlers/main.yml b/deployment/roles/mysql_server/handlers/main.yml
new file mode 100644
index 0000000..112c5ca
--- /dev/null
+++ b/deployment/roles/mysql_server/handlers/main.yml
@@ -0,0 +1,19 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+---
+- name: restart elasticsearch
+  service: name=elasticsearch state=restarted

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/mysql_server/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/mysql_server/tasks/main.yml b/deployment/roles/mysql_server/tasks/main.yml
new file mode 100644
index 0000000..987c160
--- /dev/null
+++ b/deployment/roles/mysql_server/tasks/main.yml
@@ -0,0 +1,86 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+---
+- name: Create temporary directories
+  file:
+    path: "/tmp/{{ item }}"
+    state: directory
+    mode: 0755
+  with_items:
+    - "geoip"
+
+- name: Distribute Mysql
+  copy:
+    src: "{{ mysql_rpm_version }}.rpm"
+    dest: /tmp
+
+- name: Install Msyql Yum Repository
+  yum:
+    name: "/tmp/{{ mysql_rpm_version }}.rpm"
+
+- name: Install MySQL
+  yum:
+    name: "{{ item }}"
+    state: latest
+  with_items:
+    - "mysql-community-server"
+    - "MySQL-python"
+
+- name: Start MySQL
+  service:
+    name: mysqld
+    state: started
+    enabled: yes
+
+- name: Retrieve temporary root password
+  shell: "grep 'temporary password' /var/log/mysqld.log | sed 's/.*root@localhost: //'"
+  args:
+    creates: ~/.my.cnf
+  register: temp_root_password
+
+- name: Update mysql root password
+  command: "mysqladmin --user=root --password='{{ temp_root_password.stdout }}' password '{{ mysql_root_password }}'"
+  ignore_errors: yes
+  args:
+    creates: ~/.my.cnf
+
+- name: Create .my.cnf
+  template:
+    src: "../roles/mysql_server/templates/.my.cnf"
+    dest: ~/.my.cnf
+
+
+- name: Download GeoIP databases
+  unarchive:
+    src:  http://geolite.maxmind.com/download/geoip/database/GeoLiteCity_CSV/GeoLiteCity-latest.tar.xz
+    dest: /tmp/geoip
+    copy: no
+    creates: /tmp/geopip/*/GeoLiteCity-Blocks.csv
+
+- name: Copy to MySQL import directory
+  shell: "cp /tmp/geoip/*/*.csv /var/lib/mysql-files/"
+
+- name: Copy DDL
+  copy:
+    src: geoip_ddl.sql
+    dest: /tmp/geoip_ddl.sql
+
+- name: Import GeoIP DDL
+  mysql_db:
+    name: all
+    state: import
+    target: /tmp/geoip_ddl.sql

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/mysql_server/templates/.my.cnf
----------------------------------------------------------------------
diff --git a/deployment/roles/mysql_server/templates/.my.cnf b/deployment/roles/mysql_server/templates/.my.cnf
new file mode 100644
index 0000000..d5c0825
--- /dev/null
+++ b/deployment/roles/mysql_server/templates/.my.cnf
@@ -0,0 +1,20 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+[client]
+user=root
+password={{ mysql_root_password }}
+host=localhost
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/mysql_server/vars/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/mysql_server/vars/main.yml b/deployment/roles/mysql_server/vars/main.yml
new file mode 100644
index 0000000..ccf2426
--- /dev/null
+++ b/deployment/roles/mysql_server/vars/main.yml
@@ -0,0 +1,20 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+---
+mysql_rpm_version: mysql57-community-release-el6-7.noarch
+mysql_root_password: P@ssw0rd
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/pcap_replay/defaults/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/pcap_replay/defaults/main.yml b/deployment/roles/pcap_replay/defaults/main.yml
new file mode 100644
index 0000000..b1fae1e
--- /dev/null
+++ b/deployment/roles/pcap_replay/defaults/main.yml
@@ -0,0 +1,21 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+---
+pcap_replay_interface: eth0
+pcap_path: /opt/pcap-replay
+tcpreplay_version: 4.1.1
+tcpreplay_prefix: /opt

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/pcap_replay/templates/pcap-replay
----------------------------------------------------------------------
diff --git a/deployment/roles/pcap_replay/templates/pcap-replay b/deployment/roles/pcap_replay/templates/pcap-replay
index 56dc40c..b9ae0c3 100644
--- a/deployment/roles/pcap_replay/templates/pcap-replay
+++ b/deployment/roles/pcap_replay/templates/pcap-replay
@@ -24,7 +24,7 @@
 
 DAEMON_PATH="{{ pcap_path }}"
 PCAPIN=`ls $DAEMON_PATH/*.pcap 2> /dev/null`
-IFACE="{{ pcap_replay_interface | default("eth0") }}"
+IFACE="{{ pcap_replay_interface }}"
 EXTRA_ARGS="${@:2}"
 DAEMON="{{ tcpreplay_prefix }}/bin/tcpreplay"
 DAEMONOPTS="--intf1=$IFACE --loop=0 $EXTRA_ARGS $PCAPIN"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/pcap_replay/vars/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/pcap_replay/vars/main.yml b/deployment/roles/pcap_replay/vars/main.yml
deleted file mode 100644
index b1fae1e..0000000
--- a/deployment/roles/pcap_replay/vars/main.yml
+++ /dev/null
@@ -1,21 +0,0 @@
-#
-#  Licensed to the Apache Software Foundation (ASF) under one or more
-#  contributor license agreements.  See the NOTICE file distributed with
-#  this work for additional information regarding copyright ownership.
-#  The ASF licenses this file to You under the Apache License, Version 2.0
-#  (the "License"); you may not use this file except in compliance with
-#  the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-#
----
-pcap_replay_interface: eth0
-pcap_path: /opt/pcap-replay
-tcpreplay_version: 4.1.1
-tcpreplay_prefix: /opt

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/tap_interface/defaults/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/tap_interface/defaults/main.yml b/deployment/roles/tap_interface/defaults/main.yml
new file mode 100644
index 0000000..ca752b4
--- /dev/null
+++ b/deployment/roles/tap_interface/defaults/main.yml
@@ -0,0 +1,19 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+---
+tap_if: tap0
+tap_ip: 10.0.0.1

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/tap_interface/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/tap_interface/tasks/main.yml b/deployment/roles/tap_interface/tasks/main.yml
new file mode 100644
index 0000000..d4590f7
--- /dev/null
+++ b/deployment/roles/tap_interface/tasks/main.yml
@@ -0,0 +1,30 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+---
+- name: Install tunctl
+  yum: name=tunctl
+
+#TODO - only run when tap_if does not exist
+- name: Create {{ tap_if }}
+  command: tunctl -p
+
+- name: Bring up {{ tap_if }} on {{ tap_ip }}
+  command: ifconfig {{ tap_if }} {{ tap_ip }} up
+
+- name:  Put {{ tap_if }} in PROMISC
+  command: ip link set {{ tap_if }} promisc on
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/yaf/defaults/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/yaf/defaults/main.yml b/deployment/roles/yaf/defaults/main.yml
new file mode 100644
index 0000000..f804cb5
--- /dev/null
+++ b/deployment/roles/yaf/defaults/main.yml
@@ -0,0 +1,29 @@
+#
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+---
+fixbuf_version: 1.7.1
+yaf_version: 2.8.0
+yaf_home: /opt/yaf
+yaf_topic: ipfix
+hdp_repo_def: http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.3.2.0/hdp.repo
+yaf: /usr/local/bin/yaf
+yaf_args: ""
+yafscii: /usr/local/bin/yafscii
+yaf_log: /var/log/yaf.log
+yaf_lock: /var/lock/subsys/yaf
+kafka_prod: /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/yaf/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/yaf/tasks/main.yml b/deployment/roles/yaf/tasks/main.yml
index 1e1194d..468a4f9 100644
--- a/deployment/roles/yaf/tasks/main.yml
+++ b/deployment/roles/yaf/tasks/main.yml
@@ -43,13 +43,6 @@
 - name: Install kafka
   yum: name=kafka
 
-- set_fact:
-    yaf: /usr/local/bin/yaf
-    yafscii: /usr/local/bin/yafscii
-    yaf_log: /var/log/yaf.log
-    yaf_lock: /var/lock/subsys/yaf
-    kafka_prod: /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh
-
 - name: Check for Java at "{{ java_home }}"
   stat: path="{{ java_home }}"
   register: jdk_dir
@@ -69,6 +62,6 @@
   when: not jdk_dir.stat.exists
 
 - name: Start yaf
-  shell: "daemonize -c {{ yaf_home }} -e {{ yaf_log }} -o {{ yaf_log }} -l {{ yaf_lock }} {{ yaf }} --in {{ sniff_interface }} --live pcap | {{ yafscii }} --tabular | {{ kafka_prod }} --broker-list {{ kafka_broker_url }} --topic {{ yaf_topic }}"
+  shell: "daemonize -c {{ yaf_home }} -e {{ yaf_log }} -o {{ yaf_log }} -l {{ yaf_lock }} {{ yaf }} --in {{ sniff_interface }} --live pcap {{ yaf_args }} | {{ yafscii }} --tabular | {{ kafka_prod }} --broker-list {{ kafka_broker_url }} --topic {{ yaf_topic }}"
   args:
     creates: "{{ yaf_lock }}"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/deployment/roles/yaf/vars/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/yaf/vars/main.yml b/deployment/roles/yaf/vars/main.yml
deleted file mode 100644
index 1d53958..0000000
--- a/deployment/roles/yaf/vars/main.yml
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-#  Licensed to the Apache Software Foundation (ASF) under one or more
-#  contributor license agreements.  See the NOTICE file distributed with
-#  this work for additional information regarding copyright ownership.
-#  The ASF licenses this file to You under the Apache License, Version 2.0
-#  (the "License"); you may not use this file except in compliance with
-#  the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-#
----
-fixbuf_version: 1.7.1
-yaf_version: 2.8.0
-yaf_home: /opt/yaf
-yaf_topic: ipfix
-hdp_repo_def: http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.3.2.0/hdp.repo
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/pom.xml b/metron-streaming/Metron-Common/pom.xml
index c4fc5aa..605c7ed 100644
--- a/metron-streaming/Metron-Common/pom.xml
+++ b/metron-streaming/Metron-Common/pom.xml
@@ -222,6 +222,11 @@
                 <version>1.4</version>
                 <configuration>
                     <createDependencyReducedPom>true</createDependencyReducedPom>
+                    <artifactSet>
+                        <excludes>
+                            <exclude>*slf4j*</exclude>
+                        </excludes>
+                    </artifactSet>
                 </configuration>
                 <executions>
                     <execution>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
index 6d094ee..a8fda69 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
@@ -66,6 +66,7 @@ public class BulkMessageWriterBolt extends ConfiguredBolt {
   @Override
   public void execute(Tuple tuple) {
     JSONObject message = (JSONObject) tuple.getValueByField("message");
+    message.put("index." + bulkMessageWriter.getClass().getSimpleName().toLowerCase() + ".ts", "" + System.currentTimeMillis());
     String sourceType = TopologyUtils.getSourceType(message);
     SourceConfig configuration = configurations.get(sourceType);
     int batchSize = configuration != null ? configuration.getBatchSize() : 1;
@@ -80,7 +81,9 @@ public class BulkMessageWriterBolt extends ConfiguredBolt {
       sourceMessageMap.put(sourceType, messageList);
     } else {
       try {
-        bulkMessageWriter.write(sourceType, configuration, tupleList, messageList);
+
+        String esType = sourceType + "_doc";
+        bulkMessageWriter.write(esType, configuration, tupleList, messageList);
         for(Tuple t: tupleList) {
           collector.ack(t);
         }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java
index dac1c0a..653eade 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java
@@ -27,6 +27,7 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Sets;
+import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.HashMap;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
index 7079d5c..6f43739 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
@@ -20,10 +20,12 @@ package org.apache.metron.domain;
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
 
 import java.io.Serializable;
+import java.util.List;
 
 public class Enrichment<T extends EnrichmentAdapter> implements Serializable {
 
   private String type;
+  private List<String> fields;
   private T adapter;
 
   public Enrichment() {}
@@ -33,6 +35,15 @@ public class Enrichment<T extends EnrichmentAdapter> implements Serializable {
     this.adapter = adapter;
   }
 
+
+  public List<String> getFields() {
+    return fields;
+  }
+
+  public void setFields(List<String> fields) {
+    this.fields = fields;
+  }
+
   public String getType() {
     return type;
   }