You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2016/04/26 16:46:19 UTC

[31/51] [partial] incubator-metron git commit: METRON-113 Project Reorganization (merrimanr) closes apache/incubator-metron#88

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/stix/StixExtractorTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/stix/StixExtractorTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/stix/StixExtractorTest.java
new file mode 100644
index 0000000..dba57dd
--- /dev/null
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/stix/StixExtractorTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor.stix;
+
+import com.google.common.collect.Iterables;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StixExtractorTest {
+    /**
+         <!--
+         STIX IP Watchlist Example
+
+         Copyright (c) 2015, The MITRE Corporation. All rights reserved.
+         The contents of this file are subject to the terms of the STIX License located at http://stix.mitre.org/about/termsofuse.html.
+
+         This example demonstrates a simple usage of STIX to represent a list of IP address indicators (watchlist of IP addresses). Cyber operations and malware analysis centers often share a list of suspected malicious IP addresses with information about what those IPs might indicate. This STIX package represents a list of three IP addresses with a short dummy description of what they represent.
+
+         It demonstrates the use of:
+
+         * STIX Indicators
+         * CybOX within STIX
+         * The CybOX Address Object (IP)
+         * CybOX Patterns (apply_condition="ANY")
+         * Controlled vocabularies
+
+         Created by Mark Davidson
+         -->
+         <stix:STIX_Package
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns:stix="http://stix.mitre.org/stix-1"
+         xmlns:indicator="http://stix.mitre.org/Indicator-2"
+         xmlns:cybox="http://cybox.mitre.org/cybox-2"
+         xmlns:AddressObject="http://cybox.mitre.org/objects#AddressObject-2"
+         xmlns:cyboxVocabs="http://cybox.mitre.org/default_vocabularies-2"
+         xmlns:stixVocabs="http://stix.mitre.org/default_vocabularies-1"
+         xmlns:example="http://example.com/"
+         id="example:STIXPackage-33fe3b22-0201-47cf-85d0-97c02164528d"
+         timestamp="2014-05-08T09:00:00.000000Z"
+         version="1.2">
+         <stix:STIX_Header>
+         <stix:Title>Example watchlist that contains IP information.</stix:Title>
+         <stix:Package_Intent xsi:type="stixVocabs:PackageIntentVocab-1.0">Indicators - Watchlist</stix:Package_Intent>
+         </stix:STIX_Header>
+         <stix:Indicators>
+         <stix:Indicator xsi:type="indicator:IndicatorType" id="example:Indicator-33fe3b22-0201-47cf-85d0-97c02164528d" timestamp="2014-05-08T09:00:00.000000Z">
+         <indicator:Type xsi:type="stixVocabs:IndicatorTypeVocab-1.1">IP Watchlist</indicator:Type>
+         <indicator:Description>Sample IP Address Indicator for this watchlist. This contains one indicator with a set of three IP addresses in the watchlist.</indicator:Description>
+         <indicator:Observable  id="example:Observable-1c798262-a4cd-434d-a958-884d6980c459">
+         <cybox:Object id="example:Object-1980ce43-8e03-490b-863a-ea404d12242e">
+         <cybox:Properties xsi:type="AddressObject:AddressObjectType" category="ipv4-addr">
+         <AddressObject:Address_Value condition="Equals" apply_condition="ANY">10.0.0.0##comma##10.0.0.1##comma##10.0.0.2</AddressObject:Address_Value>
+         </cybox:Properties>
+         </cybox:Object>
+         </indicator:Observable>
+         </stix:Indicator>
+         </stix:Indicators>
+         </stix:STIX_Package>
+         */
+    @Multiline
+    private static String stixDoc;
+
+    /**
+    {
+        "config" : {
+             "stix_address_categories" : "IPV_4_ADDR"
+        }
+        ,"extractor" : "STIX"
+    }
+    */
+    @Multiline
+    private static String stixConfigOnlyIPV4;
+    /**
+    {
+        "config" : {
+             "stix_address_categories" : "IPV_6_ADDR"
+        }
+        ,"extractor" : "STIX"
+    }
+    */
+    @Multiline
+    private static String stixConfigOnlyIPV6;
+    /**
+    {
+        "config" : {
+        }
+        ,"extractor" : "STIX"
+    }
+    */
+    @Multiline
+    private static String stixConfig;
+    @Test
+    public void testStixAddresses() throws Exception {
+        {
+            ExtractorHandler handler = ExtractorHandler.load(stixConfigOnlyIPV4);
+            Extractor extractor = handler.getExtractor();
+            Iterable<LookupKV> results = extractor.extract(stixDoc);
+
+            Assert.assertEquals(3, Iterables.size(results));
+            Assert.assertEquals("10.0.0.0", ((EnrichmentKey)(Iterables.get(results, 0).getKey())).indicator);
+            Assert.assertEquals("10.0.0.1", ((EnrichmentKey)(Iterables.get(results, 1).getKey())).indicator);
+            Assert.assertEquals("10.0.0.2", ((EnrichmentKey)(Iterables.get(results, 2).getKey())).indicator);
+        }
+        {
+
+            ExtractorHandler handler = ExtractorHandler.load(stixConfig);
+            Extractor extractor = handler.getExtractor();
+            Iterable<LookupKV> results = extractor.extract(stixDoc);
+            Assert.assertEquals(3, Iterables.size(results));
+            Assert.assertEquals("10.0.0.0", ((EnrichmentKey)(Iterables.get(results, 0).getKey())).indicator);
+            Assert.assertEquals("10.0.0.1", ((EnrichmentKey)(Iterables.get(results, 1).getKey())).indicator);
+            Assert.assertEquals("10.0.0.2", ((EnrichmentKey)(Iterables.get(results, 2).getKey())).indicator);
+        }
+        {
+
+            ExtractorHandler handler = ExtractorHandler.load(stixConfigOnlyIPV6);
+            Extractor extractor = handler.getExtractor();
+            Iterable<LookupKV> results = extractor.extract(stixDoc);
+            Assert.assertEquals(0, Iterables.size(results));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
new file mode 100644
index 0000000..28b3e26
--- /dev/null
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.hbase;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.metron.enrichment.converter.HbaseConverter;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+
+public class HBaseEnrichmentConverterTest {
+    EnrichmentKey key = new EnrichmentKey("domain", "google");
+    EnrichmentValue value = new EnrichmentValue(
+            new HashMap<String, String>() {{
+                put("foo", "bar");
+                put("grok", "baz");
+            }});
+    LookupKV<EnrichmentKey, EnrichmentValue> results = new LookupKV(key, value);
+    @Test
+    public void testKeySerialization() {
+        byte[] serialized = key.toBytes();
+
+        EnrichmentKey deserialized = new EnrichmentKey();
+        deserialized.fromBytes(serialized);
+        Assert.assertEquals(key, deserialized);
+    }
+
+    @Test
+    public void testPut() throws IOException {
+        HbaseConverter<EnrichmentKey, EnrichmentValue> converter = new EnrichmentConverter();
+        Put put = converter.toPut("cf", key, value);
+        LookupKV<EnrichmentKey, EnrichmentValue> converted= converter.fromPut(put, "cf");
+        Assert.assertEquals(results, converted);
+    }
+    @Test
+    public void testResult() throws IOException {
+        HbaseConverter<EnrichmentKey, EnrichmentValue> converter = new EnrichmentConverter();
+        Result r = converter.toResult("cf", key, value);
+        LookupKV<EnrichmentKey, EnrichmentValue> converted= converter.fromResult(r, "cf");
+        Assert.assertEquals(results, converted);
+    }
+
+    @Test
+    public void testGet() throws Exception {
+        HbaseConverter<EnrichmentKey, EnrichmentValue> converter = new EnrichmentConverter();
+        Get get = converter.toGet("cf", key);
+        Assert.assertArrayEquals(key.toBytes(), get.getRow());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperIntegrationTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperIntegrationTest.java
new file mode 100644
index 0000000..626c98e
--- /dev/null
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperIntegrationTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.hbase.mr;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.metron.dataloads.bulk.ThreatIntelBulkLoader;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class BulkLoadMapperIntegrationTest {
+  /** The test util. */
+  private HBaseTestingUtility testUtil;
+
+  /** The test table. */
+  private HTable testTable;
+  String tableName = "malicious_domains";
+  String cf = "cf";
+  Configuration config = null;
+  @Before
+  public void setup() throws Exception {
+    Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true);
+    config = kv.getValue();
+    testUtil = kv.getKey();
+    testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
+  }
+
+  @After
+  public void teardown() throws Exception {
+    HBaseUtil.INSTANCE.teardown(testUtil);
+  }
+ /**
+         {
+            "config" : {
+                        "columns" : {
+                                "host" : 0
+                                ,"meta" : 2
+                                    }
+                       ,"indicator_column" : "host"
+                       ,"separator" : ","
+                       ,"type" : "threat"
+                       }
+            ,"extractor" : "CSV"
+         }
+         */
+  @Multiline
+  private static String extractorConfig;
+
+  @Test
+  public void test() throws IOException, ClassNotFoundException, InterruptedException {
+
+    Assert.assertNotNull(testTable);
+    FileSystem fs = FileSystem.get(config);
+    String contents = "google.com,1,foo";
+    EnrichmentConverter converter = new EnrichmentConverter();
+    HBaseUtil.INSTANCE.writeFile(contents, new Path("input.csv"), fs);
+    Job job = ThreatIntelBulkLoader.createJob(config, "input.csv", tableName, cf, extractorConfig, 0L, new EnrichmentConverter());
+    Assert.assertTrue(job.waitForCompletion(true));
+    ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
+    List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
+    for(Result r : scanner) {
+      results.add(converter.fromResult(r, cf));
+    }
+    Assert.assertEquals(1, results.size());
+    Assert.assertEquals(results.get(0).getKey().indicator, "google.com");
+    Assert.assertEquals(results.get(0).getKey().type, "threat");
+    Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
+    Assert.assertEquals(results.get(0).getValue().getMetadata().get("meta"), "foo");
+    Assert.assertEquals(results.get(0).getValue().getMetadata().get("host"), "google.com");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperTest.java
new file mode 100644
index 0000000..82233cf
--- /dev/null
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.hbase.mr;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class BulkLoadMapperTest {
+    /**
+         {
+            "config" : {
+                        "columns" : {
+                                "host" : 0
+                                ,"meta" : 2
+                                    }
+                       ,"indicator_column" : "host"
+                       ,"type" : "threat"
+                       ,"separator" : ","
+                       }
+            ,"extractor" : "CSV"
+         }
+         */
+    @Multiline
+    private static String extractorConfig;
+    @Test
+    public void testMapper() throws IOException, InterruptedException {
+
+        final Map<ImmutableBytesWritable, Put> puts = new HashMap<>();
+        BulkLoadMapper mapper = new BulkLoadMapper() {
+            @Override
+            protected void write(ImmutableBytesWritable key, Put value, Context context) throws IOException, InterruptedException {
+                puts.put(key, value);
+            }
+        };
+        mapper.initialize(new Configuration() {{
+            set(BulkLoadMapper.COLUMN_FAMILY_KEY, "cf");
+            set(BulkLoadMapper.CONFIG_KEY, extractorConfig);
+            set(BulkLoadMapper.LAST_SEEN_KEY, "0");
+            set(BulkLoadMapper.CONVERTER_KEY, EnrichmentConverter.class.getName());
+        }});
+        {
+            mapper.map(null, new Text("#google.com,1,foo"), null);
+            Assert.assertTrue(puts.size() == 0);
+        }
+        {
+            mapper.map(null, new Text("google.com,1,foo"), null);
+            Assert.assertTrue(puts.size() == 1);
+            EnrichmentKey expectedKey = new EnrichmentKey() {{
+                indicator = "google.com";
+                type = "threat";
+            }};
+            EnrichmentConverter converter = new EnrichmentConverter();
+            Put put = puts.get(new ImmutableBytesWritable(expectedKey.toBytes()));
+            Assert.assertNotNull(puts);
+            LookupKV<EnrichmentKey, EnrichmentValue> results = converter.fromPut(put, "cf");
+            Assert.assertEquals(results.getKey().indicator, "google.com");
+            Assert.assertEquals(results.getValue().getMetadata().size(), 2);
+            Assert.assertEquals(results.getValue().getMetadata().get("meta"), "foo");
+            Assert.assertEquals(results.getValue().getMetadata().get("host"), "google.com");
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/HBaseUtil.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/HBaseUtil.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/HBaseUtil.java
new file mode 100644
index 0000000..c9c6424
--- /dev/null
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/HBaseUtil.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.hbase.mr;
+
+import com.google.common.base.Joiner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+
+import java.io.*;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public enum HBaseUtil {
+    INSTANCE;
+    public Map.Entry<HBaseTestingUtility,Configuration> create(boolean startMRCluster) throws Exception {
+        Configuration config = HBaseConfiguration.create();
+        config.set("hbase.master.hostname", "localhost");
+        config.set("hbase.regionserver.hostname", "localhost");
+        HBaseTestingUtility testUtil = new HBaseTestingUtility(config);
+
+        testUtil.startMiniCluster(1);
+        if(startMRCluster) {
+            testUtil.startMiniMapReduceCluster();
+        }
+        return new AbstractMap.SimpleEntry<>(testUtil, config);
+    }
+    public void writeFile(String contents, Path filename, FileSystem fs) throws IOException {
+        FSDataOutputStream os = fs.create(filename, true);
+        PrintWriter pw = new PrintWriter(new OutputStreamWriter(os));
+        pw.print(contents);
+        pw.flush();
+        os.close();
+    }
+
+    public String readFile(FileSystem fs, Path filename) throws IOException {
+        FSDataInputStream in = fs.open(filename);
+        BufferedReader br = new BufferedReader(new InputStreamReader(in));
+        List<String> contents = new ArrayList<>();
+        for(String line = null;(line = br.readLine()) != null;) {
+            contents.add(line);
+        }
+        return Joiner.on('\n').join(contents);
+    }
+
+    public void teardown(HBaseTestingUtility testUtil) throws Exception {
+        testUtil.shutdownMiniMapReduceCluster();
+        testUtil.shutdownMiniCluster();
+        testUtil.cleanupTestDir();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
new file mode 100644
index 0000000..65befe3
--- /dev/null
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.hbase.mr;
+
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.metron.dataloads.bulk.LeastRecentlyUsedPruner;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.EnrichmentLookup;
+import org.apache.metron.enrichment.lookup.LookupKey;
+import org.apache.metron.enrichment.lookup.accesstracker.BloomAccessTracker;
+import org.apache.metron.enrichment.lookup.accesstracker.PersistentAccessTracker;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class LeastRecentlyUsedPrunerIntegrationTest {
+    /** The test util. */
+    private HBaseTestingUtility testUtil;
+
+    /** The test table. */
+    private HTable testTable;
+    private HTable atTable;
+    String tableName = "malicious_domains";
+    String cf = "cf";
+    String atTableName = "access_trackers";
+    String atCF= "cf";
+    Configuration config = null;
+    @Before
+    public void setup() throws Exception {
+        Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true);
+        config = kv.getValue();
+        testUtil = kv.getKey();
+        testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
+        atTable = testUtil.createTable(Bytes.toBytes(atTableName), Bytes.toBytes(atCF));
+    }
+    @After
+    public void teardown() throws Exception {
+        HBaseUtil.INSTANCE.teardown(testUtil);
+    }
+    public List<LookupKey> getKeys(int start, int end) {
+        List<LookupKey> keys = new ArrayList<>();
+        for(int i = start;i < end;++i) {
+            keys.add(new EnrichmentKey("type", "key-" + i));
+        }
+        return keys;
+    }
+    @Test
+    public void test() throws Exception {
+        long ts = System.currentTimeMillis();
+        BloomAccessTracker bat = new BloomAccessTracker("tracker1", 100, 0.03);
+        PersistentAccessTracker pat = new PersistentAccessTracker(tableName, "0", atTable, atCF, bat, 0L);
+        EnrichmentLookup lookup = new EnrichmentLookup(testTable, cf, pat);
+        List<LookupKey> goodKeysHalf = getKeys(0, 5);
+        List<LookupKey> goodKeysOtherHalf = getKeys(5, 10);
+        Iterable<LookupKey> goodKeys = Iterables.concat(goodKeysHalf, goodKeysOtherHalf);
+        List<LookupKey> badKey = getKeys(10, 11);
+        EnrichmentConverter converter = new EnrichmentConverter();
+        for(LookupKey k : goodKeysHalf) {
+            testTable.put(converter.toPut(cf, (EnrichmentKey) k
+                                            , new EnrichmentValue(
+                                                  new HashMap<String, String>() {{
+                                                    put("k", "dummy");
+                                                    }}
+                                                  )
+                                          )
+                         );
+            Assert.assertTrue(lookup.exists((EnrichmentKey)k, testTable, true));
+        }
+        pat.persist(true);
+        for(LookupKey k : goodKeysOtherHalf) {
+            testTable.put(converter.toPut(cf, (EnrichmentKey) k
+                                            , new EnrichmentValue(new HashMap<String, String>() {{
+                                                    put("k", "dummy");
+                                                    }}
+                                                                  )
+                                         )
+                         );
+            Assert.assertTrue(lookup.exists((EnrichmentKey)k, testTable, true));
+        }
+        testUtil.flush();
+        Assert.assertFalse(lookup.getAccessTracker().hasSeen(goodKeysHalf.get(0)));
+        for(LookupKey k : goodKeysOtherHalf) {
+            Assert.assertTrue(lookup.getAccessTracker().hasSeen(k));
+        }
+        pat.persist(true);
+        {
+            testTable.put(converter.toPut(cf, (EnrichmentKey) badKey.get(0)
+                    , new EnrichmentValue(new HashMap<String, String>() {{
+                        put("k", "dummy");
+                    }}
+                    )
+                    )
+            );
+        }
+        testUtil.flush();
+        Assert.assertFalse(lookup.getAccessTracker().hasSeen(badKey.get(0)));
+
+
+        Job job = LeastRecentlyUsedPruner.createJob(config, tableName, cf, atTableName, atCF, ts);
+        Assert.assertTrue(job.waitForCompletion(true));
+        for(LookupKey k : goodKeys) {
+            Assert.assertTrue(lookup.exists((EnrichmentKey)k, testTable, true));
+        }
+        for(LookupKey k : badKey) {
+            Assert.assertFalse(lookup.exists((EnrichmentKey)k, testTable, true));
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/MockTaxiiService.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/MockTaxiiService.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/MockTaxiiService.java
new file mode 100644
index 0000000..bc1b3b7
--- /dev/null
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/MockTaxiiService.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.taxii;
+
+
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import org.apache.commons.io.FileUtils;
+import org.apache.metron.test.utils.UnitTestHelper;
+
+import javax.ws.rs.ApplicationPath;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.*;
+import javax.ws.rs.ext.RuntimeDelegate;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+@Path("/")
+public class MockTaxiiService {
+    static String discoveryMsg;
+    static String pollMsg;
+    static {
+        try {
+            String baseDir = UnitTestHelper.findDir("taxii-messages");
+            discoveryMsg = FileUtils.readFileToString(new File(new File(baseDir), "message.discovery"));
+            pollMsg= FileUtils.readFileToString(new File(new File(baseDir), "messages.poll"));
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to read discovery message", e);
+        }
+    }
+
+
+    @POST
+    @Path("/taxii-discovery-service")
+    public Response getDiscovery() {
+        return Response.ok(discoveryMsg, MediaType.APPLICATION_XML_TYPE).header("x-taxii-content-type", "urn:taxii.mitre.org:message:xml:1.1").build();
+    }
+    @POST
+    @Path("/taxii-data")
+    public Response getData() {
+        return Response.ok(pollMsg).type(MediaType.APPLICATION_XML_TYPE).header("x-taxii-content-type", "urn:taxii.mitre.org:message:xml:1.1").build();
+    }
+
+    @ApplicationPath("rs")
+    public static class ApplicationConfig extends Application{
+        private final Set<Class<?>> classes;
+        public ApplicationConfig() {
+            HashSet<Class<?>> c = new HashSet<>();
+            c.add(MockTaxiiService.class);
+            classes = Collections.unmodifiableSet(c);
+        }
+        @Override
+        public Set<Class<?>> getClasses() {
+            return classes;
+        }
+    }
+    private static HttpServer server;
+    public static void start(int port) throws IOException {
+        // Create an HTTP server listening at port 8282
+        URI uri = UriBuilder.fromUri("http://localhost/").port(port).build();
+        server = HttpServer.create(new InetSocketAddress(uri.getPort()), 0);
+        HttpHandler handler = RuntimeDelegate.getInstance().createEndpoint(new ApplicationConfig(), HttpHandler.class);
+        server.createContext(uri.getPath(), handler);
+        discoveryMsg = discoveryMsg.replaceAll("PORT", "" + uri.getPort());
+        server.start();
+    }
+
+    public static void shutdown() {
+        if(server != null) {
+            server.stop(0);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java
new file mode 100644
index 0000000..f0d9178
--- /dev/null
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.nonbulk.taxii;
+
+import com.google.common.base.Splitter;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.metron.dataloads.extractor.stix.StixExtractor;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.test.mock.MockHTable;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+public class TaxiiIntegrationTest {
+
+    @Before
+    public void setup() throws IOException {
+        MockTaxiiService.start(8282);
+    }
+
+    @After
+    public void teardown() {
+        MockTaxiiService.shutdown();
+        MockHTable.Provider.clear();
+    }
+
+    /**
+         {
+            "endpoint" : "http://localhost:8282/taxii-discovery-service"
+           ,"type" : "DISCOVER"
+           ,"collection" : "guest.Abuse_ch"
+           ,"table" : "threat_intel"
+           ,"columnFamily" : "cf"
+           ,"allowedIndicatorTypes" : [ "domainname:FQDN", "address:IPV_4_ADDR" ]
+         }
+    */
+    @Multiline
+    static String taxiiConnectionConfig;
+
+    @Test
+    public void testTaxii() throws Exception {
+
+        final MockHTable.Provider provider = new MockHTable.Provider();
+        final Configuration config = HBaseConfiguration.create();
+        TaxiiHandler handler = new TaxiiHandler(TaxiiConnectionConfig.load(taxiiConnectionConfig), new StixExtractor(), config ) {
+            @Override
+            protected synchronized HTableInterface createHTable(String tableInfo) throws IOException {
+                return provider.addToCache("threat_intel", "cf");
+            }
+        };
+        //UnitTestHelper.verboseLogging();
+        handler.run();
+        Set<String> maliciousDomains;
+        {
+            MockHTable table = (MockHTable) provider.getTable(config, "threat_intel");
+            maliciousDomains = getIndicators("domainname:FQDN", table.getPutLog(), "cf");
+        }
+        Assert.assertTrue(maliciousDomains.contains("www.office-112.com"));
+        Assert.assertEquals(numStringsMatch(MockTaxiiService.pollMsg, "DomainNameObj:Value condition=\"Equals\""), maliciousDomains.size());
+        Set<String> maliciousAddresses;
+        {
+            MockHTable table = (MockHTable) provider.getTable(config, "threat_intel");
+            maliciousAddresses= getIndicators("address:IPV_4_ADDR", table.getPutLog(), "cf");
+        }
+        Assert.assertTrue(maliciousAddresses.contains("94.102.53.142"));
+        Assert.assertEquals(numStringsMatch(MockTaxiiService.pollMsg, "AddressObj:Address_Value condition=\"Equal\""), maliciousAddresses.size());
+        MockHTable.Provider.clear();
+    }
+
+    private static int numStringsMatch(String xmlBundle, String text) {
+        int cnt = 0;
+        for(String line : Splitter.on("\n").split(xmlBundle)) {
+            if(line.contains(text)) {
+                cnt++;
+            }
+        }
+        return cnt;
+    }
+
+    private static Set<String> getIndicators(String indicatorType, Iterable<Put> puts, String cf) throws IOException {
+        EnrichmentConverter converter = new EnrichmentConverter();
+        Set<String> ret = new HashSet<>();
+        for(Put p : puts) {
+            LookupKV<EnrichmentKey, EnrichmentValue> kv = converter.fromPut(p, cf);
+            if (kv.getKey().type.equals(indicatorType)) {
+                ret.add(kv.getKey().indicator);
+            }
+        }
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/resources/log4j.properties b/metron-platform/metron-data-management/src/test/resources/log4j.properties
new file mode 100644
index 0000000..0d50388
--- /dev/null
+++ b/metron-platform/metron-data-management/src/test/resources/log4j.properties
@@ -0,0 +1,24 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+# Root logger option
+log4j.rootLogger=ERROR, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/test/resources/taxii-messages/message.discovery
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/resources/taxii-messages/message.discovery b/metron-platform/metron-data-management/src/test/resources/taxii-messages/message.discovery
new file mode 100644
index 0000000..2f7e788
--- /dev/null
+++ b/metron-platform/metron-data-management/src/test/resources/taxii-messages/message.discovery
@@ -0,0 +1,21 @@
+<taxii_11:Discovery_Response in_response_to="urn:uuid:695149bd-72ab-41dd-a7e0-7d0b2a73e81f" message_id="65257" xmlns:xmldsig="http://www.w3.org/2000/09/xmldsig#" xmlns:taxii_11="http://taxii.mitre.org/messages/taxii_xml_binding-1.1">
+    <taxii_11:Service_Instance service_type="DISCOVERY" available="true" service_version="urn:taxii.mitre.org:services:1.1">
+        <taxii_11:Protocol_Binding>urn:taxii.mitre.org:protocol:https:1.0</taxii_11:Protocol_Binding>
+        <taxii_11:Address>http://localhost:PORT/taxii-data</taxii_11:Address>
+        <taxii_11:Message_Binding>urn:taxii.mitre.org:message:xml:1.1</taxii_11:Message_Binding>
+        <taxii_11:Message></taxii_11:Message>
+    </taxii_11:Service_Instance>
+    <taxii_11:Service_Instance service_type="COLLECTION_MANAGEMENT" available="true" service_version="urn:taxii.mitre.org:services:1.1">
+        <taxii_11:Protocol_Binding>urn:taxii.mitre.org:protocol:https:1.0</taxii_11:Protocol_Binding>
+        <taxii_11:Address>http://localhost:PORT/taxii-collections</taxii_11:Address>
+        <taxii_11:Message_Binding>urn:taxii.mitre.org:message:xml:1.1</taxii_11:Message_Binding>
+        <taxii_11:Message></taxii_11:Message>
+    </taxii_11:Service_Instance>
+    <taxii_11:Service_Instance service_type="POLL" available="true" service_version="urn:taxii.mitre.org:services:1.1">
+        <taxii_11:Protocol_Binding>urn:taxii.mitre.org:protocol:https:1.0</taxii_11:Protocol_Binding>
+        <taxii_11:Address>http://localhost:PORT/taxii-data</taxii_11:Address>
+        <taxii_11:Message_Binding>urn:taxii.mitre.org:message:xml:1.1</taxii_11:Message_Binding>
+        <taxii_11:Message></taxii_11:Message>
+    </taxii_11:Service_Instance>
+</taxii_11:Discovery_Response>
+