You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/03/08 00:25:08 UTC

[2/3] incubator-metron git commit: METRON-58 Remediate Deployment Integration Testing Issues (dlyle65535 via cestella) closes apache/incubator-metron#36

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
new file mode 100644
index 0000000..d704908
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public enum JSONUtils {
+  INSTANCE;
+  private static ThreadLocal<ObjectMapper> _mapper = new ThreadLocal<ObjectMapper>() {
+    /**
+     * Returns the current thread's "initial value" for this
+     * thread-local variable.  This method will be invoked the first
+     * time a thread accesses the variable with the {@link #get}
+     * method, unless the thread previously invoked the {@link #set}
+     * method, in which case the {@code initialValue} method will not
+     * be invoked for the thread.  Normally, this method is invoked at
+     * most once per thread, but it may be invoked again in case of
+     * subsequent invocations of {@link #remove} followed by {@link #get}.
+     * <p>
+     * <p>This implementation simply returns {@code null}; if the
+     * programmer desires thread-local variables to have an initial
+     * value other than {@code null}, {@code ThreadLocal} must be
+     * subclassed, and this method overridden.  Typically, an
+     * anonymous inner class will be used.
+     *
+     * @return the initial value for this thread-local
+     */
+    @Override
+    protected ObjectMapper initialValue() {
+      return new ObjectMapper();
+    }
+  };
+
+  public <T> T load(InputStream is, Class<T> clazz) throws IOException {
+    return _mapper.get().readValue(is, clazz);
+  }
+
+  public <T> T load(String is, Class<T> clazz) throws IOException {
+    return _mapper.get().readValue(is, clazz);
+  }
+
+  public String toJSON(Object o, boolean pretty) throws JsonProcessingException {
+    if(pretty) {
+      return _mapper.get().writerWithDefaultPrettyPrinter().writeValueAsString(o);
+    }
+    else {
+      return _mapper.get().writeValueAsString(o);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Common/src/test/resources/config/source/bro-config.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/resources/config/source/bro-config.json b/metron-streaming/Metron-Common/src/test/resources/config/source/bro-config.json
index fcbfc03..34109b8 100644
--- a/metron-streaming/Metron-Common/src/test/resources/config/source/bro-config.json
+++ b/metron-streaming/Metron-Common/src/test/resources/config/source/bro-config.json
@@ -3,11 +3,12 @@
   "batchSize": 5,
   "enrichmentFieldMap":
   {
-    "geo": ["id.orig_h"],
-    "host": ["id.orig_h"]
+    "geo": ["ip_dst_addr", "ip_src_addr"],
+    "host": ["host"]
   },
   "threatIntelFieldMap":
   {
-    "ip": ["id.orig_h"]
+    "ip": ["ip_dst_addr", "ip_src_addr"]
   }
-}
\ No newline at end of file
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Common/src/test/resources/config/source/snort-config.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/resources/config/source/snort-config.json b/metron-streaming/Metron-Common/src/test/resources/config/source/snort-config.json
index ceb441e..1208637 100644
--- a/metron-streaming/Metron-Common/src/test/resources/config/source/snort-config.json
+++ b/metron-streaming/Metron-Common/src/test/resources/config/source/snort-config.json
@@ -3,11 +3,12 @@
   "batchSize": 1,
   "enrichmentFieldMap":
   {
-    "geo": ["src", "dst"],
-    "host": ["src", "dst"]
+    "geo": ["ip_dst_addr", "ip_src_addr"],
+    "host": ["host"]
   },
   "threatIntelFieldMap":
   {
-    "ip": ["src", "dst"]
+    "ip": ["ip_dst_addr", "ip_src_addr"]
   }
-}
\ No newline at end of file
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Common/src/test/resources/config/source/yaf-config.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/resources/config/source/yaf-config.json b/metron-streaming/Metron-Common/src/test/resources/config/source/yaf-config.json
index abf4ff4..65de961 100644
--- a/metron-streaming/Metron-Common/src/test/resources/config/source/yaf-config.json
+++ b/metron-streaming/Metron-Common/src/test/resources/config/source/yaf-config.json
@@ -3,11 +3,12 @@
   "batchSize": 5,
   "enrichmentFieldMap":
   {
-    "geo": ["sip", "dip"],
-    "host": ["sip", "dip"]
+    "geo": ["ip_dst_addr", "ip_src_addr"],
+    "host": ["host"]
   },
   "threatIntelFieldMap":
   {
-    "ip": ["sip", "dip"]
+    "ip": ["ip_dst_addr", "ip_src_addr"]
   }
-}
\ No newline at end of file
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
index 7d7ef98..02f9f9b 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
@@ -103,7 +103,7 @@ public class ThreatIntelBulkLoader  {
                 return o;
             }
         })
