You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/02/10 21:17:40 UTC

[2/4] incubator-metron git commit: METRON-33 Execute Enrichments in Parallel (merrimanr via cestella) closes apache/incubator-metron#19

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/PcapParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/PcapParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/PcapParser.java
index a0cb8c8..0841762 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/PcapParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/PcapParser.java
@@ -3,11 +3,17 @@ package org.apache.metron.parsing.parsers;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
+import org.apache.metron.parser.interfaces.MessageParser;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
 import org.krakenapps.pcap.decoder.ethernet.EthernetDecoder;
 import org.krakenapps.pcap.decoder.ethernet.EthernetType;
 import org.krakenapps.pcap.decoder.ip.IpDecoder;
@@ -24,42 +30,59 @@ import org.apache.metron.pcap.MetronEthernetDecoder;
 import org.apache.metron.pcap.PacketInfo;
 import org.apache.metron.pcap.PcapByteInputStream;
 
-/**
- * The Class PcapParser.
- * 
- * @author sheetal
- * @version $Revision: 1.0 $
- */
-public final class PcapParser {
+public class PcapParser implements MessageParser<JSONObject>, Serializable {
 
-  /** The Constant LOG. */
   private static final Logger LOG = Logger.getLogger(PcapParser.class);
 
-  /** The ETHERNET_DECODER. */
-  private static final EthernetDecoder ETHERNET_DECODER = new MetronEthernetDecoder();
-
-  /** The ip decoder. */
-  private static final IpDecoder IP_DECODER = new IpDecoder();
-
-  // /** The tcp decoder. */
-  // private static final TcpDecoder TCP_DECODER = new TcpDecoder(new
-  // TcpPortProtocolMapper());
-  //
-  // /** The udp decoder. */
-  // private static final UdpDecoder UDP_DECODER = new UdpDecoder(new
-  // UdpPortProtocolMapper());
+  private EthernetDecoder ethernetDecoder;
+  private long timePrecisionDivisor = 1L;
+
+  public PcapParser withTsPrecision(String tsPrecision) {
+    if (tsPrecision.equalsIgnoreCase("MILLI")) {
+      //Convert nanos to millis
+      LOG.info("Configured for MILLI, setting timePrecisionDivisor to 1000000L" );
+      timePrecisionDivisor = 1000000L;
+    } else if (tsPrecision.equalsIgnoreCase("MICRO")) {
+      //Convert nanos to micro
+      LOG.info("Configured for MICRO, setting timePrecisionDivisor to 1000L" );
+      timePrecisionDivisor = 1000L;
+    } else if (tsPrecision.equalsIgnoreCase("NANO")) {
+      //Keep nano as is.
+      LOG.info("Configured for NANO, setting timePrecisionDivisor to 1L" );
+      timePrecisionDivisor = 1L;
+    } else {
+      LOG.info("bolt.parser.ts.precision not set. Default to NANO");
+      timePrecisionDivisor = 1L;
+    }
+    return this;
+  }
 
-  static {
-    // IP_DECODER.register(InternetProtocol.TCP, TCP_DECODER);
-    // IP_DECODER.register(InternetProtocol.UDP, UDP_DECODER);
-    ETHERNET_DECODER.register(EthernetType.IPV4, IP_DECODER);
+  @Override
+  public void init() {
+    ethernetDecoder = new MetronEthernetDecoder();
+    IpDecoder ipDecoder = new IpDecoder();
+    ethernetDecoder.register(EthernetType.IPV4, ipDecoder);
   }
 
-  /**
-   * Instantiates a new pcap parser.
-   */
-  private PcapParser() { // $codepro.audit.disable emptyMethod
+  @Override
+  public List<JSONObject> parse(byte[] pcap) {
+    List<JSONObject> messages = new ArrayList<>();
+    List<PacketInfo> packetInfoList = new ArrayList<>();
+    try {
+      packetInfoList = getPacketInfo(pcap);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    for (PacketInfo packetInfo : packetInfoList) {
+      JSONObject message = (JSONObject) JSONValue.parse(packetInfo.getJsonIndexDoc());
+      messages.add(message);
+    }
+    return messages;
+  }
 
+  @Override
+  public boolean validate(JSONObject message) {
+    return true;
   }
 
   /**
@@ -73,7 +96,7 @@ public final class PcapParser {
    * @throws IOException
    *           Signals that an I/O exception has occurred.
    */
-  public static List<PacketInfo> parse(byte[] pcap) throws IOException {
+  public List<PacketInfo> getPacketInfo(byte[] pcap) throws IOException {
     List<PacketInfo> packetInfoList = new ArrayList<PacketInfo>();
 
     PcapByteInputStream pcapByteInputStream = new PcapByteInputStream(pcap);
@@ -96,7 +119,7 @@ public final class PcapParser {
         // LOG.trace("Got packet # " + ++packetCounter);
 
         // LOG.trace(packet.getPacketData());
-        ETHERNET_DECODER.decode(packet);
+        ethernetDecoder.decode(packet);
 
         PacketHeader packetHeader = packet.getPacketHeader();
         Ipv4Packet ipv4Packet = Ipv4Packet.parse(packet.getPacketData());
@@ -148,13 +171,13 @@ public final class PcapParser {
     double totalIterations = 1000000;
     double parallelism = 64;
     double targetEvents = 1000000;
-
+    PcapParser pcapParser = new PcapParser();
     File fin = new File("/Users/sheetal/Downloads/bad_packets/bad_packet_1405988125427.pcap");
     File fout = new File(fin.getAbsolutePath() + ".parsed");
     byte[] pcapBytes = FileUtils.readFileToByteArray(fin);
     long startTime = System.currentTimeMillis();
     for (int i = 0; i < totalIterations; i++) {
-      List<PacketInfo> list = parse(pcapBytes);
+      List<PacketInfo> list = pcapParser.getPacketInfo(pcapBytes);
 
       for (PacketInfo packetInfo : list) {
         System.out.println(packetInfo.getJsonIndexDoc());

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/utils/GrokUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/utils/GrokUtils.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/utils/GrokUtils.java
new file mode 100644
index 0000000..b2b5f7a
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/utils/GrokUtils.java
@@ -0,0 +1,26 @@
+package org.apache.metron.parsing.utils;
+import java.io.Serializable;
+
+import com.google.code.regexp.Pattern;
+
+public class GrokUtils implements Serializable {
+
+	private static final long serialVersionUID = 7465176887422419286L;
+	/**
+	   * Extract Grok patter like %{FOO} to FOO, Also Grok pattern with semantic.
+	   */
+	  public static final Pattern GROK_PATTERN = Pattern.compile(
+	      "%\\{" +
+	      "(?<name>" +
+	        "(?<pattern>[A-z0-9]+)" +
+	          "(?::(?<subname>[A-z0-9_:;\\/\\s\\.]+))?" +
+	          ")" +
+	          "(?:=(?<definition>" +
+	            "(?:" +
+	            "(?:[^{}]+|\\.+)+" +
+	            ")+" +
+	            ")" +
+	      ")?" +
+	      "\\}");
+
+	}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/utils/ParserUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/utils/ParserUtils.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/utils/ParserUtils.java
new file mode 100644
index 0000000..e1a3e2c
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/utils/ParserUtils.java
@@ -0,0 +1,55 @@
+package org.apache.metron.parsing.utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.json.simple.JSONObject;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Locale;
+import java.util.TimeZone;
+
+public class ParserUtils {
+
+  public static final String PREFIX = "stream2file";
+  public static final String SUFFIX = ".tmp";
+
+  public static File stream2file(InputStream in) throws IOException {
+    final File tempFile = File.createTempFile(PREFIX, SUFFIX);
+    tempFile.deleteOnExit();
+    try (FileOutputStream out = new FileOutputStream(tempFile)) {
+      IOUtils.copy(in, out);
+    }
+    return tempFile;
+  }
+
+  public static Long convertToEpoch(String m, String d, String ts,
+                                    boolean adjust_timezone) throws ParseException {
+    d = d.trim();
+    if (d.length() <= 2) {
+      d = "0" + d;
+    }
+    Date date = new SimpleDateFormat("MMM", Locale.ENGLISH).parse(m);
+    Calendar cal = Calendar.getInstance();
+    cal.setTime(date);
+    String month = String.valueOf(cal.get(Calendar.MONTH));
+    int year = Calendar.getInstance().get(Calendar.YEAR);
+    if (month.length() <= 2) {
+      month = "0" + month;
+    }
+    String coglomerated_ts = year + "-" + month + "-" + d + " " + ts;
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    if (adjust_timezone) {
+      sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
+    }
+    date = sdf.parse(coglomerated_ts);
+    long timeInMillisSinceEpoch = date.getTime();
+    return timeInMillisSinceEpoch;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java
index 2f643d7..acd919c 100644
--- a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java
+++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java
@@ -8,7 +8,7 @@ import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.json.simple.parser.ParseException;
-
+import org.junit.Assert;
 import org.apache.metron.parsing.parsers.BasicBroParser;
 
 public class BasicBroParserTest extends TestCase {
@@ -21,12 +21,12 @@ public class BasicBroParserTest extends TestCase {
 
 	/**
 	 * Constructs a new <code>BasicBroParserTest</code> instance.
-	 * 
+	 *
 	 * @throws Exception
 	 */
 	public BasicBroParserTest() throws Exception {
 		broParser = new BasicBroParser();
-		jsonParser = new JSONParser();		
+		jsonParser = new JSONParser();
 	}
 
     public void testUnwrappedBroMessage() throws ParseException {
@@ -34,91 +34,91 @@ public class BasicBroParserTest extends TestCase {
 
         JSONObject rawJson = (JSONObject)jsonParser.parse(rawMessage);
 
-        JSONObject broJson = broParser.parse(rawMessage.getBytes());
-
-        assertEquals(broJson.get("timestamp"), Long.parseLong(rawJson.get("timestamp").toString()));
-        assertEquals(broJson.get("ip_src_addr").toString(), rawJson.get("source_ip").toString());
-        assertEquals(broJson.get("ip_dst_addr").toString(), rawJson.get("dest_ip").toString());
-        assertEquals(broJson.get("ip_src_port"), rawJson.get("source_port"));
-        assertEquals(broJson.get("ip_dst_port"), rawJson.get("dest_port"));
-        assertEquals(broJson.get("uid").toString(), rawJson.get("uid").toString());
-        assertEquals(broJson.get("trans_id").toString(), rawJson.get("trans_id").toString());
-        assertEquals(broJson.get("sensor").toString(), rawJson.get("sensor").toString());
-        assertEquals(broJson.get("protocol").toString(), rawJson.get("type").toString());
-        assertEquals(broJson.get("rcode").toString(), rawJson.get("rcode").toString());
-        assertEquals(broJson.get("rcode_name").toString(), rawJson.get("rcode_name").toString());
-        assertTrue(broJson.get("original_string").toString().startsWith("DNS"));
+        JSONObject broJson = broParser.parse(rawMessage.getBytes()).get(0);
+
+				Assert.assertEquals(broJson.get("timestamp"), Long.parseLong(rawJson.get("timestamp").toString()));
+			  Assert.assertEquals(broJson.get("ip_src_addr").toString(), rawJson.get("source_ip").toString());
+			  Assert.assertEquals(broJson.get("ip_dst_addr").toString(), rawJson.get("dest_ip").toString());
+			  Assert.assertEquals(broJson.get("ip_src_port"), rawJson.get("source_port"));
+        Assert.assertEquals(broJson.get("ip_dst_port"), rawJson.get("dest_port"));
+        Assert.assertEquals(broJson.get("uid").toString(), rawJson.get("uid").toString());
+        Assert.assertEquals(broJson.get("trans_id").toString(), rawJson.get("trans_id").toString());
+        Assert.assertEquals(broJson.get("sensor").toString(), rawJson.get("sensor").toString());
+        Assert.assertEquals(broJson.get("protocol").toString(), rawJson.get("type").toString());
+        Assert.assertEquals(broJson.get("rcode").toString(), rawJson.get("rcode").toString());
+        Assert.assertEquals(broJson.get("rcode_name").toString(), rawJson.get("rcode_name").toString());
+				Assert.assertTrue(broJson.get("original_string").toString().startsWith("DNS"));
     }
 
 	@SuppressWarnings("rawtypes")
 	public void testHttpBroMessage() throws ParseException {
 		String rawMessage = "{\"http\":{\"ts\":1402307733473,\"uid\":\"CTo78A11g7CYbbOHvj\",\"id.orig_h\":\"192.249.113.37\",\"id.orig_p\":58808,\"id.resp_h\":\"72.163.4.161\",\"id.resp_p\":80,\"trans_depth\":1,\"method\":\"GET\",\"host\":\"www.cisco.com\",\"uri\":\"/\",\"user_agent\":\"curl/7.22.0 (x86_64-pc-linux-gnu) libcurl/7.22.0 OpenSSL/1.0.1 zlib/1.2.3.4 libidn/1.23 librtmp/2.3\",\"request_body_len\":0,\"response_body_len\":25523,\"status_code\":200,\"status_msg\":\"OK\",\"tags\":[],\"resp_fuids\":[\"FJDyMC15lxUn5ngPfd\"],\"resp_mime_types\":[\"text/html\"]}}";
-		
+
 		Map rawMessageMap = (Map) jsonParser.parse(rawMessage);
 		JSONObject rawJson = (JSONObject) rawMessageMap.get(rawMessageMap.keySet().iterator().next());
-		
-		JSONObject broJson = broParser.parse(rawMessage.getBytes());
-		assertEquals(broJson.get("timestamp").toString(), rawJson.get("ts").toString());
-		assertEquals(broJson.get("ip_src_addr").toString(), rawJson.get("id.orig_h").toString());
-		assertEquals(broJson.get("ip_dst_addr").toString(), rawJson.get("id.resp_h").toString());
-		assertEquals(broJson.get("ip_src_port").toString(), rawJson.get("id.orig_p").toString());
-		assertEquals(broJson.get("ip_dst_port").toString(), rawJson.get("id.resp_p").toString());
-		assertTrue(broJson.get("original_string").toString().startsWith(rawMessageMap.keySet().iterator().next().toString().toUpperCase()));
-		
-		assertEquals(broJson.get("uid").toString(), rawJson.get("uid").toString());
-		assertEquals(broJson.get("method").toString(), rawJson.get("method").toString());
-		assertEquals(broJson.get("host").toString(), rawJson.get("host").toString());
-		assertEquals(broJson.get("resp_mime_types").toString(), rawJson.get("resp_mime_types").toString());
+
+		JSONObject broJson = broParser.parse(rawMessage.getBytes()).get(0);
+		Assert.assertEquals(broJson.get("timestamp").toString(), rawJson.get("ts").toString());
+		Assert.assertEquals(broJson.get("ip_src_addr").toString(), rawJson.get("id.orig_h").toString());
+		Assert.assertEquals(broJson.get("ip_dst_addr").toString(), rawJson.get("id.resp_h").toString());
+		Assert.assertEquals(broJson.get("ip_src_port").toString(), rawJson.get("id.orig_p").toString());
+		Assert.assertEquals(broJson.get("ip_dst_port").toString(), rawJson.get("id.resp_p").toString());
+		Assert.assertTrue(broJson.get("original_string").toString().startsWith(rawMessageMap.keySet().iterator().next().toString().toUpperCase()));
+
+		Assert.assertEquals(broJson.get("uid").toString(), rawJson.get("uid").toString());
+		Assert.assertEquals(broJson.get("method").toString(), rawJson.get("method").toString());
+		Assert.assertEquals(broJson.get("host").toString(), rawJson.get("host").toString());
+		Assert.assertEquals(broJson.get("resp_mime_types").toString(), rawJson.get("resp_mime_types").toString());
 	}
-	
+
 	@SuppressWarnings("rawtypes")
 	public void testDnsBroMessage() throws ParseException {
 		String rawMessage = "{\"dns\":{\"ts\":1402308259609,\"uid\":\"CuJT272SKaJSuqO0Ia\",\"id.orig_h\":\"10.122.196.204\",\"id.orig_p\":33976,\"id.resp_h\":\"144.254.71.184\",\"id.resp_p\":53,\"proto\":\"udp\",\"trans_id\":62418,\"query\":\"www.cisco.com\",\"qclass\":1,\"qclass_name\":\"C_INTERNET\",\"qtype\":28,\"qtype_name\":\"AAAA\",\"rcode\":0,\"rcode_name\":\"NOERROR\",\"AA\":true,\"TC\":false,\"RD\":true,\"RA\":true,\"Z\":0,\"answers\":[\"www.cisco.com.akadns.net\",\"origin-www.cisco.com\",\"2001:420:1201:2::a\"],\"TTLs\":[3600.0,289.0,14.0],\"rejected\":false}}";
-		
+
 		Map rawMessageMap = (Map) jsonParser.parse(rawMessage);
 		JSONObject rawJson = (JSONObject) rawMessageMap.get(rawMessageMap.keySet().iterator().next());
-		
-		JSONObject broJson = broParser.parse(rawMessage.getBytes());
-		assertEquals(broJson.get("timestamp").toString(), rawJson.get("ts").toString());
-		assertEquals(broJson.get("ip_src_addr").toString(), rawJson.get("id.orig_h").toString());
-		assertEquals(broJson.get("ip_dst_addr").toString(), rawJson.get("id.resp_h").toString());
-		assertEquals(broJson.get("ip_src_port").toString(), rawJson.get("id.orig_p").toString());
-		assertEquals(broJson.get("ip_dst_port").toString(), rawJson.get("id.resp_p").toString());
-		assertTrue(broJson.get("original_string").toString().startsWith(rawMessageMap.keySet().iterator().next().toString().toUpperCase()));
-		
-		assertEquals(broJson.get("qtype").toString(), rawJson.get("qtype").toString());
-		assertEquals(broJson.get("trans_id").toString(), rawJson.get("trans_id").toString());
+
+		JSONObject broJson = broParser.parse(rawMessage.getBytes()).get(0);
+		Assert.assertEquals(broJson.get("timestamp").toString(), rawJson.get("ts").toString());
+		Assert.assertEquals(broJson.get("ip_src_addr").toString(), rawJson.get("id.orig_h").toString());
+		Assert.assertEquals(broJson.get("ip_dst_addr").toString(), rawJson.get("id.resp_h").toString());
+		Assert.assertEquals(broJson.get("ip_src_port").toString(), rawJson.get("id.orig_p").toString());
+		Assert.assertEquals(broJson.get("ip_dst_port").toString(), rawJson.get("id.resp_p").toString());
+		Assert.assertTrue(broJson.get("original_string").toString().startsWith(rawMessageMap.keySet().iterator().next().toString().toUpperCase()));
+
+		Assert.assertEquals(broJson.get("qtype").toString(), rawJson.get("qtype").toString());
+		Assert.assertEquals(broJson.get("trans_id").toString(), rawJson.get("trans_id").toString());
 	}
-	
+
 	@SuppressWarnings("rawtypes")
 	public void testFilesBroMessage() throws ParseException {
 		String rawMessage = "{\"files\":{\"analyzers\": [\"X509\",\"MD5\",\"SHA1\"],\"conn_uids\":[\"C4tygJ3qxJBEJEBCeh\"],\"depth\": 0,\"duration\": 0.0,\"fuid\":\"FZEBC33VySG0nHSoO9\",\"is_orig\": false,\"local_orig\": false,\"md5\": \"eba37166385e3ef42464ed9752e99f1b\",\"missing_bytes\": 0,\"overflow_bytes\": 0,\"rx_hosts\": [\"10.220.15.205\"],\"seen_bytes\": 1136,\"sha1\": \"73e42686657aece354fbf685712361658f2f4357\",\"source\": \"SSL\",\"timedout\": false,\"ts\": \"1425845251334\",\"tx_hosts\": [\"68.171.237.7\"]}}";
-		
+
 		Map rawMessageMap = (Map) jsonParser.parse(rawMessage);
 		JSONObject rawJson = (JSONObject) rawMessageMap.get(rawMessageMap.keySet().iterator().next());
-		
-		JSONObject broJson = broParser.parse(rawMessage.getBytes());
-		assertEquals(broJson.get("timestamp").toString(), rawJson.get("ts").toString());
-		assertEquals(broJson.get("ip_src_addr").toString(), ((JSONArray)rawJson.get("tx_hosts")).get(0).toString());
-		assertEquals(broJson.get("ip_dst_addr").toString(), ((JSONArray)rawJson.get("rx_hosts")).get(0).toString());
-		assertTrue(broJson.get("original_string").toString().startsWith(rawMessageMap.keySet().iterator().next().toString().toUpperCase()));
-		
-		assertEquals(broJson.get("fuid").toString(), rawJson.get("fuid").toString());
-		assertEquals(broJson.get("md5").toString(), rawJson.get("md5").toString());
-		assertEquals(broJson.get("analyzers").toString(), rawJson.get("analyzers").toString());
+
+		JSONObject broJson = broParser.parse(rawMessage.getBytes()).get(0);
+		Assert.assertEquals(broJson.get("timestamp").toString(), rawJson.get("ts").toString());
+		Assert.assertEquals(broJson.get("ip_src_addr").toString(), ((JSONArray)rawJson.get("tx_hosts")).get(0).toString());
+		Assert.assertEquals(broJson.get("ip_dst_addr").toString(), ((JSONArray)rawJson.get("rx_hosts")).get(0).toString());
+		Assert.assertTrue(broJson.get("original_string").toString().startsWith(rawMessageMap.keySet().iterator().next().toString().toUpperCase()));
+
+		Assert.assertEquals(broJson.get("fuid").toString(), rawJson.get("fuid").toString());
+		Assert.assertEquals(broJson.get("md5").toString(), rawJson.get("md5").toString());
+		Assert.assertEquals(broJson.get("analyzers").toString(), rawJson.get("analyzers").toString());
 	}
-	
+
 	@SuppressWarnings("rawtypes")
 	public void testProtocolKeyCleanedUp() throws ParseException {
 		String rawMessage = "{\"ht*tp\":{\"ts\":1402307733473,\"uid\":\"CTo78A11g7CYbbOHvj\",\"id.orig_h\":\"192.249.113.37\",\"id.orig_p\":58808,\"id.resp_h\":\"72.163.4.161\",\"id.resp_p\":80,\"trans_depth\":1,\"method\":\"GET\",\"host\":\"www.cisco.com\",\"uri\":\"/\",\"user_agent\":\"curl/7.22.0 (x86_64-pc-linux-gnu) libcurl/7.22.0 OpenSSL/1.0.1 zlib/1.2.3.4 libidn/1.23 librtmp/2.3\",\"request_body_len\":0,\"response_body_len\":25523,\"status_code\":200,\"status_msg\":\"OK\",\"tags\":[],\"resp_fuids\":[\"FJDyMC15lxUn5ngPfd\"],\"resp_mime_types\":[\"text/html\"]}}";
-		
+
 		Map rawMessageMap = (Map) jsonParser.parse(rawMessage);
 		JSONObject rawJson = (JSONObject) rawMessageMap.get(rawMessageMap.keySet().iterator().next());
-		
-		JSONObject broJson = broParser.parse(rawMessage.getBytes());
-		
-		assertEquals(broJson.get("timestamp").toString(), rawJson.get("ts").toString());
-		assertEquals(broJson.get("ip_src_addr").toString(), rawJson.get("id.orig_h").toString());
-		assertTrue(broJson.get("original_string").toString().startsWith("HTTP"));
+
+		JSONObject broJson = broParser.parse(rawMessage.getBytes()).get(0);
+
+		Assert.assertEquals(broJson.get("timestamp").toString(), rawJson.get("ts").toString());
+		Assert.assertEquals(broJson.get("ip_src_addr").toString(), rawJson.get("id.orig_h").toString());
+		Assert.assertTrue(broJson.get("original_string").toString().startsWith("HTTP"));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicFireEyeParserTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicFireEyeParserTest.java b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicFireEyeParserTest.java
index 9adf446..1eefc27 100644
--- a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicFireEyeParserTest.java
+++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicFireEyeParserTest.java
@@ -78,12 +78,18 @@ public class BasicFireEyeParserTest extends AbstractConfigTest
 	}
 
 	/**
-	 * Test method for {@link org.apache.metron.parsing.parsers.BasicFireEyeParser#parse(java.lang.String)}.
+	 * Test method for
+	 *
+	 *
+	 *
+	 *
+	 *
+	 * {@link org.apache.metron.parsing.parsers.BasicFireEyeParser#parse(byte[])}.
 	 */
 	@SuppressWarnings({ "rawtypes"})
 	public void testParse() {
 		for (String inputString : getInputStrings()) {
-			JSONObject parsed = parser.parse(inputString.getBytes());
+			JSONObject parsed = parser.parse(inputString.getBytes()).get(0);
 			Assert.assertNotNull(parsed);
 		
 			JSONParser parser = new JSONParser();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicIseParserTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicIseParserTest.java b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicIseParserTest.java
index e8842b9..03d31a3 100644
--- a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicIseParserTest.java
+++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicIseParserTest.java
@@ -110,7 +110,7 @@ public class BasicIseParserTest extends AbstractSchemaTest {
 	 */
 	public void testParse() throws ParseException, IOException, Exception {
         for (String inputString : getInputStrings()) {
-            JSONObject parsed = parser.parse(inputString.getBytes());
+            JSONObject parsed = parser.parse(inputString.getBytes()).get(0);
             assertNotNull(parsed);
         
             System.out.println(parsed);
@@ -139,7 +139,7 @@ public class BasicIseParserTest extends AbstractSchemaTest {
 	/**
 	 * Sets the iseParser.
 	 * 
-	 * @param iseParser
+	 * @param parser
 	 */
 
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicLancopeParserTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicLancopeParserTest.java b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicLancopeParserTest.java
index 9667699..70cf089 100644
--- a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicLancopeParserTest.java
+++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicLancopeParserTest.java
@@ -104,7 +104,7 @@ public class BasicLancopeParserTest extends AbstractSchemaTest {
     public void testParse() throws IOException, Exception {
         
         for (String inputString : getInputStrings()) {
-            JSONObject parsed = parser.parse(inputString.getBytes());
+            JSONObject parsed = parser.parse(inputString.getBytes()).get(0);
             assertNotNull(parsed);
         
             System.out.println(parsed);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicPaloAltoFirewallParserTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicPaloAltoFirewallParserTest.java b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicPaloAltoFirewallParserTest.java
index 7a0b9d4..d989b6c 100644
--- a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicPaloAltoFirewallParserTest.java
+++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicPaloAltoFirewallParserTest.java
@@ -72,12 +72,13 @@ public class BasicPaloAltoFirewallParserTest extends AbstractConfigTest {
 		}
 
 		/**
-		 * Test method for {@link org.apache.metron.parsing.parsers.BasicSourcefireParser#parse(java.lang.String)}.
+		 * Test method for
+		 * {@link org.apache.metron.parsing.parsers.BasicSourcefireParser#parse(byte[])}.
 		 */
 		@SuppressWarnings({ "rawtypes" })
 		public void testParse() {
 			for (String inputString : getInputStrings()) {
-				JSONObject parsed = paParser.parse(inputString.getBytes());
+				JSONObject parsed = paParser.parse(inputString.getBytes()).get(0);
 				Assert.assertNotNull(parsed);
 			
 				System.out.println(parsed);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicSourcefireParserTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicSourcefireParserTest.java b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicSourcefireParserTest.java
index dd56eeb..3ac44fb 100644
--- a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicSourcefireParserTest.java
+++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicSourcefireParserTest.java
@@ -78,13 +78,13 @@ public class BasicSourcefireParserTest extends AbstractConfigTest
 	}
 
 	/**
-	 * Test method for {@link org.apache.metron.parsing.parsers.BasicSourcefireParser#parse(java.lang.String)}.
+	 * Test method for {@link org.apache.metron.parsing.parsers.BasicSourcefireParser#parse(byte[])}.
 	 */
 	@SuppressWarnings({ "rawtypes", "unused" })
 	public void testParse() {
 		for (String sourceFireString : getSourceFireStrings()) {
 		    byte[] srcBytes = sourceFireString.getBytes();
-			JSONObject parsed = sourceFireParser.parse(sourceFireString.getBytes());
+			JSONObject parsed = sourceFireParser.parse(sourceFireString.getBytes()).get(0);
 			Assert.assertNotNull(parsed);
 		
 			System.out.println(parsed);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BroParserTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BroParserTest.java b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BroParserTest.java
index 01a1210..2a543fe 100644
--- a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BroParserTest.java
+++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BroParserTest.java
@@ -85,7 +85,7 @@ public class BroParserTest extends AbstractConfigTest {
 	public void testParse() throws ParseException {
 
 		for (String inputString : getInputStrings()) {
-			JSONObject cleanJson = parser.parse(inputString.getBytes());
+			JSONObject cleanJson = parser.parse(inputString.getBytes()).get(0);
 			Assert.assertNotNull(cleanJson);
 			System.out.println(cleanJson);
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/GrokAsaParserTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/GrokAsaParserTest.java b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/GrokAsaParserTest.java
index 86f23bd..a59d5c8 100644
--- a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/GrokAsaParserTest.java
+++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/GrokAsaParserTest.java
@@ -75,13 +75,13 @@ public class GrokAsaParserTest extends AbstractConfigTest{
 		}
 
 		/**
-		 * Test method for {@link org.apache.metron.parsing.parsers.BasicSourcefireParser#parse(java.lang.String)}.
+		 * Test method for {@link org.apache.metron.parsing.parsers.BasicSourcefireParser#parse(byte[])}.
 		 */
 		@SuppressWarnings({ "rawtypes" })
 		public void testParse() {
 		    
 			for (String grokAsaString : getGrokAsaStrings()) {
-				JSONObject parsed = grokAsaParser.parse(grokAsaString.getBytes());
+				JSONObject parsed = grokAsaParser.parse(grokAsaString.getBytes()).get(0);
 				Assert.assertNotNull(parsed);
 			
 				System.out.println(parsed);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java
index 13f4c99..b922916 100644
--- a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java
@@ -109,7 +109,13 @@ public class GenericInternalTestSpout extends BaseRichSpout {
 		
 		if(cnt < jsons.size())
 		{
-			_collector.emit(new Values(_converter.convert(jsons.get(cnt))));
+			byte[] value;
+			if (_converter != null) {
+			  value = _converter.convert(jsons.get(cnt));
+			} else {
+				value = jsons.get(cnt).getBytes();
+			}
+			_collector.emit(new Values(value));
 		}
 		cnt ++;
 		

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/whitelists/known_hosts.conf
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/whitelists/known_hosts.conf b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/whitelists/known_hosts.conf
index 1c11207..d20b050 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/whitelists/known_hosts.conf
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/whitelists/known_hosts.conf
@@ -1,3 +1,4 @@
 10.1.128.236={"local":"YES", "type":"webserver", "asset_value" : "important"}
 10.1.128.237={"local":"UNKNOWN", "type":"unknown", "asset_value" : "important"}
-10.60.10.254={"local":"YES", "type":"printer", "asset_value" : "important"}
\ No newline at end of file
+10.60.10.254={"local":"YES", "type":"printer", "asset_value" : "important"}
+10.0.2.15={"local":"YES", "type":"webserver", "asset_value" : "important"}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/local.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/local.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/local.yaml
index 8615d8f..6c6c63b 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/local.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/local.yaml
@@ -3,36 +3,58 @@ config:
     topology.workers: 1
 
 components:
-    -   id: "asaParser"
+    -   id: "parser"
         className: "org.apache.metron.parsing.parsers.GrokAsaParser"
-    -   id: "genericMessageFilter"
-        className: "org.apache.metron.filters.GenericMessageFilter"
-    -   id: "geoKeys"
-        className: "java.util.ArrayList"
-        configMethods:
-            -   name: "add"
-                args: ["ip_src_addr"]
-            -   name: "add"
-                args: ["ip_dst_addr"]
+    -   id: "jdbcConfig"
+        className: "org.apache.metron.enrichment.adapters.jdbc.MySqlConfig"
+        properties:
+            -   name: "host"
+                value: "${mysql.ip}"
+            -   name: "port"
+                value: ${mysql.port}
+            -   name: "username"
+                value: "${mysql.username}"
+            -   name: "password"
+                value: "${mysql.password}"
+            -   name: "table"
+                value: "GEO"
     -   id: "geoEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.geo.GeoMysqlAdapter"
-        constructorArgs:
-            - "${mysql.ip}"
-            - ${mysql.port}
-            - "${mysql.username}"
-            - "${mysql.password}"
-            - "GEO"
-    -   id: "hostsKeys"
-        className: "java.util.ArrayList"
+        className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
         configMethods:
-            -   name: "add"
-                args: ["ip_src_addr"]
-            -   name: "add"
-                args: ["ip_dst_addr"]
+            -   name: "withJdbcConfig"
+                args:
+                    - ref: "jdbcConfig"
+    -   id: "geoEnrichment"
+        className: "org.apache.metron.domain.Enrichment"
+        properties:
+            -   name: "name"
+                value:  "geo"
+            -   name: "fields"
+                value: ["ip_src_addr", "ip_dst_addr"]
+            -   name: "adapter"
+                ref: "geoEnrichmentAdapter"
     -   id: "hostEnrichmentAdapter"
         className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
         constructorArgs:
             - '${org.apache.metron.enrichment.host.known_hosts}'
+    -   id: "hostEnrichment"
+        className: "org.apache.metron.domain.Enrichment"
+        properties:
+            -   name: "name"
+                value:  "host"
+            -   name: "fields"
+                value: ["ip_src_addr", "ip_dst_addr"]
+            -   name: "adapter"
+                ref: "hostEnrichmentAdapter"
+    -   id: "enrichments"
+        className: "java.util.ArrayList"
+        configMethods:
+            -   name: "add"
+                args:
+                    - ref: "geoEnrichment"
+            -   name: "add"
+                args:
+                    - ref: "hostEnrichment"
     -   id: "indexAdapter"
         className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
     -   id: "alertsConfig"
@@ -135,60 +157,14 @@ spouts:
 
 bolts:
     -   id: "parserBolt"
-        className: "org.apache.metron.parsing.TelemetryParserBolt"
+        className: "org.apache.metron.bolt.TelemetryParserBolt"
         configMethods:
             -   name: "withMessageParser"
                 args:
-                    - ref: "asaParser"
-            -   name: "withOutputFieldName"
+                    - ref: "parser"
+            -   name: "withEnrichments"
                 args:
-                    - "asa"
-            -   name: "withMessageFilter"
-                args:
-                    - ref: "genericMessageFilter"
-            -   name: "withMetricConfig"
-                args:
-                    - ref: "metricConfig"
-    -   id: "geoEnrichmentBolt"
-        className: "org.apache.metron.enrichment.common.GenericEnrichmentBolt"
-        configMethods:
-            -   name: "withEnrichmentTag"
-                args: ["geo"]
-            -   name: "withAdapter"
-                args:
-                    - ref: "geoEnrichmentAdapter"
-            -   name: "withMaxTimeRetain"
-                args: [10]
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withOutputFieldName"
-                args: ["asa"]
-            -   name: "withKeys"
-                args:
-                    - ref: "geoKeys"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
-    -   id: "hostEnrichmentBolt"
-        className: "org.apache.metron.enrichment.common.GenericEnrichmentBolt"
-        configMethods:
-            -   name: "withEnrichmentTag"
-                args: ["host"]
-            -   name: "withAdapter"
-                args:
-                    - ref: "hostEnrichmentAdapter"
-            -   name: "withMaxTimeRetain"
-                args: [10]
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withOutputFieldName"
-                args: ["asa"]
-            -   name: "withKeys"
-                args:
-                    - ref: "hostsKeys"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
+                    - ref: "enrichments"
     -   id: "indexingBolt"
         className: "org.apache.metron.indexing.TelemetryIndexingBolt"
         configMethods:
@@ -297,6 +273,36 @@ bolts:
             -   name: "withMetricConfiguration"
                 args:
                     - ref: "metricConfig"
+    -   id: "geoEnrichmentBolt"
+        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+        configMethods:
+            -   name: "withEnrichment"
+                args:
+                    - ref: "geoEnrichment"
+            -   name: "withMaxCacheSize"
+                args: [10000]
+            -   name: "withMaxTimeRetain"
+                args: [10]
+    -   id: "hostEnrichmentBolt"
+        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+        configMethods:
+            -   name: "withEnrichment"
+                args:
+                    - ref: "hostEnrichment"
+            -   name: "withMaxCacheSize"
+                args: [10000]
+            -   name: "withMaxTimeRetain"
+                args: [10]
+    -   id: "joinBolt"
+        className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
+        configMethods:
+        -   name: "withEnrichments"
+            args:
+                - ref: "enrichments"
+        -   name: "withMaxCacheSize"
+            args: [10000]
+        -   name: "withMaxTimeRetain"
+            args: [10]
 
 streams:
     -   name: "spout -> parser"
@@ -304,22 +310,43 @@ streams:
         to: "parserBolt"
         grouping:
             type: SHUFFLE
+    -   name: "parser -> host"
+        from: "parserBolt"
+        to: "hostEnrichmentBolt"
+        grouping:
+            streamId: "host"
+            type: FIELDS
+            args: ["key"]
     -   name: "parser -> geo"
         from: "parserBolt"
         to: "geoEnrichmentBolt"
         grouping:
+            streamId: "geo"
             type: FIELDS
+            args: ["key"]
+    -   name: "parser -> join"
+        from: "parserBolt"
+        to: "joinBolt"
+        grouping:
             streamId: "message"
+            type: FIELDS
             args: ["key"]
-    -   name: "geo -> host"
+    -   name: "geo -> join"
         from: "geoEnrichmentBolt"
-        to: "hostEnrichmentBolt"
+        to: "joinBolt"
         grouping:
+            streamId: "geo"
             type: FIELDS
-            streamId: "message"
             args: ["key"]
-    -   name: "host -> alerts"
+    -   name: "host -> join"
         from: "hostEnrichmentBolt"
+        to: "joinBolt"
+        grouping:
+            streamId: "host"
+            type: FIELDS
+            args: ["key"]
+    -   name: "join -> alerts"
+        from: "joinBolt"
         to: "alertsBolt"
         grouping:
             streamId: "message"
@@ -331,8 +358,8 @@ streams:
         grouping:
             streamId: "message"
             type: SHUFFLE
-    -   name: "alerts -> indexing"
-        from: "alertsBolt"
+    -   name: "join -> indexing"
+        from: "joinBolt"
         to: "indexingBolt"
         grouping:
             streamId: "message"
@@ -355,4 +382,4 @@ streams:
         to: "errorIndexingBolt"
         grouping:
             streamId: "error"
-            type: SHUFFLE
+            type: SHUFFLE
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/remote.yaml
index e3b01d6..01492c5 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/remote.yaml
@@ -3,36 +3,58 @@ config:
     topology.workers: 1
 
 components:
-    -   id: "asaParser"
+    -   id: "parser"
         className: "org.apache.metron.parsing.parsers.GrokAsaParser"
-    -   id: "genericMessageFilter"
-        className: "org.apache.metron.filters.GenericMessageFilter"
-    -   id: "geoKeys"
-        className: "java.util.ArrayList"
-        configMethods:
-            -   name: "add"
-                args: ["ip_src_addr"]
-            -   name: "add"
-                args: ["ip_dst_addr"]
+    -   id: "jdbcConfig"
+        className: "org.apache.metron.enrichment.adapters.jdbc.MySqlConfig"
+        properties:
+            -   name: "host"
+                value: "${mysql.ip}"
+            -   name: "port"
+                value: ${mysql.port}
+            -   name: "username"
+                value: "${mysql.username}"
+            -   name: "password"
+                value: "${mysql.password}"
+            -   name: "table"
+                value: "GEO"
     -   id: "geoEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.geo.GeoMysqlAdapter"
-        constructorArgs:
-            - "${mysql.ip}"
-            - ${mysql.port}
-            - "${mysql.username}"
-            - "${mysql.password}"
-            - "GEO"
-    -   id: "hostsKeys"
-        className: "java.util.ArrayList"
+        className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
         configMethods:
-            -   name: "add"
-                args: ["ip_src_addr"]
-            -   name: "add"
-                args: ["ip_dst_addr"]
+            -   name: "withJdbcConfig"
+                args:
+                    - ref: "jdbcConfig"
+    -   id: "geoEnrichment"
+        className: "org.apache.metron.domain.Enrichment"
+        properties:
+            -   name: "name"
+                value:  "geo"
+            -   name: "fields"
+                value: ["ip_src_addr", "ip_dst_addr"]
+            -   name: "adapter"
+                ref: "geoEnrichmentAdapter"
     -   id: "hostEnrichmentAdapter"
         className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
         constructorArgs:
             - '${org.apache.metron.enrichment.host.known_hosts}'
+    -   id: "hostEnrichment"
+        className: "org.apache.metron.domain.Enrichment"
+        properties:
+            -   name: "name"
+                value:  "host"
+            -   name: "fields"
+                value: ["ip_src_addr", "ip_dst_addr"]
+            -   name: "adapter"
+                ref: "hostEnrichmentAdapter"
+    -   id: "enrichments"
+        className: "java.util.ArrayList"
+        configMethods:
+            -   name: "add"
+                args:
+                    - ref: "geoEnrichment"
+            -   name: "add"
+                args:
+                    - ref: "hostEnrichment"
     -   id: "indexAdapter"
         className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
     -   id: "alertsConfig"
@@ -130,11 +152,11 @@ components:
             # zookeeper hosts
             - ref: "zkHosts"
             # topic name
-            - "${spout.kafka.topic.asa}"
+            - "${spout.kafka.topic.pcap}"
             # zk root
             - ""
             # id
-            - "${spout.kafka.topic.asa}"
+            - "${spout.kafka.topic.pcap}"
         properties:
             -   name: "forceFromStart"
                 value: true
@@ -149,60 +171,14 @@ spouts:
 
 bolts:
     -   id: "parserBolt"
-        className: "org.apache.metron.parsing.TelemetryParserBolt"
+        className: "org.apache.metron.bolt.TelemetryParserBolt"
         configMethods:
             -   name: "withMessageParser"
                 args:
-                    - ref: "asaParser"
-            -   name: "withOutputFieldName"
-                args:
-                    - "asa"
-            -   name: "withMessageFilter"
+                    - ref: "parser"
+            -   name: "withEnrichments"
                 args:
-                    - ref: "genericMessageFilter"
-            -   name: "withMetricConfig"
-                args:
-                    - ref: "metricConfig"
-    -   id: "geoEnrichmentBolt"
-        className: "org.apache.metron.enrichment.common.GenericEnrichmentBolt"
-        configMethods:
-            -   name: "withEnrichmentTag"
-                args: ["geo"]
-            -   name: "withAdapter"
-                args:
-                    - ref: "geoEnrichmentAdapter"
-            -   name: "withMaxTimeRetain"
-                args: [10]
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withOutputFieldName"
-                args: ["asa"]
-            -   name: "withKeys"
-                args:
-                    - ref: "geoKeys"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
-    -   id: "hostEnrichmentBolt"
-        className: "org.apache.metron.enrichment.common.GenericEnrichmentBolt"
-        configMethods:
-            -   name: "withEnrichmentTag"
-                args: ["host"]
-            -   name: "withAdapter"
-                args:
-                    - ref: "hostEnrichmentAdapter"
-            -   name: "withMaxTimeRetain"
-                args: [10]
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withOutputFieldName"
-                args: ["asa"]
-            -   name: "withKeys"
-                args:
-                    - ref: "hostsKeys"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
+                    - ref: "enrichments"
     -   id: "indexingBolt"
         className: "org.apache.metron.indexing.TelemetryIndexingBolt"
         configMethods:
@@ -311,6 +287,36 @@ bolts:
             -   name: "withMetricConfiguration"
                 args:
                     - ref: "metricConfig"
+    -   id: "geoEnrichmentBolt"
+        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+        configMethods:
+            -   name: "withEnrichment"
+                args:
+                    - ref: "geoEnrichment"
+            -   name: "withMaxCacheSize"
+                args: [10000]
+            -   name: "withMaxTimeRetain"
+                args: [10]
+    -   id: "hostEnrichmentBolt"
+        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+        configMethods:
+            -   name: "withEnrichment"
+                args:
+                    - ref: "hostEnrichment"
+            -   name: "withMaxCacheSize"
+                args: [10000]
+            -   name: "withMaxTimeRetain"
+                args: [10]
+    -   id: "joinBolt"
+        className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
+        configMethods:
+        -   name: "withEnrichments"
+            args:
+                - ref: "enrichments"
+        -   name: "withMaxCacheSize"
+            args: [10000]
+        -   name: "withMaxTimeRetain"
+            args: [10]
 
 streams:
     -   name: "spout -> parser"
@@ -318,22 +324,43 @@ streams:
         to: "parserBolt"
         grouping:
             type: SHUFFLE
+    -   name: "parser -> host"
+        from: "parserBolt"
+        to: "hostEnrichmentBolt"
+        grouping:
+            streamId: "host"
+            type: FIELDS
+            args: ["key"]
     -   name: "parser -> geo"
         from: "parserBolt"
         to: "geoEnrichmentBolt"
         grouping:
+            streamId: "geo"
             type: FIELDS
+            args: ["key"]
+    -   name: "parser -> join"
+        from: "parserBolt"
+        to: "joinBolt"
+        grouping:
             streamId: "message"
+            type: FIELDS
             args: ["key"]
-    -   name: "geo -> host"
+    -   name: "geo -> join"
         from: "geoEnrichmentBolt"
-        to: "hostEnrichmentBolt"
+        to: "joinBolt"
         grouping:
+            streamId: "geo"
             type: FIELDS
-            streamId: "message"
             args: ["key"]
-    -   name: "host -> alerts"
+    -   name: "host -> join"
         from: "hostEnrichmentBolt"
+        to: "joinBolt"
+        grouping:
+            streamId: "host"
+            type: FIELDS
+            args: ["key"]
+    -   name: "join -> alerts"
+        from: "joinBolt"
         to: "alertsBolt"
         grouping:
             streamId: "message"
@@ -345,8 +372,8 @@ streams:
         grouping:
             streamId: "message"
             type: SHUFFLE
-    -   name: "alerts -> indexing"
-        from: "alertsBolt"
+    -   name: "join -> indexing"
+        from: "joinBolt"
         to: "indexingBolt"
         grouping:
             streamId: "message"
@@ -369,4 +396,4 @@ streams:
         to: "errorIndexingBolt"
         grouping:
             streamId: "error"
-            type: SHUFFLE
+            type: SHUFFLE
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/local.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/local.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/local.yaml
index e1ea080..8013b37 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/local.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/local.yaml
@@ -83,20 +83,11 @@ spouts:
 
 bolts:
     -   id: "parserBolt"
-        className: "org.apache.metron.parsing.TelemetryParserBolt"
+        className: "org.apache.metron.bolt.TelemetryParserBolt"
         configMethods:
             -   name: "withMessageParser"
                 args:
                     - ref: "broParser"
-            -   name: "withOutputFieldName"
-                args:
-                    - "bro"
-            -   name: "withMessageFilter"
-                args:
-                    - ref: "genericMessageFilter"
-            -   name: "withMetricConfig"
-                args:
-                    - ref: "metricConfig"
     -   id: "indexingBolt"
         className: "org.apache.metron.indexing.TelemetryIndexingBolt"
         configMethods:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
index 956e346..23fbc56 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
@@ -97,20 +97,11 @@ spouts:
 
 bolts:
     -   id: "parserBolt"
-        className: "org.apache.metron.parsing.TelemetryParserBolt"
+        className: "org.apache.metron.bolt.TelemetryParserBolt"
         configMethods:
             -   name: "withMessageParser"
                 args:
                     - ref: "broParser"
-            -   name: "withOutputFieldName"
-                args:
-                    - "bro"
-            -   name: "withMessageFilter"
-                args:
-                    - ref: "genericMessageFilter"
-            -   name: "withMetricConfig"
-                args:
-                    - ref: "metricConfig"
     -   id: "indexingBolt"
         className: "org.apache.metron.indexing.TelemetryIndexingBolt"
         configMethods:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/fireeye/local.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/fireeye/local.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/fireeye/local.yaml
index 3390dd7..e519715 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/fireeye/local.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/fireeye/local.yaml
@@ -3,36 +3,58 @@ config:
     topology.workers: 1
 
 components:
-    -   id: "fireEyeParser"
+    -   id: "parser"
         className: "org.apache.metron.parsing.parsers.BasicFireEyeParser"
-    -   id: "genericMessageFilter"
-        className: "org.apache.metron.filters.GenericMessageFilter"
-    -   id: "geoKeys"
-        className: "java.util.ArrayList"
-        configMethods:
-            -   name: "add"
-                args: ["ip_src_addr"]
-            -   name: "add"
-                args: ["ip_dst_addr"]
+    -   id: "jdbcConfig"
+        className: "org.apache.metron.enrichment.adapters.jdbc.MySqlConfig"
+        properties:
+            -   name: "host"
+                value: "${mysql.ip}"
+            -   name: "port"
+                value: ${mysql.port}
+            -   name: "username"
+                value: "${mysql.username}"
+            -   name: "password"
+                value: "${mysql.password}"
+            -   name: "table"
+                value: "GEO"
     -   id: "geoEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.geo.GeoMysqlAdapter"
-        constructorArgs:
-            - "${mysql.ip}"
-            - ${mysql.port}
-            - "${mysql.username}"
-            - "${mysql.password}"
-            - "GEO"
-    -   id: "hostsKeys"
-        className: "java.util.ArrayList"
+        className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
         configMethods:
-            -   name: "add"
-                args: ["ip_src_addr"]
-            -   name: "add"
-                args: ["ip_dst_addr"]
+            -   name: "withJdbcConfig"
+                args:
+                    - ref: "jdbcConfig"
+    -   id: "geoEnrichment"
+        className: "org.apache.metron.domain.Enrichment"
+        properties:
+            -   name: "name"
+                value:  "geo"
+            -   name: "fields"
+                value: ["ip_src_addr", "ip_dst_addr"]
+            -   name: "adapter"
+                ref: "geoEnrichmentAdapter"
     -   id: "hostEnrichmentAdapter"
         className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
         constructorArgs:
             - '${org.apache.metron.enrichment.host.known_hosts}'
+    -   id: "hostEnrichment"
+        className: "org.apache.metron.domain.Enrichment"
+        properties:
+            -   name: "name"
+                value:  "host"
+            -   name: "fields"
+                value: ["ip_src_addr", "ip_dst_addr"]
+            -   name: "adapter"
+                ref: "hostEnrichmentAdapter"
+    -   id: "enrichments"
+        className: "java.util.ArrayList"
+        configMethods:
+            -   name: "add"
+                args:
+                    - ref: "geoEnrichment"
+            -   name: "add"
+                args:
+                    - ref: "hostEnrichment"
     -   id: "indexAdapter"
         className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
     -   id: "alertsConfig"
@@ -135,60 +157,14 @@ spouts:
 
 bolts:
     -   id: "parserBolt"
-        className: "org.apache.metron.parsing.TelemetryParserBolt"
+        className: "org.apache.metron.bolt.TelemetryParserBolt"
         configMethods:
             -   name: "withMessageParser"
                 args:
-                    - ref: "fireEyeParser"
-            -   name: "withOutputFieldName"
+                    - ref: "parser"
+            -   name: "withEnrichments"
                 args:
-                    - "fireeye"
-            -   name: "withMessageFilter"
-                args:
-                    - ref: "genericMessageFilter"
-            -   name: "withMetricConfig"
-                args:
-                    - ref: "metricConfig"
-    -   id: "geoEnrichmentBolt"
-        className: "org.apache.metron.enrichment.common.GenericEnrichmentBolt"
-        configMethods:
-            -   name: "withEnrichmentTag"
-                args: ["geo"]
-            -   name: "withAdapter"
-                args:
-                    - ref: "geoEnrichmentAdapter"
-            -   name: "withMaxTimeRetain"
-                args: [10]
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withOutputFieldName"
-                args: ["fireeye"]
-            -   name: "withKeys"
-                args:
-                    - ref: "geoKeys"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
-    -   id: "hostEnrichmentBolt"
-        className: "org.apache.metron.enrichment.common.GenericEnrichmentBolt"
-        configMethods:
-            -   name: "withEnrichmentTag"
-                args: ["host"]
-            -   name: "withAdapter"
-                args:
-                    - ref: "hostEnrichmentAdapter"
-            -   name: "withMaxTimeRetain"
-                args: [10]
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withOutputFieldName"
-                args: ["fireeye"]
-            -   name: "withKeys"
-                args:
-                    - ref: "hostsKeys"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
+                    - ref: "enrichments"
     -   id: "indexingBolt"
         className: "org.apache.metron.indexing.TelemetryIndexingBolt"
         configMethods:
@@ -297,6 +273,36 @@ bolts:
             -   name: "withMetricConfiguration"
                 args:
                     - ref: "metricConfig"
+    -   id: "geoEnrichmentBolt"
+        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+        configMethods:
+            -   name: "withEnrichment"
+                args:
+                    - ref: "geoEnrichment"
+            -   name: "withMaxCacheSize"
+                args: [10000]
+            -   name: "withMaxTimeRetain"
+                args: [10]
+    -   id: "hostEnrichmentBolt"
+        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+        configMethods:
+            -   name: "withEnrichment"
+                args:
+                    - ref: "hostEnrichment"
+            -   name: "withMaxCacheSize"
+                args: [10000]
+            -   name: "withMaxTimeRetain"
+                args: [10]
+    -   id: "joinBolt"
+        className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
+        configMethods:
+        -   name: "withEnrichments"
+            args:
+                - ref: "enrichments"
+        -   name: "withMaxCacheSize"
+            args: [10000]
+        -   name: "withMaxTimeRetain"
+            args: [10]
 
 streams:
     -   name: "spout -> parser"
@@ -304,22 +310,43 @@ streams:
         to: "parserBolt"
         grouping:
             type: SHUFFLE
+    -   name: "parser -> host"
+        from: "parserBolt"
+        to: "hostEnrichmentBolt"
+        grouping:
+            streamId: "host"
+            type: FIELDS
+            args: ["key"]
     -   name: "parser -> geo"
         from: "parserBolt"
         to: "geoEnrichmentBolt"
         grouping:
+            streamId: "geo"
             type: FIELDS
+            args: ["key"]
+    -   name: "parser -> join"
+        from: "parserBolt"
+        to: "joinBolt"
+        grouping:
             streamId: "message"
+            type: FIELDS
             args: ["key"]
-    -   name: "geo -> host"
+    -   name: "geo -> join"
         from: "geoEnrichmentBolt"
-        to: "hostEnrichmentBolt"
+        to: "joinBolt"
         grouping:
+            streamId: "geo"
             type: FIELDS
-            streamId: "message"
             args: ["key"]
-    -   name: "host -> alerts"
+    -   name: "host -> join"
         from: "hostEnrichmentBolt"
+        to: "joinBolt"
+        grouping:
+            streamId: "host"
+            type: FIELDS
+            args: ["key"]
+    -   name: "join -> alerts"
+        from: "joinBolt"
         to: "alertsBolt"
         grouping:
             streamId: "message"
@@ -331,8 +358,8 @@ streams:
         grouping:
             streamId: "message"
             type: SHUFFLE
-    -   name: "alerts -> indexing"
-        from: "alertsBolt"
+    -   name: "join -> indexing"
+        from: "joinBolt"
         to: "indexingBolt"
         grouping:
             streamId: "message"
@@ -355,4 +382,4 @@ streams:
         to: "errorIndexingBolt"
         grouping:
             streamId: "error"
-            type: SHUFFLE
+            type: SHUFFLE
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/fireeye/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/fireeye/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/fireeye/remote.yaml
index b759adf..494c233 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/fireeye/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/fireeye/remote.yaml
@@ -3,36 +3,58 @@ config:
     topology.workers: 1
 
 components:
-    -   id: "fireEyeParser"
+    -   id: "parser"
         className: "org.apache.metron.parsing.parsers.BasicFireEyeParser"
-    -   id: "genericMessageFilter"
-        className: "org.apache.metron.filters.GenericMessageFilter"
-    -   id: "geoKeys"
-        className: "java.util.ArrayList"
-        configMethods:
-            -   name: "add"
-                args: ["ip_src_addr"]
-            -   name: "add"
-                args: ["ip_dst_addr"]
+    -   id: "jdbcConfig"
+        className: "org.apache.metron.enrichment.adapters.jdbc.MySqlConfig"
+        properties:
+            -   name: "host"
+                value: "${mysql.ip}"
+            -   name: "port"
+                value: ${mysql.port}
+            -   name: "username"
+                value: "${mysql.username}"
+            -   name: "password"
+                value: "${mysql.password}"
+            -   name: "table"
+                value: "GEO"
     -   id: "geoEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.geo.GeoMysqlAdapter"
-        constructorArgs:
-            - "${mysql.ip}"
-            - ${mysql.port}
-            - "${mysql.username}"
-            - "${mysql.password}"
-            - "GEO"
-    -   id: "hostsKeys"
-        className: "java.util.ArrayList"
+        className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
         configMethods:
-            -   name: "add"
-                args: ["ip_src_addr"]
-            -   name: "add"
-                args: ["ip_dst_addr"]
+            -   name: "withJdbcConfig"
+                args:
+                    - ref: "jdbcConfig"
+    -   id: "geoEnrichment"
+        className: "org.apache.metron.domain.Enrichment"
+        properties:
+            -   name: "name"
+                value:  "geo"
+            -   name: "fields"
+                value: ["ip_src_addr", "ip_dst_addr"]
+            -   name: "adapter"
+                ref: "geoEnrichmentAdapter"
     -   id: "hostEnrichmentAdapter"
         className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
         constructorArgs:
             - '${org.apache.metron.enrichment.host.known_hosts}'
+    -   id: "hostEnrichment"
+        className: "org.apache.metron.domain.Enrichment"
+        properties:
+            -   name: "name"
+                value:  "host"
+            -   name: "fields"
+                value: ["ip_src_addr", "ip_dst_addr"]
+            -   name: "adapter"
+                ref: "hostEnrichmentAdapter"
+    -   id: "enrichments"
+        className: "java.util.ArrayList"
+        configMethods:
+            -   name: "add"
+                args:
+                    - ref: "geoEnrichment"
+            -   name: "add"
+                args:
+                    - ref: "hostEnrichment"
     -   id: "indexAdapter"
         className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
     -   id: "alertsConfig"
@@ -130,11 +152,11 @@ components:
             # zookeeper hosts
             - ref: "zkHosts"
             # topic name
-            - "${spout.kafka.topic.fireeye}"
+            - "${spout.kafka.topic.pcap}"
             # zk root
             - ""
             # id
-            - "${spout.kafka.topic.fireeye}"
+            - "${spout.kafka.topic.pcap}"
         properties:
             -   name: "forceFromStart"
                 value: true
@@ -149,60 +171,14 @@ spouts:
 
 bolts:
     -   id: "parserBolt"
-        className: "org.apache.metron.parsing.TelemetryParserBolt"
+        className: "org.apache.metron.bolt.TelemetryParserBolt"
         configMethods:
             -   name: "withMessageParser"
                 args:
-                    - ref: "fireEyeParser"
-            -   name: "withOutputFieldName"
-                args:
-                    - "fireeye"
-            -   name: "withMessageFilter"
+                    - ref: "parser"
+            -   name: "withEnrichments"
                 args:
-                    - ref: "genericMessageFilter"
-            -   name: "withMetricConfig"
-                args:
-                    - ref: "metricConfig"
-    -   id: "geoEnrichmentBolt"
-        className: "org.apache.metron.enrichment.common.GenericEnrichmentBolt"
-        configMethods:
-            -   name: "withEnrichmentTag"
-                args: ["geo"]
-            -   name: "withAdapter"
-                args:
-                    - ref: "geoEnrichmentAdapter"
-            -   name: "withMaxTimeRetain"
-                args: [10]
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withOutputFieldName"
-                args: ["fireeye"]
-            -   name: "withKeys"
-                args:
-                    - ref: "geoKeys"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
-    -   id: "hostEnrichmentBolt"
-        className: "org.apache.metron.enrichment.common.GenericEnrichmentBolt"
-        configMethods:
-            -   name: "withEnrichmentTag"
-                args: ["host"]
-            -   name: "withAdapter"
-                args:
-                    - ref: "hostEnrichmentAdapter"
-            -   name: "withMaxTimeRetain"
-                args: [10]
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withOutputFieldName"
-                args: ["fireeye"]
-            -   name: "withKeys"
-                args:
-                    - ref: "hostsKeys"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
+                    - ref: "enrichments"
     -   id: "indexingBolt"
         className: "org.apache.metron.indexing.TelemetryIndexingBolt"
         configMethods:
@@ -311,6 +287,36 @@ bolts:
             -   name: "withMetricConfiguration"
                 args:
                     - ref: "metricConfig"
+    -   id: "geoEnrichmentBolt"
+        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+        configMethods:
+            -   name: "withEnrichment"
+                args:
+                    - ref: "geoEnrichment"
+            -   name: "withMaxCacheSize"
+                args: [10000]
+            -   name: "withMaxTimeRetain"
+                args: [10]
+    -   id: "hostEnrichmentBolt"
+        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+        configMethods:
+            -   name: "withEnrichment"
+                args:
+                    - ref: "hostEnrichment"
+            -   name: "withMaxCacheSize"
+                args: [10000]
+            -   name: "withMaxTimeRetain"
+                args: [10]
+    -   id: "joinBolt"
+        className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
+        configMethods:
+        -   name: "withEnrichments"
+            args:
+                - ref: "enrichments"
+        -   name: "withMaxCacheSize"
+            args: [10000]
+        -   name: "withMaxTimeRetain"
+            args: [10]
 
 streams:
     -   name: "spout -> parser"
@@ -318,22 +324,43 @@ streams:
         to: "parserBolt"
         grouping:
             type: SHUFFLE
+    -   name: "parser -> host"
+        from: "parserBolt"
+        to: "hostEnrichmentBolt"
+        grouping:
+            streamId: "host"
+            type: FIELDS
+            args: ["key"]
     -   name: "parser -> geo"
         from: "parserBolt"
         to: "geoEnrichmentBolt"
         grouping:
+            streamId: "geo"
             type: FIELDS
+            args: ["key"]
+    -   name: "parser -> join"
+        from: "parserBolt"
+        to: "joinBolt"
+        grouping:
             streamId: "message"
+            type: FIELDS
             args: ["key"]
-    -   name: "geo -> host"
+    -   name: "geo -> join"
         from: "geoEnrichmentBolt"
-        to: "hostEnrichmentBolt"
+        to: "joinBolt"
         grouping:
+            streamId: "geo"
             type: FIELDS
-            streamId: "message"
             args: ["key"]
-    -   name: "host -> alerts"
+    -   name: "host -> join"
         from: "hostEnrichmentBolt"
+        to: "joinBolt"
+        grouping:
+            streamId: "host"
+            type: FIELDS
+            args: ["key"]
+    -   name: "join -> alerts"
+        from: "joinBolt"
         to: "alertsBolt"
         grouping:
             streamId: "message"
@@ -345,8 +372,8 @@ streams:
         grouping:
             streamId: "message"
             type: SHUFFLE
-    -   name: "alerts -> indexing"
-        from: "alertsBolt"
+    -   name: "join -> indexing"
+        from: "joinBolt"
         to: "indexingBolt"
         grouping:
             streamId: "message"
@@ -369,4 +396,4 @@ streams:
         to: "errorIndexingBolt"
         grouping:
             streamId: "error"
-            type: SHUFFLE
+            type: SHUFFLE
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/ise/local.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/ise/local.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/ise/local.yaml
index 9722413..fc02ac4 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/ise/local.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/ise/local.yaml
@@ -83,20 +83,11 @@ spouts:
 
 bolts:
     -   id: "parserBolt"
-        className: "org.apache.metron.parsing.TelemetryParserBolt"
+        className: "org.apache.metron.bolt.TelemetryParserBolt"
         configMethods:
             -   name: "withMessageParser"
                 args:
                     - ref: "iseParser"
-            -   name: "withOutputFieldName"
-                args:
-                    - "ise"
-            -   name: "withMessageFilter"
-                args:
-                    - ref: "genericMessageFilter"
-            -   name: "withMetricConfig"
-                args:
-                    - ref: "metricConfig"
     -   id: "indexingBolt"
         className: "org.apache.metron.indexing.TelemetryIndexingBolt"
         configMethods:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/ise/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/ise/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/ise/remote.yaml
index d425269..88272ef 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/ise/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/ise/remote.yaml
@@ -97,20 +97,11 @@ spouts:
 
 bolts:
     -   id: "parserBolt"
-        className: "org.apache.metron.parsing.TelemetryParserBolt"
+        className: "org.apache.metron.bolt.TelemetryParserBolt"
         configMethods:
             -   name: "withMessageParser"
                 args:
                     - ref: "iseParser"
-            -   name: "withOutputFieldName"
-                args:
-                    - "ise"
-            -   name: "withMessageFilter"
-                args:
-                    - ref: "genericMessageFilter"
-            -   name: "withMetricConfig"
-                args:
-                    - ref: "metricConfig"
     -   id: "indexingBolt"
         className: "org.apache.metron.indexing.TelemetryIndexingBolt"
         configMethods:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/905b09f6/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/lancope/local.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/lancope/local.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/lancope/local.yaml
index f83924a..7df3163 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/lancope/local.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/lancope/local.yaml
@@ -3,25 +3,58 @@ config:
     topology.workers: 1
 
 components:
-    -   id: "lancopeParser"
+    -   id: "parser"
         className: "org.apache.metron.parsing.parsers.BasicLancopeParser"
-    -   id: "genericMessageFilter"
-        className: "org.apache.metron.filters.GenericMessageFilter"
-    -   id: "geoKeys"
+    -   id: "jdbcConfig"
+        className: "org.apache.metron.enrichment.adapters.jdbc.MySqlConfig"
+        properties:
+            -   name: "host"
+                value: "${mysql.ip}"
+            -   name: "port"
+                value: ${mysql.port}
+            -   name: "username"
+                value: "${mysql.username}"
+            -   name: "password"
+                value: "${mysql.password}"
+            -   name: "table"
+                value: "GEO"
+    -   id: "geoEnrichmentAdapter"
+        className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
+        configMethods:
+            -   name: "withJdbcConfig"
+                args:
+                    - ref: "jdbcConfig"
+    -   id: "geoEnrichment"
+        className: "org.apache.metron.domain.Enrichment"
+        properties:
+            -   name: "name"
+                value:  "geo"
+            -   name: "fields"
+                value: ["ip_src_addr", "ip_dst_addr"]
+            -   name: "adapter"
+                ref: "geoEnrichmentAdapter"
+    -   id: "hostEnrichmentAdapter"
+        className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
+        constructorArgs:
+            - '${org.apache.metron.enrichment.host.known_hosts}'
+    -   id: "hostEnrichment"
+        className: "org.apache.metron.domain.Enrichment"
+        properties:
+            -   name: "name"
+                value:  "host"
+            -   name: "fields"
+                value: ["ip_src_addr", "ip_dst_addr"]
+            -   name: "adapter"
+                ref: "hostEnrichmentAdapter"
+    -   id: "enrichments"
         className: "java.util.ArrayList"
         configMethods:
             -   name: "add"
-                args: ["ip_src_addr"]
+                args:
+                    - ref: "geoEnrichment"
             -   name: "add"
-                args: ["ip_dst_addr"]
-    -   id: "geoEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.geo.GeoMysqlAdapter"
-        constructorArgs:
-            - "${mysql.ip}"
-            - ${mysql.port}
-            - "${mysql.username}"
-            - "${mysql.password}"
-            - "GEO"
+                args:
+                    - ref: "hostEnrichment"
     -   id: "indexAdapter"
         className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
     -   id: "alertsConfig"
@@ -124,40 +157,14 @@ spouts:
 
 bolts:
     -   id: "parserBolt"
-        className: "org.apache.metron.parsing.TelemetryParserBolt"
+        className: "org.apache.metron.bolt.TelemetryParserBolt"
         configMethods:
             -   name: "withMessageParser"
                 args:
-                    - ref: "lancopeParser"
-            -   name: "withOutputFieldName"
-                args:
-                    - "lancope"
-            -   name: "withMessageFilter"
-                args:
-                    - ref: "genericMessageFilter"
-            -   name: "withMetricConfig"
-                args:
-                    - ref: "metricConfig"
-    -   id: "geoEnrichmentBolt"
-        className: "org.apache.metron.enrichment.common.GenericEnrichmentBolt"
-        configMethods:
-            -   name: "withEnrichmentTag"
-                args: ["geo"]
-            -   name: "withAdapter"
-                args:
-                    - ref: "geoEnrichmentAdapter"
-            -   name: "withMaxTimeRetain"
-                args: [10]
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withOutputFieldName"
-                args: ["lancope"]
-            -   name: "withKeys"
-                args:
-                    - ref: "geoKeys"
-            -   name: "withMetricConfiguration"
+                    - ref: "parser"
+            -   name: "withEnrichments"
                 args:
-                    - ref: "metricConfig"
+                    - ref: "enrichments"
     -   id: "indexingBolt"
         className: "org.apache.metron.indexing.TelemetryIndexingBolt"
         configMethods:
@@ -266,6 +273,36 @@ bolts:
             -   name: "withMetricConfiguration"
                 args:
                     - ref: "metricConfig"
+    -   id: "geoEnrichmentBolt"
+        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+        configMethods:
+            -   name: "withEnrichment"
+                args:
+                    - ref: "geoEnrichment"
+            -   name: "withMaxCacheSize"
+                args: [10000]
+            -   name: "withMaxTimeRetain"
+                args: [10]
+    -   id: "hostEnrichmentBolt"
+        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+        configMethods:
+            -   name: "withEnrichment"
+                args:
+                    - ref: "hostEnrichment"
+            -   name: "withMaxCacheSize"
+                args: [10000]
+            -   name: "withMaxTimeRetain"
+                args: [10]
+    -   id: "joinBolt"
+        className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
+        configMethods:
+        -   name: "withEnrichments"
+            args:
+                - ref: "enrichments"
+        -   name: "withMaxCacheSize"
+            args: [10000]
+        -   name: "withMaxTimeRetain"
+            args: [10]
 
 streams:
     -   name: "spout -> parser"
@@ -273,19 +310,47 @@ streams:
         to: "parserBolt"
         grouping:
             type: SHUFFLE
+    -   name: "parser -> host"
+        from: "parserBolt"
+        to: "hostEnrichmentBolt"
+        grouping:
+            streamId: "host"
+            type: FIELDS
+            args: ["key"]
     -   name: "parser -> geo"
         from: "parserBolt"
         to: "geoEnrichmentBolt"
         grouping:
+            streamId: "geo"
             type: FIELDS
+            args: ["key"]
+    -   name: "parser -> join"
+        from: "parserBolt"
+        to: "joinBolt"
+        grouping:
             streamId: "message"
+            type: FIELDS
             args: ["key"]
-    -   name: "geo -> alerts"
+    -   name: "geo -> join"
         from: "geoEnrichmentBolt"
-        to: "alertsBolt"
+        to: "joinBolt"
+        grouping:
+            streamId: "geo"
+            type: FIELDS
+            args: ["key"]
+    -   name: "host -> join"
+        from: "hostEnrichmentBolt"
+        to: "joinBolt"
         grouping:
+            streamId: "host"
             type: FIELDS
+            args: ["key"]
+    -   name: "join -> alerts"
+        from: "joinBolt"
+        to: "alertsBolt"
+        grouping:
             streamId: "message"
+            type: FIELDS
             args: ["key"]
     -   name: "alerts -> alertsIndexing"
         from: "alertsBolt"
@@ -293,8 +358,8 @@ streams:
         grouping:
             streamId: "message"
             type: SHUFFLE
-    -   name: "alerts -> indexing"
-        from: "alertsBolt"
+    -   name: "join -> indexing"
+        from: "joinBolt"
         to: "indexingBolt"
         grouping:
             streamId: "message"
@@ -317,4 +382,4 @@ streams:
         to: "errorIndexingBolt"
         grouping:
             streamId: "error"
-            type: SHUFFLE
+            type: SHUFFLE
\ No newline at end of file