You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/03/21 18:05:55 UTC

[13/43] incubator-metron git commit: METRON-56 Create unified enrichment topology (merrimanr via cestella) closes apache/incubator-metron#33

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/SnortParsed
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/SnortParsed b/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/SnortParsed
new file mode 100644
index 0000000..4b74794
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/SnortParsed
@@ -0,0 +1,3 @@
+{"msg":"\"Consecutive TCP small segments exceeding threshold\"","sig_rev":"1","dst":"10.0.2.15","dstport":"22","ethsrc":"52:54:00:12:35:02","tcpseq":"0x9AFF3D7","dgmlen":"64","icmpid":"","tcplen":"","tcpwindow":"0xFFFF","icmpseq":"","tcpack":"0xC8761D52","original_string":"01\/27-16:01:04.877970 ,129,12,1,\"Consecutive TCP small segments exceeding threshold\",TCP,10.0.2.2,56642,10.0.2.15,22,52:54:00:12:35:02,08:00:27:7F:93:2D,0x4E,***AP***,0x9AFF3D7,0xC8761D52,,0xFFFF,64,0,59677,64,65536,,,,","icmpcode":"","tos":"0","id":"59677","timestamp":1453932941970,"ethdst":"08:00:27:7F:93:2D","src":"10.0.2.2","ttl":"64","source.type":"test","ethlen":"0x4E","iplen":"65536","icmptype":"","proto":"TCP","srcport":"56642","tcpflags":"***AP***","sig_id":"12","sig_generator":"129"}
+{"msg":"\"Consecutive TCP small segments exceeding threshold\"","sig_rev":"1","dst":"10.0.2.15","dstport":"50895","ethsrc":"52:54:00:12:35:02","tcpseq":"0xDB45F7A","dgmlen":"96","icmpid":"","tcplen":"","tcpwindow":"0xFFFF","icmpseq":"","tcpack":"0x7701DD5B","original_string":"02\/22-15:56:48.612494 ,129,12,1,\"Consecutive TCP small segments exceeding threshold\",TCP,96.44.142.5,80,10.0.2.15,50895,52:54:00:12:35:02,08:00:27:7F:93:2D,0x6E,***AP***,0xDB45F7A,0x7701DD5B,,0xFFFF,64,0,16785,96,98304,,,,","icmpcode":"","tos":"0","id":"16785","timestamp":1456178820494,"ethdst":"08:00:27:7F:93:2D","src":"96.44.142.5","ttl":"64","source.type":"test","ethlen":"0x6E","iplen":"98304","icmptype":"","proto":"TCP","srcport":"80","tcpflags":"***AP***","sig_id":"12","sig_generator":"129"}
+{"msg":"\"Consecutive TCP small segments exceeding threshold\"","sig_rev":"1","dst":"10.0.2.15","dstport":"50895","ethsrc":"52:54:00:12:35:02","tcpseq":"0xDB508F2","dgmlen":"152","icmpid":"","tcplen":"","tcpwindow":"0xFFFF","icmpseq":"","tcpack":"0x7701DD5B","original_string":"02\/22-15:56:48.616775 ,129,12,1,\"Consecutive TCP small segments exceeding threshold\",TCP,96.44.142.5,80,10.0.2.15,50895,52:54:00:12:35:02,08:00:27:7F:93:2D,0xA6,***AP***,0xDB508F2,0x7701DD5B,,0xFFFF,64,0,16824,152,155648,,,,","icmpcode":"","tos":"0","id":"16824","timestamp":1456178824775,"ethdst":"08:00:27:7F:93:2D","src":"96.44.142.5","ttl":"64","source.type":"test","ethlen":"0xA6","iplen":"155648","icmptype":"","proto":"TCP","srcport":"80","tcpflags":"***AP***","sig_id":"12","sig_generator":"129"}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/YafExampleParsed
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/YafExampleParsed b/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/YafExampleParsed
new file mode 100644
index 0000000..57f07b1
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/YafExampleParsed
@@ -0,0 +1,10 @@
+{"iflags":"AS","uflags":0,"isn":"22efa001","dip":"10.0.2.15","dp":39468,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                          216.21.170.221|   80|                               10.0.2.15|39468|      AS|       0|       0|       0|22efa001|00000000|000|000|       1|      44|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"sip":"216.21.170.221","tag":0,"rtag":0,"sp":80,"timestamp":1453994988512,"app":0,"oct":44,"end_reason":"idle","risn":0,"end_time":1453994988512,"source.type":"yaf","start_time":1453994988512,"riflags":0,"rtt":"0.000","proto":6}
+{"iflags":"A","uflags":0,"isn":10000000,"dip":"10.0.2.3","dp":53,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.502|2016-01-28 15:29:48.502|   0.000|   0.000| 17|                               10.0.2.15|37299|                                10.0.2.3|   53|       A|       0|       0|       0|10000000|00000000|000|000|       1|      56|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"sip":"10.0.2.15","tag":0,"rtag":0,"sp":37299,"timestamp":1453994988502,"app":0,"oct":56,"end_reason":"idle","risn":0,"end_time":1453994988502,"source.type":"yaf","start_time":1453994988502,"riflags":0,"rtt":"0.000","proto":17}
+{"iflags":"A","uflags":0,"isn":0,"dip":"10.0.2.15","dp":37299,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.504|2016-01-28 15:29:48.504|   0.000|   0.000| 17|                                10.0.2.3|   53|                               10.0.2.15|37299|       A|       0|       0|       0|00000000|00000000|000|000|       1|     312|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"sip":"10.0.2.3","tag":0,"rtag":0,"sp":53,"timestamp":1453994988504,"app":0,"oct":312,"end_reason":"idle","risn":0,"end_time":1453994988504,"source.type":"yaf","start_time":1453994988504,"riflags":0,"rtt":"0.000","proto":17}
+{"iflags":"A","uflags":0,"isn":0,"dip":"10.0.2.3","dp":53,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.504|2016-01-28 15:29:48.504|   0.000|   0.000| 17|                               10.0.2.15|56303|                                10.0.2.3|   53|       A|       0|       0|       0|00000000|00000000|000|000|       1|      56|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"sip":"10.0.2.15","tag":0,"rtag":0,"sp":56303,"timestamp":1453994988504,"app":0,"oct":56,"end_reason":"idle","risn":0,"end_time":1453994988504,"source.type":"yaf","start_time":1453994988504,"riflags":0,"rtt":"0.000","proto":17}
+{"iflags":"A","uflags":0,"isn":0,"dip":"10.0.2.15","dp":56303,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.506|2016-01-28 15:29:48.506|   0.000|   0.000| 17|                                10.0.2.3|   53|                               10.0.2.15|56303|       A|       0|       0|       0|00000000|00000000|000|000|       1|      84|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"sip":"10.0.2.3","tag":0,"rtag":0,"sp":53,"timestamp":1453994988506,"app":0,"oct":84,"end_reason":"idle","risn":0,"end_time":1453994988506,"source.type":"yaf","start_time":1453994988506,"riflags":0,"rtt":"0.000","proto":17}
+{"iflags":"S","uflags":0,"isn":"58c52fca","dip":"216.21.170.221","dp":80,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.508|2016-01-28 15:29:48.508|   0.000|   0.000|  6|                               10.0.2.15|39468|                          216.21.170.221|   80|       S|       0|       0|       0|58c52fca|00000000|000|000|       1|      60|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"sip":"10.0.2.15","tag":0,"rtag":0,"sp":39468,"timestamp":1453994988508,"app":0,"oct":60,"end_reason":"idle","risn":0,"end_time":1453994988508,"source.type":"yaf","start_time":1453994988508,"riflags":0,"rtt":"0.000","proto":6}
+{"iflags":"A","uflags":0,"isn":"58c52fcb","dip":"216.21.170.221","dp":80,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                               10.0.2.15|39468|                          216.21.170.221|   80|       A|       0|       0|       0|58c52fcb|00000000|000|000|       1|      40|       0|       0|    0|idle ","pkt":1,"ruflags":0,"roct":0,"sip":"10.0.2.15","tag":0,"rtag":0,"sp":39468,"timestamp":1453994988512,"app":0,"oct":40,"end_reason":"idle ","risn":0,"end_time":1453994988512,"source.type":"yaf","start_time":1453994988512,"riflags":0,"rtt":"0.000","proto":6}
+{"iflags":"AP","uflags":0,"isn":"58c52fcb","dip":"216.21.170.221","dp":80,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                               10.0.2.15|39468|                          216.21.170.221|   80|      AP|       0|       0|       0|58c52fcb|00000000|000|000|       1|     148|       0|       0|    0|idle ","pkt":1,"ruflags":0,"roct":0,"sip":"10.0.2.15","tag":0,"rtag":0,"sp":39468,"timestamp":1453994988512,"app":0,"oct":148,"end_reason":"idle ","risn":0,"end_time":1453994988512,"source.type":"yaf","start_time":1453994988512,"riflags":0,"rtt":"0.000","proto":6}
+{"iflags":"A","uflags":0,"isn":"22efa002","dip":"10.0.2.15","dp":39468,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                          216.21.170.221|   80|                               10.0.2.15|39468|       A|       0|       0|       0|22efa002|00000000|000|000|       1|      40|       0|       0|    0|idle ","pkt":1,"ruflags":0,"roct":0,"sip":"216.21.170.221","tag":0,"rtag":0,"sp":80,"timestamp":1453994988512,"app":0,"oct":40,"end_reason":"idle ","risn":0,"end_time":1453994988512,"source.type":"yaf","start_time":1453994988512,"riflags":0,"rtt":"0.000","proto":6}
+{"iflags":"AP","uflags":0,"isn":"22efa002","dip":"10.0.2.15","dp":39468,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.562|2016-01-28 15:29:48.562|   0.000|   0.000|  6|                          216.21.170.221|   80|                               10.0.2.15|39468|      AP|       0|       0|       0|22efa002|00000000|000|000|       1|     604|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"sip":"216.21.170.221","tag":0,"rtag":0,"sp":80,"timestamp":1453994988562,"app":0,"oct":604,"end_reason":"idle","risn":0,"end_time":1453994988562,"source.type":"yaf","start_time":1453994988562,"riflags":0,"rtt":"0.000","proto":6}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
new file mode 100644
index 0000000..ef1318e
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+import com.google.common.base.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.Constants;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
+import org.apache.metron.integration.util.TestUtils;
+import org.apache.metron.integration.util.UnitTestHelper;
+import org.apache.metron.integration.util.integration.ComponentRunner;
+import org.apache.metron.integration.util.integration.Processor;
+import org.apache.metron.integration.util.integration.ReadinessState;
+import org.apache.metron.integration.util.integration.components.ElasticSearchComponent;
+import org.apache.metron.integration.util.integration.components.FluxTopologyComponent;
+import org.apache.metron.integration.util.integration.components.KafkaWithZKComponent;
+import org.apache.metron.integration.util.mock.MockHTable;
+import org.apache.metron.integration.util.threatintel.ThreatIntelHelper;
+import org.apache.metron.reference.lookup.LookupKV;
+import org.apache.metron.utils.SourceConfigUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+public class EnrichmentIntegrationTest {
+
+  private String fluxPath = "src/main/resources/Metron_Configs/topologies/enrichment/test.yaml";
+  private String indexDir = "target/elasticsearch";
+  private String sampleParsedPath = "src/main/resources/SampleParsed/YafExampleParsed";
+  private String sampleIndexedPath = "src/main/resources/SampleIndexed/YafIndexed";
+  private Map<String, String> sourceConfigs = new HashMap<>();
+
+  public static class Provider implements TableProvider, Serializable {
+    MockHTable.Provider  provider = new MockHTable.Provider();
+    @Override
+    public HTableInterface getTable(Configuration config, String tableName) throws IOException {
+      return provider.getTable(config, tableName);
+    }
+  }
+
+
+  @Test
+  public void test() throws Exception {
+    final String dateFormat = "yyyy.MM.dd.hh";
+    final String index = "yaf_" + new SimpleDateFormat(dateFormat).format(new Date());
+    String yafConfig = "{\n" +
+            "  \"index\": \"yaf\",\n" +
+            "  \"batchSize\": 5,\n" +
+            "  \"enrichmentFieldMap\":\n" +
+            "  {\n" +
+            "    \"geo\": [\"sip\", \"dip\"],\n" +
+            "    \"host\": [\"sip\", \"dip\"]\n" +
+            "  },\n" +
+            "  \"threatIntelFieldMap\":\n" +
+            "  {\n" +
+            "    \"ip\": [\"sip\", \"dip\"]\n" +
+            "  }\n" +
+            "}";
+    sourceConfigs.put("yaf", yafConfig);
+    final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath);
+    final String cf = "cf";
+    final String trackerHBaseTable = "tracker";
+    final String ipThreatIntelTable = "ip_threat_intel";
+    final Properties topologyProperties = new Properties() {{
+      setProperty("org.apache.metron.enrichment.host.known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"},\n" +
+              "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"},\n" +
+              "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"},\n" +
+              "{\"ip\":\"10.0.2.15\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}]");
+      setProperty("hbase.provider.impl","" + Provider.class.getName());
+      setProperty("threat.intel.tracker.table", trackerHBaseTable);
+      setProperty("threat.intel.tracker.cf", cf);
+      setProperty("threat.intel.ip.table", ipThreatIntelTable);
+      setProperty("threat.intel.ip.cf", cf);
+      setProperty("es.clustername", "metron");
+      setProperty("es.port", "9300");
+      setProperty("es.ip", "localhost");
+      setProperty("index.date.format", dateFormat);
+    }};
+    final KafkaWithZKComponent kafkaComponent = new KafkaWithZKComponent().withTopics(new ArrayList<KafkaWithZKComponent.Topic>() {{
+      add(new KafkaWithZKComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
+    }})
+            .withPostStartCallback(new Function<KafkaWithZKComponent, Void>() {
+              @Nullable
+              @Override
+              public Void apply(@Nullable KafkaWithZKComponent kafkaWithZKComponent) {
+                topologyProperties.setProperty("kafka.zk", kafkaWithZKComponent.getZookeeperConnect());
+                try {
+                  for(String sourceType: sourceConfigs.keySet()) {
+                    SourceConfigUtils.writeToZookeeper(sourceType, sourceConfigs.get(sourceType).getBytes(), kafkaWithZKComponent.getZookeeperConnect());
+                  }
+                } catch (Exception e) {
+                  e.printStackTrace();
+                }
+                return null;
+              }
+            });
+
+    ElasticSearchComponent esComponent = new ElasticSearchComponent.Builder()
+            .withHttpPort(9211)
+            .withIndexDir(new File(indexDir))
+            .build();
+
+    //create MockHBaseTables
+    final MockHTable trackerTable = (MockHTable)MockHTable.Provider.addToCache(trackerHBaseTable, cf);
+    final MockHTable ipTable = (MockHTable)MockHTable.Provider.addToCache(ipThreatIntelTable, cf);
+    ThreatIntelHelper.INSTANCE.load(ipTable, cf, new ArrayList<LookupKV<ThreatIntelKey, ThreatIntelValue>>(){{
+      add(new LookupKV<>(new ThreatIntelKey("10.0.2.3"), new ThreatIntelValue(new HashMap<String, String>())));
+    }});
+
+    FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
+            .withTopologyLocation(new File(fluxPath))
+            .withTopologyName("test")
+            .withTopologyProperties(topologyProperties)
+            .build();
+
+    UnitTestHelper.verboseLogging();
+    ComponentRunner runner = new ComponentRunner.Builder()
+            .withComponent("kafka", kafkaComponent)
+            .withComponent("elasticsearch", esComponent)
+            .withComponent("storm", fluxComponent)
+            .withTimeBetweenAttempts(10000)
+            .build();
+    runner.start();
+    fluxComponent.submitTopology();
+    kafkaComponent.writeMessages(Constants.ENRICHMENT_TOPIC, inputMessages);
+    List<Map<String, Object>> docs =
+            runner.process(new Processor<List<Map<String, Object>>> () {
+              List<Map<String, Object>> docs = null;
+              public ReadinessState process(ComponentRunner runner){
+                ElasticSearchComponent elasticSearchComponent = runner.getComponent("elasticsearch", ElasticSearchComponent.class);
+                if(elasticSearchComponent.hasIndex(index)) {
+                  try {
+                    docs = elasticSearchComponent.getAllIndexedDocs(index, "yaf");
+                  } catch (IOException e) {
+                    throw new IllegalStateException("Unable to retrieve indexed documents.", e);
+                  }
+                  if(docs.size() < inputMessages.size()) {
+                    return ReadinessState.NOT_READY;
+                  }
+                  else {
+                    return ReadinessState.READY;
+                  }
+                }
+                else {
+                  return ReadinessState.NOT_READY;
+                }
+              }
+
+              public List<Map<String, Object>> getResult() {
+                return docs;
+              }
+            });
+
+    List<byte[]> sampleIndexedMessages = TestUtils.readSampleData(sampleIndexedPath);
+    Assert.assertEquals(sampleIndexedMessages.size(), docs.size());
+    for (int i = 0; i < docs.size(); i++) {
+      String doc = docs.get(i).toString();
+      String sampleIndexedMessage = new String(sampleIndexedMessages.get(i));
+      assertEqual(sampleIndexedMessage, doc);
+    }
+    runner.stop();
+  }
+  public static void assertEqual(String doc1, String doc2) {
+    Assert.assertEquals(doc1.length(), doc2.length());
+    char[] c1 = doc1.toCharArray();
+    Arrays.sort(c1);
+    char[] c2 = doc2.toCharArray();
+    Arrays.sort(c2);
+    Assert.assertArrayEquals(c1, c2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java
new file mode 100644
index 0000000..c55a069
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+import com.google.common.base.Function;
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.consumer.ConsumerIterator;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.producer.Producer;
+import kafka.message.MessageAndMetadata;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.metron.Constants;
+import org.apache.metron.integration.util.TestUtils;
+import org.apache.metron.integration.util.UnitTestHelper;
+import org.apache.metron.integration.util.integration.ComponentRunner;
+import org.apache.metron.integration.util.integration.Processor;
+import org.apache.metron.integration.util.integration.ReadinessState;
+import org.apache.metron.integration.util.integration.components.ElasticSearchComponent;
+import org.apache.metron.integration.util.integration.components.FluxTopologyComponent;
+import org.apache.metron.integration.util.integration.components.KafkaWithZKComponent;
+import org.apache.metron.integration.util.integration.util.KafkaUtil;
+import org.apache.metron.spout.pcap.HDFSWriterCallback;
+import org.apache.metron.test.converters.HexStringConverter;
+import org.apache.metron.utils.SourceConfigUtils;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.*;
+import java.util.*;
+
+public abstract class ParserIntegrationTest {
+
+  public abstract String getFluxPath();
+  public abstract String getSampleInputPath();
+  public abstract String getSampleParsedPath();
+  public abstract String getSourceType();
+  public abstract String getSourceConfig();
+  public abstract String getFluxTopicProperty();
+
+  @Test
+  public void test() throws Exception {
+
+    final String kafkaTopic = "test";
+
+    final List<byte[]> inputMessages = TestUtils.readSampleData(getSampleInputPath());
+
+    final Properties topologyProperties = new Properties() {{
+      setProperty(getFluxTopicProperty(), kafkaTopic);
+    }};
+    final KafkaWithZKComponent kafkaComponent = new KafkaWithZKComponent().withTopics(new ArrayList<KafkaWithZKComponent.Topic>() {{
+      add(new KafkaWithZKComponent.Topic(kafkaTopic, 1));
+    }})
+            .withPostStartCallback(new Function<KafkaWithZKComponent, Void>() {
+              @Nullable
+              @Override
+              public Void apply(@Nullable KafkaWithZKComponent kafkaWithZKComponent) {
+                topologyProperties.setProperty("kafka.zk", kafkaWithZKComponent.getZookeeperConnect());
+                try {
+                  SourceConfigUtils.writeToZookeeper(getSourceType(), getSourceConfig().getBytes(), kafkaWithZKComponent.getZookeeperConnect());
+                } catch (Exception e) {
+                  e.printStackTrace();
+                }
+                return null;
+              }
+            });
+
+    topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
+    FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
+            .withTopologyLocation(new File(getFluxPath()))
+            .withTopologyName("test")
+            .withTopologyProperties(topologyProperties)
+            .build();
+
+    UnitTestHelper.verboseLogging();
+    ComponentRunner runner = new ComponentRunner.Builder()
+            .withComponent("kafka", kafkaComponent)
+            .withComponent("storm", fluxComponent)
+            .withTimeBetweenAttempts(5000)
+            .build();
+    runner.start();
+    fluxComponent.submitTopology();
+    kafkaComponent.writeMessages(kafkaTopic, inputMessages);
+    List<byte[]> outputMessages =
+            runner.process(new Processor<List<byte[]>>() {
+              List<byte[]> messages = null;
+
+              public ReadinessState process(ComponentRunner runner) {
+                KafkaWithZKComponent kafkaWithZKComponent = runner.getComponent("kafka", KafkaWithZKComponent.class);
+                List<byte[]> outputMessages = kafkaWithZKComponent.readMessages(Constants.ENRICHMENT_TOPIC);
+                if (outputMessages.size() == inputMessages.size()) {
+                  messages = outputMessages;
+                  return ReadinessState.READY;
+                } else {
+                  return ReadinessState.NOT_READY;
+                }
+              }
+
+              public List<byte[]> getResult() {
+                return messages;
+              }
+            });
+    List<byte[]> sampleParsedMessages = TestUtils.readSampleData(getSampleParsedPath());
+    Assert.assertEquals(sampleParsedMessages.size(), outputMessages.size());
+    for (int i = 0; i < outputMessages.size(); i++) {
+      String sampleParsedMessage = new String(sampleParsedMessages.get(i));
+      String outputMessage = new String(outputMessages.get(i));
+      assertJSONEqual(sampleParsedMessage, outputMessage);
+    }
+    runner.stop();
+
+  }
+
+  public static void assertJSONEqual(String doc1, String doc2) throws IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    Map m1 = mapper.readValue(doc1, Map.class);
+    Map m2 = mapper.readValue(doc2, Map.class);
+    Assert.assertEquals(m1.size(), m2.size());
+    for(Object k : m1.keySet()) {
+      Object v1 = m1.get(k);
+      Object v2 = m2.get(k);
+
+      if(v2 == null) {
+        Assert.fail("Unable to find key: " + k + " in output");
+      }
+      if(k.equals("timestamp")) {
+        //TODO: Take the ?!?@ timestamps out of the reference file.
+        Assert.assertEquals(v1.toString().length(), v2.toString().length());
+      }
+      else if(!v2.equals(v1)) {
+        Assert.assertEquals("value mismatch for " + k ,v1, v2);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/SnortIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/SnortIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/SnortIntegrationTest.java
new file mode 100644
index 0000000..7508ad7
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/SnortIntegrationTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+public class SnortIntegrationTest extends ParserIntegrationTest {
+
+  @Override
+  public String getFluxPath() {
+    return "src/main/resources/Metron_Configs/topologies/snort/test.yaml";
+  }
+
+  @Override
+  public String getSampleInputPath() {
+    return "src/main/resources/SampleInput/SnortOutput";
+  }
+
+  @Override
+  public String getSampleParsedPath() {
+    return "src/main/resources/SampleParsed/SnortParsed";
+  }
+
+  @Override
+  public String getSourceType() {
+    return "snort";
+  }
+
+  @Override
+  public String getSourceConfig() {
+    return "{\"index\": \"snort\"," +
+            " \"batchSize\": 1," +
+            " \"enrichmentFieldMap\":" +
+            "  {" +
+            "    \"geo\": [\"src\", \"dst\"]," +
+            "    \"host\": [\"src\", \"dst\"]" +
+            "  }," +
+            "  \"threatIntelFieldMap\":" +
+            "  {" +
+            "    \"ip\": [\"src\", \"dst\"]" +
+            "  }" +
+            "}";
+  }
+
+  @Override
+  public String getFluxTopicProperty() {
+    return "spout.kafka.topic.snort";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/YafIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/YafIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/YafIntegrationTest.java
new file mode 100644
index 0000000..cf91bea
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/YafIntegrationTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+public class YafIntegrationTest extends ParserIntegrationTest {
+
+  @Override
+  public String getFluxPath() {
+    return "src/main/resources/Metron_Configs/topologies/yaf/test.yaml";
+  }
+
+  @Override
+  public String getSampleInputPath() {
+    return "src/main/resources/SampleInput/YafExampleOutput";
+  }
+
+  @Override
+  public String getSampleParsedPath() {
+    return "src/main/resources/SampleParsed/YafExampleParsed";
+  }
+
+  @Override
+  public String getSourceType() {
+    return "yaf";
+  }
+
+  @Override
+  public String getSourceConfig() {
+    return "{\"index\": \"yaf\"," +
+            " \"batchSize\": 5," +
+            " \"enrichmentFieldMap\":" +
+            "  {" +
+            "    \"geo\": [\"sip\", \"dip\"]," +
+            "    \"host\": [\"sip\", \"dip\"]" +
+            "  }," +
+            "  \"threatIntelFieldMap\":" +
+            "  {" +
+            "    \"ip\": [\"sip\", \"dip\"]" +
+            "  }" +
+            "}";
+  }
+
+  @Override
+  public String getFluxTopicProperty() {
+    return "spout.kafka.topic.yaf";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java
deleted file mode 100644
index 3337855..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.pcap;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.metron.hbase.HTableProvider;
-import org.apache.metron.hbase.TableProvider;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
-import org.apache.metron.integration.util.UnitTestHelper;
-import org.apache.metron.integration.util.integration.ComponentRunner;
-import org.apache.metron.integration.util.integration.Processor;
-import org.apache.metron.integration.util.integration.ReadinessState;
-import org.apache.metron.integration.util.integration.components.ElasticSearchComponent;
-import org.apache.metron.integration.util.integration.components.FluxTopologyComponent;
-import org.apache.metron.integration.util.mock.MockHTable;
-import org.apache.metron.integration.util.threatintel.ThreatIntelHelper;
-import org.apache.metron.parsing.parsers.PcapParser;
-import org.apache.metron.reference.lookup.LookupKV;
-import org.apache.metron.test.converters.HexStringConverter;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
-import org.apache.metron.threatintel.ThreatIntelResults;
-import org.json.simple.JSONObject;
-import org.junit.Assert;
-import org.junit.Test;
-
-import javax.annotation.Nullable;
-import java.io.*;
-import java.text.SimpleDateFormat;
-import java.util.*;
-
-public class PcapIntegrationTest {
-
-    private String topologiesDir = "src/main/resources/Metron_Configs/topologies";
-    private String targetDir = "target";
-
-    public static class Provider implements TableProvider, Serializable{
-
-        MockHTable.Provider  provider = new MockHTable.Provider();
-        @Override
-        public HTableInterface getTable(Configuration config, String tableName) throws IOException {
-            return provider.getTable(config, tableName);
-        }
-    }
-
-    @Test
-    public void testTopology() throws Exception {
-        if(!new File(topologiesDir).exists()) {
-            topologiesDir = UnitTestHelper.findDir("topologies");
-        }
-        if(!new File(targetDir).exists()) {
-            targetDir = UnitTestHelper.findDir("target");
-        }
-        Assert.assertNotNull(topologiesDir);
-        Assert.assertNotNull(targetDir);
-        final List<String> expectedPcapIds= getExpectedPcap(new File(topologiesDir + "/../../SampleInput/PCAPExampleOutput"));
-        Assert.assertTrue("Expected non-zero number of PCAP Ids from the sample data", expectedPcapIds.size() > 0);
-        System.out.println("Using topologies directory: " + topologiesDir);
-
-        ElasticSearchComponent esComponent = new ElasticSearchComponent.Builder()
-                                                                       .withHttpPort(9211)
-                                                                       .withIndexDir(new File(targetDir + "/elasticsearch"))
-                                                                       .build();
-        final String cf = "cf";
-        final String trackerHBaseTable = "tracker";
-        final String ipThreatIntelTable = "ip_threat_intel";
-        Properties topologyProperties = new Properties() {{
-            setProperty("input.path", "src/main/resources/");
-            setProperty("es.port", "9300");
-            setProperty("es.ip", "localhost");
-            setProperty("es.clustername", "metron");
-            setProperty("mysql.ip", "node1");
-            setProperty("mysql.port", "3306");
-            setProperty("mysql.username", "root");
-            setProperty("mysql.password", "P@ssw0rd");
-            setProperty("pcap.binary.converter", "FROM_HEX_STRING");
-            setProperty("testing.repeating", "false");
-            setProperty("org.apache.metron.metrics.reporter.graphite", "false");
-            setProperty("org.apache.metron.metrics.reporter.console", "false");
-            setProperty("org.apache.metron.metrics.reporter.jmx", "false");
-            setProperty("org.apache.metron.metrics.TelemetryParserBolt.acks","true");
-            setProperty("org.apache.metron.metrics.TelemetryParserBolt.emits", "true");
-            setProperty("org.apache.metron.metrics.TelemetryParserBolt.fails","true");
-            setProperty("org.apache.metron.metrics.GenericEnrichmentBolt.acks","true");
-            setProperty("org.apache.metron.metrics.GenericEnrichmentBolt.emits","true");
-            setProperty("org.apache.metron.metrics.GenericEnrichmentBolt.fails","true");
-            setProperty("org.apache.metron.metrics.TelemetryIndexingBolt.acks", "true");
-            setProperty("org.apache.metron.metrics.TelemetryIndexingBolt.emits","true");
-            setProperty("org.apache.metron.metrics.TelemetryIndexingBolt.fails","true");
-            setProperty("kafka.zk", "localhost:2000,localhost:2000");
-            setProperty("bolt.hbase.table.name", "pcap_test");
-            setProperty("bolt.hbase.table.fields", "t:value");
-            setProperty("bolt.hbase.table.key.tuple.field.name", "key");
-            setProperty("bolt.hbase.table.timestamp.tuple.field.name", "timestamp");
-            setProperty("bolt.hbase.enable.batching", "false");
-            setProperty("bolt.hbase.write.buffer.size.in.bytes", "2000000");
-            setProperty("bolt.hbase.durability", "SKIP_WAL");
-            setProperty("bolt.hbase.partitioner.region.info.refresh.interval.mins","60");
-            setProperty("hbase.provider.impl","" + Provider.class.getName());
-            setProperty("threat.intel.tracker.table", trackerHBaseTable);
-            setProperty("threat.intel.tracker.cf", cf);
-            setProperty("threat.intel.ip.table", ipThreatIntelTable);
-            setProperty("threat.intel.ip.cf", cf);
-            setProperty("org.apache.metron.enrichment.host.known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"}," +
-                    "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"}," +
-                    "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}," +
-                    "{\"ip\":\"10.0.2.15\", \"local\":\"YES\", " +
-                    "\"type\":\"printer\", \"asset_value\" : \"important\"}]");
-        }};
-        //create MockHBaseTables
-        final MockHTable trackerTable = (MockHTable)MockHTable.Provider.addToCache(trackerHBaseTable, cf);
-        final MockHTable ipTable = (MockHTable)MockHTable.Provider.addToCache(ipThreatIntelTable, cf);
-        ThreatIntelHelper.INSTANCE.load(ipTable, cf, new ArrayList<LookupKV<ThreatIntelKey, ThreatIntelValue>>(){{
-            add(new LookupKV<>(new ThreatIntelKey("10.0.2.3"), new ThreatIntelValue(new HashMap<String, String>())));
-        }});
-        final MockHTable pcapTable = (MockHTable) MockHTable.Provider.addToCache("pcap_test", "t");
-        FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
-                                                                       .withTopologyLocation(new File(topologiesDir + "/pcap/local.yaml"))
-                                                                       .withTopologyName("pcap")
-                                                                       .withTopologyProperties(topologyProperties)
-                                                                       .build();
-        //UnitTestHelper.verboseLogging();
-        ComponentRunner runner = new ComponentRunner.Builder()
-                                                    .withComponent("elasticsearch", esComponent)
-                                                    .withComponent("storm", fluxComponent)
-                                                    .build();
-
-        final String index = getIndex();
-        System.out.println("Index of the run: " + index);
-        runner.start();
-        fluxComponent.submitTopology();
-        List<Map<String, Object>> docs =
-        runner.process(new Processor<List<Map<String, Object>>> () {
-            List<Map<String, Object>> docs = null;
-            public ReadinessState process(ComponentRunner runner){
-                ElasticSearchComponent elasticSearchComponent = runner.getComponent("elasticsearch", ElasticSearchComponent.class);
-                if(elasticSearchComponent.hasIndex(index)) {
-                    try {
-                        docs = elasticSearchComponent.getAllIndexedDocs(index);
-                    } catch (IOException e) {
-                        throw new IllegalStateException("Unable to retrieve indexed documents.", e);
-                    }
-                    if(docs.size() < expectedPcapIds.size() && pcapTable.getPutLog().size() < expectedPcapIds.size()) {
-                        return ReadinessState.NOT_READY;
-                    }
-                    else {
-                        return ReadinessState.READY;
-                    }
-                }
-                else {
-                    return ReadinessState.NOT_READY;
-                }
-            }
-
-            public List<Map<String, Object>> getResult() {
-                return docs;
-            }
-        });
-
-        Assert.assertEquals(expectedPcapIds.size(), pcapTable.getPutLog().size());
-        UnitTestHelper.assertSetEqual("PCap IDs from Index"
-                                     , new HashSet<>(expectedPcapIds)
-                                     , convertToSet(Iterables.transform(docs, DOC_TO_PCAP_ID))
-                                     );
-        UnitTestHelper.assertSetEqual("PCap IDs from HBase"
-                                     , new HashSet<>(expectedPcapIds)
-                                     , convertToSet(Iterables.transform(pcapTable.getPutLog(), RK_TO_PCAP_ID))
-                                     );
-        Iterable<JSONObject> packetsFromHBase = Iterables.transform(pcapTable.getPutLog(), PUT_TO_PCAP);
-        Assert.assertEquals(expectedPcapIds.size(), Iterables.size(packetsFromHBase));
-
-        List<Map<String, Object>> allDocs= runner.getComponent("elasticsearch", ElasticSearchComponent.class).getAllIndexedDocs(index, null);
-        boolean hasThreat = false;
-        for(Map<String, Object> d : allDocs) {
-            Map<String, Object> message = (Map<String, Object>) d.get("message");
-            Set<String> ips = new HashSet<>(Arrays.asList((String)message.get("ip_dst_addr"), (String)message.get("ip_src_addr")));
-            if(ips.contains("10.0.2.3")) {
-                hasThreat = true;
-                Map<String, Object> alerts = (Map<String, Object>) ((Map<String, Object>) d.get("alerts")).get("ip");
-                Assert.assertTrue(  ((Map<String,Object>)alerts.get("ip_dst_addr")).size() > 0
-                                 || ((Map<String,Object>)alerts.get("ip_src_addr")).size() > 0
-                                 );
-            }
-        }
-        Assert.assertTrue(hasThreat);
-        MockHTable.Provider.clear();
-        runner.stop();
-    }
-
-    public static Set<String> convertToSet(Iterable<String> strings) {
-        Set<String> ret = new HashSet<String>();
-        Iterables.addAll(ret, strings);
-        return ret;
-    }
-    public static final Function<Put, String> RK_TO_PCAP_ID = new Function<Put, String>() {
-        @Nullable
-        public String apply(@Nullable Put put) {
-            String rk =new String(put.getRow());
-            return Joiner.on("-").join(Iterables.limit(Splitter.on('-').split(rk), 5));
-        }
-    };
-
-    public static final Function<Map<String, Object>, String> DOC_TO_PCAP_ID = new Function<Map<String, Object>, String>() {
-
-        @Nullable
-        public String apply(@Nullable Map<String, Object> doc) {
-            return (String)doc.get("pcap_id");
-        }
-    };
-
-    public static final Function<Put, JSONObject> PUT_TO_PCAP = new
-            Function<Put, JSONObject>() {
-        @Nullable
-        public JSONObject apply(@Nullable Put put) {
-            try {
-                return putToPcap(put);
-            } catch (IOException e) {
-                throw new RuntimeException("Unable to convert put to PCAP: " + put);
-            }
-        }
-    };
-
-
-
-    private static List<String> getExpectedPcap(File rawFile) throws IOException {
-        List<String> ret = new ArrayList<String>();
-        PcapParser parser = new PcapParser();
-        parser.withTsPrecision("MICRO");
-        parser.init();
-        BufferedReader br = new BufferedReader(new FileReader(rawFile));
-        for(String line = null; (line = br.readLine()) != null;) {
-            byte[] pcapBytes = new HexStringConverter().convert(line);
-            List<JSONObject> list = parser.parse(pcapBytes);
-            for(JSONObject message : list) {
-                ret.add((String) message.get("pcap_id"));
-            }
-        }
-        return ret;
-    }
-
-    private static String getIndex() {
-        SimpleDateFormat sdf = new SimpleDateFormat("yyyy.MM.dd.hh");
-        Date d = new Date();
-        return "pcap_index_" + sdf.format(d);
-    }
-
-    private static JSONObject putToPcap(Put p) throws IOException {
-        PcapParser parser = new PcapParser();
-        parser.init();
-        List<Cell> cells = p.get(Bytes.toBytes("t"), Bytes.toBytes("value"));
-        Assert.assertEquals(1, cells.size());
-        List<JSONObject> messages = parser.parse(cells.get(0).getValueArray());
-        Assert.assertEquals(1, messages.size());
-        return messages.get(0);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/TestUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/TestUtils.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/TestUtils.java
new file mode 100644
index 0000000..594700b
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/TestUtils.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestUtils {
+
+  public static List<byte[]> readSampleData(String samplePath) throws IOException {
+    BufferedReader br = new BufferedReader(new FileReader(samplePath));
+    List<byte[]> ret = new ArrayList<>();
+    for (String line = null; (line = br.readLine()) != null; ) {
+      long ts = System.currentTimeMillis();
+      ret.add(line.getBytes());
+    }
+    br.close();
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a4c773d..499e323 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,7 +51,7 @@
 						<exclude>metron-ui/lib/public/**</exclude>
 						<exclude>**/src/main/resources/patterns/**</exclude>
 						<exclude>**/src/test/resources/**</exclude>
-						<exclude>**/src/main/resources/SampleInput/**</exclude>
+						<exclude>**/src/main/resources/Sample*/**</exclude>
 						<exclude>**/dependency-reduced-pom.xml</exclude>
 					        <exclude>**/files/opensoc-ui</exclude>
 					        <exclude>**/*.iml</exclude>