You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/04/07 14:56:38 UTC

[5/6] incubator-metron git commit: METRON-93: Generalize the HBase threat intel infrastructure to support enrichments closes apache/incubator-metron#64

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java
index 20026b2..45e93a1 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java
@@ -31,6 +31,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.metron.Constants;
 import org.apache.metron.domain.Configurations;
+import org.apache.metron.domain.SensorEnrichmentConfig;
 import org.apache.zookeeper.KeeperException;
 
 import java.io.File;
@@ -52,23 +53,40 @@ public class ConfigurationsUtils {
     writeToZookeeper(path, Files.readAllBytes(Paths.get(filePath)), zookeeperUrl);
   }
 
+  public static void writeToZookeeperFromFile(String path, String filePath, CuratorFramework client) throws Exception {
+    writeToZookeeper(path, Files.readAllBytes(Paths.get(filePath)), client);
+  }
   public static void writerGlobalConfigToZookeeper(byte[] configData, String zookeeperUrl) throws Exception {
     writeToZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, configData, zookeeperUrl);
   }
 
+  public static void writerGlobalConfigToZookeeper(byte[] configData, CuratorFramework client) throws Exception {
+    writeToZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, configData, client);
+  }
   public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, String zookeeperUrl) throws Exception {
     writeToZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, configData, zookeeperUrl);
   }
 
