You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/03/21 18:05:55 UTC
[13/43] incubator-metron git commit: METRON-56 Create unified
enrichment topology (merrimanr via cestella) closes
apache/incubator-metron#33
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/SnortParsed
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/SnortParsed b/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/SnortParsed
new file mode 100644
index 0000000..4b74794
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/SnortParsed
@@ -0,0 +1,3 @@
+{"msg":"\"Consecutive TCP small segments exceeding threshold\"","sig_rev":"1","dst":"10.0.2.15","dstport":"22","ethsrc":"52:54:00:12:35:02","tcpseq":"0x9AFF3D7","dgmlen":"64","icmpid":"","tcplen":"","tcpwindow":"0xFFFF","icmpseq":"","tcpack":"0xC8761D52","original_string":"01\/27-16:01:04.877970 ,129,12,1,\"Consecutive TCP small segments exceeding threshold\",TCP,10.0.2.2,56642,10.0.2.15,22,52:54:00:12:35:02,08:00:27:7F:93:2D,0x4E,***AP***,0x9AFF3D7,0xC8761D52,,0xFFFF,64,0,59677,64,65536,,,,","icmpcode":"","tos":"0","id":"59677","timestamp":1453932941970,"ethdst":"08:00:27:7F:93:2D","src":"10.0.2.2","ttl":"64","source.type":"test","ethlen":"0x4E","iplen":"65536","icmptype":"","proto":"TCP","srcport":"56642","tcpflags":"***AP***","sig_id":"12","sig_generator":"129"}
+{"msg":"\"Consecutive TCP small segments exceeding threshold\"","sig_rev":"1","dst":"10.0.2.15","dstport":"50895","ethsrc":"52:54:00:12:35:02","tcpseq":"0xDB45F7A","dgmlen":"96","icmpid":"","tcplen":"","tcpwindow":"0xFFFF","icmpseq":"","tcpack":"0x7701DD5B","original_string":"02\/22-15:56:48.612494 ,129,12,1,\"Consecutive TCP small segments exceeding threshold\",TCP,96.44.142.5,80,10.0.2.15,50895,52:54:00:12:35:02,08:00:27:7F:93:2D,0x6E,***AP***,0xDB45F7A,0x7701DD5B,,0xFFFF,64,0,16785,96,98304,,,,","icmpcode":"","tos":"0","id":"16785","timestamp":1456178820494,"ethdst":"08:00:27:7F:93:2D","src":"96.44.142.5","ttl":"64","source.type":"test","ethlen":"0x6E","iplen":"98304","icmptype":"","proto":"TCP","srcport":"80","tcpflags":"***AP***","sig_id":"12","sig_generator":"129"}
+{"msg":"\"Consecutive TCP small segments exceeding threshold\"","sig_rev":"1","dst":"10.0.2.15","dstport":"50895","ethsrc":"52:54:00:12:35:02","tcpseq":"0xDB508F2","dgmlen":"152","icmpid":"","tcplen":"","tcpwindow":"0xFFFF","icmpseq":"","tcpack":"0x7701DD5B","original_string":"02\/22-15:56:48.616775 ,129,12,1,\"Consecutive TCP small segments exceeding threshold\",TCP,96.44.142.5,80,10.0.2.15,50895,52:54:00:12:35:02,08:00:27:7F:93:2D,0xA6,***AP***,0xDB508F2,0x7701DD5B,,0xFFFF,64,0,16824,152,155648,,,,","icmpcode":"","tos":"0","id":"16824","timestamp":1456178824775,"ethdst":"08:00:27:7F:93:2D","src":"96.44.142.5","ttl":"64","source.type":"test","ethlen":"0xA6","iplen":"155648","icmptype":"","proto":"TCP","srcport":"80","tcpflags":"***AP***","sig_id":"12","sig_generator":"129"}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/YafExampleParsed
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/YafExampleParsed b/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/YafExampleParsed
new file mode 100644
index 0000000..57f07b1
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/YafExampleParsed
@@ -0,0 +1,10 @@
+{"iflags":"AS","uflags":0,"isn":"22efa001","dip":"10.0.2.15","dp":39468,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512| 0.000| 0.000| 6| 216.21.170.221| 80| 10.0.2.15|39468| AS| 0| 0| 0|22efa001|00000000|000|000| 1| 44| 0| 0| 0|idle","pkt":1,"ruflags":0,"roct":0,"sip":"216.21.170.221","tag":0,"rtag":0,"sp":80,"timestamp":1453994988512,"app":0,"oct":44,"end_reason":"idle","risn":0,"end_time":1453994988512,"source.type":"yaf","start_time":1453994988512,"riflags":0,"rtt":"0.000","proto":6}
+{"iflags":"A","uflags":0,"isn":10000000,"dip":"10.0.2.3","dp":53,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.502|2016-01-28 15:29:48.502| 0.000| 0.000| 17| 10.0.2.15|37299| 10.0.2.3| 53| A| 0| 0| 0|10000000|00000000|000|000| 1| 56| 0| 0| 0|idle","pkt":1,"ruflags":0,"roct":0,"sip":"10.0.2.15","tag":0,"rtag":0,"sp":37299,"timestamp":1453994988502,"app":0,"oct":56,"end_reason":"idle","risn":0,"end_time":1453994988502,"source.type":"yaf","start_time":1453994988502,"riflags":0,"rtt":"0.000","proto":17}
+{"iflags":"A","uflags":0,"isn":0,"dip":"10.0.2.15","dp":37299,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.504|2016-01-28 15:29:48.504| 0.000| 0.000| 17| 10.0.2.3| 53| 10.0.2.15|37299| A| 0| 0| 0|00000000|00000000|000|000| 1| 312| 0| 0| 0|idle","pkt":1,"ruflags":0,"roct":0,"sip":"10.0.2.3","tag":0,"rtag":0,"sp":53,"timestamp":1453994988504,"app":0,"oct":312,"end_reason":"idle","risn":0,"end_time":1453994988504,"source.type":"yaf","start_time":1453994988504,"riflags":0,"rtt":"0.000","proto":17}
+{"iflags":"A","uflags":0,"isn":0,"dip":"10.0.2.3","dp":53,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.504|2016-01-28 15:29:48.504| 0.000| 0.000| 17| 10.0.2.15|56303| 10.0.2.3| 53| A| 0| 0| 0|00000000|00000000|000|000| 1| 56| 0| 0| 0|idle","pkt":1,"ruflags":0,"roct":0,"sip":"10.0.2.15","tag":0,"rtag":0,"sp":56303,"timestamp":1453994988504,"app":0,"oct":56,"end_reason":"idle","risn":0,"end_time":1453994988504,"source.type":"yaf","start_time":1453994988504,"riflags":0,"rtt":"0.000","proto":17}
+{"iflags":"A","uflags":0,"isn":0,"dip":"10.0.2.15","dp":56303,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.506|2016-01-28 15:29:48.506| 0.000| 0.000| 17| 10.0.2.3| 53| 10.0.2.15|56303| A| 0| 0| 0|00000000|00000000|000|000| 1| 84| 0| 0| 0|idle","pkt":1,"ruflags":0,"roct":0,"sip":"10.0.2.3","tag":0,"rtag":0,"sp":53,"timestamp":1453994988506,"app":0,"oct":84,"end_reason":"idle","risn":0,"end_time":1453994988506,"source.type":"yaf","start_time":1453994988506,"riflags":0,"rtt":"0.000","proto":17}
+{"iflags":"S","uflags":0,"isn":"58c52fca","dip":"216.21.170.221","dp":80,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.508|2016-01-28 15:29:48.508| 0.000| 0.000| 6| 10.0.2.15|39468| 216.21.170.221| 80| S| 0| 0| 0|58c52fca|00000000|000|000| 1| 60| 0| 0| 0|idle","pkt":1,"ruflags":0,"roct":0,"sip":"10.0.2.15","tag":0,"rtag":0,"sp":39468,"timestamp":1453994988508,"app":0,"oct":60,"end_reason":"idle","risn":0,"end_time":1453994988508,"source.type":"yaf","start_time":1453994988508,"riflags":0,"rtt":"0.000","proto":6}
+{"iflags":"A","uflags":0,"isn":"58c52fcb","dip":"216.21.170.221","dp":80,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512| 0.000| 0.000| 6| 10.0.2.15|39468| 216.21.170.221| 80| A| 0| 0| 0|58c52fcb|00000000|000|000| 1| 40| 0| 0| 0|idle ","pkt":1,"ruflags":0,"roct":0,"sip":"10.0.2.15","tag":0,"rtag":0,"sp":39468,"timestamp":1453994988512,"app":0,"oct":40,"end_reason":"idle ","risn":0,"end_time":1453994988512,"source.type":"yaf","start_time":1453994988512,"riflags":0,"rtt":"0.000","proto":6}
+{"iflags":"AP","uflags":0,"isn":"58c52fcb","dip":"216.21.170.221","dp":80,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512| 0.000| 0.000| 6| 10.0.2.15|39468| 216.21.170.221| 80| AP| 0| 0| 0|58c52fcb|00000000|000|000| 1| 148| 0| 0| 0|idle ","pkt":1,"ruflags":0,"roct":0,"sip":"10.0.2.15","tag":0,"rtag":0,"sp":39468,"timestamp":1453994988512,"app":0,"oct":148,"end_reason":"idle ","risn":0,"end_time":1453994988512,"source.type":"yaf","start_time":1453994988512,"riflags":0,"rtt":"0.000","proto":6}
+{"iflags":"A","uflags":0,"isn":"22efa002","dip":"10.0.2.15","dp":39468,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512| 0.000| 0.000| 6| 216.21.170.221| 80| 10.0.2.15|39468| A| 0| 0| 0|22efa002|00000000|000|000| 1| 40| 0| 0| 0|idle ","pkt":1,"ruflags":0,"roct":0,"sip":"216.21.170.221","tag":0,"rtag":0,"sp":80,"timestamp":1453994988512,"app":0,"oct":40,"end_reason":"idle ","risn":0,"end_time":1453994988512,"source.type":"yaf","start_time":1453994988512,"riflags":0,"rtt":"0.000","proto":6}
+{"iflags":"AP","uflags":0,"isn":"22efa002","dip":"10.0.2.15","dp":39468,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.562|2016-01-28 15:29:48.562| 0.000| 0.000| 6| 216.21.170.221| 80| 10.0.2.15|39468| AP| 0| 0| 0|22efa002|00000000|000|000| 1| 604| 0| 0| 0|idle","pkt":1,"ruflags":0,"roct":0,"sip":"216.21.170.221","tag":0,"rtag":0,"sp":80,"timestamp":1453994988562,"app":0,"oct":604,"end_reason":"idle","risn":0,"end_time":1453994988562,"source.type":"yaf","start_time":1453994988562,"riflags":0,"rtt":"0.000","proto":6}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
new file mode 100644
index 0000000..ef1318e
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+import com.google.common.base.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.Constants;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
+import org.apache.metron.integration.util.TestUtils;
+import org.apache.metron.integration.util.UnitTestHelper;
+import org.apache.metron.integration.util.integration.ComponentRunner;
+import org.apache.metron.integration.util.integration.Processor;
+import org.apache.metron.integration.util.integration.ReadinessState;
+import org.apache.metron.integration.util.integration.components.ElasticSearchComponent;
+import org.apache.metron.integration.util.integration.components.FluxTopologyComponent;
+import org.apache.metron.integration.util.integration.components.KafkaWithZKComponent;
+import org.apache.metron.integration.util.mock.MockHTable;
+import org.apache.metron.integration.util.threatintel.ThreatIntelHelper;
+import org.apache.metron.reference.lookup.LookupKV;
+import org.apache.metron.utils.SourceConfigUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+public class EnrichmentIntegrationTest {
+
+ private String fluxPath = "src/main/resources/Metron_Configs/topologies/enrichment/test.yaml";
+ private String indexDir = "target/elasticsearch";
+ private String sampleParsedPath = "src/main/resources/SampleParsed/YafExampleParsed";
+ private String sampleIndexedPath = "src/main/resources/SampleIndexed/YafIndexed";
+ private Map<String, String> sourceConfigs = new HashMap<>();
+
+ public static class Provider implements TableProvider, Serializable {
+ MockHTable.Provider provider = new MockHTable.Provider();
+ @Override
+ public HTableInterface getTable(Configuration config, String tableName) throws IOException {
+ return provider.getTable(config, tableName);
+ }
+ }
+
+
+ @Test
+ public void test() throws Exception {
+ final String dateFormat = "yyyy.MM.dd.hh";
+ final String index = "yaf_" + new SimpleDateFormat(dateFormat).format(new Date());
+ String yafConfig = "{\n" +
+ " \"index\": \"yaf\",\n" +
+ " \"batchSize\": 5,\n" +
+ " \"enrichmentFieldMap\":\n" +
+ " {\n" +
+ " \"geo\": [\"sip\", \"dip\"],\n" +
+ " \"host\": [\"sip\", \"dip\"]\n" +
+ " },\n" +
+ " \"threatIntelFieldMap\":\n" +
+ " {\n" +
+ " \"ip\": [\"sip\", \"dip\"]\n" +
+ " }\n" +
+ "}";
+ sourceConfigs.put("yaf", yafConfig);
+ final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath);
+ final String cf = "cf";
+ final String trackerHBaseTable = "tracker";
+ final String ipThreatIntelTable = "ip_threat_intel";
+ final Properties topologyProperties = new Properties() {{
+ setProperty("org.apache.metron.enrichment.host.known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"},\n" +
+ "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"},\n" +
+ "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"},\n" +
+ "{\"ip\":\"10.0.2.15\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}]");
+ setProperty("hbase.provider.impl","" + Provider.class.getName());
+ setProperty("threat.intel.tracker.table", trackerHBaseTable);
+ setProperty("threat.intel.tracker.cf", cf);
+ setProperty("threat.intel.ip.table", ipThreatIntelTable);
+ setProperty("threat.intel.ip.cf", cf);
+ setProperty("es.clustername", "metron");
+ setProperty("es.port", "9300");
+ setProperty("es.ip", "localhost");
+ setProperty("index.date.format", dateFormat);
+ }};
+ final KafkaWithZKComponent kafkaComponent = new KafkaWithZKComponent().withTopics(new ArrayList<KafkaWithZKComponent.Topic>() {{
+ add(new KafkaWithZKComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
+ }})
+ .withPostStartCallback(new Function<KafkaWithZKComponent, Void>() {
+ @Nullable
+ @Override
+ public Void apply(@Nullable KafkaWithZKComponent kafkaWithZKComponent) {
+ topologyProperties.setProperty("kafka.zk", kafkaWithZKComponent.getZookeeperConnect());
+ try {
+ for(String sourceType: sourceConfigs.keySet()) {
+ SourceConfigUtils.writeToZookeeper(sourceType, sourceConfigs.get(sourceType).getBytes(), kafkaWithZKComponent.getZookeeperConnect());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+ });
+
+ ElasticSearchComponent esComponent = new ElasticSearchComponent.Builder()
+ .withHttpPort(9211)
+ .withIndexDir(new File(indexDir))
+ .build();
+
+ //create MockHBaseTables
+ final MockHTable trackerTable = (MockHTable)MockHTable.Provider.addToCache(trackerHBaseTable, cf);
+ final MockHTable ipTable = (MockHTable)MockHTable.Provider.addToCache(ipThreatIntelTable, cf);
+ ThreatIntelHelper.INSTANCE.load(ipTable, cf, new ArrayList<LookupKV<ThreatIntelKey, ThreatIntelValue>>(){{
+ add(new LookupKV<>(new ThreatIntelKey("10.0.2.3"), new ThreatIntelValue(new HashMap<String, String>())));
+ }});
+
+ FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
+ .withTopologyLocation(new File(fluxPath))
+ .withTopologyName("test")
+ .withTopologyProperties(topologyProperties)
+ .build();
+
+ UnitTestHelper.verboseLogging();
+ ComponentRunner runner = new ComponentRunner.Builder()
+ .withComponent("kafka", kafkaComponent)
+ .withComponent("elasticsearch", esComponent)
+ .withComponent("storm", fluxComponent)
+ .withTimeBetweenAttempts(10000)
+ .build();
+ runner.start();
+ fluxComponent.submitTopology();
+ kafkaComponent.writeMessages(Constants.ENRICHMENT_TOPIC, inputMessages);
+ List<Map<String, Object>> docs =
+ runner.process(new Processor<List<Map<String, Object>>> () {
+ List<Map<String, Object>> docs = null;
+ public ReadinessState process(ComponentRunner runner){
+ ElasticSearchComponent elasticSearchComponent = runner.getComponent("elasticsearch", ElasticSearchComponent.class);
+ if(elasticSearchComponent.hasIndex(index)) {
+ try {
+ docs = elasticSearchComponent.getAllIndexedDocs(index, "yaf");
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to retrieve indexed documents.", e);
+ }
+ if(docs.size() < inputMessages.size()) {
+ return ReadinessState.NOT_READY;
+ }
+ else {
+ return ReadinessState.READY;
+ }
+ }
+ else {
+ return ReadinessState.NOT_READY;
+ }
+ }
+
+ public List<Map<String, Object>> getResult() {
+ return docs;
+ }
+ });
+
+ List<byte[]> sampleIndexedMessages = TestUtils.readSampleData(sampleIndexedPath);
+ Assert.assertEquals(sampleIndexedMessages.size(), docs.size());
+ for (int i = 0; i < docs.size(); i++) {
+ String doc = docs.get(i).toString();
+ String sampleIndexedMessage = new String(sampleIndexedMessages.get(i));
+ assertEqual(sampleIndexedMessage, doc);
+ }
+ runner.stop();
+ }
+ public static void assertEqual(String doc1, String doc2) {
+ Assert.assertEquals(doc1.length(), doc2.length());
+ char[] c1 = doc1.toCharArray();
+ Arrays.sort(c1);
+ char[] c2 = doc2.toCharArray();
+ Arrays.sort(c2);
+ Assert.assertArrayEquals(c1, c2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java
new file mode 100644
index 0000000..c55a069
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+import com.google.common.base.Function;
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.consumer.ConsumerIterator;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.producer.Producer;
+import kafka.message.MessageAndMetadata;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.metron.Constants;
+import org.apache.metron.integration.util.TestUtils;
+import org.apache.metron.integration.util.UnitTestHelper;
+import org.apache.metron.integration.util.integration.ComponentRunner;
+import org.apache.metron.integration.util.integration.Processor;
+import org.apache.metron.integration.util.integration.ReadinessState;
+import org.apache.metron.integration.util.integration.components.ElasticSearchComponent;
+import org.apache.metron.integration.util.integration.components.FluxTopologyComponent;
+import org.apache.metron.integration.util.integration.components.KafkaWithZKComponent;
+import org.apache.metron.integration.util.integration.util.KafkaUtil;
+import org.apache.metron.spout.pcap.HDFSWriterCallback;
+import org.apache.metron.test.converters.HexStringConverter;
+import org.apache.metron.utils.SourceConfigUtils;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.*;
+import java.util.*;
+
+public abstract class ParserIntegrationTest {
+
+ public abstract String getFluxPath();
+ public abstract String getSampleInputPath();
+ public abstract String getSampleParsedPath();
+ public abstract String getSourceType();
+ public abstract String getSourceConfig();
+ public abstract String getFluxTopicProperty();
+
+ @Test
+ public void test() throws Exception {
+
+ final String kafkaTopic = "test";
+
+ final List<byte[]> inputMessages = TestUtils.readSampleData(getSampleInputPath());
+
+ final Properties topologyProperties = new Properties() {{
+ setProperty(getFluxTopicProperty(), kafkaTopic);
+ }};
+ final KafkaWithZKComponent kafkaComponent = new KafkaWithZKComponent().withTopics(new ArrayList<KafkaWithZKComponent.Topic>() {{
+ add(new KafkaWithZKComponent.Topic(kafkaTopic, 1));
+ }})
+ .withPostStartCallback(new Function<KafkaWithZKComponent, Void>() {
+ @Nullable
+ @Override
+ public Void apply(@Nullable KafkaWithZKComponent kafkaWithZKComponent) {
+ topologyProperties.setProperty("kafka.zk", kafkaWithZKComponent.getZookeeperConnect());
+ try {
+ SourceConfigUtils.writeToZookeeper(getSourceType(), getSourceConfig().getBytes(), kafkaWithZKComponent.getZookeeperConnect());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+ });
+
+ topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
+ FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
+ .withTopologyLocation(new File(getFluxPath()))
+ .withTopologyName("test")
+ .withTopologyProperties(topologyProperties)
+ .build();
+
+ UnitTestHelper.verboseLogging();
+ ComponentRunner runner = new ComponentRunner.Builder()
+ .withComponent("kafka", kafkaComponent)
+ .withComponent("storm", fluxComponent)
+ .withTimeBetweenAttempts(5000)
+ .build();
+ runner.start();
+ fluxComponent.submitTopology();
+ kafkaComponent.writeMessages(kafkaTopic, inputMessages);
+ List<byte[]> outputMessages =
+ runner.process(new Processor<List<byte[]>>() {
+ List<byte[]> messages = null;
+
+ public ReadinessState process(ComponentRunner runner) {
+ KafkaWithZKComponent kafkaWithZKComponent = runner.getComponent("kafka", KafkaWithZKComponent.class);
+ List<byte[]> outputMessages = kafkaWithZKComponent.readMessages(Constants.ENRICHMENT_TOPIC);
+ if (outputMessages.size() == inputMessages.size()) {
+ messages = outputMessages;
+ return ReadinessState.READY;
+ } else {
+ return ReadinessState.NOT_READY;
+ }
+ }
+
+ public List<byte[]> getResult() {
+ return messages;
+ }
+ });
+ List<byte[]> sampleParsedMessages = TestUtils.readSampleData(getSampleParsedPath());
+ Assert.assertEquals(sampleParsedMessages.size(), outputMessages.size());
+ for (int i = 0; i < outputMessages.size(); i++) {
+ String sampleParsedMessage = new String(sampleParsedMessages.get(i));
+ String outputMessage = new String(outputMessages.get(i));
+ assertJSONEqual(sampleParsedMessage, outputMessage);
+ }
+ runner.stop();
+
+ }
+
+ public static void assertJSONEqual(String doc1, String doc2) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ Map m1 = mapper.readValue(doc1, Map.class);
+ Map m2 = mapper.readValue(doc2, Map.class);
+ Assert.assertEquals(m1.size(), m2.size());
+ for(Object k : m1.keySet()) {
+ Object v1 = m1.get(k);
+ Object v2 = m2.get(k);
+
+ if(v2 == null) {
+ Assert.fail("Unable to find key: " + k + " in output");
+ }
+ if(k.equals("timestamp")) {
+ //TODO: Take the ?!?@ timestamps out of the reference file.
+ Assert.assertEquals(v1.toString().length(), v2.toString().length());
+ }
+ else if(!v2.equals(v1)) {
+ Assert.assertEquals("value mismatch for " + k ,v1, v2);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/SnortIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/SnortIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/SnortIntegrationTest.java
new file mode 100644
index 0000000..7508ad7
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/SnortIntegrationTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+public class SnortIntegrationTest extends ParserIntegrationTest {
+
+ @Override
+ public String getFluxPath() {
+ return "src/main/resources/Metron_Configs/topologies/snort/test.yaml";
+ }
+
+ @Override
+ public String getSampleInputPath() {
+ return "src/main/resources/SampleInput/SnortOutput";
+ }
+
+ @Override
+ public String getSampleParsedPath() {
+ return "src/main/resources/SampleParsed/SnortParsed";
+ }
+
+ @Override
+ public String getSourceType() {
+ return "snort";
+ }
+
+ @Override
+ public String getSourceConfig() {
+ return "{\"index\": \"snort\"," +
+ " \"batchSize\": 1," +
+ " \"enrichmentFieldMap\":" +
+ " {" +
+ " \"geo\": [\"src\", \"dst\"]," +
+ " \"host\": [\"src\", \"dst\"]" +
+ " }," +
+ " \"threatIntelFieldMap\":" +
+ " {" +
+ " \"ip\": [\"src\", \"dst\"]" +
+ " }" +
+ "}";
+ }
+
+ @Override
+ public String getFluxTopicProperty() {
+ return "spout.kafka.topic.snort";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/YafIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/YafIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/YafIntegrationTest.java
new file mode 100644
index 0000000..cf91bea
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/YafIntegrationTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+public class YafIntegrationTest extends ParserIntegrationTest {
+
+ @Override
+ public String getFluxPath() {
+ return "src/main/resources/Metron_Configs/topologies/yaf/test.yaml";
+ }
+
+ @Override
+ public String getSampleInputPath() {
+ return "src/main/resources/SampleInput/YafExampleOutput";
+ }
+
+ @Override
+ public String getSampleParsedPath() {
+ return "src/main/resources/SampleParsed/YafExampleParsed";
+ }
+
+ @Override
+ public String getSourceType() {
+ return "yaf";
+ }
+
+ @Override
+ public String getSourceConfig() {
+ return "{\"index\": \"yaf\"," +
+ " \"batchSize\": 5," +
+ " \"enrichmentFieldMap\":" +
+ " {" +
+ " \"geo\": [\"sip\", \"dip\"]," +
+ " \"host\": [\"sip\", \"dip\"]" +
+ " }," +
+ " \"threatIntelFieldMap\":" +
+ " {" +
+ " \"ip\": [\"sip\", \"dip\"]" +
+ " }" +
+ "}";
+ }
+
+ @Override
+ public String getFluxTopicProperty() {
+ return "spout.kafka.topic.yaf";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java
deleted file mode 100644
index 3337855..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.pcap;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.metron.hbase.HTableProvider;
-import org.apache.metron.hbase.TableProvider;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
-import org.apache.metron.integration.util.UnitTestHelper;
-import org.apache.metron.integration.util.integration.ComponentRunner;
-import org.apache.metron.integration.util.integration.Processor;
-import org.apache.metron.integration.util.integration.ReadinessState;
-import org.apache.metron.integration.util.integration.components.ElasticSearchComponent;
-import org.apache.metron.integration.util.integration.components.FluxTopologyComponent;
-import org.apache.metron.integration.util.mock.MockHTable;
-import org.apache.metron.integration.util.threatintel.ThreatIntelHelper;
-import org.apache.metron.parsing.parsers.PcapParser;
-import org.apache.metron.reference.lookup.LookupKV;
-import org.apache.metron.test.converters.HexStringConverter;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
-import org.apache.metron.threatintel.ThreatIntelResults;
-import org.json.simple.JSONObject;
-import org.junit.Assert;
-import org.junit.Test;
-
-import javax.annotation.Nullable;
-import java.io.*;
-import java.text.SimpleDateFormat;
-import java.util.*;
-
-public class PcapIntegrationTest {
-
- private String topologiesDir = "src/main/resources/Metron_Configs/topologies";
- private String targetDir = "target";
-
- public static class Provider implements TableProvider, Serializable{
-
- MockHTable.Provider provider = new MockHTable.Provider();
- @Override
- public HTableInterface getTable(Configuration config, String tableName) throws IOException {
- return provider.getTable(config, tableName);
- }
- }
-
- @Test
- public void testTopology() throws Exception {
- if(!new File(topologiesDir).exists()) {
- topologiesDir = UnitTestHelper.findDir("topologies");
- }
- if(!new File(targetDir).exists()) {
- targetDir = UnitTestHelper.findDir("target");
- }
- Assert.assertNotNull(topologiesDir);
- Assert.assertNotNull(targetDir);
- final List<String> expectedPcapIds= getExpectedPcap(new File(topologiesDir + "/../../SampleInput/PCAPExampleOutput"));
- Assert.assertTrue("Expected non-zero number of PCAP Ids from the sample data", expectedPcapIds.size() > 0);
- System.out.println("Using topologies directory: " + topologiesDir);
-
- ElasticSearchComponent esComponent = new ElasticSearchComponent.Builder()
- .withHttpPort(9211)
- .withIndexDir(new File(targetDir + "/elasticsearch"))
- .build();
- final String cf = "cf";
- final String trackerHBaseTable = "tracker";
- final String ipThreatIntelTable = "ip_threat_intel";
- Properties topologyProperties = new Properties() {{
- setProperty("input.path", "src/main/resources/");
- setProperty("es.port", "9300");
- setProperty("es.ip", "localhost");
- setProperty("es.clustername", "metron");
- setProperty("mysql.ip", "node1");
- setProperty("mysql.port", "3306");
- setProperty("mysql.username", "root");
- setProperty("mysql.password", "P@ssw0rd");
- setProperty("pcap.binary.converter", "FROM_HEX_STRING");
- setProperty("testing.repeating", "false");
- setProperty("org.apache.metron.metrics.reporter.graphite", "false");
- setProperty("org.apache.metron.metrics.reporter.console", "false");
- setProperty("org.apache.metron.metrics.reporter.jmx", "false");
- setProperty("org.apache.metron.metrics.TelemetryParserBolt.acks","true");
- setProperty("org.apache.metron.metrics.TelemetryParserBolt.emits", "true");
- setProperty("org.apache.metron.metrics.TelemetryParserBolt.fails","true");
- setProperty("org.apache.metron.metrics.GenericEnrichmentBolt.acks","true");
- setProperty("org.apache.metron.metrics.GenericEnrichmentBolt.emits","true");
- setProperty("org.apache.metron.metrics.GenericEnrichmentBolt.fails","true");
- setProperty("org.apache.metron.metrics.TelemetryIndexingBolt.acks", "true");
- setProperty("org.apache.metron.metrics.TelemetryIndexingBolt.emits","true");
- setProperty("org.apache.metron.metrics.TelemetryIndexingBolt.fails","true");
- setProperty("kafka.zk", "localhost:2000,localhost:2000");
- setProperty("bolt.hbase.table.name", "pcap_test");
- setProperty("bolt.hbase.table.fields", "t:value");
- setProperty("bolt.hbase.table.key.tuple.field.name", "key");
- setProperty("bolt.hbase.table.timestamp.tuple.field.name", "timestamp");
- setProperty("bolt.hbase.enable.batching", "false");
- setProperty("bolt.hbase.write.buffer.size.in.bytes", "2000000");
- setProperty("bolt.hbase.durability", "SKIP_WAL");
- setProperty("bolt.hbase.partitioner.region.info.refresh.interval.mins","60");
- setProperty("hbase.provider.impl","" + Provider.class.getName());
- setProperty("threat.intel.tracker.table", trackerHBaseTable);
- setProperty("threat.intel.tracker.cf", cf);
- setProperty("threat.intel.ip.table", ipThreatIntelTable);
- setProperty("threat.intel.ip.cf", cf);
- setProperty("org.apache.metron.enrichment.host.known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"}," +
- "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"}," +
- "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}," +
- "{\"ip\":\"10.0.2.15\", \"local\":\"YES\", " +
- "\"type\":\"printer\", \"asset_value\" : \"important\"}]");
- }};
- //create MockHBaseTables
- final MockHTable trackerTable = (MockHTable)MockHTable.Provider.addToCache(trackerHBaseTable, cf);
- final MockHTable ipTable = (MockHTable)MockHTable.Provider.addToCache(ipThreatIntelTable, cf);
- ThreatIntelHelper.INSTANCE.load(ipTable, cf, new ArrayList<LookupKV<ThreatIntelKey, ThreatIntelValue>>(){{
- add(new LookupKV<>(new ThreatIntelKey("10.0.2.3"), new ThreatIntelValue(new HashMap<String, String>())));
- }});
- final MockHTable pcapTable = (MockHTable) MockHTable.Provider.addToCache("pcap_test", "t");
- FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
- .withTopologyLocation(new File(topologiesDir + "/pcap/local.yaml"))
- .withTopologyName("pcap")
- .withTopologyProperties(topologyProperties)
- .build();
- //UnitTestHelper.verboseLogging();
- ComponentRunner runner = new ComponentRunner.Builder()
- .withComponent("elasticsearch", esComponent)
- .withComponent("storm", fluxComponent)
- .build();
-
- final String index = getIndex();
- System.out.println("Index of the run: " + index);
- runner.start();
- fluxComponent.submitTopology();
- List<Map<String, Object>> docs =
- runner.process(new Processor<List<Map<String, Object>>> () {
- List<Map<String, Object>> docs = null;
- public ReadinessState process(ComponentRunner runner){
- ElasticSearchComponent elasticSearchComponent = runner.getComponent("elasticsearch", ElasticSearchComponent.class);
- if(elasticSearchComponent.hasIndex(index)) {
- try {
- docs = elasticSearchComponent.getAllIndexedDocs(index);
- } catch (IOException e) {
- throw new IllegalStateException("Unable to retrieve indexed documents.", e);
- }
- if(docs.size() < expectedPcapIds.size() && pcapTable.getPutLog().size() < expectedPcapIds.size()) {
- return ReadinessState.NOT_READY;
- }
- else {
- return ReadinessState.READY;
- }
- }
- else {
- return ReadinessState.NOT_READY;
- }
- }
-
- public List<Map<String, Object>> getResult() {
- return docs;
- }
- });
-
- Assert.assertEquals(expectedPcapIds.size(), pcapTable.getPutLog().size());
- UnitTestHelper.assertSetEqual("PCap IDs from Index"
- , new HashSet<>(expectedPcapIds)
- , convertToSet(Iterables.transform(docs, DOC_TO_PCAP_ID))
- );
- UnitTestHelper.assertSetEqual("PCap IDs from HBase"
- , new HashSet<>(expectedPcapIds)
- , convertToSet(Iterables.transform(pcapTable.getPutLog(), RK_TO_PCAP_ID))
- );
- Iterable<JSONObject> packetsFromHBase = Iterables.transform(pcapTable.getPutLog(), PUT_TO_PCAP);
- Assert.assertEquals(expectedPcapIds.size(), Iterables.size(packetsFromHBase));
-
- List<Map<String, Object>> allDocs= runner.getComponent("elasticsearch", ElasticSearchComponent.class).getAllIndexedDocs(index, null);
- boolean hasThreat = false;
- for(Map<String, Object> d : allDocs) {
- Map<String, Object> message = (Map<String, Object>) d.get("message");
- Set<String> ips = new HashSet<>(Arrays.asList((String)message.get("ip_dst_addr"), (String)message.get("ip_src_addr")));
- if(ips.contains("10.0.2.3")) {
- hasThreat = true;
- Map<String, Object> alerts = (Map<String, Object>) ((Map<String, Object>) d.get("alerts")).get("ip");
- Assert.assertTrue( ((Map<String,Object>)alerts.get("ip_dst_addr")).size() > 0
- || ((Map<String,Object>)alerts.get("ip_src_addr")).size() > 0
- );
- }
- }
- Assert.assertTrue(hasThreat);
- MockHTable.Provider.clear();
- runner.stop();
- }
-
- public static Set<String> convertToSet(Iterable<String> strings) {
- Set<String> ret = new HashSet<String>();
- Iterables.addAll(ret, strings);
- return ret;
- }
- public static final Function<Put, String> RK_TO_PCAP_ID = new Function<Put, String>() {
- @Nullable
- public String apply(@Nullable Put put) {
- String rk =new String(put.getRow());
- return Joiner.on("-").join(Iterables.limit(Splitter.on('-').split(rk), 5));
- }
- };
-
- public static final Function<Map<String, Object>, String> DOC_TO_PCAP_ID = new Function<Map<String, Object>, String>() {
-
- @Nullable
- public String apply(@Nullable Map<String, Object> doc) {
- return (String)doc.get("pcap_id");
- }
- };
-
- public static final Function<Put, JSONObject> PUT_TO_PCAP = new
- Function<Put, JSONObject>() {
- @Nullable
- public JSONObject apply(@Nullable Put put) {
- try {
- return putToPcap(put);
- } catch (IOException e) {
- throw new RuntimeException("Unable to convert put to PCAP: " + put);
- }
- }
- };
-
-
-
- private static List<String> getExpectedPcap(File rawFile) throws IOException {
- List<String> ret = new ArrayList<String>();
- PcapParser parser = new PcapParser();
- parser.withTsPrecision("MICRO");
- parser.init();
- BufferedReader br = new BufferedReader(new FileReader(rawFile));
- for(String line = null; (line = br.readLine()) != null;) {
- byte[] pcapBytes = new HexStringConverter().convert(line);
- List<JSONObject> list = parser.parse(pcapBytes);
- for(JSONObject message : list) {
- ret.add((String) message.get("pcap_id"));
- }
- }
- return ret;
- }
-
- private static String getIndex() {
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy.MM.dd.hh");
- Date d = new Date();
- return "pcap_index_" + sdf.format(d);
- }
-
- private static JSONObject putToPcap(Put p) throws IOException {
- PcapParser parser = new PcapParser();
- parser.init();
- List<Cell> cells = p.get(Bytes.toBytes("t"), Bytes.toBytes("value"));
- Assert.assertEquals(1, cells.size());
- List<JSONObject> messages = parser.parse(cells.get(0).getValueArray());
- Assert.assertEquals(1, messages.size());
- return messages.get(0);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/TestUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/TestUtils.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/TestUtils.java
new file mode 100644
index 0000000..594700b
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/TestUtils.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestUtils {
+
+ public static List<byte[]> readSampleData(String samplePath) throws IOException {
+ BufferedReader br = new BufferedReader(new FileReader(samplePath));
+ List<byte[]> ret = new ArrayList<>();
+ for (String line = null; (line = br.readLine()) != null; ) {
+ long ts = System.currentTimeMillis();
+ ret.add(line.getBytes());
+ }
+ br.close();
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a4c773d..499e323 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,7 +51,7 @@
<exclude>metron-ui/lib/public/**</exclude>
<exclude>**/src/main/resources/patterns/**</exclude>
<exclude>**/src/test/resources/**</exclude>
- <exclude>**/src/main/resources/SampleInput/**</exclude>
+ <exclude>**/src/main/resources/Sample*/**</exclude>
<exclude>**/dependency-reduced-pom.xml</exclude>
<exclude>**/files/opensoc-ui</exclude>
<exclude>**/*.iml</exclude>