You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/02/16 18:10:19 UTC
[3/5] incubator-metron git commit: METRON-35 Implement threat
intelligence message enrichment closes apache/incubator-metron#22
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
new file mode 100644
index 0000000..0673bb3
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
@@ -0,0 +1,62 @@
+package org.apache.metron.dataloads.hbase.mr;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.metron.reference.lookup.LookupKey;
+import org.apache.metron.reference.lookup.accesstracker.AccessTracker;
+import org.apache.metron.reference.lookup.accesstracker.AccessTrackerUtil;
+
+import java.io.IOException;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public class PrunerMapper extends TableMapper<ImmutableBytesWritable, Delete> {
+ public static final String ACCESS_TRACKER_TABLE_CONF = "access_tracker_table";
+ public static final String ACCESS_TRACKER_CF_CONF = "access_tracker_cf";
+ public static final String TIMESTAMP_CONF = "access_tracker_timestamp";
+ public static final String ACCESS_TRACKER_NAME_CONF = "access_tracker_name";
+ AccessTracker tracker;
+ @Override
+ public void setup(Context context) throws IOException
+ {
+ String atTable = context.getConfiguration().get(ACCESS_TRACKER_TABLE_CONF);
+ String atCF = context.getConfiguration().get(ACCESS_TRACKER_CF_CONF);
+ String atName = context.getConfiguration().get(ACCESS_TRACKER_NAME_CONF);
+ HTable table = new HTable(context.getConfiguration(), atTable);
+ long timestamp = context.getConfiguration().getLong(TIMESTAMP_CONF, -1);
+ if(timestamp < 0) {
+ throw new IllegalStateException("Must specify a timestamp that is positive.");
+ }
+ try {
+ tracker = AccessTrackerUtil.INSTANCE.loadAll(AccessTrackerUtil.INSTANCE.loadAll(table, atCF, atName, timestamp));
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to load the accesstrackers from the directory", e);
+ }
+ }
+
+ @Override
+ public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
+ if(tracker == null || key == null) {
+ throw new RuntimeException("Tracker = " + tracker + " key = " + key);
+ }
+ if(!tracker.hasSeen(toLookupKey(key.get()))) {
+ Delete d = new Delete(key.get());
+ context.write(key, d);
+ }
+ }
+
+ protected LookupKey toLookupKey(final byte[] bytes) {
+ return new LookupKey() {
+ @Override
+ public byte[] toBytes() {
+ return bytes;
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/resources/hbase-site.xml b/metron-streaming/Metron-DataLoads/src/main/resources/hbase-site.xml
deleted file mode 100644
index a73469d..0000000
--- a/metron-streaming/Metron-DataLoads/src/main/resources/hbase-site.xml
+++ /dev/null
@@ -1,100 +0,0 @@
-<!--Tue Feb 11 02:34:08 2014 -->
-<configuration>
-
- <property>
- <name>hbase.regionserver.global.memstore.lowerLimit</name>
- <value>0.38</value>
- </property>
- <property>
- <name>zookeeper.session.timeout</name>
- <value>30000</value>
- </property>
-
- <property>
- <name>hbase.security.authorization</name>
- <value>false</value>
- </property>
- <property>
- <name>hbase.cluster.distributed</name>
- <value>true</value>
- </property>
-
- <property>
- <name>hbase.hstore.flush.retries.number</name>
- <value>120</value>
- </property>
- <property>
- <name>hbase.hregion.memstore.block.multiplier</name>
- <value>4</value>
- </property>
- <property>
- <name>hbase.hstore.blockingStoreFiles</name>
- <value>200</value>
- </property>
- <property>
- <name>hbase.defaults.for.version.skip</name>
- <value>true</value>
- </property>
- <property>
- <name>hbase.regionserver.global.memstore.upperLimit</name>
- <value>0.4</value>
- </property>
- <property>
- <name>hbase.hregion.memstore.mslab.enabled</name>
- <value>true</value>
- </property>
- <property>
- <name>hbase.client.keyvalue.maxsize</name>
- <value>10485760</value>
- </property>
- <property>
- <name>hbase.superuser</name>
- <value>hbase</value>
- </property>
- <property>
- <name>hfile.block.cache.size</name>
- <value>0.40</value>
- </property>
- <property>
- <name>zookeeper.znode.parent</name>
- <value>/hbase-unsecure</value>
- </property>
- <property>
- <name>hbase.hregion.max.filesize</name>
- <value>10737418240</value>
- </property>
- <property>
- <name>hbase.zookeeper.property.clientPort</name>
- <value>2181</value>
- </property>
- <property>
- <name>hbase.security.authentication</name>
- <value>simple</value>
- </property>
- <property>
- <name>hbase.client.scanner.caching</name>
- <value>100</value>
- </property>
- <property>
- <name>hbase.hregion.memstore.flush.size</name>
- <value>134217728</value>
- </property>
- <property>
- <name>hbase.hregion.majorcompaction</name>
- <value>86400000</value>
- </property>
- <property>
- <name>hbase.zookeeper.property.clientPort</name>
- <value>2181</value>
- </property>
-
- <property>
- <name>hbase.zookeeper.quorum</name>
- <value>zkpr1</value>
- </property>
-
- <property>
- <name>hbase.client.write.buffer</name>
- <value>500000000</value>
- </property>
-</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/ExtractorTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/ExtractorTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/ExtractorTest.java
new file mode 100644
index 0000000..e949cc7
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/ExtractorTest.java
@@ -0,0 +1,61 @@
+package org.apache.metron.dataloads.extractor;
+
+import com.google.common.collect.Iterables;
+import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/3/16.
+ */
+public class ExtractorTest {
+ public static class DummyExtractor implements Extractor
+ {
+
+ @Override
+ public Iterable<ThreatIntelResults> extract(String line) throws IOException {
+ ThreatIntelKey key = new ThreatIntelKey();
+ key.indicator = "dummy";
+ Map<String, String> value = new HashMap<>();
+ value.put("indicator", "dummy");
+ return Arrays.asList(new ThreatIntelResults(key, value));
+ }
+
+ @Override
+ public void initialize(Map<String, Object> config) {
+
+ }
+ }
+ @Test
+ public void testDummyExtractor() throws IllegalAccessException, InstantiationException, ClassNotFoundException, IOException {
+ Extractor extractor = Extractors.create(DummyExtractor.class.getName());
+ ThreatIntelResults results = Iterables.getFirst(extractor.extract(null), null);
+ Assert.assertEquals("dummy", results.getKey().indicator);
+ Assert.assertEquals("dummy", results.getValue().get("indicator"));
+ }
+
+ @Test
+ public void testExtractionLoading() throws Exception {
+ /**
+ config:
+ {
+ "config" : {}
+ ,"extractor" : "org.apache.metron.dataloads.extractor.ExtractorTest$DummyExtractor"
+ }
+ */
+ String config = "{\n" +
+ " \"config\" : {}\n" +
+ " ,\"extractor\" : \"org.apache.metron.dataloads.extractor.ExtractorTest$DummyExtractor\"\n" +
+ " }";
+ ExtractorHandler handler = ExtractorHandler.load(config);
+ ThreatIntelResults results = Iterables.getFirst(handler.getExtractor().extract(null), null);
+ Assert.assertEquals("dummy", results.getKey().indicator);
+ Assert.assertEquals("dummy", results.getValue().get("indicator"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/csv/CSVExtractorTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/csv/CSVExtractorTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/csv/CSVExtractorTest.java
new file mode 100644
index 0000000..923cbb5
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/csv/CSVExtractorTest.java
@@ -0,0 +1,102 @@
+package org.apache.metron.dataloads.extractor.csv;
+
+import com.google.common.collect.Iterables;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Created by cstella on 2/3/16.
+ */
+public class CSVExtractorTest {
+ @Test
+ public void testCSVExtractorMapColumns() throws Exception {
+ /**
+ {
+ "config" : {
+ "columns" : {
+ "host" : 0
+ ,"meta" : 2
+ }
+ ,"indicator_column" : "host"
+ ,"separator" : ","
+ }
+ ,"extractor" : "CSV"
+ }
+ */
+ String config = "{\n" +
+ " \"config\" : {\n" +
+ " \"columns\" : [\"host:0\",\"meta:2\"]\n" +
+ " ,\"indicator_column\" : \"host\"\n" +
+ " ,\"separator\" : \",\"\n" +
+ " }\n" +
+ " ,\"extractor\" : \"CSV\"\n" +
+ " }";
+ ExtractorHandler handler = ExtractorHandler.load(config);
+ validate(handler);
+ }
+ @Test
+ public void testCSVExtractorListColumns() throws Exception {
+ /**
+ {
+ "config" : {
+ "columns" : ["host:0","meta:2"]
+ ,"indicator_column" : "host"
+ ,"separator" : ","
+ }
+ ,"extractor" : "CSV"
+ }
+ */
+ String config = "{\n" +
+ " \"config\" : {\n" +
+ " \"columns\" : [\"host:0\",\"meta:2\"]\n" +
+ " ,\"indicator_column\" : \"host\"\n" +
+ " ,\"separator\" : \",\"\n" +
+ " }\n" +
+ " ,\"extractor\" : \"CSV\"\n" +
+ " }";
+ ExtractorHandler handler = ExtractorHandler.load(config);
+ validate(handler);
+ }
+
+ @Test
+ public void testCSVExtractor() throws Exception {
+ /**
+ {
+ "config" : {
+ "columns" : "host:0,meta:2"
+ ,"indicator_column" : "host"
+ ,"separator" : ","
+ }
+ ,"extractor" : "CSV"
+ }
+ */
+ String config = "{\n" +
+ " \"config\" : {\n" +
+ " \"columns\" : \"host:0,meta:2\"\n" +
+ " ,\"indicator_column\" : \"host\"\n" +
+ " ,\"separator\" : \",\"\n" +
+ " }\n" +
+ " ,\"extractor\" : \"CSV\"\n" +
+ " }";
+ ExtractorHandler handler = ExtractorHandler.load(config);
+ validate(handler);
+ }
+
+ public void validate(ExtractorHandler handler) throws IOException {
+ {
+ ThreatIntelResults results = Iterables.getFirst(handler.getExtractor().extract("google.com,1.0,foo"), null);
+ Assert.assertEquals("google.com", results.getKey().indicator);
+ Assert.assertEquals("google.com", results.getValue().get("host"));
+ Assert.assertEquals("foo", results.getValue().get("meta"));
+ Assert.assertEquals(2, results.getValue().size());
+ }
+ {
+ Iterable<ThreatIntelResults> results = handler.getExtractor().extract("#google.com,1.0,foo");
+ Assert.assertEquals(0, Iterables.size(results));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/stix/StixExtractorTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/stix/StixExtractorTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/stix/StixExtractorTest.java
new file mode 100644
index 0000000..d362241
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/stix/StixExtractorTest.java
@@ -0,0 +1,185 @@
+package org.apache.metron.dataloads.extractor.stix;
+
+import com.google.common.collect.Iterables;
+import org.apache.metron.dataloads.ThreatIntelBulkLoader;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Created by cstella on 2/9/16.
+ */
+public class StixExtractorTest {
+ @Test
+ public void testStixAddresses() throws Exception {
+ /**
+ <!--
+ STIX IP Watchlist Example
+
+ Copyright (c) 2015, The MITRE Corporation. All rights reserved.
+ The contents of this file are subject to the terms of the STIX License located at http://stix.mitre.org/about/termsofuse.html.
+
+ This example demonstrates a simple usage of STIX to represent a list of IP address indicators (watchlist of IP addresses). Cyber operations and malware analysis centers often share a list of suspected malicious IP addresses with information about what those IPs might indicate. This STIX package represents a list of three IP addresses with a short dummy description of what they represent.
+
+ It demonstrates the use of:
+
+ * STIX Indicators
+ * CybOX within STIX
+ * The CybOX Address Object (IP)
+ * CybOX Patterns (apply_condition="ANY")
+ * Controlled vocabularies
+
+ Created by Mark Davidson
+ -->
+ <stix:STIX_Package
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:stix="http://stix.mitre.org/stix-1"
+ xmlns:indicator="http://stix.mitre.org/Indicator-2"
+ xmlns:cybox="http://cybox.mitre.org/cybox-2"
+ xmlns:AddressObject="http://cybox.mitre.org/objects#AddressObject-2"
+ xmlns:cyboxVocabs="http://cybox.mitre.org/default_vocabularies-2"
+ xmlns:stixVocabs="http://stix.mitre.org/default_vocabularies-1"
+ xmlns:example="http://example.com/"
+ id="example:STIXPackage-33fe3b22-0201-47cf-85d0-97c02164528d"
+ timestamp="2014-05-08T09:00:00.000000Z"
+ version="1.2">
+ <stix:STIX_Header>
+ <stix:Title>Example watchlist that contains IP information.</stix:Title>
+ <stix:Package_Intent xsi:type="stixVocabs:PackageIntentVocab-1.0">Indicators - Watchlist</stix:Package_Intent>
+ </stix:STIX_Header>
+ <stix:Indicators>
+ <stix:Indicator xsi:type="indicator:IndicatorType" id="example:Indicator-33fe3b22-0201-47cf-85d0-97c02164528d" timestamp="2014-05-08T09:00:00.000000Z">
+ <indicator:Type xsi:type="stixVocabs:IndicatorTypeVocab-1.1">IP Watchlist</indicator:Type>
+ <indicator:Description>Sample IP Address Indicator for this watchlist. This contains one indicator with a set of three IP addresses in the watchlist.</indicator:Description>
+ <indicator:Observable id="example:Observable-1c798262-a4cd-434d-a958-884d6980c459">
+ <cybox:Object id="example:Object-1980ce43-8e03-490b-863a-ea404d12242e">
+ <cybox:Properties xsi:type="AddressObject:AddressObjectType" category="ipv4-addr">
+ <AddressObject:Address_Value condition="Equals" apply_condition="ANY">10.0.0.0##comma##10.0.0.1##comma##10.0.0.2</AddressObject:Address_Value>
+ </cybox:Properties>
+ </cybox:Object>
+ </indicator:Observable>
+ </stix:Indicator>
+ </stix:Indicators>
+ </stix:STIX_Package>
+
+
+ */
+ String stixDoc = "<!--\n" +
+ "STIX IP Watchlist Example\n" +
+ "\n" +
+ "Copyright (c) 2015, The MITRE Corporation. All rights reserved.\n" +
+ "The contents of this file are subject to the terms of the STIX License located at http://stix.mitre.org/about/termsofuse.html.\n" +
+ "\n" +
+ "This example demonstrates a simple usage of STIX to represent a list of IP address indicators (watchlist of IP addresses). Cyber operations and malware analysis centers often share a list of suspected malicious IP addresses with information about what those IPs might indicate. This STIX package represents a list of three IP addresses with a short dummy description of what they represent.\n" +
+ "\n" +
+ "It demonstrates the use of:\n" +
+ "\n" +
+ "* STIX Indicators\n" +
+ "* CybOX within STIX\n" +
+ "* The CybOX Address Object (IP)\n" +
+ "* CybOX Patterns (apply_condition=\"ANY\")\n" +
+ "* Controlled vocabularies\n" +
+ "\n" +
+ "Created by Mark Davidson\n" +
+ "-->\n" +
+ "<stix:STIX_Package\n" +
+ " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n" +
+ " xmlns:stix=\"http://stix.mitre.org/stix-1\"\n" +
+ " xmlns:indicator=\"http://stix.mitre.org/Indicator-2\"\n" +
+ " xmlns:cybox=\"http://cybox.mitre.org/cybox-2\"\n" +
+ " xmlns:AddressObject=\"http://cybox.mitre.org/objects#AddressObject-2\"\n" +
+ " xmlns:cyboxVocabs=\"http://cybox.mitre.org/default_vocabularies-2\"\n" +
+ " xmlns:stixVocabs=\"http://stix.mitre.org/default_vocabularies-1\"\n" +
+ " xmlns:example=\"http://example.com/\"\n" +
+ " id=\"example:STIXPackage-33fe3b22-0201-47cf-85d0-97c02164528d\"\n" +
+ " timestamp=\"2014-05-08T09:00:00.000000Z\"\n" +
+ " version=\"1.2\">\n" +
+ " <stix:STIX_Header>\n" +
+ " <stix:Title>Example watchlist that contains IP information.</stix:Title>\n" +
+ " <stix:Package_Intent xsi:type=\"stixVocabs:PackageIntentVocab-1.0\">Indicators - Watchlist</stix:Package_Intent>\n" +
+ " </stix:STIX_Header>\n" +
+ " <stix:Indicators>\n" +
+ " <stix:Indicator xsi:type=\"indicator:IndicatorType\" id=\"example:Indicator-33fe3b22-0201-47cf-85d0-97c02164528d\" timestamp=\"2014-05-08T09:00:00.000000Z\">\n" +
+ " <indicator:Type xsi:type=\"stixVocabs:IndicatorTypeVocab-1.1\">IP Watchlist</indicator:Type>\n" +
+ " <indicator:Description>Sample IP Address Indicator for this watchlist. This contains one indicator with a set of three IP addresses in the watchlist.</indicator:Description>\n" +
+ " <indicator:Observable id=\"example:Observable-1c798262-a4cd-434d-a958-884d6980c459\">\n" +
+ " <cybox:Object id=\"example:Object-1980ce43-8e03-490b-863a-ea404d12242e\">\n" +
+ " <cybox:Properties xsi:type=\"AddressObject:AddressObjectType\" category=\"ipv4-addr\">\n" +
+ " <AddressObject:Address_Value condition=\"Equals\" apply_condition=\"ANY\">10.0.0.0##comma##10.0.0.1##comma##10.0.0.2</AddressObject:Address_Value>\n" +
+ " </cybox:Properties>\n" +
+ " </cybox:Object>\n" +
+ " </indicator:Observable>\n" +
+ " </stix:Indicator>\n" +
+ " </stix:Indicators>\n" +
+ "</stix:STIX_Package>\n" +
+ "\n";
+ {
+ /**
+ {
+ "config" : {
+ "stix_address_categories" : "IPV_4_ADDR"
+ }
+ ,"extractor" : "STIX"
+ }
+ */
+ String config = "{\n" +
+ " \"config\" : {\n" +
+ " \"stix_address_categories\" : \"IPV_4_ADDR\"\n" +
+ " }\n" +
+ " ,\"extractor\" : \"STIX\"\n" +
+ " }";
+ ExtractorHandler handler = ExtractorHandler.load(config);
+ Extractor extractor = handler.getExtractor();
+ Iterable<ThreatIntelResults> results = extractor.extract(stixDoc);
+ Assert.assertEquals(3, Iterables.size(results));
+ Assert.assertEquals("10.0.0.0", Iterables.get(results, 0).getKey().indicator);
+ Assert.assertEquals("10.0.0.1", Iterables.get(results, 1).getKey().indicator);
+ Assert.assertEquals("10.0.0.2", Iterables.get(results, 2).getKey().indicator);
+ }
+ {
+ /**
+ {
+ "config" : {
+ }
+ ,"extractor" : "STIX"
+ }
+ */
+ String config = "{\n" +
+ " \"config\" : {\n" +
+ " }\n" +
+ " ,\"extractor\" : \"STIX\"\n" +
+ " }";
+ ExtractorHandler handler = ExtractorHandler.load(config);
+ Extractor extractor = handler.getExtractor();
+ Iterable<ThreatIntelResults> results = extractor.extract(stixDoc);
+ Assert.assertEquals(3, Iterables.size(results));
+ Assert.assertEquals("10.0.0.0", Iterables.get(results, 0).getKey().indicator);
+ Assert.assertEquals("10.0.0.1", Iterables.get(results, 1).getKey().indicator);
+ Assert.assertEquals("10.0.0.2", Iterables.get(results, 2).getKey().indicator);
+ }
+ {
+ /**
+ {
+ "config" : {
+ "stix_address_categories" : "IPV_6_ADDR"
+ }
+ ,"extractor" : "STIX"
+ }
+ */
+ String config = "{\n" +
+ " \"config\" : {\n" +
+ " \"stix_address_categories\" : \"IPV_6_ADDR\"\n" +
+ " }\n" +
+ " ,\"extractor\" : \"STIX\"\n" +
+ " }";
+ ExtractorHandler handler = ExtractorHandler.load(config);
+ Extractor extractor = handler.getExtractor();
+ Iterable<ThreatIntelResults> results = extractor.extract(stixDoc);
+ Assert.assertEquals(0, Iterables.size(results));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/HBaseConverterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/HBaseConverterTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/HBaseConverterTest.java
new file mode 100644
index 0000000..25cdbc7
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/HBaseConverterTest.java
@@ -0,0 +1,53 @@
+package org.apache.metron.dataloads.hbase;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.metron.threatintel.hbase.Converter;
+import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/3/16.
+ */
+public class HBaseConverterTest {
+ ThreatIntelKey key = new ThreatIntelKey("google");
+ Map<String, String> value = new HashMap<String, String>() {{
+ put("foo", "bar");
+ put("grok", "baz");
+ }};
+ Long timestamp = 7L;
+ ThreatIntelResults results = new ThreatIntelResults(key, value);
+ @Test
+ public void testKeySerialization() {
+ byte[] serialized = key.toBytes();
+ ThreatIntelKey deserialized = ThreatIntelKey.fromBytes(serialized);
+ Assert.assertEquals(key, deserialized);
+ }
+
+ @Test
+ public void testPut() throws IOException {
+ Put put = Converter.INSTANCE.toPut("cf", key, value, timestamp);
+ Map.Entry<ThreatIntelResults, Long> converted= Converter.INSTANCE.fromPut(put, "cf");
+ Assert.assertEquals(new AbstractMap.SimpleEntry<>(results, timestamp), converted);
+ }
+ @Test
+ public void testResult() throws IOException {
+ Result r = Converter.INSTANCE.toResult("cf", key, value, timestamp);
+ Map.Entry<ThreatIntelResults, Long> converted= Converter.INSTANCE.fromResult(r, "cf");
+ Assert.assertEquals(new AbstractMap.SimpleEntry<>(results, timestamp), converted);
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ Get get = Converter.INSTANCE.toGet("cf", key);
+ Assert.assertArrayEquals(key.toBytes(), get.getRow());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperIntegrationTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperIntegrationTest.java
new file mode 100644
index 0000000..179337e
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperIntegrationTest.java
@@ -0,0 +1,91 @@
+package org.apache.metron.dataloads.hbase.mr;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.metron.dataloads.ThreatIntelBulkLoader;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.apache.metron.threatintel.hbase.Converter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public class BulkLoadMapperIntegrationTest {
+ /** The test util. */
+ private HBaseTestingUtility testUtil;
+
+ /** The test table. */
+ private HTable testTable;
+ String tableName = "malicious_domains";
+ String cf = "cf";
+ Configuration config = null;
+ @Before
+ public void setup() throws Exception {
+ Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true);
+ config = kv.getValue();
+ testUtil = kv.getKey();
+ testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
+ }
+
+ @After
+ public void teardown() throws Exception {
+ HBaseUtil.INSTANCE.teardown(testUtil);
+ }
+
+ @Test
+ public void test() throws IOException, ClassNotFoundException, InterruptedException {
+ /**
+ {
+ "config" : {
+ "columns" : {
+ "host" : 0
+ ,"meta" : 2
+ }
+ ,"indicator_column" : "host"
+ ,"separator" : ","
+ }
+ ,"extractor" : "CSV"
+ }
+ */
+ final String extractorConfig = "{\n" +
+ " \"config\" : {\n" +
+ " \"columns\" : [\"host:0\",\"meta:2\"]\n" +
+ " ,\"indicator_column\" : \"host\"\n" +
+ " ,\"separator\" : \",\"\n" +
+ " }\n" +
+ " ,\"extractor\" : \"CSV\"\n" +
+ " }";
+ Assert.assertNotNull(testTable);
+ FileSystem fs = FileSystem.get(config);
+ String contents = "google.com,1,foo";
+ HBaseUtil.INSTANCE.writeFile(contents, new Path("input.csv"), fs);
+ Job job = ThreatIntelBulkLoader.createJob(config, "input.csv", tableName, cf, extractorConfig, 0L);
+ Assert.assertTrue(job.waitForCompletion(true));
+ ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
+ List<Map.Entry<ThreatIntelResults, Long>> results = new ArrayList<>();
+ for(Result r : scanner) {
+ results.add(Converter.INSTANCE.fromResult(r, cf));
+ }
+ Assert.assertEquals(1, results.size());
+ Assert.assertEquals(0L, (long)results.get(0).getValue());
+ Assert.assertEquals(results.get(0).getKey().getKey().indicator, "google.com");
+ Assert.assertEquals(results.get(0).getKey().getValue().size(), 2);
+ Assert.assertEquals(results.get(0).getKey().getValue().get("meta"), "foo");
+ Assert.assertEquals(results.get(0).getKey().getValue().get("host"), "google.com");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperTest.java
new file mode 100644
index 0000000..1d30b5c
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperTest.java
@@ -0,0 +1,78 @@
+package org.apache.metron.dataloads.hbase.mr;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.apache.metron.threatintel.hbase.Converter;
+import org.apache.metron.threatintel.ThreatIntelKey;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/3/16.
+ */
+public class BulkLoadMapperTest {
+ @Test
+ public void testMapper() throws IOException, InterruptedException {
+ /**
+ {
+ "config" : {
+ "columns" : {
+ "host" : 0
+ ,"meta" : 2
+ }
+ ,"indicator_column" : "host"
+ ,"separator" : ","
+ }
+ ,"extractor" : "CSV"
+ }
+ */
+ final String extractorConfig = "{\n" +
+ " \"config\" : {\n" +
+ " \"columns\" : [\"host:0\",\"meta:2\"]\n" +
+ " ,\"indicator_column\" : \"host\"\n" +
+ " ,\"separator\" : \",\"\n" +
+ " }\n" +
+ " ,\"extractor\" : \"CSV\"\n" +
+ " }";
+
+ final Map<ImmutableBytesWritable, Put> puts = new HashMap<>();
+ BulkLoadMapper mapper = new BulkLoadMapper() {
+ @Override
+ protected void write(ImmutableBytesWritable key, Put value, Context context) throws IOException, InterruptedException {
+ puts.put(key, value);
+ }
+ };
+ mapper.initialize(new Configuration() {{
+ set(BulkLoadMapper.COLUMN_FAMILY_KEY, "cf");
+ set(BulkLoadMapper.CONFIG_KEY, extractorConfig);
+ set(BulkLoadMapper.LAST_SEEN_KEY, "0");
+ }});
+ {
+ mapper.map(null, new Text("#google.com,1,foo"), null);
+ Assert.assertTrue(puts.size() == 0);
+ }
+ {
+ mapper.map(null, new Text("google.com,1,foo"), null);
+ Assert.assertTrue(puts.size() == 1);
+ ThreatIntelKey expectedKey = new ThreatIntelKey() {{
+ indicator = "google.com";
+ }};
+ Put put = puts.get(new ImmutableBytesWritable(expectedKey.toBytes()));
+ Assert.assertNotNull(puts);
+ Map.Entry<ThreatIntelResults, Long> results = Converter.INSTANCE.fromPut(put, "cf");
+ Assert.assertEquals(0L, (long)results.getValue());
+ Assert.assertEquals(results.getKey().getKey().indicator, "google.com");
+ Assert.assertEquals(results.getKey().getValue().size(), 2);
+ Assert.assertEquals(results.getKey().getValue().get("meta"), "foo");
+ Assert.assertEquals(results.getKey().getValue().get("host"), "google.com");
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/HBaseUtil.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/HBaseUtil.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/HBaseUtil.java
new file mode 100644
index 0000000..d243a65
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/HBaseUtil.java
@@ -0,0 +1,58 @@
+package org.apache.metron.dataloads.hbase.mr;
+
+import com.google.common.base.Joiner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+
+import java.io.*;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public enum HBaseUtil {
+ INSTANCE;
+ public Map.Entry<HBaseTestingUtility,Configuration> create(boolean startMRCluster) throws Exception {
+ Configuration config = HBaseConfiguration.create();
+ config.set("hbase.master.hostname", "localhost");
+ config.set("hbase.regionserver.hostname", "localhost");
+ HBaseTestingUtility testUtil = new HBaseTestingUtility(config);
+
+ testUtil.startMiniCluster(1);
+ if(startMRCluster) {
+ testUtil.startMiniMapReduceCluster();
+ }
+ return new AbstractMap.SimpleEntry<>(testUtil, config);
+ }
+ public void writeFile(String contents, Path filename, FileSystem fs) throws IOException {
+ FSDataOutputStream os = fs.create(filename, true);
+ PrintWriter pw = new PrintWriter(new OutputStreamWriter(os));
+ pw.print(contents);
+ pw.flush();
+ os.close();
+ }
+
+ public String readFile(FileSystem fs, Path filename) throws IOException {
+ FSDataInputStream in = fs.open(filename);
+ BufferedReader br = new BufferedReader(new InputStreamReader(in));
+ List<String> contents = new ArrayList<>();
+ for(String line = null;(line = br.readLine()) != null;) {
+ contents.add(line);
+ }
+ return Joiner.on('\n').join(contents);
+ }
+
+ public void teardown(HBaseTestingUtility testUtil) throws Exception {
+ testUtil.shutdownMiniMapReduceCluster();
+ testUtil.shutdownMiniCluster();
+ testUtil.cleanupTestDir();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
new file mode 100644
index 0000000..3979224
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
@@ -0,0 +1,124 @@
+package org.apache.metron.dataloads.hbase.mr;
+
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.metron.dataloads.LeastRecentlyUsedPruner;
+import org.apache.metron.reference.lookup.LookupKey;
+import org.apache.metron.reference.lookup.accesstracker.AccessTrackerUtil;
+import org.apache.metron.reference.lookup.accesstracker.BloomAccessTracker;
+import org.apache.metron.reference.lookup.accesstracker.PersistentAccessTracker;
+import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.threatintel.hbase.Converter;
+import org.apache.metron.threatintel.hbase.ThreatIntelLookup;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/8/16.
+ */
+public class LeastRecentlyUsedPrunerIntegrationTest {
+ /** The test util. */
+ private HBaseTestingUtility testUtil;
+
+ /** The test table. */
+ private HTable testTable;
+ private HTable atTable;
+ String tableName = "malicious_domains";
+ String cf = "cf";
+ String atTableName = "access_trackers";
+ String atCF= "cf";
+ Configuration config = null;
+ @Before
+ public void setup() throws Exception {
+ Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true);
+ config = kv.getValue();
+ testUtil = kv.getKey();
+ testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
+ atTable = testUtil.createTable(Bytes.toBytes(atTableName), Bytes.toBytes(atCF));
+ }
+ @After
+ public void teardown() throws Exception {
+ HBaseUtil.INSTANCE.teardown(testUtil);
+ }
+ public List<LookupKey> getKeys(int start, int end) {
+ List<LookupKey> keys = new ArrayList<>();
+ for(int i = start;i < end;++i) {
+ keys.add(new ThreatIntelKey("key-" + i));
+ }
+ return keys;
+ }
+ @Test
+ public void test() throws Exception {
+ long ts = System.currentTimeMillis();
+ BloomAccessTracker bat = new BloomAccessTracker("tracker1", 100, 0.03);
+ PersistentAccessTracker pat = new PersistentAccessTracker(tableName, "0", atTable, atCF, bat, 0L);
+ ThreatIntelLookup lookup = new ThreatIntelLookup(testTable, cf, pat);
+ List<LookupKey> goodKeysHalf = getKeys(0, 5);
+ List<LookupKey> goodKeysOtherHalf = getKeys(5, 10);
+ Iterable<LookupKey> goodKeys = Iterables.concat(goodKeysHalf, goodKeysOtherHalf);
+ List<LookupKey> badKey = getKeys(10, 11);
+ for(LookupKey k : goodKeysHalf) {
+ testTable.put(Converter.INSTANCE.toPut(cf, (ThreatIntelKey) k
+ , new HashMap<String, String>() {{
+ put("k", "dummy");
+ }}
+ , 1L
+ )
+ );
+ Assert.assertTrue(lookup.exists((ThreatIntelKey)k, testTable, true));
+ }
+ pat.persist(true);
+ for(LookupKey k : goodKeysOtherHalf) {
+ testTable.put(Converter.INSTANCE.toPut(cf, (ThreatIntelKey) k
+ , new HashMap<String, String>() {{
+ put("k", "dummy");
+ }}
+ , 1L
+ )
+ );
+ Assert.assertTrue(lookup.exists((ThreatIntelKey)k, testTable, true));
+ }
+ testUtil.flush();
+ Assert.assertFalse(lookup.getAccessTracker().hasSeen(goodKeysHalf.get(0)));
+ for(LookupKey k : goodKeysOtherHalf) {
+ Assert.assertTrue(lookup.getAccessTracker().hasSeen(k));
+ }
+ pat.persist(true);
+ {
+ testTable.put(Converter.INSTANCE.toPut(cf, (ThreatIntelKey) badKey.get(0)
+ , new HashMap<String, String>() {{
+ put("k", "dummy");
+ }}
+ , 1L
+ )
+ );
+ }
+ testUtil.flush();
+ Assert.assertFalse(lookup.getAccessTracker().hasSeen(badKey.get(0)));
+
+
+ Job job = LeastRecentlyUsedPruner.createJob(config, tableName, cf, atTableName, atCF, ts);
+ Assert.assertTrue(job.waitForCompletion(true));
+ for(LookupKey k : goodKeys) {
+ Assert.assertTrue(lookup.exists((ThreatIntelKey)k, testTable, true));
+ }
+ for(LookupKey k : badKey) {
+ Assert.assertFalse(lookup.exists((ThreatIntelKey)k, testTable, true));
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataServices/src/main/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataServices/src/main/resources/hbase-site.xml b/metron-streaming/Metron-DataServices/src/main/resources/hbase-site.xml
deleted file mode 100644
index 5c3c819..0000000
--- a/metron-streaming/Metron-DataServices/src/main/resources/hbase-site.xml
+++ /dev/null
@@ -1,127 +0,0 @@
-<!--Tue Apr 1 18:16:39 2014-->
- <configuration>
- <property>
- <name>hbase.tmp.dir</name>
- <value>/disk/h/hbase</value>
- </property>
- <property>
- <name>hbase.hregion.memstore.chunkpool.maxsize</name>
- <value>0.5</value>
- </property>
- <property>
- <name>hbase.regionserver.codecs</name>
- <value>lzo,gz,snappy</value>
- </property>
- <property>
- <name>hbase.hstore.flush.retries.number</name>
- <value>120</value>
- </property>
- <property>
- <name>hbase.client.keyvalue.maxsize</name>
- <value>10485760</value>
- </property>
- <property>
- <name>hbase.rootdir</name>
- <value>hdfs://nn1:8020/apps/hbase/data</value>
- </property>
- <property>
- <name>hbase.defaults.for.version.skip</name>
- <value>true</value>
- </property>
- <property>
- <name>hbase.client.scanner.caching</name>
- <value>100</value>
- </property>
- <property>
- <name>hbase.superuser</name>
- <value>hbase</value>
- </property>
- <property>
- <name>hfile.block.cache.size</name>
- <value>0.40</value>
- </property>
- <property>
- <name>hbase.regionserver.checksum.verify</name>
- <value>true</value>
- </property>
- <property>
- <name>hbase.hregion.memstore.mslab.enabled</name>
- <value>true</value>
- </property>
- <property>
- <name>hbase.hregion.max.filesize</name>
- <value>107374182400</value>
- </property>
- <property>
- <name>hbase.cluster.distributed</name>
- <value>true</value>
- </property>
- <property>
- <name>zookeeper.session.timeout</name>
- <value>30000</value>
- </property>
- <property>
- <name>zookeeper.znode.parent</name>
- <value>/hbase-unsecure</value>
- </property>
- <property>
- <name>hbase.regionserver.global.memstore.lowerLimit</name>
- <value>0.38</value>
- </property>
- <property>
- <name>hbase.regionserver.handler.count</name>
- <value>240</value>
- </property>
- <property>
- <name>hbase.hregion.memstore.mslab.chunksize</name>
- <value>8388608</value>
- </property>
- <property>
- <name>hbase.zookeeper.quorum</name>
- <value>zkpr1,zkpr2,zkpr3</value>
- </property>
- <property>
- <name>hbase.zookeeper.useMulti</name>
- <value>true</value>
- </property>
- <property>
- <name>hbase.hregion.majorcompaction</name>
- <value>86400000</value>
- </property>
- <property>
- <name>hbase.hstore.blockingStoreFiles</name>
- <value>200</value>
- </property>
- <property>
- <name>hbase.zookeeper.property.clientPort</name>
- <value>2181</value>
- </property>
- <property>
- <name>hbase.hregion.memstore.flush.size</name>
- <value>134217728</value>
- </property>
- <property>
- <name>hbase.security.authorization</name>
- <value>false</value>
- </property>
- <property>
- <name>hbase.regionserver.global.memstore.upperLimit</name>
- <value>0.4</value>
- </property>
- <property>
- <name>hbase.hstore.compactionThreshold</name>
- <value>4</value>
- </property>
- <property>
- <name>hbase.hregion.memstore.block.multiplier</name>
- <value>8</value>
- </property>
- <property>
- <name>hbase.security.authentication</name>
- <value>simple</value>
- </property>
- <property>
- <name>dfs.client.read.shortcircuit</name>
- <value>true</value>
- </property>
- </configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/pom.xml b/metron-streaming/Metron-EnrichmentAdapters/pom.xml
index a37a032..45d068f 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/pom.xml
+++ b/metron-streaming/Metron-EnrichmentAdapters/pom.xml
@@ -10,156 +10,160 @@
the specific language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.metron</groupId>
- <artifactId>Metron-Streaming</artifactId>
- <version>0.6BETA</version>
- </parent>
- <artifactId>Metron-EnrichmentAdapters</artifactId>
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-Streaming</artifactId>
+ <version>0.6BETA</version>
+ </parent>
+ <artifactId>Metron-EnrichmentAdapters</artifactId>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <mysql.version>5.1.31</mysql.version>
- <slf4j.version>1.7.7</slf4j.version>
- <hbase.client.version>0.96.1-hadoop2</hbase.client.version>
- <storm.hdfs.version>0.1.2</storm.hdfs.version>
- <guava.version>${global_guava_version}</guava.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.metron</groupId>
- <artifactId>Metron-Common</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>${slf4j.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>${mysql.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>${hbase.client.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>${global_hadoop_version}</version>
- <exclusions>
- <exclusion>
- <artifactId>servlet-api</artifactId>
- <groupId>javax.servlet</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${global_storm_version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <artifactId>servlet-api</artifactId>
- <groupId>javax.servlet</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${global_hadoop_version}</version>
- <exclusions>
- <exclusion>
- <artifactId>servlet-api</artifactId>
- <groupId>javax.servlet</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>${global_junit_version}</version>
- </dependency>
- <dependency>
- <groupId>commons-validator</groupId>
- <artifactId>commons-validator</artifactId>
- <version>1.4.0</version>
- </dependency>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ <mysql.version>5.1.31</mysql.version>
+ <slf4j.version>1.7.7</slf4j.version>
+ <storm.hdfs.version>0.1.2</storm.hdfs.version>
+ <guava.version>${global_guava_version}</guava.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-Common</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>${mysql.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${global_hbase_version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${global_hadoop_version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${global_storm_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${global_hadoop_version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${global_junit_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-validator</groupId>
+ <artifactId>commons-validator</artifactId>
+ <version>1.4.0</version>
+ </dependency>
- </dependencies>
- <reporting>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <systemProperties>
- <property>
- <name>mode</name>
- <value>global</value>
- </property>
- </systemProperties>
- </configuration>
- </plugin>
- <!-- Normally, dependency report takes time, skip it -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-project-info-reports-plugin</artifactId>
- <version>2.7</version>
+ </dependencies>
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemProperties>
+ <property>
+ <name>mode</name>
+ <value>global</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
+ <!-- Normally, dependency report takes time, skip it -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-project-info-reports-plugin</artifactId>
+ <version>2.7</version>
- <configuration>
- <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>emma-maven-plugin</artifactId>
- <version>1.0-alpha-3</version>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-pmd-plugin</artifactId>
- <configuration>
- <targetJdk>1.7</targetJdk>
- </configuration>
- </plugin>
- </plugins>
- </reporting>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <source>1.7</source>
- <target>1.7</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
+ <configuration>
+ <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>emma-maven-plugin</artifactId>
+ <version>1.0-alpha-3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-pmd-plugin</artifactId>
+ <configuration>
+ <targetJdk>1.7</targetJdk>
+ </configuration>
+ </plugin>
+ </plugins>
+ </reporting>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
index bfe9ef6..2ebf1b0 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
@@ -53,6 +53,11 @@ public class CIFHbaseAdapter implements EnrichmentAdapter<String>,Serializable {
private static final Logger LOGGER = Logger
.getLogger(CIFHbaseAdapter.class);
+ @Override
+ public void logAccess(String value) {
+
+ }
+
public JSONObject enrich(String metadata) {
JSONObject output = new JSONObject();
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
index 89721a7..93ec2ef 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
@@ -11,6 +11,11 @@ public class GeoAdapter extends JdbcAdapter {
private InetAddressValidator ipvalidator = new InetAddressValidator();
+ @Override
+ public void logAccess(String value) {
+
+ }
+
@SuppressWarnings("unchecked")
@Override
public JSONObject enrich(String value) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
index 260d878..f4822de 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
@@ -35,6 +35,11 @@ public class HostFromJSONListAdapter extends AbstractHostAdapter {
return false;
}
+ @Override
+ public void logAccess(String value) {
+
+ }
+
@SuppressWarnings("unchecked")
@Override
public JSONObject enrich(String metadata) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
index 4e4586e..4ae0329 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
@@ -32,7 +32,7 @@ public class HostFromPropertiesFileAdapter extends AbstractHostAdapter {
}
@Override
- public boolean initializeAdapter()
+ public boolean initializeAdapter()
{
if(_known_hosts.size() > 0)
@@ -41,6 +41,11 @@ public class HostFromPropertiesFileAdapter extends AbstractHostAdapter {
return false;
}
+ @Override
+ public void logAccess(String value) {
+
+ }
+
@SuppressWarnings("unchecked")
@Override
public JSONObject enrich(String metadata) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
index ca4f8ea..83cb856 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
@@ -32,8 +32,8 @@ public abstract class JdbcAdapter implements EnrichmentAdapter<String>,
if (!InetAddress.getByName(host).isReachable(500)) {
throw new Exception("Unable to reach host " + host);
}
- Class.forName(config.getClassName());
- connection = DriverManager.getConnection(config.getJdbcUrl());
+ Class.forName(this.config.getClassName());
+ connection = DriverManager.getConnection(this.config.getJdbcUrl());
connection.setReadOnly(true);
if (!connection.isValid(0))
throw new Exception("Invalid connection string....");
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
index cfd47c8..fde0cbd 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
@@ -19,9 +19,7 @@ package org.apache.metron.enrichment.adapters.threat;
import java.io.IOException;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
@@ -36,7 +34,6 @@ import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.log4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,6 +59,11 @@ public class ThreatHbaseAdapter implements EnrichmentAdapter<String>,
private static final Logger LOGGER = Logger
.getLogger(ThreatHbaseAdapter.class);
+ @Override
+ public void logAccess(String value) {
+
+ }
+
public JSONObject enrich(String metadata) {
JSONObject output = new JSONObject();
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
index bbcd0bd..ce2efee 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
@@ -19,7 +19,6 @@ package org.apache.metron.enrichment.adapters.whois;
import java.io.IOException;
import java.io.Serializable;
-import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -32,7 +31,6 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
import org.json.simple.JSONObject;
-import com.google.common.base.Joiner;
import org.apache.metron.tldextractor.BasicTldExtractor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,6 +94,11 @@ public class WhoisHBaseAdapter implements EnrichmentAdapter<String>,
}
+ @Override
+ public void logAccess(String value) {
+
+ }
+
@SuppressWarnings({ "unchecked", "deprecation" })
public JSONObject enrich(String metadataIn) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
index f16c2fb..2f530a9 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
@@ -19,6 +19,7 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
protected List<Enrichment> enrichments;
+ protected String type = "enrichment";
/**
* @param enrichments A class for sending tuples to enrichment bolt
* @return Instance of this class
@@ -28,6 +29,11 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
return this;
}
+ public EnrichmentJoinBolt withType(String type) {
+ this.type = type;
+ return this;
+ }
+
@Override
public void prepare(Map map, TopologyContext topologyContext) {
@@ -46,12 +52,17 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
@Override
public JSONObject joinValues(Map<String, JSONObject> streamValueMap) {
JSONObject message = new JSONObject();
- message.put("message", streamValueMap.get("message"));
+ if(streamValueMap.get("message").containsKey("message")) {
+ message = streamValueMap.get("message");
+ }
+ else {
+ message.put("message", streamValueMap.get("message"));
+ }
JSONObject enrichment = new JSONObject();
for(String streamId: getStreamIds()) {
enrichment.put(streamId, streamValueMap.get(streamId));
}
- message.put("enrichment", enrichment);
+ message.put(type, enrichment);
return message;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 1563652..72c7f51 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -21,9 +21,11 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import backtype.storm.topology.base.BaseRichBolt;
+import com.google.common.base.Splitter;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Iterables;
import org.apache.metron.domain.Enrichment;
import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
import org.json.simple.JSONObject;
@@ -79,8 +81,7 @@ public class GenericEnrichmentBolt extends BaseRichBolt {
* @return Instance of this class
*/
- public GenericEnrichmentBolt withEnrichment
- (Enrichment<EnrichmentAdapter> enrichment) {
+ public GenericEnrichmentBolt withEnrichment(Enrichment<EnrichmentAdapter> enrichment) {
this.streamId = enrichment.getName();
this.enrichment = enrichment;
this.adapter = this.enrichment.getAdapter();
@@ -132,18 +133,17 @@ public class GenericEnrichmentBolt extends BaseRichBolt {
.build(loader);
boolean success = adapter.initializeAdapter();
if (!success) {
- LOG.error("[Metron] EnrichmentBolt could not initialize adapter");
+ LOG.error("[Metron] EnrichmentSplitterBolt could not initialize adapter");
throw new IllegalStateException("Could not initialize adapter...");
}
}
@Override
- public void declareOutputFields(OutputFieldsDeclarer declearer) {
- declearer.declareStream(streamId, new Fields("key", "message"));
- declearer.declareStream("error", new Fields("message"));
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declareStream(streamId, new Fields("key", "message"));
+ declarer.declareStream("error", new Fields("message"));
}
-
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
@@ -159,18 +159,19 @@ public class GenericEnrichmentBolt extends BaseRichBolt {
JSONObject enrichedField = new JSONObject();
String value = (String) rawMessage.get(field);
if (value != null && value.length() != 0) {
+ adapter.logAccess(value);
enrichedField = cache.getUnchecked(value);
if (enrichedField == null)
throw new Exception("[Metron] Could not enrich string: "
+ value);
}
- enrichedMessage.put(field, enrichedField);
+ enrichedMessage.put(Iterables.getLast(Splitter.on('/').split(field)), enrichedField);
}
if (!enrichedMessage.isEmpty()) {
collector.emit(streamId, new Values(key, enrichedMessage));
}
} catch (Exception e) {
- LOG.error("[Metron] Unable to enrich message: " + rawMessage);
+ LOG.error("[Metron] Unable to enrich message: " + rawMessage, e);
JSONObject error = ErrorGenerator.generateErrorMessage("Enrichment problem: " + rawMessage, e);
if (key != null) {
collector.emit(streamId, new Values(key, enrichedMessage));
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
new file mode 100644
index 0000000..3a6b4cb
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
@@ -0,0 +1,97 @@
+package org.apache.metron.threatintel;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.reference.lookup.Lookup;
+import org.apache.metron.reference.lookup.accesstracker.BloomAccessTracker;
+import org.apache.metron.reference.lookup.accesstracker.PersistentAccessTracker;
+import org.apache.metron.threatintel.hbase.ThreatIntelLookup;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Created by cstella on 2/10/16.
+ */
+public class ThreatIntelAdapter implements EnrichmentAdapter<String>,Serializable {
+ protected static final Logger _LOG = LoggerFactory.getLogger(ThreatIntelAdapter.class);
+ protected ThreatIntelConfig config;
+ protected ThreatIntelLookup lookup;
+
+ public ThreatIntelAdapter() {
+ }
+ public ThreatIntelAdapter(ThreatIntelConfig config) {
+ withConfig(config);
+ }
+
+ public ThreatIntelAdapter withConfig(ThreatIntelConfig config) {
+ this.config = config;
+ return this;
+ }
+
+ @Override
+ public void logAccess(String value) {
+ lookup.getAccessTracker().logAccess(new ThreatIntelKey(value));
+ }
+
+ @Override
+ public JSONObject enrich(String value) {
+ JSONObject enriched = new JSONObject();
+ boolean isThreat = false;
+ try {
+ isThreat = lookup.exists(new ThreatIntelKey(value), lookup.getTable(), false);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to retrieve value", e);
+ }
+ if(isThreat) {
+ enriched.put("threat_source", config.getHBaseTable());
+ _LOG.trace("Enriched value => " + enriched);
+ }
+ //throw new RuntimeException("Unable to retrieve value " + value);
+ return enriched;
+ }
+
+ @Override
+ public boolean initializeAdapter() {
+ PersistentAccessTracker accessTracker;
+ String hbaseTable = config.getHBaseTable();
+ int expectedInsertions = config.getExpectedInsertions();
+ double falsePositives = config.getFalsePositiveRate();
+ String trackerHBaseTable = config.getTrackerHBaseTable();
+ String trackerHBaseCF = config.getTrackerHBaseCF();
+ long millisecondsBetweenPersist = config.getMillisecondsBetweenPersists();
+ BloomAccessTracker bat = new BloomAccessTracker(hbaseTable, expectedInsertions, falsePositives);
+ Configuration hbaseConfig = HBaseConfiguration.create();
+ try {
+ accessTracker = new PersistentAccessTracker( hbaseTable
+ , UUID.randomUUID().toString()
+ , config.getProvider().getTable(hbaseConfig, trackerHBaseTable)
+ , trackerHBaseCF
+ , bat
+ , millisecondsBetweenPersist
+ );
+ lookup = new ThreatIntelLookup(config.getProvider().getTable(hbaseConfig, hbaseTable), config.getHBaseCF(), accessTracker);
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to initialize ThreatIntelAdapter", e);
+ }
+
+ return true;
+ }
+
+ @Override
+ public void cleanup() {
+ try {
+ lookup.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to cleanup access tracker", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelConfig.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelConfig.java
new file mode 100644
index 0000000..cb1f5e3
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelConfig.java
@@ -0,0 +1,111 @@
+package org.apache.metron.threatintel;
+
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Created by cstella on 2/10/16.
+ */
+public class ThreatIntelConfig implements Serializable {
+ public static final long MS_IN_HOUR = 10000*60*60;
+ private String hBaseTable;
+ private String hBaseCF;
+ private double falsePositiveRate = 0.03;
+ private int expectedInsertions = 100000;
+ private String trackerHBaseTable;
+ private String trackerHBaseCF;
+ private long millisecondsBetweenPersists = 2*MS_IN_HOUR;
+ private TableProvider provider = new HTableProvider();
+
+ public String getHBaseTable() {
+ return hBaseTable;
+ }
+
+ public int getExpectedInsertions() {
+ return expectedInsertions;
+ }
+
+ public double getFalsePositiveRate() {
+ return falsePositiveRate;
+ }
+
+ public String getTrackerHBaseTable() {
+ return trackerHBaseTable;
+ }
+
+ public String getTrackerHBaseCF() {
+ return trackerHBaseCF;
+ }
+
+ public long getMillisecondsBetweenPersists() {
+ return millisecondsBetweenPersists;
+ }
+
+ public String getHBaseCF() {
+ return hBaseCF;
+ }
+
+ public TableProvider getProvider() {
+ return provider;
+ }
+
+ public ThreatIntelConfig withProviderImpl(String connectorImpl) {
+ if(connectorImpl == null || connectorImpl.length() == 0 || connectorImpl.charAt(0) == '$') {
+ provider = new HTableProvider();
+ }
+ else {
+ try {
+ Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(connectorImpl);
+ provider = clazz.getConstructor().newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalStateException("Unable to instantiate connector.", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException("Unable to instantiate connector: illegal access", e);
+ } catch (InvocationTargetException e) {
+ throw new IllegalStateException("Unable to instantiate connector", e);
+ } catch (NoSuchMethodException e) {
+ throw new IllegalStateException("Unable to instantiate connector: no such method", e);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("Unable to instantiate connector: class not found", e);
+ }
+ }
+ return this;
+ }
+
+ public ThreatIntelConfig withTrackerHBaseTable(String hBaseTable) {
+ this.trackerHBaseTable = hBaseTable;
+ return this;
+ }
+
+ public ThreatIntelConfig withTrackerHBaseCF(String cf) {
+ this.trackerHBaseCF = cf;
+ return this;
+ }
+ public ThreatIntelConfig withHBaseTable(String hBaseTable) {
+ this.hBaseTable = hBaseTable;
+ return this;
+ }
+
+ public ThreatIntelConfig withHBaseCF(String cf) {
+ this.hBaseCF= cf;
+ return this;
+ }
+
+ public ThreatIntelConfig withFalsePositiveRate(double falsePositiveRate) {
+ this.falsePositiveRate = falsePositiveRate;
+ return this;
+ }
+
+ public ThreatIntelConfig withExpectedInsertions(int expectedInsertions) {
+ this.expectedInsertions = expectedInsertions;
+ return this;
+ }
+
+ public ThreatIntelConfig withMillisecondsBetweenPersists(long millisecondsBetweenPersists) {
+ this.millisecondsBetweenPersists = millisecondsBetweenPersists;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/resources/hbase-site.xml b/metron-streaming/Metron-EnrichmentAdapters/src/main/resources/hbase-site.xml
deleted file mode 100644
index 8d812a9..0000000
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/resources/hbase-site.xml
+++ /dev/null
@@ -1,131 +0,0 @@
-<!--Tue Apr 1 18:16:39 2014-->
- <configuration>
- <property>
- <name>hbase.tmp.dir</name>
- <value>/disk/h/hbase</value>
- </property>
- <property>
- <name>hbase.hregion.memstore.chunkpool.maxsize</name>
- <value>0.5</value>
- </property>
- <property>
- <name>hbase.regionserver.codecs</name>
- <value>lzo,gz,snappy</value>
- </property>
- <property>
- <name>hbase.hstore.flush.retries.number</name>
- <value>120</value>
- </property>
- <property>
- <name>hbase.client.keyvalue.maxsize</name>
- <value>10485760</value>
- </property>
- <property>
- <name>hbase.rootdir</name>
- <value>hdfs://nn1:8020/apps/hbase/data</value>
- </property>
- <property>
- <name>hbase.defaults.for.version.skip</name>
- <value>true</value>
- </property>
- <property>
- <name>hbase.client.scanner.caching</name>
- <value>100</value>
- </property>
- <property>
- <name>hbase.superuser</name>
- <value>hbase</value>
- </property>
- <property>
- <name>hfile.block.cache.size</name>
- <value>0.40</value>
- </property>
- <property>
- <name>hbase.regionserver.checksum.verify</name>
- <value>true</value>
- </property>
- <property>
- <name>hbase.hregion.memstore.mslab.enabled</name>
- <value>true</value>
- </property>
- <property>
- <name>hbase.hregion.max.filesize</name>
- <value>107374182400</value>
- </property>
- <property>
- <name>hbase.cluster.distributed</name>
- <value>true</value>
- </property>
- <property>
- <name>zookeeper.session.timeout</name>
- <value>30000</value>
- </property>
- <property>
- <name>zookeeper.znode.parent</name>
- <value>/hbase-unsecure</value>
- </property>
- <property>
- <name>hbase.regionserver.global.memstore.lowerLimit</name>
- <value>0.38</value>
- </property>
- <property>
- <name>hbase.regionserver.handler.count</name>
- <value>240</value>
- </property>
- <property>
- <name>hbase.hregion.memstore.mslab.chunksize</name>
- <value>8388608</value>
- </property>
- <property>
- <name>hbase.zookeeper.quorum</name>
- <value>zkpr1,zkpr2,zkpr3</value>
- </property>
- <property>
- <name>hbase.zookeeper.useMulti</name>
- <value>true</value>
- </property>
- <property>
- <name>hbase.hregion.majorcompaction</name>
- <value>86400000</value>
- </property>
- <property>
- <name>hbase.hstore.blockingStoreFiles</name>
- <value>200</value>
- </property>
- <property>
- <name>hbase.zookeeper.property.clientPort</name>
- <value>2181</value>
- </property>
- <property>
- <name>hbase.hregion.memstore.flush.size</name>
- <value>134217728</value>
- </property>
- <property>
- <name>hbase.security.authorization</name>
- <value>false</value>
- </property>
- <property>
- <name>hbase.regionserver.global.memstore.upperLimit</name>
- <value>0.4</value>
- </property>
- <property>
- <name>hbase.hstore.compactionThreshold</name>
- <value>4</value>
- </property>
- <property>
- <name>hbase.hregion.memstore.block.multiplier</name>
- <value>8</value>
- </property>
- <property>
- <name>hbase.security.authentication</name>
- <value>simple</value>
- </property>
- <property>
- <name>dfs.client.read.shortcircuit</name>
- <value>true</value>
- </property>
- <property>
- <name>dfs.domain.socket.path</name>
- <value>/var/run/hdfs/dn_socket</value>
- </property>
- </configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java
index 4a39edb..eb4491d 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java
@@ -21,8 +21,8 @@ package org.apache.metron.enrichment.adapters.cif;
import java.net.InetAddress;
import java.util.Properties;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
import org.apache.metron.test.AbstractTestContext;
-import org.apache.metron.enrichment.adapters.cif.CIFHbaseAdapter;
import org.junit.Assert;
@@ -121,7 +121,7 @@ public class CIFHbaseAdapterTest extends AbstractTestContext {
}
/**
- * Test method for {@link org.apache.metron.enrichment.adapters.cif.CIFHbaseAdapter#initializeAdapter()}.
+ * Test method for {@link EnrichmentAdapter#initializeAdapter()}.
*/
public void testInitializeAdapter() {
if(skipTests(this.getMode())){