+  public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, CuratorFramework client) throws Exception {
+    writeToZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, configData, client);
+  }
   public static void writeToZookeeper(String path, byte[] configData, String zookeeperUrl) throws Exception {
     CuratorFramework client = getClient(zookeeperUrl);
     client.start();
     try {
+      writeToZookeeper(path, configData, client);
+    }
+    finally {
+      client.close();
+    }
+
+  }
+  public static void writeToZookeeper(String path, byte[] configData, CuratorFramework client) throws Exception {
+    try {
       client.setData().forPath(path, configData);
     } catch (KeeperException.NoNodeException e) {
       client.create().creatingParentsIfNeeded().forPath(path, configData);
     }
-    client.close();
   }
 
   public static void updateConfigsFromZookeeper(Configurations configurations, CuratorFramework client) throws Exception {
@@ -114,12 +132,21 @@ public class ConfigurationsUtils {
   public static void dumpConfigs(String zookeeperUrl) throws Exception {
     CuratorFramework client = getClient(zookeeperUrl);
     client.start();
-    List<String> children = client.getChildren().forPath(Constants.ZOOKEEPER_TOPOLOGY_ROOT);
-    for (String child : children) {
-      byte[] data = client.getData().forPath(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + child);
-      System.out.println("Config for source " + child);
-      System.out.println(new String(data));
-      System.out.println();
+    //Output global configs
+    {
+      System.out.println("Global config");
+      byte[] globalConfigData = client.getData().forPath(Constants.ZOOKEEPER_GLOBAL_ROOT);
+      System.out.println(new String(globalConfigData));
+    }
+    //Output sensor specific configs
+    {
+      List<String> children = client.getChildren().forPath(Constants.ZOOKEEPER_SENSOR_ROOT);
+      for (String child : children) {
+        byte[] data = client.getData().forPath(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + child);
+        System.out.println("Config for source " + child);
+        System.out.println(new String(data));
+        System.out.println();
+      }
     }
     client.close();
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
index cffcd68..34f98bb 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
@@ -22,8 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
-import java.io.IOException;
-import java.io.InputStream;
+import java.io.*;
 
 public enum JSONUtils {
   INSTANCE;
@@ -58,11 +57,16 @@ public enum JSONUtils {
   public <T> T load(String is, TypeReference<T> ref) throws IOException {
     return _mapper.get().readValue(is, ref);
   }
-
+  public <T> T load(File f, TypeReference<T> ref) throws IOException {
+    return _mapper.get().readValue(new BufferedInputStream(new FileInputStream(f)), ref);
+  }
   public <T> T load(InputStream is, Class<T> clazz) throws IOException {
     return _mapper.get().readValue(is, clazz);
   }
 
+  public <T> T load(File f, Class<T> clazz) throws IOException {
+    return _mapper.get().readValue(new BufferedInputStream(new FileInputStream(f)), clazz);
+  }
   public <T> T load(String is, Class<T> clazz) throws IOException {
     return _mapper.get().readValue(is, clazz);
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-Common/src/test/java/org/apache/metron/enrichment/EnrichmentConfigTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/enrichment/EnrichmentConfigTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/enrichment/EnrichmentConfigTest.java
new file mode 100644
index 0000000..1f29403
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/enrichment/EnrichmentConfigTest.java
@@ -0,0 +1,197 @@
+package org.apache.metron.enrichment;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.Constants;
+import org.apache.metron.domain.SensorEnrichmentConfig;
+import org.apache.metron.utils.JSONUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EnrichmentConfigTest {
+  /**
+   {
+      "index": "bro",
+      "batchSize": 5,
+      "enrichmentFieldMap": {
+        "geo": ["ip_dst_addr", "ip_src_addr"],
+        "host": ["host"]
+                            },
+      "threatIntelFieldMap": {
+        "hbaseThreatIntel": ["ip_dst_addr", "ip_src_addr"]
+                             },
+      "fieldToThreatIntelTypeMap": {
+        "ip_dst_addr" : [ "malicious_ip" ]
+       ,"ip_src_addr" : [ "malicious_ip" ]
+                                   }
+    }
+   */
+  @Multiline
+  public static String sourceConfigStr;
+
+  /**
+{
+  "zkQuorum" : "localhost:2181"
+ ,"sensorToFieldList" : {
+  "bro" : {
+           "type" : "THREAT_INTEL"
+          ,"fieldToEnrichmentTypes" : {
+            "ip_src_addr" : [ "playful" ]
+           ,"ip_dst_addr" : [ "playful" ]
+                                      }
+          }
+                        }
+}
+     */
+  @Multiline
+  public static String threatIntelConfigStr;
+
+  @Test
+  public void testThreatIntel() throws Exception {
+
+    SensorEnrichmentConfig broSc = JSONUtils.INSTANCE.load(sourceConfigStr, SensorEnrichmentConfig.class);
+
+
+    EnrichmentConfig config = JSONUtils.INSTANCE.load(threatIntelConfigStr, EnrichmentConfig.class);
+    final Map<String, SensorEnrichmentConfig> outputScs = new HashMap<>();
+    EnrichmentConfig.SourceConfigHandler scHandler = new EnrichmentConfig.SourceConfigHandler() {
+      @Override
+      public SensorEnrichmentConfig readConfig(String sensor) throws Exception {
+        if(sensor.equals("bro")) {
+          return JSONUtils.INSTANCE.load(sourceConfigStr, SensorEnrichmentConfig.class);
+        }
+        else {
+          throw new IllegalStateException("Tried to retrieve an unexpected sensor: " + sensor);
+        }
+      }
+
+      @Override
+      public void persistConfig(String sensor, SensorEnrichmentConfig config) throws Exception {
+        outputScs.put(sensor, config);
+      }
+    };
+    EnrichmentConfig.updateSensorConfigs(scHandler, config.getSensorToFieldList());
+    Assert.assertNotNull(outputScs.get("bro"));
+    Assert.assertNotSame(outputScs.get("bro"), broSc);
+    Assert.assertEquals( outputScs.get("bro").toJSON()
+                       , outputScs.get("bro").getThreatIntelFieldMap().get(Constants.SIMPLE_HBASE_THREAT_INTEL).size()
+                       , 2
+                       );
+    Assert.assertTrue( outputScs.get("bro").toJSON()
+                       , outputScs.get("bro").getThreatIntelFieldMap()
+                                  .get(Constants.SIMPLE_HBASE_THREAT_INTEL)
+                                  .contains("ip_src_addr")
+                       );
+    Assert.assertTrue( outputScs.get("bro").toJSON()
+                       , outputScs.get("bro").getThreatIntelFieldMap()
+                                  .get(Constants.SIMPLE_HBASE_THREAT_INTEL)
+                                  .contains("ip_dst_addr")
+                       );
+    Assert.assertEquals( outputScs.get("bro").toJSON()
+                       , outputScs.get("bro").getFieldToThreatIntelTypeMap().keySet().size()
+                       , 2
+                       );
+    Assert.assertEquals( outputScs.get("bro").toJSON()
+                       , outputScs.get("bro").getFieldToThreatIntelTypeMap().get("ip_src_addr").size()
+                       , 2
+                       );
+    Assert.assertTrue( outputScs.get("bro").toJSON()
+                       , outputScs.get("bro").getFieldToThreatIntelTypeMap().get("ip_src_addr").contains("playful")
+                       );
+    Assert.assertTrue( outputScs.get("bro").toJSON()
+                       , outputScs.get("bro").getFieldToThreatIntelTypeMap().get("ip_src_addr").contains("malicious_ip")
+                       );
+    Assert.assertEquals( outputScs.get("bro").toJSON()
+                       , outputScs.get("bro").getFieldToThreatIntelTypeMap().get("ip_dst_addr").size()
+                       , 2
+                       );
+    Assert.assertTrue( outputScs.get("bro").toJSON()
+                       , outputScs.get("bro").getFieldToThreatIntelTypeMap().get("ip_dst_addr").contains("playful")
+                       );
+    Assert.assertTrue( outputScs.get("bro").toJSON()
+                       , outputScs.get("bro").getFieldToThreatIntelTypeMap().get("ip_dst_addr").contains("malicious_ip")
+                       );
+  }
+
+  /**
+   {
+  "zkQuorum" : "localhost:2181"
+ ,"sensorToFieldList" : {
+  "bro" : {
+           "type" : "ENRICHMENT"
+          ,"fieldToEnrichmentTypes" : {
+            "ip_src_addr" : [ "playful" ]
+           ,"ip_dst_addr" : [ "playful" ]
+                                      }
+          }
+                        }
+   }
+   */
+  @Multiline
+  public static String enrichmentConfigStr;
+  @Test
+  public void testEnrichment() throws Exception {
+
+    SensorEnrichmentConfig broSc = JSONUtils.INSTANCE.load(sourceConfigStr, SensorEnrichmentConfig.class);
+
+    EnrichmentConfig config = JSONUtils.INSTANCE.load(enrichmentConfigStr, EnrichmentConfig.class);
+    final Map<String, SensorEnrichmentConfig> outputScs = new HashMap<>();
+    EnrichmentConfig.SourceConfigHandler scHandler = new EnrichmentConfig.SourceConfigHandler() {
+      @Override
+      public SensorEnrichmentConfig readConfig(String sensor) throws Exception {
+        if(sensor.equals("bro")) {
+          return JSONUtils.INSTANCE.load(sourceConfigStr, SensorEnrichmentConfig.class);
+        }
+        else {
+          throw new IllegalStateException("Tried to retrieve an unexpected sensor: " + sensor);
+        }
+      }
+
+      @Override
+      public void persistConfig(String sensor, SensorEnrichmentConfig config) throws Exception {
+        outputScs.put(sensor, config);
+      }
+    };
+    EnrichmentConfig.updateSensorConfigs(scHandler, config.getSensorToFieldList());
+    Assert.assertNotNull(outputScs.get("bro"));
+    Assert.assertNotSame(outputScs.get("bro"), broSc);
+    Assert.assertEquals( outputScs.get("bro").toJSON()
+                       , outputScs.get("bro").getEnrichmentFieldMap().get(Constants.SIMPLE_HBASE_ENRICHMENT).size()
+                       , 2
+                       );
+    Assert.assertTrue( outputScs.get("bro").toJSON()
+                       , outputScs.get("bro").getEnrichmentFieldMap()
+                                  .get(Constants.SIMPLE_HBASE_ENRICHMENT)
+                                  .contains("ip_src_addr")
+                       );
+    Assert.assertTrue( outputScs.get("bro").toJSON()
+                       , outputScs.get("bro").getEnrichmentFieldMap()
+                                  .get(Constants.SIMPLE_HBASE_ENRICHMENT)
+                                  .contains("ip_dst_addr")
+                       );
+    Assert.assertEquals( outputScs.get("bro").toJSON()
+                       , outputScs.get("bro").getFieldToEnrichmentTypeMap().keySet().size()
+                       , 2
+                       );
+    Assert.assertEquals( outputScs.get("bro").toJSON()
+                       , outputScs.get("bro").getFieldToEnrichmentTypeMap().get("ip_src_addr").size()
+                       , 1
+                       );
+    Assert.assertEquals( outputScs.get("bro").toJSON()
+                       , outputScs.get("bro").getFieldToEnrichmentTypeMap().get("ip_src_addr").get(0)
+                       , "playful"
+                       );
+    Assert.assertEquals( outputScs.get("bro").toJSON()
+                       , outputScs.get("bro").getFieldToEnrichmentTypeMap().get("ip_dst_addr").size()
+                       , 1
+                       );
+    Assert.assertEquals( outputScs.get("bro").toJSON()
+                       , outputScs.get("bro").getFieldToEnrichmentTypeMap().get("ip_dst_addr").get(0)
+                       , "playful"
+                       );
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-Common/src/test/java/org/apache/metron/hbase/converters/enrichment/EnrichmentConverterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/hbase/converters/enrichment/EnrichmentConverterTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/hbase/converters/enrichment/EnrichmentConverterTest.java
new file mode 100644
index 0000000..9327b68
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/hbase/converters/enrichment/EnrichmentConverterTest.java
@@ -0,0 +1,34 @@
+package org.apache.metron.hbase.converters.enrichment;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.metron.reference.lookup.LookupKV;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+public class EnrichmentConverterTest {
+  @Test
+  public void testKeyConversion() {
+    EnrichmentKey k1 = new EnrichmentKey("type", "indicator1");
+    byte[] serialized = k1.toBytes();
+    EnrichmentKey k2 = new EnrichmentKey();
+    k2.fromBytes(serialized);
+    Assert.assertEquals(k1, k2);
+  }
+
+  @Test
+  public void testValueConversion() throws IOException {
+    EnrichmentConverter converter = new EnrichmentConverter();
+    EnrichmentKey k1 = new EnrichmentKey("type", "indicator");
+    EnrichmentValue v1 = new EnrichmentValue(new HashMap<String, String>() {{
+      put("k1", "v1");
+      put("k2", "v2");
+    }});
+    Put serialized = converter.toPut("cf", k1, v1);
+    LookupKV<EnrichmentKey, EnrichmentValue> kv = converter.fromPut(serialized,"cf");
+    Assert.assertEquals(k1, kv.getKey());
+    Assert.assertEquals(v1, kv.getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/README.md
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/README.md b/metron-streaming/Metron-DataLoads/README.md
index b6fe229..4ad5d5f 100644
--- a/metron-streaming/Metron-DataLoads/README.md
+++ b/metron-streaming/Metron-DataLoads/README.md
@@ -1,50 +1,252 @@
 # Metron-DataLoads
 
-This project is a collection of classes to assist with loading of various enrichment sources into Metron.
+This project is a collection of classes to assist with loading of
+various enrichment and threat intelligence sources into Metron.
+
+## Simple HBase Enrichments/Threat Intelligence
+
+The vast majority of enrichments and threat intelligence processing tend
+toward the following pattern:
+* Take a field
+* Look up the field in a key/value store
+* If the key exists, then either it's a threat to be alerted or it should be enriched with the value associated with the key.
+
+As such, we have created this capability as a default threat intel and enrichment adapter.  The basic primitive for simple enrichments and threat intelligence sources
+is a complex key containing the following:
+* Type : The type of threat intel or enrichment (e.g. malicious_ip)
+* Indicator : The indicator in question
+* Value : The value to associate with the type, indicator pair.  This is a JSON map.
+
+At present, all of the dataloads utilities function by converting raw data
+sources to this primitive key (type, indicator) and value to be placed in HBase.
+
+In the case of threat intel, a hit on the threat intel table will result
+in:
+* The `is_alert` field being set to `true` in the index
+* A field named `threatintels.hbaseThreatIntel.$field.$threatintel_type` is set to `alert` 
+   * `$field` is the field in the original document that was a match (e.g. `src_ip_addr`) 
+   * `$threatintel_type` is the type of threat intel imported (defined in the Extractor configuration below).
+
+In the case of simple hbase enrichment, a hit on the enrichments table
+will result in the following new field for each key in the value:`enrichments.hbaseEnrichment.$field.$enrichment_type.$key` 
+* `$field` is the field in the original document that was a match (e.g.  `src_ip_addr`)
+* `$enrichment_type` is the type of enrichment imported (defined in the Extractor configuration below).
+* `$key` is a key in the JSON map associated with the row in HBase.
+
+For instance, in the situation where we had the following very silly key/value in
+HBase in the enrichment table:
+* indicator: `127.0.0.1`
+* type : `important_addresses`
+* value: `{ "name" : "localhost", "location" : "home" }`
+
+If we had a document whose `ip_src_addr` came through with a value of
+`127.0.0.1`, we would have the following fields added to the indexed
+document:
+* `enrichments.hbaseEnrichment.ip_src_addr.important_addresses.name` : `localhost`
+* `enrichments.hbaseEnrichment.ip_src_addr.important_addresses.location` : `home`
+
+## Extractor Framework
+
+For the purpose of ingesting data of a variety of formats, we have
+created an Extractor framework which allows for common data formats to
+be interpreted as enrichment or threat intelligence sources.  The
+formats supported at present are:
+* CSV (both threat intel and enrichment)
+* STIX (threat intel only)
+* Custom (pass your own class)
+
+All of the current utilities take a JSON file to configure how to
+interpret input data.  This JSON describes the type of data and the
+schema if necessary for the data if it is not fixed (as in STIX, e.g.).
+
+### CSV Extractor
+
+Consider the following example configuration file which
+describes how to process a CSV file.
 
-## Threat Intel Enrichment
+````
+{
+  "config" : {
+    "columns" : {
+         "ip" : 0
+        ,"source" : 2
+    }
+    ,"indicator_column" : "ip"
+    ,"type" : "malicious_ip"
+    ,"separator" : ","
+  }
+  ,"extractor" : "CSV"
+}
+````
 
-Threat Intel enrichment data sources can be loaded into Metron using the ThreatIntelLoader class and an implementation of a ThreatIntelSource interface. Both are described below.
+In this example, we have instructed the extractor of the schema (i.e. the columns field), 
+two columns at the first and third position.  We have indicated that the `ip` column is the indicator type
+and that the enrichment type is named `malicious_ip`.  We have also indicated that the extractor to use is the CSV Extractor.
+The other option is the STIX extractor or a fully qualified classname for your own extractor.
 
-### ThreatIntelSource Interface
+The meta column values will show up in the value in HBase because it is called out as a non-indicator column.  The key
+for the value will be 'meta'.  For instance, given an input string of `123.45.123.12,something,the grapevine`, the following key, value
+would be extracted:
+* Indicator : `123.45.123.12`
+* Type : `malicious_ip`
+* Value : `{ "source" : "the grapevine" }`
 
-This inteface extends the Iterator interface and must implement the following methods:
+### STIX Extractor
 
-`void initializeSource(Configuration config);`
+Consider the following config for importing STIX documents.  This is a threat intelligence interchange
+format, so it is particularly relevant and attractive data to import for our purposes.  Because STIX is
+a standard format, there is no need to specify the schema or how to interpret the documents.
 
-Put any setup that needs to be done here. This will be called by ThreatIntelLoader before attempting to fetch any data from the source. The paramter config is a Configuration object created from the configuration file passed to ThreatIntelLoader. See the ThreatIntelLoader section below for more details
+We support a subset of STIX messages for importation:
 
-`void cleanupSource();`
+| STIX Type | Specific Type | Enrichment Type Name |
+|-----------|---------------|----------------------|
+| Address   | IPV_4_ADDR    | address:IPV_4_ADDR   |
+| Address   | IPV_6_ADDR    | address:IPV_6_ADDR   |
+| Address   | E_MAIL        | address:E_MAIL       |
+| Address   | MAC           | address:MAC          |
+| Domain    | FQDN          | domain:FQDN          |
+| Hostname  |               | hostname             |
 
-This is called after all data is retrieved, just before ThreatIntelLoader exists. Perform any clean up here if needed.
 
-`JSONObject next()`
+NOTE: The enrichment type will be used as the type above.
 
-This method should return the next piece of intel to be stored in Metron. The returned JSONObject must have the following fields:
+Consider the following configuration for an Extractor
 
-* indicator - The indicator that will be checked against during enrichment. For example, and IP Address or a Hostname.
-* source - The source of the data, which can be any unique string to identify the origin of the intel. This will be the column qualifer in HBase and be used to group matches on in Storm
-* data - A JSONArray of JSONObjects that detail the intel for the indicator. The JSONObjects have no required format
+````
+{
+  "config" : {
+    "stix_address_categories" : "IPV_4_ADDR"
+  }
+  ,"extractor" : "STIX"
+}
+````
 
+In here, we're configuring the STIX extractor to load from a series of STIX files, however we only want to bring in IPv4
+addresses from the set of all possible addresses.  Note that if no categories are specified for import, all are assumed.
+Also, only address and domain types allow filtering via `stix_address_categories` and `stix_domain_categories` config
+parameters.
 
-`boolean hasNext()`
+## Enrichment Config
 
-Returns true if there are more sources to read. Otherwise, false.
+In order to automatically add new enrichment and threat intel types to existing, running enrichment topologies, you will
+need to add new fields and new types to the zookeeper configuration.  A convenience parameter has been made to assist in this
+when doing an import.  Namely, you can specify the enrichment configs and how they associate with the fields of the 
+documents flowing through the enrichment topology.
 
+Consider the following Enrichment Configuration JSON.  This one is for a threat intelligence type:
+
+````
+{
+  "zkQuorum" : "localhost:2181"
+ ,"sensorToFieldList" : {
+    "bro" : {
+           "type" : "THREAT_INTEL"
+          ,"fieldToEnrichmentTypes" : {
+             "ip_src_addr" : [ "malicious_ip" ]
+            ,"ip_dst_addr" : [ "malicious_ip" ]
+                                      }
+           }
+                        }
+}
+````
 
-### ThreatIntelLoader
+We have to specify the following:
+* The zookeeper quorum which holds the cluster configuration
+* The mapping between the fields in the enriched documents and the enrichment types.
 
-This class is intenteded to be called from the commandline on the Metron cluster and is responsible for taking intel from a ThreatIntelSource implementation and putting them into HBase.
+This configuration allows the ingestion tools to update zookeeper post-ingestion so that the enrichment topology can take advantage
+immediately of the new type.
 
-#### Usage
+
+## Loading Utilities
+
+The two configurations above are used in the three separate ingestion tools:
+* Taxii Loader
+* Bulk load from HDFS via MapReduce
+* Flat File ingestion
+
+### Taxii Loader
+
+The shell script `$METRON_HOME/bin/threatintel_taxii_load.sh` can be used to poll a Taxii server for STIX documents and ingest them into HBase.  
+It is quite common for this Taxii server to be an aggregation server such as Soltra Edge.
+
+In addition to the Enrichment and Extractor configs described above, this loader requires a configuration file describing the connection information
+to the Taxii server.  An illustrative example of such a configuration file is:
 
 ````
-usage: ThreatIntelLoader [--configFile <c>] --source <s> --table <t>
-    --configFile <c>   Configuration file for source class
-    --source <s>       Source class to use
-    --table <t>        HBase table to load into
+{
+   "endpoint" : "http://localhost:8282/taxii-discovery-service"
+  ,"type" : "DISCOVER"
+  ,"collection" : "guest.Abuse_ch"
+  ,"table" : "threat_intel"
+  ,"columnFamily" : "cf"
+  ,"allowedIndicatorTypes" : [ "domainname:FQDN", "address:IPV_4_ADDR" ]
+}
 ````
 
-* configFile - the file passed in by this class is used to provide configuration options to the ThreatIntelSource implementation being used.
-* source - the implementation of ThreatIntelSource to use
-* table - the hbase table to store the threat intel in for enrichment later. This should match what the corresponding enrichment bolt is using in Storm
+As you can see, we are specifying the following information:
+* endpoint : The URL of the endpoint
+* type : `POLL` or `DISCOVER` depending on the endpoint.
+* collection : The Taxii collection to ingest
+* table : The HBase table to import into
+* columnFamily : The column family to import into
+* allowedIndicatorTypes : an array of acceptable threat intel types (see the "Enrichment Type Name" column of the Stix table above for the possibilities).
+
+The parameters for the utility are as follows:
+
+| Short Code | Long Code                 | Is Required? | Description                                                                                                                                        |
+|------------|---------------------------|--------------|----------------------------------------------------------------------------------------------------------------------------------------------------|
+| -h         |                           | No           | Generate the help screen/set of options                                                                                                            |
+| -e         | --extractor_config        | Yes          | JSON Document describing the extractor for this input data source                                                                                  |
+| -c         | --taxii_connection_config | Yes          | The JSON config file to configure the connection                                                                                                   |
+| -p         | --time_between_polls      | No           | The time between polling the Taxii server in milliseconds. (default: 1 hour)                                                                       |
+| -b         | --begin_time              | No           | Start time to poll the Taxii server (all data from that point will be gathered in the first pull).  The format for the date is yyyy-MM-dd HH:mm:ss |
+| -l         | --log4j                   | No           | The Log4j Properties to load                                                                                                                       |
+| -n         | --enrichment_config       | No           | The JSON document describing the enrichments to configure.  Unlike other loaders, this is run first if specified.                                  |
+
+
+### Bulk Load from HDFS
+
+The shell script `$METRON_HOME/bin/threatintel_bulk_load.sh` will kick off a MR job to load data staged in HDFS into an HBase table.  Note: despite what
+the naming may suggest, this utility works for enrichment as well as threat intel due to the underlying infrastructure being the same.
+
+The parameters for the utility are as follows:
+
+| Short Code | Long Code           | Is Required? | Description                                                                                                       |
+|------------|---------------------|--------------|-------------------------------------------------------------------------------------------------------------------|
+| -h         |                     | No           | Generate the help screen/set of options                                                                           |
+| -e         | --extractor_config  | Yes          | JSON Document describing the extractor for this input data source                                                 |
+| -t         | --table             | Yes          | The HBase table to import into                                                                                    |
+| -f         | --column_family     | Yes          | The HBase table column family to import into                                                                      |
+| -i         | --input             | Yes          | The input data location on HDFS                                                                                   |
+| -n         | --enrichment_config | No           | The JSON document describing the enrichments to configure.  Unlike other loaders, this is run first if specified. |
+or threat intel.
+
+### Flatfile Loader
+
+The shell script `$METRON_HOME/bin/flatfile_loader.sh` will read data from local disk and load the enrichment or threat intel data into an HBase table.  
+Note: This utility works for enrichment as well as threat intel due to the underlying infrastructure being the same.
+
+One special thing to note here is that there is a special configuration
+parameter to the Extractor config that is only considered during this
+loader:
+* inputFormatHandler : This specifies how to consider the data.  The two implementations are `BY_LINE` and `org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat`.
+
+The default is `BY_LINE`, which makes sense for a list of CSVs where
+each line indicates a unit of information which can be imported.
+However, if you are importing a set of STIX documents, then you want
+each document to be considered as input to the Extractor.
+
+The parameters for the utility are as follows:
+
+| Short Code | Long Code           | Is Required? | Description                                                                                                                                                                          |
+|------------|---------------------|--------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| -h         |                     | No           | Generate the help screen/set of options                                                                                                                                              |
+| -e         | --extractor_config  | Yes          | JSON Document describing the extractor for this input data source                                                                                                                    |
+| -t         | --hbase_table       | Yes          | The HBase table to import into                                                                                                                                                       |
+| -c         | --hbase_cf          | Yes          | The HBase table column family to import into                                                                                                                                         |
+| -i         | --input             | Yes          | The input data location on local disk.  If this is a file, then that file will be loaded.  If this is a directory, then the files will be loaded recursively under that directory. |
+| -l         | --log4j             | No           | The log4j properties file to load                                                                                                                                                    |
+| -n         | --enrichment_config | No           | The JSON document describing the enrichments to configure.  Unlike other loaders, this is run first if specified.                                                                    |
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/bash/flatfile_loader.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/flatfile_loader.sh b/metron-streaming/Metron-DataLoads/src/main/bash/flatfile_loader.sh
new file mode 100755
index 0000000..a9db639
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/bash/flatfile_loader.sh
@@ -0,0 +1,39 @@
+#!/bin/bash
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#     http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# 
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+  . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+  . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+
+export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
+CP=/usr/metron/0.1BETA/lib/Metron-DataLoads-0.1BETA.jar:/usr/metron/0.1BETA/lib/taxii-1.1.0.1.jar:`${HBASE_HOME}/bin/hbase classpath`
+HADOOP_CLASSPATH=$(echo $CP )
+for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
+  if [ -f $jar ];then
+    LIBJARS="$jar,$LIBJARS"
+  fi
+done
+export HADOOP_CLASSPATH
+hadoop jar /usr/metron/0.1BETA/lib/Metron-DataLoads-0.1BETA.jar org.apache.metron.dataloads.nonbulk.flatfile.SimpleEnrichmentFlatFileLoader "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_taxii_load.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_taxii_load.sh b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_taxii_load.sh
index 77011fb..59e70d8 100755
--- a/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_taxii_load.sh
+++ b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_taxii_load.sh
@@ -36,4 +36,4 @@ for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
   fi
 done
 export HADOOP_CLASSPATH
-hadoop jar /usr/metron/0.1BETA/lib/Metron-DataLoads-0.1BETA.jar org.apache.metron.dataloads.taxii.TaxiiLoader "$@"
+hadoop jar /usr/metron/0.1BETA/lib/Metron-DataLoads-0.1BETA.jar org.apache.metron.dataloads.nonbulk.taxii.TaxiiLoader "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/ThreatIntelLoader.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/ThreatIntelLoader.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/ThreatIntelLoader.java
deleted file mode 100644
index 8ed2e23..0000000
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/ThreatIntelLoader.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.dataloads;
-
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.BasicParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.log4j.Logger;
-import org.json.simple.JSONObject;
-import org.json.simple.JSONArray;
-
-import org.apache.metron.dataloads.interfaces.ThreatIntelSource;
-
-public class ThreatIntelLoader {
-
-	
-	private static final Logger LOG = Logger.getLogger(ThreatIntelLoader.class);
-	
-	private static int BULK_SIZE = 50; 
-	
-	public static void main(String[] args) {
-		
-		PropertiesConfiguration sourceConfig = null;
-		ThreatIntelSource threatIntelSource = null;
-		ArrayList<Put> putList = null;
-		HTable table = null;
-		Configuration hConf = null;
-		
-		CommandLine commandLine = parseCommandLine(args);
-		File configFile = new File(commandLine.getOptionValue("configFile"));
-		
-		try {
-			sourceConfig = new PropertiesConfiguration(configFile);
-		} catch (org.apache.commons.configuration.ConfigurationException e) {
-			LOG.error("Error in configuration file " + configFile);
-			LOG.error(e);
-			System.exit(-1);
-		}
-		
-		try {
-			threatIntelSource = (ThreatIntelSource) Class.forName(commandLine.getOptionValue("source")).newInstance();
-			threatIntelSource.initializeSource(sourceConfig);
-		} catch (ClassNotFoundException|InstantiationException|IllegalAccessException e) {
-			LOG.error("Error while trying to load class " + commandLine.getOptionValue("source"));
-			LOG.error(e);
-			System.exit(-1);
-		}
-		
-		hConf = HBaseConfiguration.create();
-		try {
-			table = new HTable(hConf, commandLine.getOptionValue("table"));
-		} catch (IOException e) {
-			LOG.error("Exception when processing HBase config");
-			LOG.error(e);
-			System.exit(-1);
-		}
-		
-		
-		putList = new ArrayList<Put>();
-		
-		while (threatIntelSource.hasNext()) {
-			
-			JSONObject intel = threatIntelSource.next();
-			
-			/*
-			 * If any of the required fields from threatIntelSource are
-			 * missing, or contain invalid data, don't put it in HBase.
-			 */
-			try {				
-
-				putList.add(putRequestFromIntel(intel));		
-				
-				if (putList.size() == BULK_SIZE) {
-					table.put(putList);
-					putList.clear();
-				}
-				
-			} catch (NullPointerException|ClassCastException e) {
-				LOG.error("Exception while processing intel object");
-				LOG.error(intel.toString());
-				LOG.error(e);
-			} catch (InterruptedIOException|org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException e) {
-				LOG.error("Problem communicationg with HBase");
-				LOG.error(e);
-				System.exit(-1);
-			} catch (IOException e) {
-				LOG.error("Problem communicationg with HBase");
-				LOG.error(e);
-				System.exit(-1);
-			}
-		}
-		
-	}
-	/*
-	 * Takes a JSONObject from a ThreatIntelSource implementation, ensures
-	 * that the format of the returned JSONObect is correct, and returns
-	 * a Put request for HBase.
-	 * 
-	 * @param	intel	The JSONObject from a ThreatIntelSource
-	 * @return			A put request for the intel data 
-	 * @throws	NullPointerException If a required field is missing
-	 * @throws	ClassCastException If a field has an invalid type
-	 * 
-	 */
-	private static Put putRequestFromIntel(JSONObject intel) {
-		
-		Put tempPut = new Put(Bytes.toBytes((String) intel.get("indicator")));
-		
-		JSONArray intelArray = (JSONArray) intel.get("data");
-		
-		tempPut.add(Bytes.toBytes("source"),
-					Bytes.toBytes((String) intel.get("source")),
-					Bytes.toBytes(intelArray.toString()));
-		
-		return tempPut;
-	}
-	/*
-	 * Handles parsing of command line options and validates the options are used
-	 * correctly. This will not validate the value of the options, it will just
-	 * ensure that the required options are used. If the options are used 
-	 * incorrectly, the help is printed, and the program exits.
-	 * 
-	 * @param  args The arguments from the CLI
-	 * @return 		A CommandLine with the CLI arguments
-	 * 
-	 */
-	private static CommandLine parseCommandLine(String[] args) {
-		
-		CommandLineParser parser = new BasicParser();
-		CommandLine cli = null;
-		
-		Options options = new Options();
-		
-		options.addOption(OptionBuilder.withArgName("s").
-				withLongOpt("source").
-				isRequired(true).
-				hasArg(true).
-				withDescription("Source class to use").
-				create()
-				);
-		options.addOption(OptionBuilder.withArgName("t").
-				withLongOpt("table").
-				isRequired(true).
-				hasArg(true).
-				withDescription("HBase table to load into").
-				create()
-				);
-		options.addOption(OptionBuilder.withArgName("c").
-				withLongOpt("configFile").
-				hasArg(true).
-				withDescription("Configuration file for source class").
-				create()
-				);
-		
-		try {
-			cli = parser.parse(options, args);
-		} catch(org.apache.commons.cli.ParseException e) {
-			HelpFormatter formatter = new HelpFormatter();
-			formatter.printHelp("ThreatIntelLoader", options, true);
-			System.exit(-1);
-		}
-		
-		return cli;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/LeastRecentlyUsedPruner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/LeastRecentlyUsedPruner.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/LeastRecentlyUsedPruner.java
index 8f1f205..7acc96c 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/LeastRecentlyUsedPruner.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/LeastRecentlyUsedPruner.java
@@ -150,7 +150,7 @@ public class LeastRecentlyUsedPruner {
 
         public static void printHelp() {
             HelpFormatter formatter = new HelpFormatter();
-            formatter.printHelp( "ThreatIntelBulkLoader", getOptions());
+            formatter.printHelp( "LeastRecentlyUsedPruner", getOptions());
         }
 
         public static Options getOptions() {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
index 02f9f9b..8c0a7e4 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
@@ -32,8 +32,10 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.metron.dataloads.extractor.ExtractorHandler;
 import org.apache.metron.dataloads.hbase.mr.BulkLoadMapper;
+import org.apache.metron.enrichment.EnrichmentConfig;
 import org.apache.metron.hbase.converters.HbaseConverter;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelConverter;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentConverter;
+import org.apache.metron.utils.JSONUtils;
 
 import javax.annotation.Nullable;
 import java.io.File;
@@ -43,188 +45,215 @@ import java.text.*;
 import java.util.Date;
 
 public class ThreatIntelBulkLoader  {
-    private static abstract class OptionHandler implements Function<String, Option> {}
-    private enum BulkLoadOptions {
-        HELP("h", new OptionHandler() {
+  private static abstract class OptionHandler implements Function<String, Option> {}
+  private enum BulkLoadOptions {
+    HELP("h", new OptionHandler() {
 
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                return new Option(s, "help", false, "Generate Help screen");
-            }
-        })
-        ,TABLE("t", new OptionHandler() {
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                Option o = new Option(s, "table", true, "HBase table to import data into");
-                o.setRequired(true);
-                o.setArgName("HBASE_TABLE");
-                return o;
-            }
-        })
-        ,COLUMN_FAMILY("f", new OptionHandler() {
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                Option o = new Option(s, "column_family", true, "Column family of the HBase table to import into");
-                o.setRequired(true);
-                o.setArgName("CF_NAME");
-                return o;
-            }
-        })
-        ,EXTRACTOR_CONFIG("e", new OptionHandler() {
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
-                o.setArgName("JSON_FILE");
-                o.setRequired(true);
-                return o;
-            }
-        })
-        ,INPUT_DATA("i", new OptionHandler() {
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                Option o = new Option(s, "input", true, "Input directory in HDFS for the data to import into HBase");
-                o.setArgName("DIR");
-                o.setRequired(true);
-                return o;
-            }
-        })
-        ,AS_OF_TIME("a", new OptionHandler() {
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                Option o = new Option(s, "as_of", true, "The last read timestamp to mark the records with (omit for time of execution)");
-                o.setArgName("datetime");
-                o.setRequired(false);
-                return o;
-            }
-        })
-        ,AS_OF_TIME_FORMAT("z", new OptionHandler() {
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                Option o = new Option(s, "as_of_format", true, "The format of the as_of time (only used in conjunction with the as_of option)");
-                o.setArgName("format");
-                o.setRequired(false);
-                return o;
-            }
-        })
-        ,CONVERTER("c", new OptionHandler() {
-            @Nullable
-            @Override
-            public Option apply(@Nullable String s) {
-                Option o = new Option(s, "converter", true, "The HBase converter class to use (Default is threat intel)");
-                o.setArgName("class");
-                o.setRequired(false);
-                return o;
-            }
-        })
-        ;
-        Option option;
-        String shortCode;
-        BulkLoadOptions(String shortCode, OptionHandler optionHandler) {
-            this.shortCode = shortCode;
-            this.option = optionHandler.apply(shortCode);
-        }
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        return new Option(s, "help", false, "Generate Help screen");
+      }
+    })
+    ,TABLE("t", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "table", true, "HBase table to import data into");
+        o.setRequired(true);
+        o.setArgName("HBASE_TABLE");
+        return o;
+      }
+    })
+    ,COLUMN_FAMILY("f", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "column_family", true, "Column family of the HBase table to import into");
+        o.setRequired(true);
+        o.setArgName("CF_NAME");
+        return o;
+      }
+    })
+    ,EXTRACTOR_CONFIG("e", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
+        o.setArgName("JSON_FILE");
+        o.setRequired(true);
+        return o;
+      }
+    })
+    ,INPUT_DATA("i", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "input", true, "Input directory in HDFS for the data to import into HBase");
+        o.setArgName("DIR");
+        o.setRequired(true);
+        return o;
+      }
+    })
+    ,AS_OF_TIME("a", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "as_of", true, "The last read timestamp to mark the records with (omit for time of execution)");
+        o.setArgName("datetime");
+        o.setRequired(false);
+        return o;
+      }
+    })
+    ,AS_OF_TIME_FORMAT("z", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "as_of_format", true, "The format of the as_of time (only used in conjunction with the as_of option)");
+        o.setArgName("format");
+        o.setRequired(false);
+        return o;
+      }
+    })
+    ,CONVERTER("c", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "converter", true, "The HBase converter class to use (Default is threat intel)");
+        o.setArgName("class");
+        o.setRequired(false);
+        return o;
+      }
+    })
+    ,ENRICHMENT_CONFIG("n", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "enrichment_config", true
+                , "JSON Document describing the enrichment configuration details." +
+                "  This is used to associate an enrichment type with a field type in zookeeper."
+        );
+        o.setArgName("JSON_FILE");
+        o.setRequired(false);
+        return o;
+      }
+    })
+    ;
+    Option option;
+    String shortCode;
+    BulkLoadOptions(String shortCode, OptionHandler optionHandler) {
+      this.shortCode = shortCode;
+      this.option = optionHandler.apply(shortCode);
+    }
 