-        ,AS_OF_TIME_FORMAT("t", new OptionHandler() {
+        ,AS_OF_TIME_FORMAT("z", new OptionHandler() {
             @Nullable
             @Override
             public Option apply(@Nullable String s) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
index 10e1e71..a2cec5a 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
@@ -25,10 +25,7 @@ import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
 
@@ -67,8 +64,20 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
   public JSONObject joinMessages(Map<String, JSONObject> streamMessageMap) {
     JSONObject message = new JSONObject();
     for (String key : streamMessageMap.keySet()) {
-      message.putAll(streamMessageMap.get(key));
+      JSONObject obj = streamMessageMap.get(key);
+      message.putAll(obj);
     }
+    List<Object> emptyKeys = new ArrayList<>();
+    for(Object key : message.keySet()) {
+      Object value = message.get(key);
+      if(value.toString().length() == 0) {
+        emptyKeys.add(key);
+      }
+    }
+    for(Object o : emptyKeys) {
+      message.remove(o);
+    }
+    message.put(getClass().getSimpleName().toLowerCase() + ".joiner.ts", "" + System.currentTimeMillis());
     return message;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
index 5839f39..51508d8 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
@@ -87,6 +87,7 @@ public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
             }
         } else {
             message = (JSONObject) tuple.getValueByField(messageFieldName);
+            message.put(getClass().getSimpleName().toLowerCase() + ".splitter.ts", "" + System.currentTimeMillis());
         }
         return message;
     }
@@ -103,6 +104,7 @@ public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
     @SuppressWarnings("unchecked")
     @Override
     public Map<String, JSONObject> splitMessage(JSONObject message) {
+
         Map<String, JSONObject> streamMessageMap = new HashMap<>();
         String sourceType = TopologyUtils.getSourceType(message);
         Map<String, List<String>> enrichmentFieldMap = getFieldMap(sourceType);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index b184975..b5c4c44 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -148,6 +148,7 @@ public class GenericEnrichmentBolt extends ConfiguredBolt {
     String key = tuple.getStringByField("key");
     JSONObject rawMessage = (JSONObject) tuple.getValueByField("message");
     JSONObject enrichedMessage = new JSONObject();
+    enrichedMessage.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".begin.ts", "" + System.currentTimeMillis());
     try {
       if (rawMessage == null || rawMessage.isEmpty())
         throw new Exception("Could not parse binary stream to JSON");
@@ -174,11 +175,10 @@ public class GenericEnrichmentBolt extends ConfiguredBolt {
           } else {
             enrichedMessage.put(field, "");
           }
-          if (enrichmentType.equals("host")) {
-            String test = "";
-          }
         }
       }
+
+      enrichedMessage.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".end.ts", "" + System.currentTimeMillis());
       if (!enrichedMessage.isEmpty()) {
         collector.emit(enrichmentType, new Values(key, enrichedMessage));
       }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
index ba17fdb..3516ee0 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.enrichment.bolt;
 
+import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,5 +38,15 @@ public class ThreatIntelJoinBolt extends EnrichmentJoinBolt {
     return configurations.get(sourceType).getThreatIntelFieldMap();
   }
 
-
+  @Override
+  public JSONObject joinMessages(Map<String, JSONObject> streamMessageMap) {
+    JSONObject ret = super.joinMessages(streamMessageMap);
+    for(Object key : ret.keySet()) {
+      if(key.toString().startsWith("threatintels") && !key.toString().endsWith(".ts")) {
+        ret.put("is_alert" , "true");
+        break;
+      }
+    }
+    return ret;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
index 9db6398..b95f4b8 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
@@ -63,7 +63,7 @@ public class ThreatIntelAdapter implements EnrichmentAdapter<String>,Serializabl
             throw new RuntimeException("Unable to retrieve value", e);
         }
         if(isThreat) {
-            enriched.put("threat_source", config.getHBaseTable());
+            enriched.put(config.getHBaseTable(), "alert");
             _LOG.trace("Enriched value => " + enriched);
         }
         //throw new RuntimeException("Unable to retrieve value " + value);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/TelemetryIndexingBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/TelemetryIndexingBolt.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/TelemetryIndexingBolt.java
index ff151c7..21ecb18 100644
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/TelemetryIndexingBolt.java
+++ b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/TelemetryIndexingBolt.java
@@ -92,6 +92,17 @@ public class TelemetryIndexingBolt extends AbstractIndexingBolt {
 	}
 
 	/**
+	 *
+	 * @param IndexName
+	 *            name of the index in ElasticSearch/Solr/etc...
+	 * @return instance of bolt
+	 */
+	public TelemetryIndexingBolt withIndexName(String IndexName) {
+		_IndexName = IndexName;
+		return this;
+	}
+
+	/**
 	 * 
 	 * @param ClusterName
 	 *            name of cluster to index into in ElasticSearch/Solr/etc...

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
index a0df685..2769efe 100644
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
+++ b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
@@ -27,6 +27,8 @@ import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.text.SimpleDateFormat;
@@ -42,6 +44,8 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
   private String host;
   private int port;
   private SimpleDateFormat dateFormat;
+  private static final Logger LOG = LoggerFactory
+          .getLogger(ElasticsearchWriter.class);
 
   public ElasticsearchWriter(String clusterName, String host, int port, String dateFormat) {
     this.clusterName = clusterName;
@@ -64,7 +68,8 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
       builder.put(optionalSettings);
     }
     client = new TransportClient(builder.build())
-            .addTransportAddress(new InetSocketTransportAddress(host, port));
+            .addTransportAddress(new InetSocketTransportAddress(host, port))
+            ;
 
   }
 
@@ -77,8 +82,9 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
       if (configuration != null) {
         indexName = configuration.getIndex();
       }
-      IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName + "_" + indexPostfix,
+      IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName + "_index_" + indexPostfix,
               sourceType);
+
       indexRequestBuilder.setSource(message.toJSONString());
       bulkRequest.add(indexRequestBuilder);
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java
index 04b943c..9aa8d72 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java
@@ -89,8 +89,13 @@ public class BasicBroParser extends BasicParser {
             long timestamp = 0L;
             if (payload.containsKey("timestamp")) {
                 try {
-                    timestamp = Long.parseLong(payload.get("timestamp").toString());
+                    String broTimestamp = payload.get("timestamp").toString();
+                    String convertedTimestamp = broTimestamp.replace(".","");
+                    convertedTimestamp = convertedTimestamp.substring(0,13);
+                    timestamp = Long.parseLong(convertedTimestamp);
                     payload.put("timestamp", timestamp);
+                    payload.put("bro_timestamp",broTimestamp);
+                    _LOG.trace(String.format("[Metron] new bro record - timestamp : %s", payload.get("timestamp")));
                 } catch (NumberFormatException nfe) {
                     _LOG.error(String.format("[Metron] timestamp is invalid: %s", payload.get("timestamp")));
                     payload.put("timestamp", 0);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java
index 6d89428..27b05a4 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java
@@ -111,6 +111,7 @@ public class BasicSnortParser extends BasicParser {
 
 			// add original msg; required by 'checkForSchemaCorrectness'
 			jsonMessage.put("original_string", csvMessage);
+			jsonMessage.put("is_alert", "true");
 			messages.add(jsonMessage);
 		} catch (Exception e) {
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java
index e2c5f32..f080e96 100644
--- a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java
+++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java
@@ -89,6 +89,28 @@ public class BasicBroParserTest extends TestCase {
 	}
 
 	@SuppressWarnings("rawtypes")
+	public void testHttpDecimalBroMessage() throws ParseException {
+		String rawMessage = "{\"http\":{\"ts\":1457149494.166991,\"uid\":\"CTo78A11g7CYbbOHvj\",\"id.orig_h\":\"192.249.113.37\",\"id.orig_p\":58808,\"id.resp_h\":\"72.163.4.161\",\"id.resp_p\":80,\"trans_depth\":1,\"method\":\"GET\",\"host\":\"www.cisco.com\",\"uri\":\"/\",\"user_agent\":\"curl/7.22.0 (x86_64-pc-linux-gnu) libcurl/7.22.0 OpenSSL/1.0.1 zlib/1.2.3.4 libidn/1.23 librtmp/2.3\",\"request_body_len\":0,\"response_body_len\":25523,\"status_code\":200,\"status_msg\":\"OK\",\"tags\":[],\"resp_fuids\":[\"FJDyMC15lxUn5ngPfd\"],\"resp_mime_types\":[\"text/html\"]}}";
+		String expectedTimestamp = "1457149494166";
+		Map rawMessageMap = (Map) jsonParser.parse(rawMessage);
+		JSONObject rawJson = (JSONObject) rawMessageMap.get(rawMessageMap.keySet().iterator().next());
+
+		JSONObject broJson = broParser.parse(rawMessage.getBytes()).get(0);
+		Assert.assertEquals(broJson.get("timestamp").toString(), expectedTimestamp);
+		Assert.assertEquals(broJson.get("ip_src_addr").toString(), rawJson.get("id.orig_h").toString());
+		Assert.assertEquals(broJson.get("ip_dst_addr").toString(), rawJson.get("id.resp_h").toString());
+		Assert.assertEquals(broJson.get("ip_src_port").toString(), rawJson.get("id.orig_p").toString());
+		Assert.assertEquals(broJson.get("ip_dst_port").toString(), rawJson.get("id.resp_p").toString());
+		Assert.assertTrue(broJson.get("original_string").toString().startsWith(rawMessageMap.keySet().iterator().next().toString().toUpperCase()));
+
+		Assert.assertEquals(broJson.get("uid").toString(), rawJson.get("uid").toString());
+		Assert.assertEquals(broJson.get("method").toString(), rawJson.get("method").toString());
+		Assert.assertEquals(broJson.get("host").toString(), rawJson.get("host").toString());
+		Assert.assertEquals(broJson.get("resp_mime_types").toString(), rawJson.get("resp_mime_types").toString());
+	}
+
+
+	@SuppressWarnings("rawtypes")
 	public void testDnsBroMessage() throws ParseException {
 		String rawMessage = "{\"dns\":{\"ts\":1402308259609,\"uid\":\"CuJT272SKaJSuqO0Ia\",\"id.orig_h\":\"10.122.196.204\",\"id.orig_p\":33976,\"id.resp_h\":\"144.254.71.184\",\"id.resp_p\":53,\"proto\":\"udp\",\"trans_id\":62418,\"query\":\"www.cisco.com\",\"qclass\":1,\"qclass_name\":\"C_INTERNET\",\"qtype\":28,\"qtype_name\":\"AAAA\",\"rcode\":0,\"rcode_name\":\"NOERROR\",\"AA\":true,\"TC\":false,\"RD\":true,\"RA\":true,\"Z\":0,\"answers\":[\"www.cisco.com.akadns.net\",\"origin-www.cisco.com\",\"2001:420:1201:2::a\"],\"TTLs\":[3600.0,289.0,14.0],\"rejected\":false}}";
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-MessageParsers/src/test/resources/BroParserTest.log
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/resources/BroParserTest.log b/metron-streaming/Metron-MessageParsers/src/test/resources/BroParserTest.log
index e71f28e..2fcdd5a 100644
--- a/metron-streaming/Metron-MessageParsers/src/test/resources/BroParserTest.log
+++ b/metron-streaming/Metron-MessageParsers/src/test/resources/BroParserTest.log
@@ -1,3 +1,4 @@
 {"http":{"ts":1402307733473,"uid":"CTo78A11g7CYbbOHvj","id.orig_h":"192.249.113.37","id.orig_p":58808,"id.resp_h":"72.163.4.161","id.resp_p":80,"trans_depth":1,"method":"GET","host":"www.cisco.com","uri":"/","user_agent":"curl/7.22.0 (x86_64-pc-linux-gnu) libcurl/7.22.0 OpenSSL/1.0.1 zlib/1.2.3.4 libidn/1.23 librtmp/2.3","request_body_len":0,"response_body_len":25523,"status_code":200,"status_msg":"OK","tags":[],"resp_fuids":["FJDyMC15lxUn5ngPfd"],"resp_mime_types":["text/html"]}}
 {"dns":{"ts":1402308259609,"uid":"CuJT272SKaJSuqO0Ia","id.orig_h":"10.122.196.204","id.orig_p":33976,"id.resp_h":"144.254.71.184","id.resp_p":53,"proto":"udp","trans_id":62418,"query":"www.cisco.com","qclass":1,"qclass_name":"C_INTERNET","qtype":28,"qtype_name":"AAAA","rcode":0,"rcode_name":"NOERROR","AA":true,"TC":false,"RD":true,"RA":true,"Z":0,"answers":["www.cisco.com.akadns.net","origin-www.cisco.com","2001:420:1201:2::a"],"TTLs":[3600.0,289.0,14.0],"rejected":false}}
-{"files":{"analyzers": ["X509","MD5","SHA1"],"conn_uids":["C4tygJ3qxJBEJEBCeh"],"depth": 0,"duration": 0.0,"fuid":"FZEBC33VySG0nHSoO9","is_orig": false,"local_orig": false,"md5": "eba37166385e3ef42464ed9752e99f1b","missing_bytes": 0,"overflow_bytes": 0,"protocol": "files","rx_hosts": ["10.220.15.205"],"seen_bytes": 1136,"sha1": "73e42686657aece354fbf685712361658f2f4357","source": "SSL","timedout": false,"ts": "1425845251334","tx_hosts": ["68.171.237.7"]}}
\ No newline at end of file
+{"files":{"analyzers": ["X509","MD5","SHA1"],"conn_uids":["C4tygJ3qxJBEJEBCeh"],"depth": 0,"duration": 0.0,"fuid":"FZEBC33VySG0nHSoO9","is_orig": false,"local_orig": false,"md5": "eba37166385e3ef42464ed9752e99f1b","missing_bytes": 0,"overflow_bytes": 0,"protocol": "files","rx_hosts": ["10.220.15.205"],"seen_bytes": 1136,"sha1": "73e42686657aece354fbf685712361658f2f4357","source": "SSL","timedout": false,"ts": "1425845251334","tx_hosts": ["68.171.237.7"]}}
+{"http": {"ts":1457149494.166991,"uid":"C5xbFM2QfGB8OZKPrg","id.orig_h":"192.168.138.158","id.orig_p":49195,"id.resp_h":"188.165.164.184","id.resp_p":80,"trans_depth":1,"method":"GET","host":"ip-addr.es","uri":"/","user_agent":"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0)","request_body_len":0,"response_body_len":0,"tags":[]}}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ComponentRunner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ComponentRunner.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ComponentRunner.java
index f9a8ca2..2c31759 100644
--- a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ComponentRunner.java
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ComponentRunner.java
@@ -26,11 +26,23 @@ public class ComponentRunner {
         LinkedHashMap<String, InMemoryComponent> components;
         String[] startupOrder;
         String[] shutdownOrder;
-        long timeBetweenAttempts;
+        long timeBetweenAttempts = 1000;
+        int numRetries = 5;
+        long maxTimeMS = 120000;
         public Builder() {
             components = new LinkedHashMap<String, InMemoryComponent>();
         }
 
+        public Builder withNumRetries(int numRetries) {
+            this.numRetries = numRetries;
+            return this;
+        }
+
+        public Builder withMaxTimeMS(long maxTimeMS) {
+            this.maxTimeMS = maxTimeMS;
+            return this;
+        }
+
         public Builder withComponent(String name, InMemoryComponent component) {
             components.put(name, component);
             return this;
@@ -44,7 +56,7 @@ public class ComponentRunner {
             this.shutdownOrder = shutdownOrder;
             return this;
         }
-        public Builder withTimeBetweenAttempts(long timeBetweenAttempts) {
+        public Builder withMillisecondsBetweenAttempts(long timeBetweenAttempts) {
             this.timeBetweenAttempts = timeBetweenAttempts;
             return this;
         }
@@ -63,7 +75,7 @@ public class ComponentRunner {
             if(startupOrder == null) {
                 startupOrder = toOrderedList(components);
             }
-            return new ComponentRunner(components, startupOrder, shutdownOrder, timeBetweenAttempts);
+            return new ComponentRunner(components, startupOrder, shutdownOrder, timeBetweenAttempts, numRetries, maxTimeMS);
         }
 
     }
@@ -72,16 +84,22 @@ public class ComponentRunner {
     String[] startupOrder;
     String[] shutdownOrder;
     long timeBetweenAttempts;
+    int numRetries;
+    long maxTimeMS;
     public ComponentRunner( LinkedHashMap<String, InMemoryComponent> components
                           , String[] startupOrder
                           , String[] shutdownOrder
                           , long timeBetweenAttempts
+                          , int numRetries
+                          , long maxTimeMS
                           )
     {
         this.components = components;
         this.startupOrder = startupOrder;
         this.shutdownOrder = shutdownOrder;
         this.timeBetweenAttempts = timeBetweenAttempts;
+        this.numRetries = numRetries;
+        this.maxTimeMS = maxTimeMS;
     }
 
     public <T extends InMemoryComponent> T getComponent(String name, Class<T> clazz) {
@@ -103,17 +121,14 @@ public class ComponentRunner {
         }
     }
 
-    public <T> T process(Processor<T> successState) {
-        return process(successState, 5, 120000);
-    }
 
-    public <T> T process(Processor<T> successState, int numRetries, long maxTimeMs) {
+    public <T> T process(Processor<T> successState) {
         int retryCount = 0;
         long start = System.currentTimeMillis();
         while(true) {
             long duration = System.currentTimeMillis() - start;
-            if(duration > maxTimeMs) {
-                throw new RuntimeException("Took too long to complete: " + duration + " > " + maxTimeMs);
+            if(duration > maxTimeMS) {
+                throw new RuntimeException("Took too long to complete: " + duration + " > " + maxTimeMS);
             }
             ReadinessState state = successState.process(this);
             if(state == ReadinessState.READY) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/pom.xml b/metron-streaming/Metron-Topologies/pom.xml
index 9a7d8df..4530e6d 100644
--- a/metron-streaming/Metron-Topologies/pom.xml
+++ b/metron-streaming/Metron-Topologies/pom.xml
@@ -150,8 +150,11 @@
             <type>pom</type>
             <scope>provided</scope>
         </dependency>
-
-
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+            <version>1.2</version>
+        </dependency>
         <dependency>
             <groupId>com.github.ptgoetz</groupId>
             <artifactId>storm-hbase</artifactId>
@@ -251,16 +254,10 @@
                                 <excludes>
                                     <exclude>storm:storm-core:*</exclude>
                                     <exclude>storm:storm-lib:*</exclude>
-                                    <exclude>*slf4j*</exclude>
+                                    <exclude>org.slf4j.impl*</exclude>
+                                    <exclude>org.slf4j:slf4j-log4j*</exclude>
                                 </excludes>
                             </artifactSet>
-                            <!--relocations>
-                                <relocation>
-                                    <pattern>com.google.common</pattern>
-                                    <shadedPattern>org.apache.metron.guava.common</shadedPattern>
-                                </relocation>
-                            </relocations-->
-
                             <transformers>
                                 <transformer
                                         implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
index ef8b2e2..13ccd0c 100644
--- a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.utils;
 
+import org.apache.commons.cli.*;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -27,6 +28,7 @@ import org.apache.zookeeper.KeeperException;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.io.PrintWriter;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.List;
@@ -80,16 +82,39 @@ public class SourceConfigUtils {
   }
 
   public static void main(String[] args) {
+
+      Options options = new Options();
+      options.addOption("p", true, "Path to source option files");
+      options.addOption("z", true, "Zookeeper Quroum URL (zk1:2181,zk2:2181,...");
+
     try {
-      File root = new File("./metron-streaming/Metron-Common/src/test/resources/config/source/");
-      for(File child: root.listFiles()) {
-        writeToZookeeperFromFile(child.getName().replaceFirst("-config.json", ""), child.getPath(), "node1:2181");
+      CommandLineParser parser = new BasicParser();
+      CommandLine cmd = parser.parse( options, args);
+
+      if( !cmd.hasOption('p') || !cmd.hasOption('z') ){
+        final PrintWriter writer = new PrintWriter(System.out);
+        final HelpFormatter usageFormatter = new HelpFormatter();
+        usageFormatter.printUsage(writer, 80, "Apache Metron SourceConfigUtils", options);
+        writer.close();
+        System.exit(1);
+      }
+
+      String sourcePath = cmd.getOptionValue('p');
+      String zkQuorum = cmd.getOptionValue('z');
+
+
+      File root = new File(sourcePath);
+
+      if( root.isDirectory() ) {
+        for (File child : root.listFiles()) {
+          writeToZookeeperFromFile(child.getName().replaceFirst("-config.json", ""), child.getPath(), zkQuorum);
+        }
       }
-      SourceConfigUtils.dumpConfigs("node1:2181");
+
+      SourceConfigUtils.dumpConfigs(zkQuorum);
+
     } catch (Exception e) {
       e.printStackTrace();
     }
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
index e805fba..5d2786d 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
@@ -18,8 +18,9 @@
 ##### Kafka #####
 
 kafka.zk=zkpr1:2181,zkpr2:2181,zkpr3:2181
+kafka.broker=kfka1:6667
 spout.kafka.topic.asa=asa
-spout.kafka.topic.bro=bro_raw
+spout.kafka.topic.bro=bro
 spout.kafka.topic.fireeye=fireeye
 spout.kafka.topic.ise=ise
 spout.kafka.topic.lancope=lancope

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
index fb594b5..42412fa 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
@@ -18,7 +18,6 @@ name: "bro"
 config:
     topology.workers: 1
 
-
 components:
     -   id: "parser"
         className: "org.apache.metron.parsing.parsers.BasicBroParser"
@@ -45,21 +44,11 @@ components:
             -   name: "ignoreZkOffsets"
                 value: true
             -   name: "startOffsetTime"
-                value: -1
+                value: -2
             -   name: "socketTimeoutMs"
                 value: 1000000
 
 spouts:
-    -   id: "testingSpout"
-        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
-        parallelism: 1
-        configMethods:
-            -   name: "withFilename"
-                args:
-                    - "SampleInput/YafExampleOutput"
-            -   name: "withRepeating"
-                args:
-                    - true
     -   id: "kafkaSpout"
         className: "storm.kafka.KafkaSpout"
         constructorArgs:
@@ -70,7 +59,7 @@ bolts:
         className: "org.apache.metron.bolt.ParserBolt"
         constructorArgs:
             - "${kafka.zk}"
-            - "yaf"
+            - "${spout.kafka.topic.bro}"
             - ref: "parser"
             - ref: "writer"
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/test.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/test.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/test.yaml
index 3bd3eed..39131f3 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/test.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/test.yaml
@@ -50,16 +50,6 @@ components:
                 value: 1000000
 
 spouts:
-    -   id: "testingSpout"
-        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
-        parallelism: 1
-        configMethods:
-            -   name: "withFilename"
-                args:
-                    - "SampleInput/YafExampleOutput"
-            -   name: "withRepeating"
-                args:
-                    - false
     -   id: "kafkaSpout"
         className: "storm.kafka.KafkaSpout"
         constructorArgs:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
index 8033374..ec36f2c 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
@@ -17,6 +17,7 @@
 name: "enrichment"
 config:
     topology.workers: 1
+    topology.acker.executors: 0
 
 components:
 # Enrichment
@@ -129,19 +130,9 @@ components:
             -   name: "ignoreZkOffsets"
                 value: true
             -   name: "startOffsetTime"
-                value: -1
+                value: -2
 
 spouts:
-    -   id: "testingSpout"
-        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
-        parallelism: 1
-        configMethods:
-            -   name: "withFilename"
-                args:
-                    - "SampleInput/YafExampleOutput"
-            -   name: "withRepeating"
-                args:
-                    - true
     -   id: "kafkaSpout"
         className: "storm.kafka.KafkaSpout"
         constructorArgs:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/paloalto/test.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/paloalto/test.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/paloalto/test.yaml
index e56e16f..07a9f48 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/paloalto/test.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/paloalto/test.yaml
@@ -44,7 +44,7 @@ components:
             -   name: "ignoreZkOffsets"
                 value: true
             -   name: "startOffsetTime"
-                value: -2
+                value: -1
 
 spouts:
     -   id: "testingSpout"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/parse.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/parse.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/parse.yaml
index dabaa7d..bfc8527 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/parse.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/parse.yaml
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-name: "yaf-test"
+name: "pcap-parse"
 config:
     topology.workers: 1
 
@@ -45,7 +45,7 @@ components:
             -   name: "ignoreZkOffsets"
                 value: true
             -   name: "startOffsetTime"
-                value: -2
+                value: -1
 
 spouts:
     -   id: "kafkaSpout"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml
index f7b0f20..5bdbc17 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml
@@ -19,12 +19,12 @@ config:
     topology.workers: 1
 
 components:
+# Parser
     -   id: "parser"
         className: "org.apache.metron.parsing.parsers.PcapParser"
         configMethods:
             -   name: "withTsPrecision"
                 args: ["MICRO"]
-
 # Threat Intel
     -   id: "ipThreatIntelConfig"
         className: "org.apache.metron.threatintel.ThreatIntelConfig"
@@ -50,7 +50,7 @@ components:
     -   id: "ipThreatIntelEnrichment"
         className: "org.apache.metron.domain.Enrichment"
         properties:
-           - name: "name"
+           - name: "type"
              value: "ip"
            - name: "fields"
              value: ["message/ip_src_addr", "message/ip_dst_addr"]
@@ -62,35 +62,8 @@ components:
             -   name: "add"
                 args:
                     - ref: "ipThreatIntelEnrichment"
-#Enrichment
-#    -   id: "jdbcConfig"
-#        className: "org.apache.metron.enrichment.adapters.jdbc.MySqlConfig"
-#        properties:
-#            -   name: "host"
-#                value: "${mysql.ip}"
-#            -   name: "port"
-#                value: ${mysql.port}
-#            -   name: "username"
-#                value: "${mysql.username}"
-#            -   name: "password"
-#                value: "${mysql.password}"
-#            -   name: "table"
-#                value: "GEO"
-#    -   id: "geoEnrichmentAdapter"
-#        className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
-#        configMethods:
-#            -   name: "withJdbcConfig"
-#                args:
-#                    - ref: "jdbcConfig"
-#    -   id: "geoEnrichment"
-#        className: "org.apache.metron.domain.Enrichment"
-#        properties:
-#            -   name: "name"
-#                value:  "geo"
-#            -   name: "fields"
-#                value: ["ip_src_addr", "ip_dst_addr"]
-#            -   name: "adapter"
-#                ref: "geoEnrichmentAdapter"
+# Enrichment
+
     -   id: "hostEnrichmentAdapter"
         className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
         constructorArgs:
@@ -98,7 +71,7 @@ components:
     -   id: "hostEnrichment"
         className: "org.apache.metron.domain.Enrichment"
         properties:
-            -   name: "name"
+            -   name: "type"
                 value:  "host"
             -   name: "fields"
                 value: ["ip_src_addr", "ip_dst_addr"]
@@ -107,12 +80,10 @@ components:
     -   id: "enrichments"
         className: "java.util.ArrayList"
         configMethods:
-#            -   name: "add"
-#                args:
-#                    - ref: "geoEnrichment"
             -   name: "add"
                 args:
                     - ref: "hostEnrichment"
+#indexing
     -   id: "indexAdapter"
         className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
     -   id: "metricConfig"
@@ -190,10 +161,13 @@ components:
             # id
             - "${spout.kafka.topic.pcap}"
         properties:
-            -   name: "forceFromStart"
+            -   name: "ignoreZkOffsets"
                 value: true
             -   name: "startOffsetTime"
                 value: -1
+            -   name: "socketTimeoutMs"
+                value: 1000000
+#hbase bolt
     -   id: "hbaseConfig"
         className: "org.apache.metron.hbase.TupleTableConfig"
         configMethods:
@@ -217,7 +191,6 @@ spouts:
         className: "storm.kafka.KafkaSpout"
         constructorArgs:
             - ref: "kafkaConfig"
-
 bolts:
     -   id: "hbaseBolt"
         className: "org.apache.metron.hbase.HBaseBolt"
@@ -226,6 +199,8 @@ bolts:
             - "${kafka.zk}"
     -   id: "parserBolt"
         className: "org.apache.metron.bolt.PcapParserBolt"
+        constructorArgs:
+            - "${kafka.zk}"
         configMethods:
             -   name: "withMessageParser"
                 args:
@@ -235,6 +210,8 @@ bolts:
                     - ref: "enrichments"
     -   id: "indexingBolt"
         className: "org.apache.metron.indexing.TelemetryIndexingBolt"
+        constructorArgs:
+            - "${kafka.zk}"
         configMethods:
             -   name: "withIndexIP"
                 args:
@@ -265,6 +242,8 @@ bolts:
                     - ref: "metricConfig"
     -   id: "errorIndexingBolt"
         className: "org.apache.metron.indexing.TelemetryIndexingBolt"
+        constructorArgs:
+            - "${kafka.zk}"
         configMethods:
             -   name: "withIndexIP"
                 args:
@@ -293,15 +272,19 @@ bolts:
             -   name: "withMetricConfiguration"
                 args:
                     - ref: "metricConfig"
-    # Threat Intel Bolts
+# Threat Intel Bolts
     -   id: "threatIntelSplitBolt"
         className: "org.apache.metron.enrichment.bolt.EnrichmentSplitterBolt"
+        constructorArgs:
+            - "${kafka.zk}"
         configMethods:
             -   name: "withEnrichments"
                 args:
                     - ref: "threatIntels"
     -   id: "ipThreatIntelBolt"
         className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+        constructorArgs:
+            - "${kafka.zk}"
         configMethods:
             -   name: "withEnrichment"
                 args:
@@ -312,29 +295,21 @@ bolts:
                 args: [10]
     -   id: "threatIntelJoinBolt"
         className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
+        constructorArgs:
+            - "${kafka.zk}"
         configMethods:
             -   name: "withEnrichments"
                 args:
                     - ref: "threatIntels"
-            -   name: "withType"
-                args:
-                    - "alerts"
             -   name: "withMaxCacheSize"
                 args: [10000]
             -   name: "withMaxTimeRetain"
                 args: [10]
-#    -   id: "geoEnrichmentBolt"
-#        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-#        configMethods:
-#            -   name: "withEnrichment"
-#                args:
-#                    - ref: "geoEnrichment"
-#            -   name: "withMaxCacheSize"
-#                args: [10000]
-#            -   name: "withMaxTimeRetain"
-#                args: [10]
+# Enrichment Bolts
     -   id: "hostEnrichmentBolt"
         className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+        constructorArgs:
+            - "${kafka.zk}"
         configMethods:
             -   name: "withEnrichment"
                 args:
@@ -345,6 +320,8 @@ bolts:
                 args: [10]
     -   id: "joinBolt"
         className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
+        constructorArgs:
+            - "${kafka.zk}"
         configMethods:
         -   name: "withEnrichments"
             args:
@@ -355,11 +332,13 @@ bolts:
             args: [10]
 
 streams:
+#parser
     -   name: "spout -> parser"
         from: "kafkaSpout"
         to: "parserBolt"
         grouping:
             type: SHUFFLE
+#hbase
     -   name: "parser -> hbase"
         from: "parserBolt"
         to: "hbaseBolt"
@@ -367,6 +346,7 @@ streams:
             streamId: "raw"
             type: FIELDS
             args: ["key"]
+#enrichment
     -   name: "parser -> host"
         from: "parserBolt"
         to: "hostEnrichmentBolt"
@@ -374,13 +354,6 @@ streams:
             streamId: "host"
             type: FIELDS
             args: ["key"]
-#    -   name: "parser -> geo"
-#        from: "parserBolt"
-#        to: "geoEnrichmentBolt"
-#        grouping:
-#            streamId: "geo"
-#            type: FIELDS
-#            args: ["key"]
     -   name: "parser -> join"
         from: "parserBolt"
         to: "joinBolt"
@@ -388,13 +361,6 @@ streams:
             streamId: "message"
             type: FIELDS
             args: ["key"]
-#    -   name: "geo -> join"
-#        from: "geoEnrichmentBolt"
-#        to: "joinBolt"
-#        grouping:
-#            streamId: "geo"
-#            type: FIELDS
-#            args: ["key"]
     -   name: "host -> join"
         from: "hostEnrichmentBolt"
         to: "joinBolt"
@@ -402,6 +368,7 @@ streams:
             streamId: "host"
             type: FIELDS
             args: ["key"]
+
 #threat intel
     -   name: "enrichmentJoin -> threatSplit"
         from: "joinBolt"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/snort/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/snort/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/snort/remote.yaml
index 7f52d0f..3354d73 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/snort/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/snort/remote.yaml
@@ -47,16 +47,6 @@ components:
                 value: -1
 
 spouts:
-    -   id: "testingSpout"
-        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
-        parallelism: 1
-        configMethods:
-            -   name: "withFilename"
-                args:
-                    - "SampleInput/YafExampleOutput"
-            -   name: "withRepeating"
-                args:
-                    - true
     -   id: "kafkaSpout"
         className: "storm.kafka.KafkaSpout"
         constructorArgs:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/snort/test.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/snort/test.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/snort/test.yaml
index bdbea97..2734ead 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/snort/test.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/snort/test.yaml
@@ -42,21 +42,11 @@ components:
             - "${spout.kafka.topic.snort}"
         properties:
             -   name: "ignoreZkOffsets"
-                value: true
+                value: false
             -   name: "startOffsetTime"
                 value: -2
 
 spouts:
-    -   id: "testingSpout"
-        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
-        parallelism: 1
-        configMethods:
-            -   name: "withFilename"
-                args:
-                    - "SampleInput/YafExampleOutput"
-            -   name: "withRepeating"
-                args:
-                    - false
     -   id: "kafkaSpout"
         className: "storm.kafka.KafkaSpout"
         constructorArgs:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/remote.yaml
index 98395e9..f1a8ea5 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/remote.yaml
@@ -14,11 +14,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-name: "yaf-test"
+name: "yaf"
 config:
     topology.workers: 1
 
-
 components:
     -   id: "parser"
         className: "org.apache.metron.parsing.parsers.GrokParser"
@@ -63,16 +62,6 @@ components:
                 value: 1000000
 
 spouts:
-    -   id: "testingSpout"
-        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
-        parallelism: 1
-        configMethods:
-            -   name: "withFilename"
-                args:
-                    - "SampleInput/YafExampleOutput"
-            -   name: "withRepeating"
-                args:
-                    - true
     -   id: "kafkaSpout"
         className: "storm.kafka.KafkaSpout"
         constructorArgs:
@@ -83,7 +72,7 @@ bolts:
         className: "org.apache.metron.bolt.ParserBolt"
         constructorArgs:
             - "${kafka.zk}"
-            - "yaf"
+            - "${spout.kafka.topic.yaf}"
             - ref: "parser"
             - ref: "writer"
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/2e9f2c6c/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/test.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/test.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/test.yaml
index 021d3f8..fe764d4 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/test.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/test.yaml
@@ -63,16 +63,6 @@ components:
                 value: 1000000
 
 spouts:
-    -   id: "testingSpout"
-        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
-        parallelism: 1
-        configMethods:
-            -   name: "withFilename"
-                args:
-                    - "SampleInput/YafExampleOutput"
-            -   name: "withRepeating"
-                args:
-                    - false
     -   id: "kafkaSpout"
         className: "storm.kafka.KafkaSpout"
         constructorArgs: