You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/02/16 18:10:20 UTC

[4/5] incubator-metron git commit: METRON-35 Implement threat intelligence message enrichment closes apache/incubator-metron#22

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelKey.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelKey.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelKey.java
new file mode 100644
index 0000000..b13e780
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelKey.java
@@ -0,0 +1,75 @@
+package org.apache.metron.threatintel;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.reference.lookup.LookupKey;
+
+/**
+ * Created by cstella on 2/2/16.
+ */
+public class ThreatIntelKey implements LookupKey{
+    private static final int SEED = 0xDEADBEEF;
+    private static final int HASH_PREFIX_SIZE=16;
+    ThreadLocal<HashFunction> hFunction= new ThreadLocal<HashFunction>() {
+        @Override
+        protected HashFunction initialValue() {
+            return Hashing.murmur3_128(SEED);
+        }
+    };
+    public ThreatIntelKey() {
+
+    }
+    public ThreatIntelKey(String indicator) {
+        this.indicator = indicator;
+    }
+
+    public String indicator;
+
+    @Override
+    public byte[] toBytes() {
+        byte[] indicatorBytes = Bytes.toBytes(indicator);
+        Hasher hasher = hFunction.get().newHasher();
+        hasher.putBytes(Bytes.toBytes(indicator));
+        byte[] prefix = hasher.hash().asBytes();
+        byte[] val = new byte[indicatorBytes.length + prefix.length];
+        int pos = 0;
+        for(int i = 0;pos < prefix.length;++pos,++i) {
+            val[pos] = prefix[i];
+        }
+        for(int i = 0;i < indicatorBytes.length;++pos,++i) {
+            val[pos] = indicatorBytes[i];
+        }
+        return val;
+    }
+
+    public static ThreatIntelKey fromBytes(byte[] row) {
+        ThreatIntelKey key = new ThreatIntelKey();
+        key.indicator = Bytes.toString(row, HASH_PREFIX_SIZE, row.length - HASH_PREFIX_SIZE);
+        return key;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ThreatIntelKey that = (ThreatIntelKey) o;
+
+        return indicator != null ? indicator.equals(that.indicator) : that.indicator == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        return indicator != null ? indicator.hashCode() : 0;
+    }
+
+    @Override
+    public String toString() {
+        return "ThreatIntelKey{" +
+                "indicator='" + indicator + '\'' +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelResults.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelResults.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelResults.java
new file mode 100644
index 0000000..bfe20b2
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelResults.java
@@ -0,0 +1,55 @@
+package org.apache.metron.threatintel;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/3/16.
+ */
+public class ThreatIntelResults {
+    private ThreatIntelKey key;
+    private Map<String, String> value;
+    public ThreatIntelResults() {
+        key = new ThreatIntelKey();
+        value = new HashMap<>();
+    }
+    public ThreatIntelResults(ThreatIntelKey key, Map<String, String> value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    public ThreatIntelKey getKey() {
+        return key;
+    }
+
+    public Map<String, String> getValue() {
+        return value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ThreatIntelResults that = (ThreatIntelResults) o;
+
+        if (getKey() != null ? !getKey().equals(that.getKey()) : that.getKey() != null) return false;
+        return getValue() != null ? getValue().equals(that.getValue()) : that.getValue() == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = getKey() != null ? getKey().hashCode() : 0;
+        result = 31 * result + (getValue() != null ? getValue().hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "ThreatIntelResults{" +
+                "key=" + key +
+                ", value=" + value +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/Converter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/Converter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/Converter.java
new file mode 100644
index 0000000..a3d94e9
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/Converter.java
@@ -0,0 +1,82 @@
+package org.apache.metron.threatintel.hbase;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Created by cstella on 2/2/16.
+ */
+public enum Converter {
+    INSTANCE;
+    public static final String VALUE_COLUMN_NAME = "v";
+    public static final byte[] VALUE_COLUMN_NAME_B = Bytes.toBytes(VALUE_COLUMN_NAME);
+    public static final String LAST_SEEN_COLUMN_NAME = "t";
+    public static final byte[] LAST_SEEN_COLUMN_NAME_B = Bytes.toBytes(LAST_SEEN_COLUMN_NAME);
+    private static final ThreadLocal<ObjectMapper> _mapper = new ThreadLocal<ObjectMapper>() {
+             @Override
+             protected ObjectMapper initialValue() {
+                return new ObjectMapper();
+             }
+    };
+    public Put toPut(String columnFamily, ThreatIntelKey key, Map<String, String> value, Long lastSeenTimestamp) throws IOException {
+        Put put = new Put(key.toBytes());
+        byte[] cf = Bytes.toBytes(columnFamily);
+        put.add(cf,VALUE_COLUMN_NAME_B, Bytes.toBytes(valueToString(value)));
+        put.add(cf, LAST_SEEN_COLUMN_NAME_B, Bytes.toBytes(lastSeenTimestamp));
+        return put;
+    }
+
+    public Map.Entry<ThreatIntelResults, Long> fromPut(Put put, String columnFamily) throws IOException {
+        ThreatIntelKey key = ThreatIntelKey.fromBytes(put.getRow());
+        Map<String, String> value = null;
+        Long lastSeen = null;
+        byte[] cf = Bytes.toBytes(columnFamily);
+        List<Cell> cells = put.getFamilyCellMap().get(cf);
+        for(Cell cell : cells) {
+            if(Bytes.equals(cell.getQualifier(), VALUE_COLUMN_NAME_B)) {
+                value = stringToValue(Bytes.toString(cell.getValue()));
+            }
+            else if(Bytes.equals(cell.getQualifier(), LAST_SEEN_COLUMN_NAME_B)) {
+               lastSeen = Bytes.toLong(cell.getValue());
+            }
+        }
+        return new AbstractMap.SimpleEntry<>(new ThreatIntelResults(key, value), lastSeen);
+    }
+
+    public Result toResult(String columnFamily, ThreatIntelKey key, Map<String, String> value, Long lastSeenTimestamp) throws IOException {
+        Put put = toPut(columnFamily, key, value, lastSeenTimestamp);
+        return Result.create(put.getFamilyCellMap().get(Bytes.toBytes(columnFamily)));
+    }
+
+    public Map.Entry<ThreatIntelResults, Long> fromResult(Result result, String columnFamily) throws IOException {
+        ThreatIntelKey key = ThreatIntelKey.fromBytes(result.getRow());
+        byte[] cf = Bytes.toBytes(columnFamily);
+        NavigableMap<byte[], byte[]> cols = result.getFamilyMap(cf);
+        Map<String, String> value = stringToValue(Bytes.toString(cols.get(VALUE_COLUMN_NAME_B)));
+        ThreatIntelResults results = new ThreatIntelResults(key, value);
+        return new AbstractMap.SimpleEntry<>(results, Bytes.toLong(cols.get(LAST_SEEN_COLUMN_NAME_B)));
+    }
+
+    public Get toGet(String columnFamily, ThreatIntelKey key) {
+        Get ret = new Get(key.toBytes());
+        ret.addFamily(Bytes.toBytes(columnFamily));
+        return ret;
+    }
+
+    public Map<String, String> stringToValue(String s) throws IOException {
+        return _mapper.get().readValue(s, new TypeReference<Map<String, String>>(){});
+    }
+    public String valueToString(Map<String, String> value) throws IOException {
+        return _mapper.get().writeValueAsString(value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/ThreatIntelLookup.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/ThreatIntelLookup.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/ThreatIntelLookup.java
new file mode 100644
index 0000000..db4d2fd
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/ThreatIntelLookup.java
@@ -0,0 +1,60 @@
+package org.apache.metron.threatintel.hbase;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.reference.lookup.Lookup;
+import org.apache.metron.reference.lookup.LookupKey;
+import org.apache.metron.reference.lookup.accesstracker.AccessTracker;
+import org.apache.metron.reference.lookup.handler.Handler;
+import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.threatintel.ThreatIntelResults;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public class ThreatIntelLookup extends Lookup<HTableInterface, ThreatIntelKey, Map.Entry<ThreatIntelResults, Long>> implements AutoCloseable {
+
+
+
+    public static class Handler implements org.apache.metron.reference.lookup.handler.Handler<HTableInterface, ThreatIntelKey, Map.Entry<ThreatIntelResults, Long>> {
+        String columnFamily;
+        public Handler(String columnFamily) {
+            this.columnFamily = columnFamily;
+        }
+        @Override
+        public boolean exists(ThreatIntelKey key, HTableInterface table, boolean logAccess) throws IOException {
+            return table.exists(Converter.INSTANCE.toGet(columnFamily, key));
+        }
+
+        @Override
+        public Map.Entry<ThreatIntelResults,Long> get(ThreatIntelKey key, HTableInterface table, boolean logAccess) throws IOException {
+            return Converter.INSTANCE.fromResult(table.get(Converter.INSTANCE.toGet(columnFamily, key)), columnFamily);
+        }
+
+
+        @Override
+        public void close() throws Exception {
+
+        }
+    }
+    private HTableInterface table;
+    public ThreatIntelLookup(HTableInterface table, String columnFamily, AccessTracker tracker) {
+        this.table = table;
+        this.setLookupHandler(new Handler(columnFamily));
+        this.setAccessTracker(tracker);
+    }
+
+    public HTableInterface getTable() {
+        return table;
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        table.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/pom.xml b/metron-streaming/Metron-DataLoads/pom.xml
index 1e0ea0f..c2a76fa 100644
--- a/metron-streaming/Metron-DataLoads/pom.xml
+++ b/metron-streaming/Metron-DataLoads/pom.xml
@@ -1,106 +1,241 @@
 <?xml version="1.0" encoding="UTF-8"?><!-- Licensed to the Apache Software 
-	Foundation (ASF) under one or more contributor license agreements. See the 
-	NOTICE file distributed with this work for additional information regarding 
-	copyright ownership. The ASF licenses this file to You under the Apache License, 
-	Version 2.0 (the "License"); you may not use this file except in compliance 
-	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
-	Unless required by applicable law or agreed to in writing, software distributed 
-	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
-	OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
-	the specific language governing permissions and limitations under the License. -->
+  Foundation (ASF) under one or more contributor license agreements. See the 
+  NOTICE file distributed with this work for additional information regarding 
+  copyright ownership. The ASF licenses this file to You under the Apache License, 
+  Version 2.0 (the "License"); you may not use this file except in compliance 
+  with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
+  Unless required by applicable law or agreed to in writing, software distributed 
+  under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+  OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+  the specific language governing permissions and limitations under the License. -->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	<parent>
-		<groupId>org.apache.metron</groupId>
-		<artifactId>Metron-Streaming</artifactId>
-		<version>0.6BETA</version>
-	</parent>
-	<artifactId>Metron-DataLoads</artifactId>
-	<properties>
-		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>	
-	</properties>
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.metron</groupId>
-			<artifactId>Metron-Common</artifactId>
-			<version>${project.parent.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.storm</groupId>
-			<artifactId>storm-core</artifactId>
-			<version>${global_storm_version}</version>
-			<scope>provided</scope>
-			<exclusions>
-				<exclusion>
-			  	 <artifactId>servlet-api</artifactId>
-			   	 <groupId>javax.servlet</groupId>
-			  </exclusion>
-			</exclusions>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hbase</groupId>
-			<artifactId>hbase-client</artifactId>
-			<version>${global_hbase_version}</version>
-			<exclusions>
-				<exclusion>
-					<artifactId>log4j</artifactId>
-					<groupId>log4j</groupId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-	</dependencies>
-	<build>
-		<sourceDirectory>src</sourceDirectory>
-		<resources>
-			<resource>
-				<directory>src</directory>
-				<excludes>
-					<exclude>**/*.java</exclude>
-				</excludes>
-			</resource>
-		</resources>
-		<plugins>
-			<plugin>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<version>3.1</version>
-				<configuration>
-					<source>1.7</source>
-					<target>1.7</target>
-				</configuration>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
-				<version>2.3</version>
-				<executions>
-					<execution>
-						<phase>package</phase>
-						<goals>
-							<goal>shade</goal>
-						</goals>
-						<configuration>
-						<minimizeJar>true</minimizeJar>
-							<artifactSet>
-								<excludes>
-									<exclude>classworlds:classworlds</exclude>
-									<exclude>junit:junit</exclude>
-									<exclude>jmock:*</exclude>
-									<exclude>*:xml-apis</exclude>
-									<exclude>*slf4j*</exclude>
-									<exclude>org.apache.maven:lib:tests</exclude>
-									<exclude>log4j:log4j:jar:</exclude>
-									<exclude>*:hbase:*</exclude>
-									<exclude>org.apache.hadoop.yarn.util.package-info*</exclude>
-								</excludes>
-							</artifactSet>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.metron</groupId>
+    <artifactId>Metron-Streaming</artifactId>
+    <version>0.6BETA</version>
+  </parent>
+  <artifactId>Metron-DataLoads</artifactId>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+  </properties>
+  <dependencies>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${global_hbase_guava_version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mitre</groupId>
+      <artifactId>stix</artifactId>
+      <version>1.2.0.2</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.metron</groupId>
+      <artifactId>Metron-Common</artifactId>
+      <version>${project.parent.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.googlecode.disruptor</groupId>
+          <artifactId>disruptor</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <!--dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${global_storm_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.googlecode.disruptor</groupId>
+                    <artifactId>disruptor</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency-->
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-client</artifactId>
+          <version>${global_hbase_version}</version>
+          <scope>provided</scope>
+          <exclusions>
+            <exclusion>
+              <artifactId>log4j</artifactId>
+              <groupId>log4j</groupId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-server</artifactId>
+          <version>${global_hbase_version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>com.opencsv</groupId>
+          <artifactId>opencsv</artifactId>
+          <version>${global_opencsv_version}</version>
+        </dependency>
 
-</project>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-testing-util</artifactId>
+          <version>${global_hbase_version}</version>
+          <scope>test</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>org.slf4j</groupId>
+              <artifactId>slf4j-log4j12</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+        <!--dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${global_guava_version}</version>
+        </dependency-->
+
+        <dependency>
+          <groupId>org.slf4j</groupId>
+          <artifactId>log4j-over-slf4j</artifactId>
+          <version>${global_slf4j_version}</version>
+          <scope>test</scope>
+        </dependency>
+
+
+      </dependencies>
+      <build>
+        <resources>
+          <resource>
+            <directory>src</directory>
+            <excludes>
+              <exclude>**/*.java</exclude>
+            </excludes>
+          </resource>
+        </resources>
+        <plugins>
+          <plugin>
+            <!-- Separates the unit tests from the integration tests. -->
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <version>2.12.4</version>
+            <configuration>
+              <!-- Skip the default running of this plug-in (or everything is run twice...see below) -->
+              <argLine>-Xmx2048m -XX:MaxPermSize=256m</argLine>
+              <skip>true</skip>
+              <!-- Show 100% of the lines from the stack trace (doesn't work) -->
+              <trimStackTrace>false</trimStackTrace>
+
+            </configuration>
+            <executions>
+              <execution>
+                <id>unit-tests</id>
+                <phase>test</phase>
+                <goals>
+                  <goal>test</goal>
+                </goals>
+                <configuration>
+                  <!-- Never skip running the tests when the test phase is invoked -->
+                  <skip>false</skip>
+                  <includes>
+                    <!-- Include unit tests within integration-test phase. -->
+                    <include>**/*Test.java</include>
+                  </includes>
+                  <excludes>
+                    <!-- Exclude integration tests within (unit) test phase. -->
+                    <exclude>**/*IntegrationTest.java</exclude>
+                  </excludes>
+                </configuration>
+              </execution>
+              <execution>
+                <id>integration-tests</id>
+                <phase>integration-test</phase>
+                <goals>
+                  <goal>test</goal>
+                </goals>
+                <configuration>
+                  <!-- Never skip running the tests when the integration-test phase is invoked -->
+                  <skip>false</skip>
+                  <includes>
+                    <!-- Include integration tests within integration-test phase. -->
+                    <include>**/*IntegrationTest.java</include>
+                  </includes>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+
+          <plugin>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <version>3.1</version>
+            <configuration>
+              <source>1.7</source>
+              <target>1.7</target>
+            </configuration>
+          </plugin>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-shade-plugin</artifactId>
+            <version>2.3</version>
+            <executions>
+              <execution>
+                <phase>package</phase>
+                <goals>
+                  <goal>shade</goal>
+                </goals>
+                <configuration>
+                  <relocations>
+                    <relocation>
+                      <pattern>com.google.common</pattern>
+                      <shadedPattern>org.apache.metron.guava.dataload</shadedPattern>
+                    </relocation>
+                  </relocations>
+                  <minimizeJar>true</minimizeJar>
+                  <artifactSet>
+                    <excludes>
+                      <exclude>classworlds:classworlds</exclude>
+                      <exclude>junit:junit</exclude>
+                      <exclude>jmock:*</exclude>
+                      <exclude>*:xml-apis</exclude>
+                      <exclude>*slf4j*</exclude>
+                      <exclude>org.apache.maven:lib:tests</exclude>
+                      <exclude>log4j:log4j:jar:</exclude>
+                      <exclude>*:hbase:*</exclude>
+                      <exclude>org.apache.hadoop.yarn.util.package-info*</exclude>
+                    </excludes>
+                  </artifactSet>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+          <plugin>
+            <artifactId>maven-assembly-plugin</artifactId>
+            <configuration>
+              <descriptor>src/main/assembly/assembly.xml</descriptor>
+            </configuration>
+            <executions>
+              <execution>
+                <id>make-assembly</id> <!-- this is used for inheritance merges -->
+                <phase>package</phase> <!-- bind to the packaging phase -->
+                <goals>
+                  <goal>single</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+
+        </plugins>
+      </build>
+
+    </project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/assembly/assembly.xml b/metron-streaming/Metron-DataLoads/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..44d7216
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/assembly/assembly.xml
@@ -0,0 +1,28 @@
+<assembly>
+  <id>archive</id>
+  <formats>
+    <format>tar.gz</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${project.basedir}/src/main/bash</directory>
+      <outputDirectory>/bin</outputDirectory>
+      <useDefaultExcludes>true</useDefaultExcludes>
+      <excludes>
+        <exclude>**/*.formatted</exclude>
+        <exclude>**/*.filtered</exclude>
+      </excludes>
+      <fileMode>0755</fileMode>
+      <lineEnding>unix</lineEnding>
+    </fileSet>
+    <fileSet>
+      <directory>${project.basedir}/target</directory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+      <outputDirectory>/lib</outputDirectory>
+      <useDefaultExcludes>true</useDefaultExcludes>
+    </fileSet>
+  </fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_load.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_load.sh b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_load.sh
new file mode 100755
index 0000000..430c48c
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_load.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+  . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+  . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+
+export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
+HADOOP_CLASSPATH=${HBASE_HOME}/lib/hbase-server.jar:`${HBASE_HOME}/bin/hbase classpath`
+for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
+  if [ -f $jar ];then
+    LIBJARS="$jar,$LIBJARS"
+  fi
+done
+export HADOOP_CLASSPATH
+hadoop jar /usr/metron/0.6BETA/lib/Metron-DataLoads-0.6BETA.jar org.apache.metron.dataloads.ThreatIntelBulkLoader -libjars ${LIBJARS} "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_prune.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_prune.sh b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_prune.sh
new file mode 100755
index 0000000..e21c9e1
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_prune.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+  . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+  . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+
+export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
+HADOOP_CLASSPATH=${HBASE_HOME}/lib/hbase-server.jar:`${HBASE_HOME}/bin/hbase classpath`
+for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
+  if [ -f $jar ];then
+    LIBJARS="$jar,$LIBJARS"
+  fi
+done
+export HADOOP_CLASSPATH
+hadoop jar /usr/metron/0.6BETA/lib/Metron-DataLoads-0.6BETA.jar org.apache.metron.dataloads.LeastRecentlyUsedPruner -libjars ${LIBJARS} "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/LeastRecentlyUsedPruner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/LeastRecentlyUsedPruner.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/LeastRecentlyUsedPruner.java
new file mode 100644
index 0000000..899e13d
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/LeastRecentlyUsedPruner.java
@@ -0,0 +1,207 @@
+package org.apache.metron.dataloads;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.dataloads.hbase.mr.PrunerMapper;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public class LeastRecentlyUsedPruner {
+    private static abstract class OptionHandler implements Function<String, Option> {}
+    private enum BulkLoadOptions {
+        HELP("h", new OptionHandler() {
+
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                return new Option(s, "help", false, "Generate Help screen");
+            }
+        }), TABLE("t", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "table", true, "HBase table to prune");
+                o.setRequired(true);
+                o.setArgName("HBASE_TABLE");
+                return o;
+            }
+        }), COLUMN_FAMILY("f", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "column_family", true, "Column family of the HBase table to prune");
+                o.setRequired(false);
+                o.setArgName("CF_NAME");
+                return o;
+            }
+        })
+        ,AS_OF_TIME("a", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "as_of", true, "The earliest access tracker you want to use.");
+                o.setArgName("datetime");
+                o.setRequired(true);
+                return o;
+            }
+        })
+        ,AS_OF_TIME_FORMAT("t", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                String defaultFormat = new SimpleDateFormat().toLocalizedPattern();
+                Option o = new Option(s, "as_of_format", true, "The format of the as_of time (only used in conjunction with the as_of option) (Default is: " + defaultFormat + ")");
+                o.setArgName("format");
+                o.setRequired(false);
+                return o;
+            }
+        })
+        ,ACCESS_TABLE("u", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "access_table", true, "HBase table containing the access trackers.");
+                o.setRequired(true);
+                o.setArgName("HBASE_TABLE");
+                return o;
+            }
+        }), ACCESS_COLUMN_FAMILY("z", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "access_column_family", true, "Column family of the HBase table containing the access trackers");
+                o.setRequired(true);
+                o.setArgName("CF_NAME");
+                return o;
+            }
+        });
+        Option option;
+        String shortCode;
+        BulkLoadOptions(String shortCode, OptionHandler optionHandler) {
+            this.shortCode = shortCode;
+            this.option = optionHandler.apply(shortCode);
+        }
+
+        public boolean has(CommandLine cli) {
+            return cli.hasOption(shortCode);
+        }
+
+        public String get(CommandLine cli) {
+            return cli.getOptionValue(shortCode);
+        }
+        private static long getTimestamp(CommandLine cli) throws java.text.ParseException {
+            Date d = getFormat(cli).parse(BulkLoadOptions.AS_OF_TIME.get(cli));
+            return d.getTime();
+        }
+
+        private static DateFormat getFormat(CommandLine cli) {
+            DateFormat format = new SimpleDateFormat();
+            if (BulkLoadOptions.AS_OF_TIME_FORMAT.has(cli)) {
+                 format = new SimpleDateFormat(BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli));
+            }
+            return format;
+        }
+
+        public static CommandLine parse(CommandLineParser parser, String[] args) {
+            try {
+                CommandLine cli = parser.parse(getOptions(), args);
+                if(BulkLoadOptions.HELP.has(cli)) {
+                    printHelp();
+                    System.exit(0);
+                }
+                return cli;
+            } catch (ParseException e) {
+                System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+                e.printStackTrace(System.err);
+                printHelp();
+                System.exit(-1);
+                return null;
+            }
+        }
+
+        public static void printHelp() {
+            HelpFormatter formatter = new HelpFormatter();
+            formatter.printHelp( "ThreatIntelBulkLoader", getOptions());
+        }
+
+        public static Options getOptions() {
+            Options ret = new Options();
+            for(BulkLoadOptions o : BulkLoadOptions.values()) {
+               ret.addOption(o.option);
+            }
+            return ret;
+        }
+    }
+
+    public static void setupHBaseJob(Job job, String sourceTable, String cf) throws IOException {
+        Scan scan = new Scan();
+        if(cf != null) {
+            scan.addFamily(Bytes.toBytes(cf));
+        }
+        scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
+        scan.setCacheBlocks(false);  // don't set to true for MR jobs
+// set other scan attrs
+
+        TableMapReduceUtil.initTableMapperJob(
+                sourceTable,      // input table
+                scan,	          // Scan instance to control CF and attribute selection
+                PrunerMapper.class,   // mapper class
+                null,	          // mapper output key
+                null,	          // mapper output value
+                job);
+        TableMapReduceUtil.initTableReducerJob(
+                sourceTable,      // output table
+                null,             // reducer class
+                job);
+    }
+
+    public static Job createJob( Configuration conf
+                               , String table
+                               , String cf
+                               , String accessTrackerTable
+                               , String accessTrackerColumnFamily
+                               , Long ts
+                               ) throws IOException
+    {
+        Job job = new Job(conf);
+        job.setJobName("LeastRecentlyUsedPruner: Pruning " +  table + ":" + cf + " since " + new SimpleDateFormat().format(new Date(ts)));
+        System.out.println("Configuring " + job.getJobName());
+        job.setJarByClass(LeastRecentlyUsedPruner.class);
+        job.getConfiguration().setLong(PrunerMapper.TIMESTAMP_CONF, ts);
+        job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_NAME_CONF, table);
+        job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_CF_CONF, accessTrackerColumnFamily);
+        job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_TABLE_CONF, accessTrackerTable);
+        setupHBaseJob(job, table, cf);
+        job.setNumReduceTasks(0);
+        return job;
+    }
+
+    public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException {
+        Configuration conf = HBaseConfiguration.create();
+        String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+
+        CommandLine cli = BulkLoadOptions.parse(new PosixParser(), otherArgs);
+        Long ts = BulkLoadOptions.getTimestamp(cli);
+        String table = BulkLoadOptions.TABLE.get(cli);
+        String cf = BulkLoadOptions.COLUMN_FAMILY.get(cli);
+        String accessTrackerTable = BulkLoadOptions.ACCESS_TABLE.get(cli);
+        String accessTrackerCF = BulkLoadOptions.ACCESS_COLUMN_FAMILY.get(cli);
+        Job job = createJob(conf, table, cf, accessTrackerTable, accessTrackerCF, ts);
+        System.exit(job.waitForCompletion(true) ? 0 : 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/ThreatIntelBulkLoader.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/ThreatIntelBulkLoader.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/ThreatIntelBulkLoader.java
new file mode 100644
index 0000000..d83b35a
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/ThreatIntelBulkLoader.java
@@ -0,0 +1,199 @@
+package org.apache.metron.dataloads;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.io.Files;
+import org.apache.commons.cli.*;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.hbase.mr.BulkLoadMapper;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.text.*;
+import java.util.Date;
+
+/**
+ * Created by cstella on 2/2/16.
+ */
+public class ThreatIntelBulkLoader  {
+    private static abstract class OptionHandler implements Function<String, Option> {}
+    private enum BulkLoadOptions {
+        HELP("h", new OptionHandler() {
+
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                return new Option(s, "help", false, "Generate Help screen");
+            }
+        })
+        ,TABLE("t", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "table", true, "HBase table to import data into");
+                o.setRequired(true);
+                o.setArgName("HBASE_TABLE");
+                return o;
+            }
+        })
+        ,COLUMN_FAMILY("f", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "column_family", true, "Column family of the HBase table to import into");
+                o.setRequired(true);
+                o.setArgName("CF_NAME");
+                return o;
+            }
+        })
+        ,EXTRACTOR_CONFIG("e", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
+                o.setArgName("JSON_FILE");
+                o.setRequired(true);
+                return o;
+            }
+        })
+        ,INPUT_DATA("i", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "input", true, "Input directory in HDFS for the data to import into HBase");
+                o.setArgName("DIR");
+                o.setRequired(true);
+                return o;
+            }
+        })
+        ,AS_OF_TIME("a", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "as_of", true, "The last read timestamp to mark the records with (omit for time of execution)");
+                o.setArgName("datetime");
+                o.setRequired(false);
+                return o;
+            }
+        })
+        ,AS_OF_TIME_FORMAT("t", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "as_of_format", true, "The format of the as_of time (only used in conjunction with the as_of option)");
+                o.setArgName("format");
+                o.setRequired(false);
+                return o;
+            }
+        })
+        ;
+        Option option;
+        String shortCode;
+        BulkLoadOptions(String shortCode, OptionHandler optionHandler) {
+            this.shortCode = shortCode;
+            this.option = optionHandler.apply(shortCode);
+        }
+
+        public boolean has(CommandLine cli) {
+            return cli.hasOption(shortCode);
+        }
+
+        public String get(CommandLine cli) {
+            return cli.getOptionValue(shortCode);
+        }
+
+        public static CommandLine parse(CommandLineParser parser, String[] args) {
+            try {
+                CommandLine cli = parser.parse(getOptions(), args);
+                if(ThreatIntelBulkLoader.BulkLoadOptions.HELP.has(cli)) {
+                    printHelp();
+                    System.exit(0);
+                }
+                return cli;
+            } catch (ParseException e) {
+                System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+                e.printStackTrace(System.err);
+                printHelp();
+                System.exit(-1);
+                return null;
+            }
+        }
+
+        public static void printHelp() {
+            HelpFormatter formatter = new HelpFormatter();
+            formatter.printHelp( "ThreatIntelBulkLoader", getOptions());
+        }
+
+        public static Options getOptions() {
+            Options ret = new Options();
+            for(BulkLoadOptions o : BulkLoadOptions.values()) {
+               ret.addOption(o.option);
+            }
+            return ret;
+        }
+    }
+    private static long getTimestamp(CommandLine cli) throws java.text.ParseException {
+        if(BulkLoadOptions.AS_OF_TIME.has(cli)) {
+            if(!BulkLoadOptions.AS_OF_TIME_FORMAT.has(cli)) {
+                throw new IllegalStateException("Unable to proceed: Specified as_of_time without an associated format.");
+            }
+            else {
+                DateFormat format = new SimpleDateFormat(BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli));
+                Date d = format.parse(BulkLoadOptions.AS_OF_TIME.get(cli));
+                return d.getTime();
+            }
+        }
+        else {
+            return System.currentTimeMillis();
+        }
+    }
+    private static String readExtractorConfig(File configFile) throws IOException {
+        return Joiner.on("\n").join(Files.readLines(configFile, Charset.defaultCharset()));
+    }
+
+    public static Job createJob(Configuration conf, String input, String table, String cf, String extractorConfigContents, long ts) throws IOException {
+        Job job = new Job(conf);
+        job.setJobName("ThreatIntelBulkLoader: " + input + " => " +  table + ":" + cf);
+        System.out.println("Configuring " + job.getJobName());
+        job.setJarByClass(ThreatIntelBulkLoader.class);
+        job.setMapperClass(org.apache.metron.dataloads.hbase.mr.BulkLoadMapper.class);
+        job.setOutputFormatClass(TableOutputFormat.class);
+        job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
+        job.getConfiguration().set(BulkLoadMapper.COLUMN_FAMILY_KEY, cf);
+        job.getConfiguration().set(BulkLoadMapper.CONFIG_KEY, extractorConfigContents);
+        job.getConfiguration().set(BulkLoadMapper.LAST_SEEN_KEY, "" + ts);
+        job.setOutputKeyClass(ImmutableBytesWritable.class);
+        job.setOutputValueClass(Put.class);
+        job.setNumReduceTasks(0);
+        ExtractorHandler handler = ExtractorHandler.load(extractorConfigContents);
+        handler.getInputFormatHandler().set(job, new Path(input), handler.getConfig());
+        return job;
+    }
+
+    public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException {
+        Configuration conf = HBaseConfiguration.create();
+        String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+
+        CommandLine cli = BulkLoadOptions.parse(new PosixParser(), otherArgs);
+        Long ts = getTimestamp(cli);
+        String input = BulkLoadOptions.INPUT_DATA.get(cli);
+        String table = BulkLoadOptions.TABLE.get(cli);
+        String cf = BulkLoadOptions.COLUMN_FAMILY.get(cli);
+        String extractorConfigContents = readExtractorConfig(new File(BulkLoadOptions.EXTRACTOR_CONFIG.get(cli)));
+        Job job = createJob(conf, input, table, cf, extractorConfigContents, ts);
+        System.out.println(conf);
+        System.exit(job.waitForCompletion(true) ? 0 : 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
new file mode 100644
index 0000000..4fcfb15
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
@@ -0,0 +1,14 @@
+package org.apache.metron.dataloads.extractor;
+
+import org.apache.metron.threatintel.ThreatIntelResults;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/2/16.
+ */
+public interface Extractor {
+    Iterable<ThreatIntelResults> extract(String line) throws IOException;
+    void initialize(Map<String, Object> config);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCreator.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCreator.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCreator.java
new file mode 100644
index 0000000..fdc619e
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCreator.java
@@ -0,0 +1,10 @@
+package org.apache.metron.dataloads.extractor;
+
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/2/16.
+ */
+public interface ExtractorCreator {
+    Extractor create();
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
new file mode 100644
index 0000000..cc63c14
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
@@ -0,0 +1,73 @@
+package org.apache.metron.dataloads.extractor;
+
+import org.apache.metron.dataloads.extractor.inputformat.Formats;
+import org.apache.metron.dataloads.extractor.inputformat.InputFormatHandler;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/2/16.
+ */
+public class ExtractorHandler {
+    final static ObjectMapper _mapper = new ObjectMapper();
+    private Map<String, Object> config;
+    private Extractor extractor;
+    private InputFormatHandler inputFormatHandler = Formats.BY_LINE;
+
+    public Map<String, Object> getConfig() {
+        return config;
+    }
+
+    public void setConfig(Map<String, Object> config) {
+        this.config = config;
+    }
+
+    public InputFormatHandler getInputFormatHandler() {
+        return inputFormatHandler;
+    }
+
+    public void setInputFormatHandler(String handler) {
+        try {
+            this.inputFormatHandler= Formats.create(handler);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalStateException("Unable to create an inputformathandler", e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalStateException("Unable to create an inputformathandler", e);
+        } catch (InstantiationException e) {
+            throw new IllegalStateException("Unable to create an inputformathandler", e);
+        }
+    }
+
+    public Extractor getExtractor() {
+        return extractor;
+    }
+    public void setExtractor(String extractor) {
+        try {
+            this.extractor = Extractors.create(extractor);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalStateException("Unable to create an extractor", e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalStateException("Unable to create an extractor", e);
+        } catch (InstantiationException e) {
+            throw new IllegalStateException("Unable to create an extractor", e);
+        }
+    }
+
+    public static synchronized ExtractorHandler load(InputStream is) throws IOException {
+        ExtractorHandler ret = _mapper.readValue(is, ExtractorHandler.class);
+        ret.getExtractor().initialize(ret.getConfig());
+        return ret;
+    }
+    public static synchronized ExtractorHandler load(String s, Charset c) throws IOException {
+        return load( new ByteArrayInputStream(s.getBytes(c)));
+    }
+    public static synchronized ExtractorHandler load(String s) throws IOException {
+        return load( s, Charset.defaultCharset());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractors.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractors.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractors.java
new file mode 100644
index 0000000..b11ccd9
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractors.java
@@ -0,0 +1,44 @@
+package org.apache.metron.dataloads.extractor;
+
+import org.apache.metron.dataloads.extractor.csv.CSVExtractor;
+import org.apache.metron.dataloads.extractor.stix.StixExtractor;
+
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/2/16.
+ */
+public enum Extractors implements ExtractorCreator {
+    CSV(new ExtractorCreator() {
+
+        @Override
+        public Extractor create() {
+            return new CSVExtractor();
+        }
+    })
+    ,STIX(new ExtractorCreator() {
+        @Override
+        public Extractor create() {
+            return new StixExtractor();
+        }
+    })
+    ;
+    ExtractorCreator _creator;
+    Extractors(ExtractorCreator creator) {
+        this._creator = creator;
+    }
+    @Override
+    public Extractor create() {
+        return _creator.create();
+    }
+    public static Extractor create(String extractorName) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+        try {
+            ExtractorCreator ec = Extractors.valueOf(extractorName);
+            return ec.create();
+        }
+        catch(IllegalArgumentException iae) {
+            Extractor ex = (Extractor) Class.forName(extractorName).newInstance();
+            return ex;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
new file mode 100644
index 0000000..ee85a58
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
@@ -0,0 +1,99 @@
+package org.apache.metron.dataloads.extractor.csv;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.opencsv.CSVParser;
+import com.opencsv.CSVParserBuilder;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.threatintel.ThreatIntelResults;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Created by cstella on 2/2/16.
+ */
+public class CSVExtractor implements Extractor {
+    public static final String COLUMNS_KEY="columns";
+    public static final String INDICATOR_COLUMN_KEY="indicator_column";
+    public static final String SEPARATOR_KEY="separator";
+
+    private int indicatorColumn;
+    private Map<String, Integer> columnMap = new HashMap<>();
+    private CSVParser parser;
+
+    @Override
+    public Iterable<ThreatIntelResults> extract(String line) throws IOException {
+        if(line.trim().startsWith("#")) {
+            //comment
+            return Collections.emptyList();
+        }
+        ThreatIntelResults ret = new ThreatIntelResults();
+        String[] tokens = parser.parseLine(line);
+        ret.getKey().indicator = tokens[indicatorColumn];
+        for(Map.Entry<String, Integer> kv : columnMap.entrySet()) {
+            ret.getValue().put(kv.getKey(), tokens[kv.getValue()]);
+        }
+        return Arrays.asList(ret);
+    }
+
+    private static Map.Entry<String, Integer> getColumnMapEntry(String column, int i) {
+        if(column.contains(":")) {
+            Iterable<String> tokens = Splitter.on(':').split(column);
+            String col = Iterables.getFirst(tokens, null);
+            Integer pos = Integer.parseInt(Iterables.getLast(tokens));
+            return new AbstractMap.SimpleEntry<>(col, pos);
+        }
+        else {
+            return new AbstractMap.SimpleEntry<>(column, i);
+        }
+
+    }
+    private static Map<String, Integer> getColumnMap(Map<String, Object> config) {
+        Map<String, Integer> columnMap = new HashMap<>();
+        if(config.containsKey(COLUMNS_KEY)) {
+            Object columnsObj = config.get(COLUMNS_KEY);
+            if(columnsObj instanceof String) {
+                String columns = (String)columnsObj;
+                int i = 0;
+                for (String column : Splitter.on(',').split(columns)) {
+                    Map.Entry<String, Integer> e = getColumnMapEntry(column, i++);
+                    columnMap.put(e.getKey(), e.getValue());
+                }
+            }
+            else if(columnsObj instanceof List) {
+                List columns = (List)columnsObj;
+                int i = 0;
+                for(Object column : columns) {
+                    Map.Entry<String, Integer> e = getColumnMapEntry(column.toString(), i++);
+                    columnMap.put(e.getKey(), e.getValue());
+                }
+            }
+            else if(columnsObj instanceof Map) {
+                Map<Object, Object> map = (Map<Object, Object>)columnsObj;
+                for(Map.Entry<Object, Object> e : map.entrySet()) {
+                    columnMap.put(e.getKey().toString(), Integer.parseInt(e.getValue().toString()));
+                }
+            }
+        }
+        return columnMap;
+    }
+
+    @Override
+    public void initialize(Map<String, Object> config) {
+        if(config.containsKey(COLUMNS_KEY)) {
+            columnMap = getColumnMap(config);
+        }
+        else {
+            throw new IllegalStateException("CSVExtractor requires " + COLUMNS_KEY + " configuration");
+        }
+        if(config.containsKey(INDICATOR_COLUMN_KEY)) {
+            indicatorColumn = columnMap.get(config.get(INDICATOR_COLUMN_KEY).toString());
+        }
+        if(config.containsKey(SEPARATOR_KEY)) {
+            char separator = config.get(SEPARATOR_KEY).toString().charAt(0);
+            parser = new CSVParserBuilder().withSeparator(separator)
+                                           .build();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java
new file mode 100644
index 0000000..9fb2231
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java
@@ -0,0 +1,41 @@
+package org.apache.metron.dataloads.extractor.inputformat;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/8/16.
+ */
+public enum Formats implements InputFormatHandler{
+    BY_LINE(new InputFormatHandler() {
+        @Override
+        public void set(Job job, Path input, Map<String, Object> config) throws IOException {
+
+            FileInputFormat.addInputPath(job, input);
+        }
+    })
+    ;
+    InputFormatHandler _handler = null;
+    Formats(InputFormatHandler handler) {
+        this._handler = handler;
+    }
+    @Override
+    public void set(Job job, Path path, Map<String, Object> config) throws IOException {
+        _handler.set(job, path, config);
+    }
+
+    public static InputFormatHandler create(String handlerName) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+        try {
+            InputFormatHandler ec = Formats.valueOf(handlerName);
+            return ec;
+        }
+        catch(IllegalArgumentException iae) {
+            InputFormatHandler ex = (InputFormatHandler) Class.forName(handlerName).newInstance();
+            return ex;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java
new file mode 100644
index 0000000..28cd241
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java
@@ -0,0 +1,14 @@
+package org.apache.metron.dataloads.extractor.inputformat;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/8/16.
+ */
+public interface InputFormatHandler {
+    void set(Job job, Path input, Map<String, Object> config) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java
new file mode 100644
index 0000000..6c694f7
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java
@@ -0,0 +1,95 @@
+package org.apache.metron.dataloads.extractor.inputformat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/8/16.
+ */
+public class WholeFileFormat implements InputFormatHandler {
+
+    public static class WholeFileRecordReader extends RecordReader<NullWritable, Text> {
+        private FileSplit fileSplit;
+        private Configuration conf;
+        private Text value = new Text();
+        private boolean processed = false;
+
+        @Override
+        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+            this.fileSplit = (FileSplit) split;
+            this.conf = context.getConfiguration();
+        }
+
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException {
+            if (!processed) {
+                byte[] contents = new byte[(int) fileSplit.getLength()];
+                Path file = fileSplit.getPath();
+                FileSystem fs = file.getFileSystem(conf);
+                FSDataInputStream in = null;
+                try {
+                    in = fs.open(file);
+                    IOUtils.readFully(in, contents, 0, contents.length);
+                    value.set(contents, 0, contents.length);
+                } finally {
+                    IOUtils.closeStream(in);
+                }
+                processed = true;
+                return true;
+            }
+            return false;
+        }
+
+        @Override
+        public NullWritable getCurrentKey() throws IOException, InterruptedException {
+            return NullWritable.get();
+        }
+        @Override
+        public Text getCurrentValue() throws IOException, InterruptedException{
+            return value;
+        }
+
+        @Override
+        public float getProgress() throws IOException {
+            return processed ? 1.0f : 0.0f;
+        }
+
+        @Override
+        public void close() throws IOException{
+            //do nothing :)
+        }
+    }
+
+    public static class WholeFileInputFormat extends FileInputFormat<NullWritable, Text> {
+
+        @Override
+        protected boolean isSplitable(JobContext context, Path file) {
+            return false;
+        }
+
+        @Override
+        public RecordReader<NullWritable, Text> createRecordReader(
+                InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+            WholeFileRecordReader reader = new WholeFileRecordReader();
+            reader.initialize(split, context);
+            return reader;
+        }
+    }
+    @Override
+    public void set(Job job, Path input, Map<String, Object> config) throws IOException {
+        WholeFileInputFormat.setInputPaths(job, input);
+        job.setInputFormatClass(WholeFileInputFormat.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
new file mode 100644
index 0000000..493adb6
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
@@ -0,0 +1,96 @@
+package org.apache.metron.dataloads.extractor.stix;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import org.apache.commons.io.FileUtils;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.dataloads.extractor.stix.types.ObjectTypeHandler;
+import org.apache.metron.dataloads.extractor.stix.types.ObjectTypeHandlers;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.mitre.cybox.common_2.*;
+import org.mitre.cybox.cybox_2.ObjectType;
+import org.mitre.stix.common_1.IndicatorBaseType;
+import org.mitre.stix.indicator_2.Indicator;
+import org.mitre.stix.stix_1.STIXPackage;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/8/16.
+ */
+public class StixExtractor implements Extractor {
+    Map<String, Object> config;
+    @Override
+    public Iterable<ThreatIntelResults> extract(String line) throws IOException {
+        STIXPackage stixPackage = STIXPackage.fromXMLString(line);
+        List<ThreatIntelResults> ret = new ArrayList<>();
+        if (stixPackage.getIndicators() != null) {
+            if (stixPackage.getIndicators().getIndicators() != null) {
+                List<IndicatorBaseType> indicators = stixPackage.getIndicators().getIndicators();
+                int indicatorCount = indicators.size();
+                for (int i = 0; i < indicatorCount; i++) {
+                    Indicator indicator = (Indicator) indicators.get(i);
+                    if (indicator.getObservable() != null) {
+                        ObjectType obj = indicator.getObservable().getObject();
+                        ObjectPropertiesType props = obj.getProperties();
+                        ObjectTypeHandler handler = ObjectTypeHandlers.getHandlerByInstance(props);
+                        if(handler != null) {
+                            Iterables.addAll(ret, handler.extract(props, config));
+                        }
+                    }
+                }
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public void initialize(Map<String, Object> config) {
+        this.config = config;
+    }
+
+    public static Iterable<String> split(StringObjectPropertyType value) {
+        final ConditionTypeEnum condition = value.getCondition();
+        final ConditionApplicationEnum applyCondition = value.getApplyCondition();
+        List<String> tokens = new ArrayList<>();
+        if(condition == ConditionTypeEnum.EQUALS && applyCondition == ConditionApplicationEnum.ANY) {
+            String delim = value.getDelimiter();
+            String line = value.getValue().toString();
+            if (delim != null) {
+                for (String token : Splitter.on(delim).split(line)) {
+                    tokens.add(token);
+                }
+            } else {
+                tokens.add(line);
+            }
+        }
+        return tokens;
+    }
+    public static void main(String[] args) throws IOException {
+
+        File file = new File("/tmp/sample.xml");
+
+        /*if (args.length > 0) {
+            file = new File(args[0]);
+        } else {
+            try {
+                URL url = XML2Object.class.getClass().getResource(
+                        "/org/mitre/stix/examples/sample.xml");
+                file = new File(url.toURI());
+            } catch (URISyntaxException e) {
+                throw new RuntimeException(e);
+            }
+        }*/
+
+        String line = FileUtils.readFileToString(file);
+        StixExtractor extractor = new StixExtractor();
+        for(ThreatIntelResults results : extractor.extract(line)) {
+            System.out.println(results);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AbstractObjectTypeHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AbstractObjectTypeHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AbstractObjectTypeHandler.java
new file mode 100644
index 0000000..9611150
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AbstractObjectTypeHandler.java
@@ -0,0 +1,20 @@
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import org.mitre.cybox.common_2.ObjectPropertiesType;
+import org.mitre.cybox.common_2.StringObjectPropertyType;
+
+/**
+ * Created by cstella on 2/9/16.
+ */
+public abstract class AbstractObjectTypeHandler<T extends ObjectPropertiesType> implements ObjectTypeHandler<T> {
+    protected Class<T> objectPropertiesType;
+    public AbstractObjectTypeHandler(Class<T> clazz) {
+        objectPropertiesType = clazz;
+    }
+    @Override
+    public Class<T> getTypeClass() {
+        return objectPropertiesType;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
new file mode 100644
index 0000000..4a280e1
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
@@ -0,0 +1,61 @@
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import com.google.common.base.Splitter;
+import org.apache.metron.dataloads.extractor.stix.StixExtractor;
+import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.mitre.cybox.common_2.ConditionApplicationEnum;
+import org.mitre.cybox.common_2.ConditionTypeEnum;
+import org.mitre.cybox.common_2.StringObjectPropertyType;
+import org.mitre.cybox.objects.Address;
+import org.mitre.cybox.objects.CategoryTypeEnum;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Created by cstella on 2/9/16.
+ */
+public class AddressHandler extends AbstractObjectTypeHandler<Address> {
+    public static final String SPECIFIC_CATEGORY_CONFIG = "stix_address_categories";
+    public static final EnumSet<CategoryTypeEnum> SUPPORTED_CATEGORIES = EnumSet.of(CategoryTypeEnum.E_MAIL
+                                                                                   ,CategoryTypeEnum.IPV_4_ADDR
+                                                                                   ,CategoryTypeEnum.IPV_6_ADDR
+                                                                                   ,CategoryTypeEnum.MAC
+                                                               ) ;
+    public AddressHandler() {
+        super(Address.class);
+    }
+
+    @Override
+    public Iterable<ThreatIntelResults> extract(final Address type, Map<String, Object> config) throws IOException {
+        List<ThreatIntelResults> ret = new ArrayList<>();
+        final CategoryTypeEnum category= type.getCategory();
+        if(!SUPPORTED_CATEGORIES.contains(category)) {
+           return ret;
+        }
+        if(config != null && config.containsKey(SPECIFIC_CATEGORY_CONFIG)) {
+            List<CategoryTypeEnum> categories = new ArrayList<>();
+            for(String c : Splitter.on(",").split(config.get(SPECIFIC_CATEGORY_CONFIG).toString())) {
+                categories.add(CategoryTypeEnum.valueOf(c));
+            }
+            EnumSet<CategoryTypeEnum> specificCategories = EnumSet.copyOf(categories);
+            if(!specificCategories.contains(category)) {
+                return ret;
+            }
+
+        }
+        StringObjectPropertyType value = type.getAddressValue();
+        for(String token : StixExtractor.split(value)) {
+            ThreatIntelResults results = new ThreatIntelResults(new ThreatIntelKey(token),
+                                                                    new HashMap<String, String>() {{
+                                                                        put("source-type", "STIX");
+                                                                        put("indicator-type", "Address");
+                                                                        put("source", type.toXMLString());
+                                                                    }}
+                                                                   );
+                ret.add(results);
+        }
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
new file mode 100644
index 0000000..51769ff
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
@@ -0,0 +1,41 @@
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import org.apache.metron.dataloads.extractor.stix.StixExtractor;
+import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.mitre.cybox.common_2.StringObjectPropertyType;
+import org.mitre.cybox.objects.DomainName;
+import org.mitre.cybox.objects.DomainNameTypeEnum;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Created by cstella on 2/9/16.
+ */
+public class DomainHandler extends AbstractObjectTypeHandler<DomainName> {
+    EnumSet<DomainNameTypeEnum> SUPPORTED_TYPES = EnumSet.of(DomainNameTypeEnum.FQDN);
+    public DomainHandler() {
+        super(DomainName.class);
+    }
+
+    @Override
+    public Iterable<ThreatIntelResults> extract(final DomainName type, Map<String, Object> config) throws IOException {
+        List<ThreatIntelResults> ret = new ArrayList<>();
+        final DomainNameTypeEnum domainType = type.getType();
+        if(SUPPORTED_TYPES.contains(domainType)) {
+            StringObjectPropertyType value = type.getValue();
+            for (String token : StixExtractor.split(value)) {
+                ThreatIntelResults results = new ThreatIntelResults(new ThreatIntelKey(token),
+                        new HashMap<String, String>() {{
+                            put("source-type", "STIX");
+                            put("indicator-type", "DomainName");
+                            put("source", type.toXMLString());
+                        }}
+                );
+                ret.add(results);
+            }
+        }
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
new file mode 100644
index 0000000..d6bfd05
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
@@ -0,0 +1,39 @@
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import org.apache.metron.dataloads.extractor.stix.StixExtractor;
+import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.mitre.cybox.common_2.StringObjectPropertyType;
+import org.mitre.cybox.objects.Hostname;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/9/16.
+ */
+public class HostnameHandler  extends AbstractObjectTypeHandler<Hostname>{
+    public HostnameHandler() {
+        super(Hostname.class);
+    }
+
+    @Override
+    public Iterable<ThreatIntelResults> extract(final Hostname type, Map<String, Object> config) throws IOException {
+        StringObjectPropertyType value = type.getHostnameValue();
+        List<ThreatIntelResults> ret = new ArrayList<>();
+        for(String token : StixExtractor.split(value)) {
+            ThreatIntelResults results = new ThreatIntelResults(new ThreatIntelKey(token),
+                                                                    new HashMap<String, String>() {{
+                                                                        put("source-type", "STIX");
+                                                                        put("indicator-type", "Hostname");
+                                                                        put("source", type.toXMLString());
+                                                                    }}
+                                                                   );
+                ret.add(results);
+        }
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
new file mode 100644
index 0000000..8de5dd2
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
@@ -0,0 +1,16 @@
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.mitre.cybox.common_2.ObjectPropertiesType;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/9/16.
+ */
+public interface ObjectTypeHandler<T extends ObjectPropertiesType> {
+    Iterable<ThreatIntelResults> extract(T type, Map<String, Object> config) throws IOException;
+    Class<T> getTypeClass();
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandlers.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandlers.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandlers.java
new file mode 100644
index 0000000..85d20b2
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandlers.java
@@ -0,0 +1,28 @@
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import org.mitre.cybox.common_2.ObjectPropertiesType;
+
+/**
+ * Created by cstella on 2/9/16.
+ */
+public enum ObjectTypeHandlers {
+      ADDRESS(new AddressHandler())
+    ,HOSTNAME(new HostnameHandler())
+    ,DOMAINNAME(new DomainHandler())
+    ,;
+   ObjectTypeHandler _handler;
+   ObjectTypeHandlers(ObjectTypeHandler handler) {
+      _handler = handler;
+   }
+   ObjectTypeHandler getHandler() {
+      return _handler;
+   }
+   public static ObjectTypeHandler getHandlerByInstance(ObjectPropertiesType inst) {
+      for(ObjectTypeHandlers h : values()) {
+         if(inst.getClass().equals(h.getHandler().getTypeClass())) {
+            return h.getHandler();
+         }
+      }
+      return null;
+   }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
new file mode 100644
index 0000000..84931df
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
@@ -0,0 +1,52 @@
+package org.apache.metron.dataloads.hbase.mr;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.apache.metron.threatintel.hbase.Converter;
+
+import java.io.IOException;
+
+/**
+ * Created by cstella on 2/3/16.
+ */
+public class BulkLoadMapper extends Mapper<Object, Text, ImmutableBytesWritable, Put>
+{
+    public static final String CONFIG_KEY="bl_extractor_config";
+    public static final String COLUMN_FAMILY_KEY = "bl_column_family";
+    public static final String LAST_SEEN_KEY = "bl_last_seen";
+    Extractor extractor = null;
+    String columnFamily = null;
+    Long lastSeen = null;
+    @Override
+    public void setup(Context context) throws IOException,
+            InterruptedException {
+        initialize(context.getConfiguration());
+    }
+
+    @Override
+    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
+        for(ThreatIntelResults results : extractor.extract(value.toString())) {
+            if (results != null) {
+                Put put = Converter.INSTANCE.toPut(columnFamily, results.getKey(), results.getValue(), lastSeen);
+                write(new ImmutableBytesWritable(results.getKey().toBytes()), put, context);
+            }
+        }
+    }
+
+    protected void initialize(Configuration configuration) throws IOException {
+        String configStr = configuration.get(CONFIG_KEY);
+        extractor = ExtractorHandler.load(configStr).getExtractor();
+        columnFamily = configuration.get(COLUMN_FAMILY_KEY);
+        lastSeen = Long.parseLong(configuration.get(LAST_SEEN_KEY));
+    }
+
+    protected void write(ImmutableBytesWritable key, Put value, Context context) throws IOException, InterruptedException {
+        context.write(key, value);
+    }
+}