-        public boolean has(CommandLine cli) {
-            return cli.hasOption(shortCode);
-        }
+    public boolean has(CommandLine cli) {
+      return cli.hasOption(shortCode);
+    }
 
-        public String get(CommandLine cli) {
-            return cli.getOptionValue(shortCode);
-        }
+    public String get(CommandLine cli) {
+      return cli.getOptionValue(shortCode);
+    }
 
-        public static CommandLine parse(CommandLineParser parser, String[] args) {
-            try {
-                CommandLine cli = parser.parse(getOptions(), args);
-                if(ThreatIntelBulkLoader.BulkLoadOptions.HELP.has(cli)) {
-                    printHelp();
-                    System.exit(0);
-                }
-                return cli;
-            } catch (ParseException e) {
-                System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
-                e.printStackTrace(System.err);
-                printHelp();
-                System.exit(-1);
-                return null;
-            }
+    public static CommandLine parse(CommandLineParser parser, String[] args) {
+      try {
+        CommandLine cli = parser.parse(getOptions(), args);
+        if(ThreatIntelBulkLoader.BulkLoadOptions.HELP.has(cli)) {
+          printHelp();
+          System.exit(0);
         }
+        return cli;
+      } catch (ParseException e) {
+        System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+        e.printStackTrace(System.err);
+        printHelp();
+        System.exit(-1);
+        return null;
+      }
+    }
 
-        public static void printHelp() {
-            HelpFormatter formatter = new HelpFormatter();
-            formatter.printHelp( "ThreatIntelBulkLoader", getOptions());
-        }
+    public static void printHelp() {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp( "ThreatIntelBulkLoader", getOptions());
+    }
 
-        public static Options getOptions() {
-            Options ret = new Options();
-            for(BulkLoadOptions o : BulkLoadOptions.values()) {
-               ret.addOption(o.option);
-            }
-            return ret;
-        }
+    public static Options getOptions() {
+      Options ret = new Options();
+      for(BulkLoadOptions o : BulkLoadOptions.values()) {
+        ret.addOption(o.option);
+      }
+      return ret;
     }
-    private static long getTimestamp(CommandLine cli) throws java.text.ParseException {
-        if(BulkLoadOptions.AS_OF_TIME.has(cli)) {
-            if(!BulkLoadOptions.AS_OF_TIME_FORMAT.has(cli)) {
-                throw new IllegalStateException("Unable to proceed: Specified as_of_time without an associated format.");
-            }
-            else {
-                DateFormat format = new SimpleDateFormat(BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli));
-                Date d = format.parse(BulkLoadOptions.AS_OF_TIME.get(cli));
-                return d.getTime();
-            }
-        }
-        else {
-            return System.currentTimeMillis();
-        }
+  }
+  private static long getTimestamp(CommandLine cli) throws java.text.ParseException {
+    if(BulkLoadOptions.AS_OF_TIME.has(cli)) {
+      if(!BulkLoadOptions.AS_OF_TIME_FORMAT.has(cli)) {
+        throw new IllegalStateException("Unable to proceed: Specified as_of_time without an associated format.");
+      }
+      else {
+        DateFormat format = new SimpleDateFormat(BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli));
+        Date d = format.parse(BulkLoadOptions.AS_OF_TIME.get(cli));
+        return d.getTime();
+      }
     }
