You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/03/21 18:05:53 UTC

[11/43] incubator-metron git commit: METRON-50: Ingest threat intel data from Taxii feeds closes apache/incubator-metron#29

METRON-50: Ingest threat intel data from Taxii feeds closes apache/incubator-metron#29


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/0e1055aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/0e1055aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/0e1055aa

Branch: refs/heads/Metron_0.1BETA
Commit: 0e1055aafe78e428b0ad1fc085589a202cf14206
Parents: a712fe6
Author: cstella <ce...@gmail.com>
Authored: Thu Feb 25 11:24:56 2016 -0500
Committer: cstella <ce...@gmail.com>
Committed: Thu Feb 25 11:24:56 2016 -0500

----------------------------------------------------------------------
 NOTICE                                          |    3 +
 metron-streaming/Metron-Common/pom.xml          |    6 +
 .../hbase/converters/AbstractConverter.java     |   93 +
 .../metron/hbase/converters/HbaseConverter.java |   40 +
 .../threatintel/ThreatIntelConverter.java       |   39 +
 .../converters/threatintel/ThreatIntelKey.java  |   89 +
 .../threatintel/ThreatIntelValue.java           |  110 +
 .../metron/reference/lookup/LookupKV.java       |   65 +
 .../metron/reference/lookup/LookupKey.java      |    1 +
 .../metron/reference/lookup/LookupValue.java    |   28 +
 .../metron/threatintel/ThreatIntelKey.java      |   89 -
 .../metron/threatintel/ThreatIntelResults.java  |    2 +
 .../metron/threatintel/hbase/Converter.java     |   96 -
 .../threatintel/hbase/ThreatIntelLookup.java    |   24 +-
 metron-streaming/Metron-DataLoads/pom.xml       |  432 +--
 .../src/main/bash/threatintel_bulk_load.sh      |    2 +-
 .../src/main/bash/threatintel_bulk_prune.sh     |    2 +-
 .../src/main/bash/threatintel_taxii_load.sh     |   39 +
 .../dataloads/LeastRecentlyUsedPruner.java      |  221 --
 .../metron/dataloads/ThreatIntelBulkLoader.java |  213 --
 .../dataloads/bulk/LeastRecentlyUsedPruner.java |  221 ++
 .../dataloads/bulk/ThreatIntelBulkLoader.java   |  230 ++
 .../metron/dataloads/extractor/Extractor.java   |    4 +-
 .../dataloads/extractor/csv/CSVExtractor.java   |   20 +-
 .../extractor/csv/LookupConverter.java          |   30 +
 .../extractor/csv/LookupConverters.java         |   67 +
 .../dataloads/extractor/stix/StixExtractor.java |   44 +-
 .../extractor/stix/types/AddressHandler.java    |   18 +-
 .../extractor/stix/types/DomainHandler.java     |   26 +-
 .../extractor/stix/types/HostnameHandler.java   |   17 +-
 .../extractor/stix/types/ObjectTypeHandler.java |    3 +-
 .../dataloads/hbase/mr/BulkLoadMapper.java      |   21 +-
 .../metron/dataloads/hbase/mr/PrunerMapper.java |    5 +
 .../metron/dataloads/taxii/ConnectionType.java  |   23 +
 .../metron/dataloads/taxii/TableInfo.java       |   72 +
 .../dataloads/taxii/TaxiiConnectionConfig.java  |  196 ++
 .../metron/dataloads/taxii/TaxiiHandler.java    |  403 +++
 .../metron/dataloads/taxii/TaxiiLoader.java     |  180 ++
 .../dataloads/extractor/ExtractorTest.java      |   24 +-
 .../extractor/csv/CSVExtractorTest.java         |   17 +-
 .../extractor/stix/StixExtractorTest.java       |   25 +-
 .../dataloads/hbase/HBaseConverterTest.java     |   67 -
 .../hbase/HBaseThreatIntelConverterTest.java    |   76 +
 .../hbase/mr/BulkLoadMapperIntegrationTest.java |   24 +-
 .../dataloads/hbase/mr/BulkLoadMapperTest.java  |   19 +-
 .../LeastRecentlyUsedPrunerIntegrationTest.java |   32 +-
 .../dataloads/taxii/MockTaxiiService.java       |   99 +
 .../dataloads/taxii/TaxiiIntegrationTest.java   |  121 +
 .../resources/taxii-messages/message.discovery  |   21 +
 .../test/resources/taxii-messages/messages.poll | 2914 ++++++++++++++++++
 .../metron/threatintel/ThreatIntelAdapter.java  |    5 +-
 metron-streaming/Metron-Indexing/pom.xml        |    8 +-
 metron-streaming/Metron-Testing/pom.xml         |   88 +
 .../metron/integration/util/UnitTestHelper.java |   84 +
 .../util/integration/ComponentRunner.java       |  130 +
 .../util/integration/InMemoryComponent.java     |   23 +
 .../integration/util/integration/Processor.java |   23 +
 .../util/integration/ReadinessState.java        |   22 +
 .../integration/UnableToStartException.java     |   27 +
 .../components/ElasticSearchComponent.java      |  188 ++
 .../components/FluxTopologyComponent.java       |  132 +
 .../integration/util/mock/MockHTable.java       |  672 ++++
 metron-streaming/Metron-Topologies/pom.xml      |    6 +
 .../integration/pcap/PcapIntegrationTest.java   |   29 +-
 .../metron/integration/util/UnitTestHelper.java |   84 -
 .../util/integration/ComponentRunner.java       |  127 -
 .../util/integration/InMemoryComponent.java     |   23 -
 .../integration/util/integration/Processor.java |   23 -
 .../util/integration/ReadinessState.java        |   22 -
 .../integration/UnableToStartException.java     |   27 -
 .../components/ElasticSearchComponent.java      |  188 --
 .../components/FluxTopologyComponent.java       |  132 -
 .../integration/util/mock/MockHTable.java       |  673 ----
 .../util/threatintel/ThreatIntelHelper.java     |   12 +-
 metron-streaming/pom.xml                        |    1 +
 pom.xml                                         |    2 +
 76 files changed, 7034 insertions(+), 2330 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index 9505518..ec99233 100644