-    private static String readExtractorConfig(File configFile) throws IOException {
-        return Joiner.on("\n").join(Files.readLines(configFile, Charset.defaultCharset()));
+    else {
+      return System.currentTimeMillis();
     }
+  }
+  private static String readExtractorConfig(File configFile) throws IOException {
+    return Joiner.on("\n").join(Files.readLines(configFile, Charset.defaultCharset()));
+  }
 
-    public static Job createJob(Configuration conf, String input, String table, String cf, String extractorConfigContents, long ts, HbaseConverter converter) throws IOException {
-        Job job = new Job(conf);
-        job.setJobName("ThreatIntelBulkLoader: " + input + " => " +  table + ":" + cf);
-        System.out.println("Configuring " + job.getJobName());
-        job.setJarByClass(ThreatIntelBulkLoader.class);
-        job.setMapperClass(org.apache.metron.dataloads.hbase.mr.BulkLoadMapper.class);
-        job.setOutputFormatClass(TableOutputFormat.class);
-        job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
-        job.getConfiguration().set(BulkLoadMapper.COLUMN_FAMILY_KEY, cf);
-        job.getConfiguration().set(BulkLoadMapper.CONFIG_KEY, extractorConfigContents);
-        job.getConfiguration().set(BulkLoadMapper.LAST_SEEN_KEY, "" + ts);
-        job.getConfiguration().set(BulkLoadMapper.CONVERTER_KEY, converter.getClass().getName());
-        job.setOutputKeyClass(ImmutableBytesWritable.class);
-        job.setOutputValueClass(Put.class);
-        job.setNumReduceTasks(0);
-        ExtractorHandler handler = ExtractorHandler.load(extractorConfigContents);
-        handler.getInputFormatHandler().set(job, new Path(input), handler.getConfig());
-        return job;
-    }
+  public static Job createJob(Configuration conf, String input, String table, String cf, String extractorConfigContents, long ts, HbaseConverter converter) throws IOException {
+    Job job = new Job(conf);
+    job.setJobName("ThreatIntelBulkLoader: " + input + " => " +  table + ":" + cf);
+    System.out.println("Configuring " + job.getJobName());
+    job.setJarByClass(ThreatIntelBulkLoader.class);
+    job.setMapperClass(org.apache.metron.dataloads.hbase.mr.BulkLoadMapper.class);
+    job.setOutputFormatClass(TableOutputFormat.class);
+    job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
+    job.getConfiguration().set(BulkLoadMapper.COLUMN_FAMILY_KEY, cf);
+    job.getConfiguration().set(BulkLoadMapper.CONFIG_KEY, extractorConfigContents);
+    job.getConfiguration().set(BulkLoadMapper.LAST_SEEN_KEY, "" + ts);
+    job.getConfiguration().set(BulkLoadMapper.CONVERTER_KEY, converter.getClass().getName());
+    job.setOutputKeyClass(ImmutableBytesWritable.class);
+    job.setOutputValueClass(Put.class);
+    job.setNumReduceTasks(0);
+    ExtractorHandler handler = ExtractorHandler.load(extractorConfigContents);
+    handler.getInputFormatHandler().set(job, new Path(input), handler.getConfig());
+    return job;
+  }
 
-    public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException, IllegalAccessException, InstantiationException {
-        Configuration conf = HBaseConfiguration.create();
-        String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+  public static void main(String... argv) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
 
-        CommandLine cli = BulkLoadOptions.parse(new PosixParser(), otherArgs);
-        Long ts = getTimestamp(cli);
-        String input = BulkLoadOptions.INPUT_DATA.get(cli);
-        String table = BulkLoadOptions.TABLE.get(cli);
-        String cf = BulkLoadOptions.COLUMN_FAMILY.get(cli);
-        String extractorConfigContents = readExtractorConfig(new File(BulkLoadOptions.EXTRACTOR_CONFIG.get(cli)));
-        String converterClass = ThreatIntelConverter.class.getName();
-        if(BulkLoadOptions.CONVERTER.has(cli)) {
-            converterClass = BulkLoadOptions.CONVERTER.get(cli);
-        }
-        HbaseConverter converter = (HbaseConverter) Class.forName(converterClass).newInstance();
-        Job job = createJob(conf, input, table, cf, extractorConfigContents, ts, converter);
-        System.out.println(conf);
-        System.exit(job.waitForCompletion(true) ? 0 : 1);
+    CommandLine cli = BulkLoadOptions.parse(new PosixParser(), otherArgs);
+    Long ts = getTimestamp(cli);
+    String input = BulkLoadOptions.INPUT_DATA.get(cli);
+    String table = BulkLoadOptions.TABLE.get(cli);
+    String cf = BulkLoadOptions.COLUMN_FAMILY.get(cli);
+    String extractorConfigContents = readExtractorConfig(new File(BulkLoadOptions.EXTRACTOR_CONFIG.get(cli)));
+    String converterClass = EnrichmentConverter.class.getName();
+    if(BulkLoadOptions.CONVERTER.has(cli)) {
+      converterClass = BulkLoadOptions.CONVERTER.get(cli);
+    }
+    EnrichmentConfig enrichmentConfig = null;
+    if(BulkLoadOptions.ENRICHMENT_CONFIG.has(cli)) {
+      enrichmentConfig = JSONUtils.INSTANCE.load( new File(BulkLoadOptions.ENRICHMENT_CONFIG.get(cli))
+              , EnrichmentConfig.class
+      );
+    }
+
+    HbaseConverter converter = (HbaseConverter) Class.forName(converterClass).newInstance();
+    Job job = createJob(conf, input, table, cf, extractorConfigContents, ts, converter);
+    System.out.println(conf);
+    boolean jobRet = job.waitForCompletion(true);
+    if(!jobRet) {
+      System.exit(1);
+    }
+    if(enrichmentConfig != null) {
+        enrichmentConfig.updateSensorConfigs();
     }
+    System.exit(0);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
index 30e56d8..119a68f 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
@@ -17,9 +17,9 @@
  */
 package org.apache.metron.dataloads.extractor;
 
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.metron.reference.lookup.LookupKV;
-import org.apache.metron.threatintel.ThreatIntelResults;
+import org.apache.metron.reference.lookup.LookupKey;
+import org.apache.metron.reference.lookup.LookupValue;
 
 import java.io.IOException;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
index b9d270a..5d17cbe 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
@@ -49,11 +49,7 @@ public class ExtractorHandler {
     public void setInputFormatHandler(String handler) {
         try {
             this.inputFormatHandler= Formats.create(handler);
-        } catch (ClassNotFoundException e) {
-            throw new IllegalStateException("Unable to create an inputformathandler", e);
-        } catch (IllegalAccessException e) {
-            throw new IllegalStateException("Unable to create an inputformathandler", e);
-        } catch (InstantiationException e) {
+        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
             throw new IllegalStateException("Unable to create an inputformathandler", e);
         }
     }
@@ -64,11 +60,7 @@ public class ExtractorHandler {
     public void setExtractor(String extractor) {
         try {
             this.extractor = Extractors.create(extractor);
-        } catch (ClassNotFoundException e) {
-            throw new IllegalStateException("Unable to create an extractor", e);
-        } catch (IllegalAccessException e) {
-            throw new IllegalStateException("Unable to create an extractor", e);
-        } catch (InstantiationException e) {
+        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
             throw new IllegalStateException("Unable to create an extractor", e);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
index 94e9ccb..92b7242 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
@@ -21,103 +21,119 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
 import com.opencsv.CSVParser;
 import com.opencsv.CSVParserBuilder;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.metron.dataloads.extractor.Extractor;
-import org.apache.metron.hbase.converters.HbaseConverter;
 import org.apache.metron.reference.lookup.LookupKV;
 import org.apache.metron.reference.lookup.LookupKey;
-import org.apache.metron.reference.lookup.LookupValue;
-import org.apache.metron.threatintel.ThreatIntelResults;
 
 import java.io.IOException;
 import java.util.*;
 
 public class CSVExtractor implements Extractor {
-    public static final String COLUMNS_KEY="columns";
-    public static final String INDICATOR_COLUMN_KEY="indicator_column";
-    public static final String SEPARATOR_KEY="separator";
-    public static final String LOOKUP_CONVERTER = "lookupConverter";
+  public static final String COLUMNS_KEY="columns";
+  public static final String INDICATOR_COLUMN_KEY="indicator_column";
+  public static final String TYPE_COLUMN_KEY="type_column";
+  public static final String TYPE_KEY="type";
+  public static final String SEPARATOR_KEY="separator";
+  public static final String LOOKUP_CONVERTER = "lookup_converter";
 
-    private int indicatorColumn;
-    private Map<String, Integer> columnMap = new HashMap<>();
-    private CSVParser parser;
-    private LookupConverter converter = LookupConverters.THREAT_INTEL.getConverter();
+  private int typeColumn;
+  private String type;
+  private int indicatorColumn;
+  private Map<String, Integer> columnMap = new HashMap<>();
+  private CSVParser parser;
+  private LookupConverter converter = LookupConverters.ENRICHMENT.getConverter();
 
-    @Override
-    public Iterable<LookupKV> extract(String line) throws IOException {
-        if(line.trim().startsWith("#")) {
-            //comment
-            return Collections.emptyList();
-        }
-        String[] tokens = parser.parseLine(line);
-        LookupKey key = converter.toKey(tokens[indicatorColumn]);
-        Map<String, String> values = new HashMap<>();
-        for(Map.Entry<String, Integer> kv : columnMap.entrySet()) {
-            values.put(kv.getKey(), tokens[kv.getValue()]);
-        }
-        return Arrays.asList(new LookupKV(key, converter.toValue(values)));
+  @Override
+  public Iterable<LookupKV> extract(String line) throws IOException {
+    if(line.trim().startsWith("#")) {
+      //comment
+      return Collections.emptyList();
     }
+    String[] tokens = parser.parseLine(line);
 
-    private static Map.Entry<String, Integer> getColumnMapEntry(String column, int i) {
-        if(column.contains(":")) {
-            Iterable<String> tokens = Splitter.on(':').split(column);
-            String col = Iterables.getFirst(tokens, null);
-            Integer pos = Integer.parseInt(Iterables.getLast(tokens));
-            return new AbstractMap.SimpleEntry<>(col, pos);
-        }
-        else {
-            return new AbstractMap.SimpleEntry<>(column, i);
-        }
+    LookupKey key = converter.toKey(getType(tokens), tokens[indicatorColumn]);
+    Map<String, String> values = new HashMap<>();
+    for(Map.Entry<String, Integer> kv : columnMap.entrySet()) {
+      values.put(kv.getKey(), tokens[kv.getValue()]);
+    }
+    return Arrays.asList(new LookupKV(key, converter.toValue(values)));
+  }
 
+  private String getType(String[] tokens) {
+    if(type == null) {
+      return tokens[typeColumn];
     }
-    private static Map<String, Integer> getColumnMap(Map<String, Object> config) {
-        Map<String, Integer> columnMap = new HashMap<>();
-        if(config.containsKey(COLUMNS_KEY)) {
-            Object columnsObj = config.get(COLUMNS_KEY);
-            if(columnsObj instanceof String) {
-                String columns = (String)columnsObj;
-                int i = 0;
-                for (String column : Splitter.on(',').split(columns)) {
-                    Map.Entry<String, Integer> e = getColumnMapEntry(column, i++);
-                    columnMap.put(e.getKey(), e.getValue());
-                }
-            }
-            else if(columnsObj instanceof List) {
-                List columns = (List)columnsObj;
-                int i = 0;
-                for(Object column : columns) {
-                    Map.Entry<String, Integer> e = getColumnMapEntry(column.toString(), i++);
-                    columnMap.put(e.getKey(), e.getValue());
-                }
-            }
-            else if(columnsObj instanceof Map) {
-                Map<Object, Object> map = (Map<Object, Object>)columnsObj;
-                for(Map.Entry<Object, Object> e : map.entrySet()) {
-                    columnMap.put(e.getKey().toString(), Integer.parseInt(e.getValue().toString()));
-                }
-            }
-        }
-        return columnMap;
+    else {
+      return type;
     }
+  }
 
-    @Override
-    public void initialize(Map<String, Object> config) {
-        if(config.containsKey(COLUMNS_KEY)) {
-            columnMap = getColumnMap(config);
-        }
-        else {
-            throw new IllegalStateException("CSVExtractor requires " + COLUMNS_KEY + " configuration");
-        }
-        if(config.containsKey(INDICATOR_COLUMN_KEY)) {
-            indicatorColumn = columnMap.get(config.get(INDICATOR_COLUMN_KEY).toString());
+  private static Map.Entry<String, Integer> getColumnMapEntry(String column, int i) {
+    if(column.contains(":")) {
+      Iterable<String> tokens = Splitter.on(':').split(column);
+      String col = Iterables.getFirst(tokens, null);
+      Integer pos = Integer.parseInt(Iterables.getLast(tokens));
+      return new AbstractMap.SimpleEntry<>(col, pos);
+    }
+    else {
+      return new AbstractMap.SimpleEntry<>(column, i);
+    }
+
+  }
+  private static Map<String, Integer> getColumnMap(Map<String, Object> config) {
+    Map<String, Integer> columnMap = new HashMap<>();
+    if(config.containsKey(COLUMNS_KEY)) {
+      Object columnsObj = config.get(COLUMNS_KEY);
+      if(columnsObj instanceof String) {
+        String columns = (String)columnsObj;
+        int i = 0;
+        for (String column : Splitter.on(',').split(columns)) {
+          Map.Entry<String, Integer> e = getColumnMapEntry(column, i++);
+          columnMap.put(e.getKey(), e.getValue());
         }
-        if(config.containsKey(SEPARATOR_KEY)) {
-            char separator = config.get(SEPARATOR_KEY).toString().charAt(0);
-            parser = new CSVParserBuilder().withSeparator(separator)
-                                           .build();
+      }
+      else if(columnsObj instanceof List) {
+        List columns = (List)columnsObj;
+        int i = 0;
+        for(Object column : columns) {
+          Map.Entry<String, Integer> e = getColumnMapEntry(column.toString(), i++);
+          columnMap.put(e.getKey(), e.getValue());
         }
-        if(config.containsKey(LOOKUP_CONVERTER)) {
-           converter = LookupConverters.getConverter((String) config.get(LOOKUP_CONVERTER));
+      }
+      else if(columnsObj instanceof Map) {
+        Map<Object, Object> map = (Map<Object, Object>)columnsObj;
+        for(Map.Entry<Object, Object> e : map.entrySet()) {
+          columnMap.put(e.getKey().toString(), Integer.parseInt(e.getValue().toString()));
         }
+      }
+    }
+    return columnMap;
+  }
+
+  @Override
+  public void initialize(Map<String, Object> config) {
+    if(config.containsKey(COLUMNS_KEY)) {
+      columnMap = getColumnMap(config);
+    }
+    else {
+      throw new IllegalStateException("CSVExtractor requires " + COLUMNS_KEY + " configuration");
+    }
+    if(config.containsKey(INDICATOR_COLUMN_KEY)) {
+      indicatorColumn = columnMap.get(config.get(INDICATOR_COLUMN_KEY).toString());
+    }
+    if(config.containsKey(TYPE_KEY)) {
+      type = config.get(TYPE_KEY).toString();
+    }
+    else if(config.containsKey(TYPE_COLUMN_KEY)) {
+      typeColumn = columnMap.get(config.get(TYPE_COLUMN_KEY).toString());
+    }
+    if(config.containsKey(SEPARATOR_KEY)) {
+      char separator = config.get(SEPARATOR_KEY).toString().charAt(0);
+      parser = new CSVParserBuilder().withSeparator(separator)
+              .build();
+    }
+    if(config.containsKey(LOOKUP_CONVERTER)) {
+      converter = LookupConverters.getConverter((String) config.get(LOOKUP_CONVERTER));
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
index 9e9b79f..1fa9378 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
@@ -25,6 +25,6 @@ import org.apache.metron.reference.lookup.LookupValue;
 import java.util.Map;
 
 public interface LookupConverter {
-    LookupKey toKey(String indicator);
+    LookupKey toKey(String type, String indicator);
     LookupValue toValue(Map<String, String> metadata);
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
index 7f9218a..aa23851 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
@@ -18,8 +18,8 @@
 
 package org.apache.metron.dataloads.extractor.csv;
 
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentKey;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentValue;
 import org.apache.metron.reference.lookup.LookupKey;
 import org.apache.metron.reference.lookup.LookupValue;
 
@@ -27,15 +27,16 @@ import java.util.Map;
 
 public enum LookupConverters {
 
-    THREAT_INTEL(new LookupConverter() {
+    ENRICHMENT(new LookupConverter() {
         @Override
-        public LookupKey toKey(String indicator) {
-            return new ThreatIntelKey(indicator);
+        public LookupKey toKey(String type, String indicator) {
+            return new EnrichmentKey(type, indicator);
+
         }
 
         @Override
         public LookupValue toValue(Map<String, String> metadata) {
-            return new ThreatIntelValue(metadata);
+            return new EnrichmentValue(metadata);
         }
     })
     ;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
index b3829b4..9d7e4f5 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
@@ -18,13 +18,11 @@
 package org.apache.metron.dataloads.extractor.stix;
 
 import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
 import org.apache.commons.io.FileUtils;
 import org.apache.metron.dataloads.extractor.Extractor;
 import org.apache.metron.dataloads.extractor.stix.types.ObjectTypeHandler;
 import org.apache.metron.dataloads.extractor.stix.types.ObjectTypeHandlers;
 import org.apache.metron.reference.lookup.LookupKV;
-import org.apache.metron.threatintel.ThreatIntelResults;
 import org.mitre.cybox.common_2.*;
 import org.mitre.cybox.cybox_2.ObjectType;
 import org.mitre.cybox.cybox_2.Observable;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AbstractObjectTypeHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AbstractObjectTypeHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AbstractObjectTypeHandler.java
index c84dba9..b637c6e 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AbstractObjectTypeHandler.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AbstractObjectTypeHandler.java
@@ -29,6 +29,8 @@ public abstract class AbstractObjectTypeHandler<T extends ObjectPropertiesType>
     public Class<T> getTypeClass() {
         return objectPropertiesType;
     }
-
+    public String getType() {
+        return getTypeClass().getSimpleName().toLowerCase();
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
index 638a9ce..81ea957 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
@@ -19,10 +19,9 @@ package org.apache.metron.dataloads.extractor.stix.types;
 
 import com.google.common.base.Splitter;
 import org.apache.metron.dataloads.extractor.stix.StixExtractor;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentKey;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentValue;
 import org.apache.metron.reference.lookup.LookupKV;
-import org.apache.metron.threatintel.ThreatIntelResults;
 import org.mitre.cybox.common_2.StringObjectPropertyType;
 import org.mitre.cybox.objects.Address;
 import org.mitre.cybox.objects.CategoryTypeEnum;
@@ -31,47 +30,65 @@ import java.io.IOException;
 import java.util.*;
 
 public class AddressHandler extends AbstractObjectTypeHandler<Address> {
-    public static final String SPECIFIC_CATEGORY_CONFIG = "stix_address_categories";
-    public static final EnumSet<CategoryTypeEnum> SUPPORTED_CATEGORIES = EnumSet.of(CategoryTypeEnum.E_MAIL
-                                                                                   ,CategoryTypeEnum.IPV_4_ADDR
-                                                                                   ,CategoryTypeEnum.IPV_6_ADDR
-                                                                                   ,CategoryTypeEnum.MAC
-                                                               ) ;
-    public AddressHandler() {
-        super(Address.class);
-    }
-
-    @Override
-    public Iterable<LookupKV> extract(final Address type, Map<String, Object> config) throws IOException {
-        List<LookupKV> ret = new ArrayList<>();
-        final CategoryTypeEnum category= type.getCategory();
-        if(!SUPPORTED_CATEGORIES.contains(category)) {
-           return ret;
-        }
-        if(config != null && config.containsKey(SPECIFIC_CATEGORY_CONFIG)) {
-            List<CategoryTypeEnum> categories = new ArrayList<>();
-            for(String c : Splitter.on(",").split(config.get(SPECIFIC_CATEGORY_CONFIG).toString())) {
-                categories.add(CategoryTypeEnum.valueOf(c));
-            }
-            EnumSet<CategoryTypeEnum> specificCategories = EnumSet.copyOf(categories);
-            if(!specificCategories.contains(category)) {
-                return ret;
-            }
+  public static final String SPECIFIC_CATEGORY_CONFIG = "stix_address_categories";
+  public static final String TYPE_CONFIG = "stix_address_type";
+  public static final EnumSet<CategoryTypeEnum> SUPPORTED_CATEGORIES = EnumSet.of(CategoryTypeEnum.E_MAIL
+          ,CategoryTypeEnum.IPV_4_ADDR
+          ,CategoryTypeEnum.IPV_6_ADDR
+          ,CategoryTypeEnum.MAC
+  ) ;
+  public AddressHandler() {
+    super(Address.class);
+  }
 
+  @Override
+  public Iterable<LookupKV> extract(final Address type, Map<String, Object> config) throws IOException {
+    List<LookupKV> ret = new ArrayList<>();
+    final CategoryTypeEnum category= type.getCategory();
+    if(!SUPPORTED_CATEGORIES.contains(category)) {
+      return ret;
+    }
+    String typeStr = getType();
+    if(config != null) {
+      if(config.containsKey(SPECIFIC_CATEGORY_CONFIG)) {
+        List<CategoryTypeEnum> categories = new ArrayList<>();
+        for (String c : Splitter.on(",").split(config.get(SPECIFIC_CATEGORY_CONFIG).toString())) {
+          categories.add(CategoryTypeEnum.valueOf(c));
         }
-        StringObjectPropertyType value = type.getAddressValue();
-        for(String token : StixExtractor.split(value)) {
-            LookupKV results = new LookupKV(new ThreatIntelKey(token)
-                                           , new ThreatIntelValue(
-                                                                    new HashMap<String, String>() {{
-                                                                        put("source-type", "STIX");
-                                                                        put("indicator-type", type.getClass().getSimpleName() + ":" + category);
-                                                                        put("source", type.toXMLString());
-                                                                    }}
-                                                                 )
-                                           );
-                ret.add(results);
+        EnumSet<CategoryTypeEnum> specificCategories = EnumSet.copyOf(categories);
+        if (!specificCategories.contains(category)) {
+          return ret;
         }
-        return ret;
+      }
+      if(config.containsKey(TYPE_CONFIG)) {
+        typeStr = config.get(TYPE_CONFIG).toString();
+      }
+    }
+    StringObjectPropertyType value = type.getAddressValue();
+    for(String token : StixExtractor.split(value)) {
+      final String indicatorType = typeStr + ":" + category;
+      LookupKV results = new LookupKV(new EnrichmentKey(indicatorType, token)
+              , new EnrichmentValue(
+              new HashMap<String, String>() {{
+                put("source-type", "STIX");
+                put("indicator-type", indicatorType);
+                put("source", type.toXMLString());
+              }}
+      )
+      );
+      ret.add(results);
+    }
+    return ret;
+  }
+
+  @Override
+  public List<String> getPossibleTypes() {
+    String typeStr = getType();
+    List<String> ret = new ArrayList<>();
+    for(CategoryTypeEnum e : SUPPORTED_CATEGORIES)
+    {
+       ret.add(typeStr + ":" + e);
     }
+    return ret;
+  }
 }