--- a/NOTICE
+++ b/NOTICE
@@ -3,3 +3,6 @@
 
    This product includes software developed at
    The Apache Software Foundation (http://www.apache.org/).
+
+   This product includes Jersey Client: https://jersey.java.net/
+   License: Common Development and Distribution License (CDDL) v1.0 (https://glassfish.dev.java.net/public/CDDLv1.0.html)

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/pom.xml b/metron-streaming/Metron-Common/pom.xml
index c9918a2..57a58d7 100644
--- a/metron-streaming/Metron-Common/pom.xml
+++ b/metron-streaming/Metron-Common/pom.xml
@@ -132,6 +132,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-common</artifactId>
+            <version>${global_hbase_version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-client</artifactId>
             <version>${global_hbase_version}</version>
             <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/AbstractConverter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/AbstractConverter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/AbstractConverter.java
new file mode 100644
index 0000000..9072c22
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/AbstractConverter.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.hbase.converters;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.reference.lookup.LookupKV;
+import org.apache.metron.reference.lookup.LookupKey;
+import org.apache.metron.reference.lookup.LookupValue;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.*;
+
+
+public abstract class AbstractConverter<KEY_T extends LookupKey, VALUE_T extends LookupValue> implements HbaseConverter<KEY_T,VALUE_T> {
+    public static Function<Cell, Map.Entry<byte[], byte[]>> CELL_TO_ENTRY  = new Function<Cell, Map.Entry<byte[], byte[]>>() {
+
+        @Nullable
+        @Override
+        public Map.Entry<byte[], byte[]> apply(@Nullable Cell cell) {
+            return new AbstractMap.SimpleEntry<byte[], byte[]>(cell.getQualifier(), cell.getValue());
+        }
+    };
+    @Override
+    public Put toPut(String columnFamily, KEY_T key, VALUE_T values) throws IOException {
+        Put put = new Put(key.toBytes());
+        byte[] cf = Bytes.toBytes(columnFamily);
+        for(Map.Entry<byte[], byte[]> kv : values.toColumns()) {
+            put.add(cf, kv.getKey(), kv.getValue());
+        }
+        return put;
+    }
+
+    public LookupKV<KEY_T, VALUE_T> fromPut(Put put, String columnFamily, KEY_T key, VALUE_T value) throws IOException {
+        key.fromBytes(put.getRow());
+        byte[] cf = Bytes.toBytes(columnFamily);
+        value.fromColumns(Iterables.transform(put.getFamilyCellMap().get(cf), CELL_TO_ENTRY));
+        return new LookupKV<>(key, value);
+    }
+
+    @Override
+    public Result toResult(String columnFamily, KEY_T key, VALUE_T values) throws IOException {
+        Put put = toPut(columnFamily, key, values);
+        return Result.create(put.getFamilyCellMap().get(Bytes.toBytes(columnFamily)));
+    }
+
+    public LookupKV<KEY_T, VALUE_T> fromResult(Result result, String columnFamily, KEY_T key, VALUE_T value) throws IOException {
+        key.fromBytes(result.getRow());
+        byte[] cf = Bytes.toBytes(columnFamily);
+        NavigableMap<byte[], byte[]> cols = result.getFamilyMap(cf);
+        value.fromColumns(cols.entrySet());
+        return new LookupKV<>(key, value);
+    }
+    @Override
+    public Get toGet(String columnFamily, KEY_T key) {
+        Get ret = new Get(key.toBytes());
+        ret.addFamily(Bytes.toBytes(columnFamily));
+        return ret;
+    }
+
+    public static Iterable<Map.Entry<byte[], byte[]>> toEntries(byte[]... kvs) {
+        if(kvs.length % 2 != 0)  {
+            throw new IllegalStateException("Must be an even size");
+        }
+        List<Map.Entry<byte[], byte[]>> ret = new ArrayList<>(kvs.length/2);
+        for(int i = 0;i < kvs.length;i += 2) {
+            ret.add(new AbstractMap.SimpleImmutableEntry<>(kvs[i], kvs[i+1])) ;
+        }
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/HbaseConverter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/HbaseConverter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/HbaseConverter.java
new file mode 100644
index 0000000..449d9cf
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/HbaseConverter.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.hbase.converters;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.metron.reference.lookup.LookupKV;
+import org.apache.metron.reference.lookup.LookupKey;
+import org.apache.metron.reference.lookup.LookupValue;
+
+import java.io.IOException;
+
+public interface HbaseConverter<KEY_T extends LookupKey, VALUE_T extends LookupValue> {
+    Put toPut(String columnFamily, KEY_T key, VALUE_T values) throws IOException;
+
+    LookupKV<KEY_T, VALUE_T> fromPut(Put put, String columnFamily) throws IOException;
+
+    Result toResult(String columnFamily, KEY_T key, VALUE_T values) throws IOException;
+
+    LookupKV<KEY_T, VALUE_T> fromResult(Result result, String columnFamily) throws IOException;
+
+    Get toGet(String columnFamily, KEY_T key);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelConverter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelConverter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelConverter.java
new file mode 100644
index 0000000..d534a52
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelConverter.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.hbase.converters.threatintel;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.metron.hbase.converters.AbstractConverter;
+import org.apache.metron.reference.lookup.LookupKV;
+
+import java.io.IOException;
+
+public class ThreatIntelConverter extends AbstractConverter<ThreatIntelKey, ThreatIntelValue> {
+
+    @Override
+    public LookupKV<ThreatIntelKey, ThreatIntelValue> fromPut(Put put, String columnFamily) throws IOException {
+        return fromPut(put, columnFamily, new ThreatIntelKey(), new ThreatIntelValue());
+    }
+
+    @Override
+    public LookupKV<ThreatIntelKey, ThreatIntelValue> fromResult(Result result, String columnFamily) throws IOException {
+        return fromResult(result, columnFamily, new ThreatIntelKey(), new ThreatIntelValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelKey.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelKey.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelKey.java
new file mode 100644
index 0000000..3d898d9
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelKey.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.hbase.converters.threatintel;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.reference.lookup.LookupKey;
+
+public class ThreatIntelKey implements LookupKey{
+    private static final int SEED = 0xDEADBEEF;
+    private static final int HASH_PREFIX_SIZE=16;
+    ThreadLocal<HashFunction> hFunction= new ThreadLocal<HashFunction>() {
+        @Override
+        protected HashFunction initialValue() {
+            return Hashing.murmur3_128(SEED);
+        }
+    };
+    public ThreatIntelKey() {
+
+    }
+    public ThreatIntelKey(String indicator) {
+        this.indicator = indicator;
+    }
+
+    public String indicator;
+
+    @Override
+    public byte[] toBytes() {
+        byte[] indicatorBytes = Bytes.toBytes(indicator);
+        Hasher hasher = hFunction.get().newHasher();
+        hasher.putBytes(Bytes.toBytes(indicator));
+        byte[] prefix = hasher.hash().asBytes();
+        byte[] val = new byte[indicatorBytes.length + prefix.length];
+        int pos = 0;
+        for(int i = 0;pos < prefix.length;++pos,++i) {
+            val[pos] = prefix[i];
+        }
+        for(int i = 0;i < indicatorBytes.length;++pos,++i) {
+            val[pos] = indicatorBytes[i];
+        }
+        return val;
+    }
+
+    @Override
+    public void fromBytes(byte[] row) {
+        ThreatIntelKey key = this;
+        key.indicator = Bytes.toString(row, HASH_PREFIX_SIZE, row.length - HASH_PREFIX_SIZE);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ThreatIntelKey that = (ThreatIntelKey) o;
+
+        return indicator != null ? indicator.equals(that.indicator) : that.indicator == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        return indicator != null ? indicator.hashCode() : 0;
+    }
+
+    @Override
+    public String toString() {
+        return "ThreatIntelKey{" +
+                "indicator='" + indicator + '\'' +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelValue.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelValue.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelValue.java
new file mode 100644
index 0000000..97f0762
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelValue.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.hbase.converters.threatintel;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.hbase.converters.AbstractConverter;
+import org.apache.metron.reference.lookup.LookupValue;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class ThreatIntelValue implements LookupValue {
+   private static final ThreadLocal<ObjectMapper> _mapper = new ThreadLocal<ObjectMapper>() {
+             @Override
+             protected ObjectMapper initialValue() {
+                return new ObjectMapper();
+             }
+    };
+    public static final String VALUE_COLUMN_NAME = "v";
+    public static final byte[] VALUE_COLUMN_NAME_B = Bytes.toBytes(VALUE_COLUMN_NAME);
+    public static final String LAST_SEEN_COLUMN_NAME = "t";
+    public static final byte[] LAST_SEEN_COLUMN_NAME_B = Bytes.toBytes(LAST_SEEN_COLUMN_NAME);
+
+    private Map<String, String> metadata = null;
+
+    public ThreatIntelValue()
+    {
+
+    }
+
+    public ThreatIntelValue(Map<String, String> metadata) {
+        this.metadata = metadata;
+    }
+
+
+
+    public Map<String, String> getMetadata() {
+        return metadata;
+    }
+
+    @Override
+    public Iterable<Map.Entry<byte[], byte[]>> toColumns() {
+        return AbstractConverter.toEntries( VALUE_COLUMN_NAME_B, Bytes.toBytes(valueToString(metadata))
+                                  );
+    }
+
+    @Override
+    public void fromColumns(Iterable<Map.Entry<byte[], byte[]>> values) {
+        for(Map.Entry<byte[], byte[]> cell : values) {
+            if(Bytes.equals(cell.getKey(), VALUE_COLUMN_NAME_B)) {
+                metadata = stringToValue(Bytes.toString(cell.getValue()));
+            }
+        }
+    }
+    public Map<String, String> stringToValue(String s){
+        try {
+            return _mapper.get().readValue(s, new TypeReference<Map<String, String>>(){});
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to convert string to metadata: " + s);
+        }
+    }
+    public String valueToString(Map<String, String> value) {
+        try {
+            return _mapper.get().writeValueAsString(value);
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to convert metadata to string: " + value);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ThreatIntelValue that = (ThreatIntelValue) o;
+
+        return getMetadata() != null ? getMetadata().equals(that.getMetadata()) : that.getMetadata() == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        return getMetadata() != null ? getMetadata().hashCode() : 0;
+    }
+
+    @Override
+    public String toString() {
+        return "ThreatIntelValue{" +
+                "metadata=" + metadata +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKV.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKV.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKV.java
new file mode 100644
index 0000000..eb2b552
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKV.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.reference.lookup;
+
+import java.io.Serializable;
+
+public class LookupKV<KEY_T extends LookupKey, VALUE_T extends LookupValue> implements Serializable {
+    private KEY_T key;
+    private VALUE_T value;
+    public LookupKV(KEY_T key, VALUE_T value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    public KEY_T getKey() {
+        return key;
+    }
+
+    public VALUE_T getValue() {
+        return value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        LookupKV<?, ?> lookupKV = (LookupKV<?, ?>) o;
+
+        if (key != null ? !key.equals(lookupKV.key) : lookupKV.key != null) return false;
+        return value != null ? value.equals(lookupKV.value) : lookupKV.value == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = key != null ? key.hashCode() : 0;
+        result = 31 * result + (value != null ? value.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "LookupKV{" +
+                "key=" + key +
+                ", value=" + value +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKey.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKey.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKey.java
index 8aecd64..c51ed9f 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKey.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKey.java
@@ -19,4 +19,5 @@ package org.apache.metron.reference.lookup;
 
 public interface LookupKey {
     byte[] toBytes();
+    void fromBytes(byte[] in);
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupValue.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupValue.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupValue.java
new file mode 100644
index 0000000..448f4c9
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupValue.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.metron.reference.lookup;
+
+import java.util.Map;
+import java.util.NavigableMap;
+
+public interface LookupValue {
+    Iterable<Map.Entry<byte[], byte[]>> toColumns();
+    void fromColumns(Iterable<Map.Entry<byte[], byte[]>> values);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelKey.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelKey.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelKey.java
deleted file mode 100644
index 8e90bc7..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelKey.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.threatintel;
-
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.metron.reference.lookup.LookupKey;
-
-public class ThreatIntelKey implements LookupKey{
-    private static final int SEED = 0xDEADBEEF;
-    private static final int HASH_PREFIX_SIZE=16;
-    ThreadLocal<HashFunction> hFunction= new ThreadLocal<HashFunction>() {
-        @Override
-        protected HashFunction initialValue() {
-            return Hashing.murmur3_128(SEED);
-        }
-    };
-    public ThreatIntelKey() {
-
-    }
-    public ThreatIntelKey(String indicator) {
-        this.indicator = indicator;
-    }
-
-    public String indicator;
-
-    @Override
-    public byte[] toBytes() {
-        byte[] indicatorBytes = Bytes.toBytes(indicator);
-        Hasher hasher = hFunction.get().newHasher();
-        hasher.putBytes(Bytes.toBytes(indicator));
-        byte[] prefix = hasher.hash().asBytes();
-        byte[] val = new byte[indicatorBytes.length + prefix.length];
-        int pos = 0;
-        for(int i = 0;pos < prefix.length;++pos,++i) {
-            val[pos] = prefix[i];
-        }
-        for(int i = 0;i < indicatorBytes.length;++pos,++i) {
-            val[pos] = indicatorBytes[i];
-        }
-        return val;
-    }
-
-    public static ThreatIntelKey fromBytes(byte[] row) {
-        ThreatIntelKey key = new ThreatIntelKey();
-        key.indicator = Bytes.toString(row, HASH_PREFIX_SIZE, row.length - HASH_PREFIX_SIZE);
-        return key;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        ThreatIntelKey that = (ThreatIntelKey) o;
-
-        return indicator != null ? indicator.equals(that.indicator) : that.indicator == null;
-
-    }
-
-    @Override
-    public int hashCode() {
-        return indicator != null ? indicator.hashCode() : 0;
-    }
-
-    @Override
-    public String toString() {
-        return "ThreatIntelKey{" +
-                "indicator='" + indicator + '\'' +
-                '}';
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelResults.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelResults.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelResults.java
index 08f1cdc..8186b38 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelResults.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelResults.java
@@ -17,6 +17,8 @@
  */
 package org.apache.metron.threatintel;
 
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
+
 import java.util.HashMap;
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/Converter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/Converter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/Converter.java
deleted file mode 100644
index 89a846a..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/Converter.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.threatintel.hbase;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.metron.threatintel.ThreatIntelKey;
-import org.apache.metron.threatintel.ThreatIntelResults;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
-
-import java.io.IOException;
-import java.util.*;
-
-public enum Converter {
-    INSTANCE;
-    public static final String VALUE_COLUMN_NAME = "v";
-    public static final byte[] VALUE_COLUMN_NAME_B = Bytes.toBytes(VALUE_COLUMN_NAME);
-    public static final String LAST_SEEN_COLUMN_NAME = "t";
-    public static final byte[] LAST_SEEN_COLUMN_NAME_B = Bytes.toBytes(LAST_SEEN_COLUMN_NAME);
-    private static final ThreadLocal<ObjectMapper> _mapper = new ThreadLocal<ObjectMapper>() {
-             @Override
-             protected ObjectMapper initialValue() {
-                return new ObjectMapper();
-             }
-    };
-    public Put toPut(String columnFamily, ThreatIntelKey key, Map<String, String> value, Long lastSeenTimestamp) throws IOException {
-        Put put = new Put(key.toBytes());
-        byte[] cf = Bytes.toBytes(columnFamily);
-        put.add(cf,VALUE_COLUMN_NAME_B, Bytes.toBytes(valueToString(value)));
-        put.add(cf, LAST_SEEN_COLUMN_NAME_B, Bytes.toBytes(lastSeenTimestamp));
-        return put;
-    }
-
-    public Map.Entry<ThreatIntelResults, Long> fromPut(Put put, String columnFamily) throws IOException {
-        ThreatIntelKey key = ThreatIntelKey.fromBytes(put.getRow());
-        Map<String, String> value = null;
-        Long lastSeen = null;
-        byte[] cf = Bytes.toBytes(columnFamily);
-        List<Cell> cells = put.getFamilyCellMap().get(cf);
-        for(Cell cell : cells) {
-            if(Bytes.equals(cell.getQualifier(), VALUE_COLUMN_NAME_B)) {
-                value = stringToValue(Bytes.toString(cell.getValue()));
-            }
-            else if(Bytes.equals(cell.getQualifier(), LAST_SEEN_COLUMN_NAME_B)) {
-               lastSeen = Bytes.toLong(cell.getValue());
-            }
-        }
-        return new AbstractMap.SimpleEntry<>(new ThreatIntelResults(key, value), lastSeen);
-    }
-
-    public Result toResult(String columnFamily, ThreatIntelKey key, Map<String, String> value, Long lastSeenTimestamp) throws IOException {
-        Put put = toPut(columnFamily, key, value, lastSeenTimestamp);
-        return Result.create(put.getFamilyCellMap().get(Bytes.toBytes(columnFamily)));
-    }
-
-    public Map.Entry<ThreatIntelResults, Long> fromResult(Result result, String columnFamily) throws IOException {
-        ThreatIntelKey key = ThreatIntelKey.fromBytes(result.getRow());
-        byte[] cf = Bytes.toBytes(columnFamily);
-        NavigableMap<byte[], byte[]> cols = result.getFamilyMap(cf);
-        Map<String, String> value = stringToValue(Bytes.toString(cols.get(VALUE_COLUMN_NAME_B)));
-        ThreatIntelResults results = new ThreatIntelResults(key, value);
-        return new AbstractMap.SimpleEntry<>(results, Bytes.toLong(cols.get(LAST_SEEN_COLUMN_NAME_B)));
-    }
-
-    public Get toGet(String columnFamily, ThreatIntelKey key) {
-        Get ret = new Get(key.toBytes());
-        ret.addFamily(Bytes.toBytes(columnFamily));
-        return ret;
-    }
-
-    public Map<String, String> stringToValue(String s) throws IOException {
-        return _mapper.get().readValue(s, new TypeReference<Map<String, String>>(){});
-    }
-    public String valueToString(Map<String, String> value) throws IOException {
-        return _mapper.get().writeValueAsString(value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/ThreatIntelLookup.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/ThreatIntelLookup.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/ThreatIntelLookup.java
index c7997fe..13efc42 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/ThreatIntelLookup.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/ThreatIntelLookup.java
@@ -17,36 +17,36 @@
  */
 package org.apache.metron.threatintel.hbase;
 
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.hbase.converters.HbaseConverter;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelConverter;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
 import org.apache.metron.reference.lookup.Lookup;
-import org.apache.metron.reference.lookup.LookupKey;
+import org.apache.metron.reference.lookup.LookupKV;
 import org.apache.metron.reference.lookup.accesstracker.AccessTracker;
-import org.apache.metron.reference.lookup.handler.Handler;
-import org.apache.metron.threatintel.ThreatIntelKey;
-import org.apache.metron.threatintel.ThreatIntelResults;
 
-import java.io.Closeable;
 import java.io.IOException;
-import java.util.Map;
 
-public class ThreatIntelLookup extends Lookup<HTableInterface, ThreatIntelKey, Map.Entry<ThreatIntelResults, Long>> implements AutoCloseable {
 
+public class ThreatIntelLookup extends Lookup<HTableInterface, ThreatIntelKey, LookupKV<ThreatIntelKey,ThreatIntelValue>> implements AutoCloseable {
 
 
-    public static class Handler implements org.apache.metron.reference.lookup.handler.Handler<HTableInterface, ThreatIntelKey, Map.Entry<ThreatIntelResults, Long>> {
+
+    public static class Handler implements org.apache.metron.reference.lookup.handler.Handler<HTableInterface,ThreatIntelKey,LookupKV<ThreatIntelKey,ThreatIntelValue>> {
         String columnFamily;
+        HbaseConverter<ThreatIntelKey, ThreatIntelValue> converter = new ThreatIntelConverter();
         public Handler(String columnFamily) {
             this.columnFamily = columnFamily;
         }
         @Override
         public boolean exists(ThreatIntelKey key, HTableInterface table, boolean logAccess) throws IOException {
-            return table.exists(Converter.INSTANCE.toGet(columnFamily, key));
+            return table.exists(converter.toGet(columnFamily, key));
         }
 
         @Override
-        public Map.Entry<ThreatIntelResults,Long> get(ThreatIntelKey key, HTableInterface table, boolean logAccess) throws IOException {
-            return Converter.INSTANCE.fromResult(table.get(Converter.INSTANCE.toGet(columnFamily, key)), columnFamily);
+        public LookupKV<ThreatIntelKey, ThreatIntelValue> get(ThreatIntelKey key, HTableInterface table, boolean logAccess) throws IOException {
+            return converter.fromResult(table.get(converter.toGet(columnFamily, key)), columnFamily);
         }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/pom.xml b/metron-streaming/Metron-DataLoads/pom.xml
index d3eef64..5cac558 100644
--- a/metron-streaming/Metron-DataLoads/pom.xml
+++ b/metron-streaming/Metron-DataLoads/pom.xml
@@ -13,232 +13,270 @@
   -->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.metron</groupId>
-    <artifactId>Metron-Streaming</artifactId>
-    <version>0.1BETA</version>
-  </parent>
-  <artifactId>Metron-DataLoads</artifactId>
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-  </properties>
-  <dependencies>
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-      <version>${global_hbase_guava_version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.mitre</groupId>
-      <artifactId>stix</artifactId>
-      <version>1.2.0.2</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.metron</groupId>
-      <artifactId>Metron-Common</artifactId>
-      <version>${project.parent.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>com.googlecode.disruptor</groupId>
-          <artifactId>disruptor</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <!--dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
-            <version>${global_storm_version}</version>
-            <scope>provided</scope>
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>Metron-Streaming</artifactId>
+        <version>0.1BETA</version>
+    </parent>
+    <artifactId>Metron-DataLoads</artifactId>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+        <httpcore.version>4.3.2</httpcore.version>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${global_hbase_guava_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.xml.bind</groupId>
+            <artifactId>jaxb-api</artifactId>
+            <version>2.2.11</version>
+        </dependency>
+        <dependency>
+            <groupId>net.sf.saxon</groupId>
+            <artifactId>Saxon-HE</artifactId>
+            <version>9.5.1-5</version>
             <exclusions>
                 <exclusion>
-                    <artifactId>servlet-api</artifactId>
-                    <groupId>javax.servlet</groupId>
+                    <groupId>javax.xml.bind</groupId>
+                    <artifactId>jaxb-api</artifactId>
                 </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.xml.bind</groupId>
+            <artifactId>jaxb-impl</artifactId>
+            <version>2.2.5-2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mitre</groupId>
+            <artifactId>stix</artifactId>
+            <version>1.2.0.2</version>
+            <exclusions>
                 <exclusion>
-                    <artifactId>log4j-over-slf4j</artifactId>
-                    <groupId>org.slf4j</groupId>
+                    <groupId>javax.xml.bind</groupId>
+                    <artifactId>jaxb-api</artifactId>
                 </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>Metron-Common</artifactId>
+            <version>${project.parent.version}</version>
+            <exclusions>
                 <exclusion>
                     <groupId>com.googlecode.disruptor</groupId>
                     <artifactId>disruptor</artifactId>
                 </exclusion>
             </exclusions>
-        </dependency-->
+        </dependency>
         <dependency>
-          <groupId>org.apache.hbase</groupId>
-          <artifactId>hbase-client</artifactId>
-          <version>${global_hbase_version}</version>
-          <scope>provided</scope>
-          <exclusions>
-            <exclusion>
-              <artifactId>log4j</artifactId>
-              <groupId>log4j</groupId>
-            </exclusion>
-          </exclusions>
+            <groupId>org.mitre.taxii</groupId>
+            <artifactId>taxii</artifactId>
+            <version>1.1.0.1</version>
+            <!--scope>provided</scope-->
         </dependency>
         <dependency>
-          <groupId>org.apache.hbase</groupId>
-          <artifactId>hbase-server</artifactId>
-          <version>${global_hbase_version}</version>
-          <scope>provided</scope>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-common</artifactId>
+            <version>${global_hbase_version}</version>
+            <scope>provided</scope>
         </dependency>
         <dependency>
-          <groupId>com.opencsv</groupId>
-          <artifactId>opencsv</artifactId>
-          <version>${global_opencsv_version}</version>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>${global_hbase_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>log4j</artifactId>
+                    <groupId>log4j</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
-
         <dependency>
-          <groupId>org.apache.hbase</groupId>
-          <artifactId>hbase-testing-util</artifactId>
-          <version>${global_hbase_version}</version>
-          <scope>test</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>org.slf4j</groupId>
-              <artifactId>slf4j-log4j12</artifactId>
-            </exclusion>
-          </exclusions>
-        </dependency>
-        <!--dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <version>${global_guava_version}</version>
-        </dependency-->
-
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-server</artifactId>
+            <version>${global_hbase_version}</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
-          <groupId>org.slf4j</groupId>
-          <artifactId>log4j-over-slf4j</artifactId>
-          <version>${global_slf4j_version}</version>
-          <scope>test</scope>
+            <groupId>com.opencsv</groupId>
+            <artifactId>opencsv</artifactId>
+            <version>${global_opencsv_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore</artifactId>
+            <version>${httpcore.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>${httpcore.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-testing-util</artifactId>
+            <version>${global_hbase_version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-client</artifactId>
+            <version>1.19</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+            <version>${global_slf4j_version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>Metron-Testing</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
         </dependency>
 
-
-      </dependencies>
-      <build>
+    </dependencies>
+    <build>
         <resources>
-          <resource>
-            <directory>src</directory>
-            <excludes>
-              <exclude>**/*.java</exclude>
-            </excludes>
-          </resource>
+            <resource>
+                <directory>src</directory>
+                <excludes>
+                    <exclude>**/*.java</exclude>
+                </excludes>
+            </resource>
         </resources>
         <plugins>
-          <plugin>
-            <!-- Separates the unit tests from the integration tests. -->
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-surefire-plugin</artifactId>
-            <version>2.12.4</version>
-            <configuration>
-              <!-- Skip the default running of this plug-in (or everything is run twice...see below) -->
-              <argLine>-Xmx2048m -XX:MaxPermSize=256m</argLine>
-              <skip>true</skip>
-              <!-- Show 100% of the lines from the stack trace (doesn't work) -->
-              <trimStackTrace>false</trimStackTrace>
-
-            </configuration>
-            <executions>
-              <execution>
-                <id>unit-tests</id>
-                <phase>test</phase>
-                <goals>
-                  <goal>test</goal>
-                </goals>
+            <plugin>
+                <!-- Separates the unit tests from the integration tests. -->
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.12.4</version>
                 <configuration>
-                  <!-- Never skip running the tests when the test phase is invoked -->
-                  <skip>false</skip>
-                  <includes>
-                    <!-- Include unit tests within integration-test phase. -->
-                    <include>**/*Test.java</include>
-                  </includes>
-                  <excludes>
-                    <!-- Exclude integration tests within (unit) test phase. -->
-                    <exclude>**/*IntegrationTest.java</exclude>
-                  </excludes>
+                    <!-- Skip the default running of this plug-in (or everything is run twice...see below) -->
+                    <argLine>-Xmx2048m -XX:MaxPermSize=256m</argLine>
+                    <skip>true</skip>
+                    <!-- Show 100% of the lines from the stack trace (doesn't work) -->
+                    <trimStackTrace>false</trimStackTrace>
+
                 </configuration>
-              </execution>
-              <execution>
-                <id>integration-tests</id>
-                <phase>integration-test</phase>
-                <goals>
-                  <goal>test</goal>
-                </goals>
+                <executions>
+                    <execution>
+                        <id>unit-tests</id>
+                        <phase>test</phase>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                        <configuration>
+                            <!-- Never skip running the tests when the test phase is invoked -->
+                            <skip>false</skip>
+                            <includes>
+                                <!-- Include unit tests within integration-test phase. -->
+                                <include>**/*Test.java</include>
+                            </includes>
+                            <excludes>
+                                <!-- Exclude integration tests within (unit) test phase. -->
+                                <exclude>**/*IntegrationTest.java</exclude>
+                            </excludes>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>integration-tests</id>
+                        <phase>integration-test</phase>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                        <configuration>
+                            <!-- Never skip running the tests when the integration-test phase is invoked -->
+                            <skip>false</skip>
+                            <includes>
+                                <!-- Include integration tests within integration-test phase. -->
+                                <include>**/*IntegrationTest.java</include>
+                            </includes>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
                 <configuration>
-                  <!-- Never skip running the tests when the integration-test phase is invoked -->
-                  <skip>false</skip>
-                  <includes>
-                    <!-- Include integration tests within integration-test phase. -->
-                    <include>**/*IntegrationTest.java</include>
-                  </includes>
+                    <source>1.7</source>
+                    <target>1.7</target>
                 </configuration>
-              </execution>
-            </executions>
-          </plugin>
-
-          <plugin>
-            <artifactId>maven-compiler-plugin</artifactId>
-            <version>3.1</version>
-            <configuration>
-              <source>1.7</source>
-              <target>1.7</target>
-            </configuration>
-          </plugin>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-shade-plugin</artifactId>
-            <version>2.3</version>
-            <executions>
-              <execution>
-                <phase>package</phase>
-                <goals>
-                  <goal>shade</goal>
-                </goals>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>2.3</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <relocations>
+                                <relocation>
+                                    <pattern>com.google.common</pattern>
+                                    <shadedPattern>org.apache.metron.guava.dataload</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.http</pattern>
+                                    <shadedPattern>org.apache.metron.httpcore.dataload</shadedPattern>
+                                </relocation>
+                            </relocations>
+                            <!--minimizeJar>true</minimizeJar-->
+                            <artifactSet>
+                                <excludes>
+                                    <exclude>classworlds:classworlds</exclude>
+                                    <exclude>junit:junit</exclude>
+                                    <exclude>jmock:*</exclude>
+                                    <exclude>*:xml-apis</exclude>
+                                    <exclude>*slf4j*</exclude>
+                                    <exclude>org.apache.maven:lib:tests</exclude>
+                                    <exclude>log4j:log4j:jar:</exclude>
+                                    <exclude>*:hbase:*</exclude>
+                                    <exclude>org.apache.hadoop.yarn.util.package-info*</exclude>
+                                </excludes>
+                            </artifactSet>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
                 <configuration>
-                  <relocations>
-                    <relocation>
-                      <pattern>com.google.common</pattern>
-                      <shadedPattern>org.apache.metron.guava.dataload</shadedPattern>
-                    </relocation>
-                  </relocations>
-                  <minimizeJar>true</minimizeJar>
-                  <artifactSet>
-                    <excludes>
-                      <exclude>classworlds:classworlds</exclude>
-                      <exclude>junit:junit</exclude>
-                      <exclude>jmock:*</exclude>
-                      <exclude>*:xml-apis</exclude>
-                      <exclude>*slf4j*</exclude>
-                      <exclude>org.apache.maven:lib:tests</exclude>
-                      <exclude>log4j:log4j:jar:</exclude>
-                      <exclude>*:hbase:*</exclude>
-                      <exclude>org.apache.hadoop.yarn.util.package-info*</exclude>
-                    </excludes>
-                  </artifactSet>
+                    <descriptor>src/main/assembly/assembly.xml</descriptor>
                 </configuration>
-              </execution>
-            </executions>
-          </plugin>
-          <plugin>
-            <artifactId>maven-assembly-plugin</artifactId>
-            <configuration>
-              <descriptor>src/main/assembly/assembly.xml</descriptor>
-            </configuration>
-            <executions>
-              <execution>
-                <id>make-assembly</id> <!-- this is used for inheritance merges -->
-                <phase>package</phase> <!-- bind to the packaging phase -->
-                <goals>
-                  <goal>single</goal>
-                </goals>
-              </execution>
-            </executions>
-          </plugin>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
+                        <phase>package</phase> <!-- bind to the packaging phase -->
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
 
         </plugins>
-      </build>
+    </build>
 
-    </project>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_load.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_load.sh b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_load.sh
index 0b7a83c..32b07f3 100755
--- a/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_load.sh
+++ b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_load.sh
@@ -35,4 +35,4 @@ for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
   fi
 done
 export HADOOP_CLASSPATH
-hadoop jar /usr/metron/0.1BETA/lib/Metron-DataLoads-0.1BETA.jar org.apache.metron.dataloads.ThreatIntelBulkLoader -libjars ${LIBJARS} "$@"
+hadoop jar /usr/metron/0.1BETA/lib/Metron-DataLoads-0.1BETA.jar org.apache.metron.dataloads.bulk.ThreatIntelBulkLoader -libjars ${LIBJARS} "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_prune.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_prune.sh b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_prune.sh
index ee230c2..88cd946 100755
--- a/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_prune.sh
+++ b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_prune.sh
@@ -34,4 +34,4 @@ for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
   fi
 done
 export HADOOP_CLASSPATH
-hadoop jar /usr/metron/0.1BETA/lib/Metron-DataLoads-0.1BETA.jar org.apache.metron.dataloads.LeastRecentlyUsedPruner -libjars ${LIBJARS} "$@"
+hadoop jar /usr/metron/0.1BETA/lib/Metron-DataLoads-0.1BETA.jar org.apache.metron.dataloads.bulk.LeastRecentlyUsedPruner -libjars ${LIBJARS} "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_taxii_load.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_taxii_load.sh b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_taxii_load.sh
new file mode 100755
index 0000000..77011fb
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_taxii_load.sh
@@ -0,0 +1,39 @@
+#!/bin/bash
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#     http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# 
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+  . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+  . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+
+export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
+CP=/usr/metron/0.1BETA/lib/Metron-DataLoads-0.1BETA.jar:/usr/metron/0.1BETA/lib/taxii-1.1.0.1.jar:`${HBASE_HOME}/bin/hbase classpath`
+HADOOP_CLASSPATH=$(echo $CP )
+for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
+  if [ -f $jar ];then
+    LIBJARS="$jar,$LIBJARS"
+  fi
+done
+export HADOOP_CLASSPATH
+hadoop jar /usr/metron/0.1BETA/lib/Metron-DataLoads-0.1BETA.jar org.apache.metron.dataloads.taxii.TaxiiLoader "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/LeastRecentlyUsedPruner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/LeastRecentlyUsedPruner.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/LeastRecentlyUsedPruner.java
deleted file mode 100644
index 4ee6e2f..0000000
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/LeastRecentlyUsedPruner.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.dataloads;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import org.apache.commons.cli.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.metron.dataloads.hbase.mr.PrunerMapper;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-public class LeastRecentlyUsedPruner {
-    private static abstract class OptionHandler implements Function<String, Option> {}
-    private enum BulkLoadOptions {
-        HELP("h", new OptionHandler() {
-
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                return new Option(s, "help", false, "Generate Help screen");
-            }
-        }), TABLE("t", new OptionHandler() {
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                Option o = new Option(s, "table", true, "HBase table to prune");
-                o.setRequired(true);
-                o.setArgName("HBASE_TABLE");
-                return o;
-            }
-        }), COLUMN_FAMILY("f", new OptionHandler() {
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                Option o = new Option(s, "column_family", true, "Column family of the HBase table to prune");
-                o.setRequired(false);
-                o.setArgName("CF_NAME");
-                return o;
-            }
-        })
-        ,AS_OF_TIME("a", new OptionHandler() {
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                Option o = new Option(s, "as_of", true, "The earliest access tracker you want to use.");
-                o.setArgName("datetime");
-                o.setRequired(true);
-                return o;
-            }
-        })
-        ,AS_OF_TIME_FORMAT("t", new OptionHandler() {
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                String defaultFormat = new SimpleDateFormat().toLocalizedPattern();
-                Option o = new Option(s, "as_of_format", true, "The format of the as_of time (only used in conjunction with the as_of option) (Default is: " + defaultFormat + ")");
-                o.setArgName("format");
-                o.setRequired(false);
-                return o;
-            }
-        })
-        ,ACCESS_TABLE("u", new OptionHandler() {
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                Option o = new Option(s, "access_table", true, "HBase table containing the access trackers.");
-                o.setRequired(true);
-                o.setArgName("HBASE_TABLE");
-                return o;
-            }
-        }), ACCESS_COLUMN_FAMILY("z", new OptionHandler() {
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                Option o = new Option(s, "access_column_family", true, "Column family of the HBase table containing the access trackers");
-                o.setRequired(true);
-                o.setArgName("CF_NAME");
-                return o;
-            }
-        });
-        Option option;
-        String shortCode;
-        BulkLoadOptions(String shortCode, OptionHandler optionHandler) {
-            this.shortCode = shortCode;
-            this.option = optionHandler.apply(shortCode);
-        }
-
-        public boolean has(CommandLine cli) {
-            return cli.hasOption(shortCode);
-        }
-
-        public String get(CommandLine cli) {
-            return cli.getOptionValue(shortCode);
-        }
-        private static long getTimestamp(CommandLine cli) throws java.text.ParseException {
-            Date d = getFormat(cli).parse(BulkLoadOptions.AS_OF_TIME.get(cli));
-            return d.getTime();
-        }
-
-        private static DateFormat getFormat(CommandLine cli) {
-            DateFormat format = new SimpleDateFormat();
-            if (BulkLoadOptions.AS_OF_TIME_FORMAT.has(cli)) {
-                 format = new SimpleDateFormat(BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli));
-            }
-            return format;
-        }
-
-        public static CommandLine parse(CommandLineParser parser, String[] args) {
-            try {
-                CommandLine cli = parser.parse(getOptions(), args);
-                if(BulkLoadOptions.HELP.has(cli)) {
-                    printHelp();
-                    System.exit(0);
-                }
-                return cli;
-            } catch (ParseException e) {
-                System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
-                e.printStackTrace(System.err);
-                printHelp();
-                System.exit(-1);
-                return null;
-            }
-        }
-
-        public static void printHelp() {
-            HelpFormatter formatter = new HelpFormatter();
-            formatter.printHelp( "ThreatIntelBulkLoader", getOptions());
-        }
-
-        public static Options getOptions() {
-            Options ret = new Options();
-            for(BulkLoadOptions o : BulkLoadOptions.values()) {
-               ret.addOption(o.option);
-            }
-            return ret;
-        }
-    }
-
-    public static void setupHBaseJob(Job job, String sourceTable, String cf) throws IOException {
-        Scan scan = new Scan();
-        if(cf != null) {
-            scan.addFamily(Bytes.toBytes(cf));
-        }
-        scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
-        scan.setCacheBlocks(false);  // don't set to true for MR jobs
-// set other scan attrs
-
-        TableMapReduceUtil.initTableMapperJob(
-                sourceTable,      // input table
-                scan,	          // Scan instance to control CF and attribute selection
-                PrunerMapper.class,   // mapper class
-                null,	          // mapper output key
-                null,	          // mapper output value
-                job);
-        TableMapReduceUtil.initTableReducerJob(
-                sourceTable,      // output table
-                null,             // reducer class
-                job);
-    }
-
-    public static Job createJob( Configuration conf
-                               , String table
-                               , String cf
-                               , String accessTrackerTable
-                               , String accessTrackerColumnFamily
-                               , Long ts
-                               ) throws IOException
-    {
-        Job job = new Job(conf);
-        job.setJobName("LeastRecentlyUsedPruner: Pruning " +  table + ":" + cf + " since " + new SimpleDateFormat().format(new Date(ts)));
-        System.out.println("Configuring " + job.getJobName());
-        job.setJarByClass(LeastRecentlyUsedPruner.class);
-        job.getConfiguration().setLong(PrunerMapper.TIMESTAMP_CONF, ts);
-        job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_NAME_CONF, table);
-        job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_CF_CONF, accessTrackerColumnFamily);
-        job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_TABLE_CONF, accessTrackerTable);
-        setupHBaseJob(job, table, cf);
-        job.setNumReduceTasks(0);
-        return job;
-    }
-
-    public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException {
-        Configuration conf = HBaseConfiguration.create();
-        String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
-
-        CommandLine cli = BulkLoadOptions.parse(new PosixParser(), otherArgs);
-        Long ts = BulkLoadOptions.getTimestamp(cli);
-        String table = BulkLoadOptions.TABLE.get(cli);
-        String cf = BulkLoadOptions.COLUMN_FAMILY.get(cli);
-        String accessTrackerTable = BulkLoadOptions.ACCESS_TABLE.get(cli);
-        String accessTrackerCF = BulkLoadOptions.ACCESS_COLUMN_FAMILY.get(cli);
-        Job job = createJob(conf, table, cf, accessTrackerTable, accessTrackerCF, ts);
-        System.exit(job.waitForCompletion(true) ? 0 : 1);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/ThreatIntelBulkLoader.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/ThreatIntelBulkLoader.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/ThreatIntelBulkLoader.java
deleted file mode 100644
index 246924c..0000000
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/ThreatIntelBulkLoader.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.dataloads;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.io.Files;
-import org.apache.commons.cli.*;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.metron.dataloads.extractor.ExtractorHandler;
-import org.apache.metron.dataloads.hbase.mr.BulkLoadMapper;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.text.*;
-import java.util.Date;
-
-public class ThreatIntelBulkLoader  {
-    private static abstract class OptionHandler implements Function<String, Option> {}
-    private enum BulkLoadOptions {
-        HELP("h", new OptionHandler() {
-
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                return new Option(s, "help", false, "Generate Help screen");
-            }
-        })
-        ,TABLE("t", new OptionHandler() {
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                Option o = new Option(s, "table", true, "HBase table to import data into");
-                o.setRequired(true);
-                o.setArgName("HBASE_TABLE");
-                return o;
-            }
-        })
-        ,COLUMN_FAMILY("f", new OptionHandler() {
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                Option o = new Option(s, "column_family", true, "Column family of the HBase table to import into");
-                o.setRequired(true);
-                o.setArgName("CF_NAME");
-                return o;
-            }
-        })
-        ,EXTRACTOR_CONFIG("e", new OptionHandler() {
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
-                o.setArgName("JSON_FILE");
-                o.setRequired(true);
-                return o;
-            }
-        })
-        ,INPUT_DATA("i", new OptionHandler() {
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                Option o = new Option(s, "input", true, "Input directory in HDFS for the data to import into HBase");
-                o.setArgName("DIR");
-                o.setRequired(true);
-                return o;
-            }
-        })
-        ,AS_OF_TIME("a", new OptionHandler() {
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                Option o = new Option(s, "as_of", true, "The last read timestamp to mark the records with (omit for time of execution)");
-                o.setArgName("datetime");
-                o.setRequired(false);
-                return o;
-            }
-        })
-        ,AS_OF_TIME_FORMAT("t", new OptionHandler() {
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                Option o = new Option(s, "as_of_format", true, "The format of the as_of time (only used in conjunction with the as_of option)");
-                o.setArgName("format");
-                o.setRequired(false);
-                return o;
-            }
-        })
-        ;
-        Option option;
-        String shortCode;
-        BulkLoadOptions(String shortCode, OptionHandler optionHandler) {
-            this.shortCode = shortCode;
-            this.option = optionHandler.apply(shortCode);
-        }
-
-        public boolean has(CommandLine cli) {
-            return cli.hasOption(shortCode);
-        }
-
-        public String get(CommandLine cli) {
-            return cli.getOptionValue(shortCode);
-        }
-
-        public static CommandLine parse(CommandLineParser parser, String[] args) {
-            try {
-                CommandLine cli = parser.parse(getOptions(), args);
-                if(ThreatIntelBulkLoader.BulkLoadOptions.HELP.has(cli)) {
-                    printHelp();
-                    System.exit(0);
-                }
-                return cli;
-            } catch (ParseException e) {
-                System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
-                e.printStackTrace(System.err);
-                printHelp();
-                System.exit(-1);
-                return null;
-            }
-        }
-
-        public static void printHelp() {
-            HelpFormatter formatter = new HelpFormatter();
-            formatter.printHelp( "ThreatIntelBulkLoader", getOptions());
-        }
-
-        public static Options getOptions() {
-            Options ret = new Options();
-            for(BulkLoadOptions o : BulkLoadOptions.values()) {
-               ret.addOption(o.option);
-            }
-            return ret;
-        }
-    }
-    private static long getTimestamp(CommandLine cli) throws java.text.ParseException {
-        if(BulkLoadOptions.AS_OF_TIME.has(cli)) {
-            if(!BulkLoadOptions.AS_OF_TIME_FORMAT.has(cli)) {
-                throw new IllegalStateException("Unable to proceed: Specified as_of_time without an associated format.");
-            }
-            else {
-                DateFormat format = new SimpleDateFormat(BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli));
-                Date d = format.parse(BulkLoadOptions.AS_OF_TIME.get(cli));
-                return d.getTime();
-            }
-        }
-        else {
-            return System.currentTimeMillis();
-        }
-    }
-    private static String readExtractorConfig(File configFile) throws IOException {
-        return Joiner.on("\n").join(Files.readLines(configFile, Charset.defaultCharset()));
-    }
-
-    public static Job createJob(Configuration conf, String input, String table, String cf, String extractorConfigContents, long ts) throws IOException {
-        Job job = new Job(conf);
-        job.setJobName("ThreatIntelBulkLoader: " + input + " => " +  table + ":" + cf);
-        System.out.println("Configuring " + job.getJobName());
-        job.setJarByClass(ThreatIntelBulkLoader.class);
-        job.setMapperClass(org.apache.metron.dataloads.hbase.mr.BulkLoadMapper.class);
-        job.setOutputFormatClass(TableOutputFormat.class);
-        job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
-        job.getConfiguration().set(BulkLoadMapper.COLUMN_FAMILY_KEY, cf);
-        job.getConfiguration().set(BulkLoadMapper.CONFIG_KEY, extractorConfigContents);
-        job.getConfiguration().set(BulkLoadMapper.LAST_SEEN_KEY, "" + ts);
-        job.setOutputKeyClass(ImmutableBytesWritable.class);
-        job.setOutputValueClass(Put.class);
-        job.setNumReduceTasks(0);
-        ExtractorHandler handler = ExtractorHandler.load(extractorConfigContents);
-        handler.getInputFormatHandler().set(job, new Path(input), handler.getConfig());
-        return job;
-    }
-
-    public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException {
-        Configuration conf = HBaseConfiguration.create();
-        String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
-
-        CommandLine cli = BulkLoadOptions.parse(new PosixParser(), otherArgs);
-        Long ts = getTimestamp(cli);
-        String input = BulkLoadOptions.INPUT_DATA.get(cli);
-        String table = BulkLoadOptions.TABLE.get(cli);
-        String cf = BulkLoadOptions.COLUMN_FAMILY.get(cli);
-        String extractorConfigContents = readExtractorConfig(new File(BulkLoadOptions.EXTRACTOR_CONFIG.get(cli)));
-        Job job = createJob(conf, input, table, cf, extractorConfigContents, ts);
-        System.out.println(conf);
-        System.exit(job.waitForCompletion(true) ? 0 : 1);
-    }
-}