You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2016/04/26 16:46:21 UTC

[33/51] [partial] incubator-metron git commit: METRON-113 Project Reorganization (merrimanr) closes apache/incubator-metron#88

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
new file mode 100644
index 0000000..1cc591b
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.io.Files;
+import org.apache.commons.cli.*;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.hbase.mr.BulkLoadMapper;
+import org.apache.metron.common.configuration.EnrichmentConfig;
+import org.apache.metron.enrichment.converter.HbaseConverter;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+import org.apache.metron.common.utils.JSONUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.text.*;
+import java.util.Date;
+
+public class ThreatIntelBulkLoader  {
+  private static abstract class OptionHandler implements Function<String, Option> {}
+  private enum BulkLoadOptions {
+    HELP("h", new OptionHandler() {
+
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        return new Option(s, "help", false, "Generate Help screen");
+      }
+    })
+    ,TABLE("t", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "table", true, "HBase table to import data into");
+        o.setRequired(true);
+        o.setArgName("HBASE_TABLE");
+        return o;
+      }
+    })
+    ,COLUMN_FAMILY("f", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "column_family", true, "Column family of the HBase table to import into");
+        o.setRequired(true);
+        o.setArgName("CF_NAME");
+        return o;
+      }
+    })
+    ,EXTRACTOR_CONFIG("e", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
+        o.setArgName("JSON_FILE");
+        o.setRequired(true);
+        return o;
+      }
+    })
+    ,INPUT_DATA("i", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "input", true, "Input directory in HDFS for the data to import into HBase");
+        o.setArgName("DIR");
+        o.setRequired(true);
+        return o;
+      }
+    })
+    ,AS_OF_TIME("a", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "as_of", true, "The last read timestamp to mark the records with (omit for time of execution)");
+        o.setArgName("datetime");
+        o.setRequired(false);
+        return o;
+      }
+    })
+    ,AS_OF_TIME_FORMAT("z", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "as_of_format", true, "The format of the as_of time (only used in conjunction with the as_of option)");
+        o.setArgName("format");
+        o.setRequired(false);
+        return o;
+      }
+    })
+    ,CONVERTER("c", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "converter", true, "The HBase converter class to use (Default is threat intel)");
+        o.setArgName("class");
+        o.setRequired(false);
+        return o;
+      }
+    })
+    ,ENRICHMENT_CONFIG("n", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "enrichment_config", true
+                , "JSON Document describing the enrichment configuration details." +
+                "  This is used to associate an enrichment type with a field type in zookeeper."
+        );
+        o.setArgName("JSON_FILE");
+        o.setRequired(false);
+        return o;
+      }
+    })
+    ;
+    Option option;
+    String shortCode;
+    BulkLoadOptions(String shortCode, OptionHandler optionHandler) {
+      this.shortCode = shortCode;
+      this.option = optionHandler.apply(shortCode);
+    }
+
+    public boolean has(CommandLine cli) {
+      return cli.hasOption(shortCode);
+    }
+
+    public String get(CommandLine cli) {
+      return cli.getOptionValue(shortCode);
+    }
+
+    public static CommandLine parse(CommandLineParser parser, String[] args) {
+      try {
+        CommandLine cli = parser.parse(getOptions(), args);
+        if(ThreatIntelBulkLoader.BulkLoadOptions.HELP.has(cli)) {
+          printHelp();
+          System.exit(0);
+        }
+        return cli;
+      } catch (ParseException e) {
+        System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+        e.printStackTrace(System.err);
+        printHelp();
+        System.exit(-1);
+        return null;
+      }
+    }
+
+    public static void printHelp() {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp( "ThreatIntelBulkLoader", getOptions());
+    }
+
+    public static Options getOptions() {
+      Options ret = new Options();
+      for(BulkLoadOptions o : BulkLoadOptions.values()) {
+        ret.addOption(o.option);
+      }
+      return ret;
+    }
+  }
+  private static long getTimestamp(CommandLine cli) throws java.text.ParseException {
+    if(BulkLoadOptions.AS_OF_TIME.has(cli)) {
+      if(!BulkLoadOptions.AS_OF_TIME_FORMAT.has(cli)) {
+        throw new IllegalStateException("Unable to proceed: Specified as_of_time without an associated format.");
+      }
+      else {
+        DateFormat format = new SimpleDateFormat(BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli));
+        Date d = format.parse(BulkLoadOptions.AS_OF_TIME.get(cli));
+        return d.getTime();
+      }
+    }
+    else {
+      return System.currentTimeMillis();
+    }
+  }
+  private static String readExtractorConfig(File configFile) throws IOException {
+    return Joiner.on("\n").join(Files.readLines(configFile, Charset.defaultCharset()));
+  }
+
+  public static Job createJob(Configuration conf, String input, String table, String cf, String extractorConfigContents, long ts, HbaseConverter converter) throws IOException {
+    Job job = new Job(conf);
+    job.setJobName("ThreatIntelBulkLoader: " + input + " => " +  table + ":" + cf);
+    System.out.println("Configuring " + job.getJobName());
+    job.setJarByClass(ThreatIntelBulkLoader.class);
+    job.setMapperClass(org.apache.metron.dataloads.hbase.mr.BulkLoadMapper.class);
+    job.setOutputFormatClass(TableOutputFormat.class);
+    job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
+    job.getConfiguration().set(BulkLoadMapper.COLUMN_FAMILY_KEY, cf);
+    job.getConfiguration().set(BulkLoadMapper.CONFIG_KEY, extractorConfigContents);
+    job.getConfiguration().set(BulkLoadMapper.LAST_SEEN_KEY, "" + ts);
+    job.getConfiguration().set(BulkLoadMapper.CONVERTER_KEY, converter.getClass().getName());
+    job.setOutputKeyClass(ImmutableBytesWritable.class);
+    job.setOutputValueClass(Put.class);
+    job.setNumReduceTasks(0);
+    ExtractorHandler handler = ExtractorHandler.load(extractorConfigContents);
+    handler.getInputFormatHandler().set(job, new Path(input), handler.getConfig());
+    return job;
+  }
+
+  public static void main(String... argv) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+
+    CommandLine cli = BulkLoadOptions.parse(new PosixParser(), otherArgs);
+    Long ts = getTimestamp(cli);
+    String input = BulkLoadOptions.INPUT_DATA.get(cli);
+    String table = BulkLoadOptions.TABLE.get(cli);
+    String cf = BulkLoadOptions.COLUMN_FAMILY.get(cli);
+    String extractorConfigContents = readExtractorConfig(new File(BulkLoadOptions.EXTRACTOR_CONFIG.get(cli)));
+    String converterClass = EnrichmentConverter.class.getName();
+    if(BulkLoadOptions.CONVERTER.has(cli)) {
+      converterClass = BulkLoadOptions.CONVERTER.get(cli);
+    }
+    EnrichmentConfig enrichmentConfig = null;
+    if(BulkLoadOptions.ENRICHMENT_CONFIG.has(cli)) {
+      enrichmentConfig = JSONUtils.INSTANCE.load( new File(BulkLoadOptions.ENRICHMENT_CONFIG.get(cli))
+              , EnrichmentConfig.class
+      );
+    }
+
+    HbaseConverter converter = (HbaseConverter) Class.forName(converterClass).newInstance();
+    Job job = createJob(conf, input, table, cf, extractorConfigContents, ts, converter);
+    System.out.println(conf);
+    boolean jobRet = job.waitForCompletion(true);
+    if(!jobRet) {
+      System.exit(1);
+    }
+    if(enrichmentConfig != null) {
+        enrichmentConfig.updateSensorConfigs();
+    }
+    System.exit(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/cif/HBaseTableLoad.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/cif/HBaseTableLoad.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/cif/HBaseTableLoad.java
new file mode 100644
index 0000000..0cff227
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/cif/HBaseTableLoad.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.cif;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.ZipInputStream;
+
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+
+import java.io.BufferedInputStream;
+
+public class HBaseTableLoad {
+
+	private static final Logger LOG = Logger.getLogger(HBaseTableLoad.class);
+	private static Configuration conf = null;
+	private String hbaseTable = "cif_table";
+	private String dirName = "./";
+	private boolean usefileList = false;
+	private Set<String> files;
+
+	/**
+	 * Initialization
+	 */
+	static {
+		conf = HBaseConfiguration.create();
+	}
+
+	public static void main(String[] args) {
+
+		HBaseTableLoad ht = new HBaseTableLoad();
+
+		ht.parse(args);
+		//ht.LoadDirHBase();
+
+	}
+
+	private void LoadDirHBase() {
+		LOG.info("Working on:" + dirName);
+		File folder = new File(dirName);
+		File[] listOfFiles = folder.listFiles();
+		InputStream input;
+
+		for (int i = 0; i < listOfFiles.length; i++) {
+			File file = listOfFiles[i];
+
+			if (file.isFile()) {
+
+				// Check if filename is present in FileList
+				if (usefileList)
+					if (!files.contains(file.getAbsolutePath()))
+						continue;
+
+				// e.g. folder name is infrastructure_botnet. Col Qualifier is
+				// botnet and col_family is infrastructure
+
+				String col_family = folder.getName().split("_")[0];
+				String col_qualifier = folder.getName().split("_")[1];
+
+				// Open file
+				try {
+					if (file.getName().endsWith(".gz"))
+						input = new BufferedInputStream(new GZIPInputStream(
+								new FileInputStream(file)));
+					else if (file.getName().endsWith(".zip"))
+						input = new BufferedInputStream(new ZipInputStream(
+								new FileInputStream(file)));
+					else if (file.getName().endsWith(".json"))
+						input = new BufferedInputStream((new FileInputStream(
+								file)));
+					else
+						continue;
+
+					LOG.info("Begin Loading File:" + file.getAbsolutePath());
+
+					HBaseBulkPut(input, col_family, col_qualifier);
+					LOG.info("Completed Loading File:" + file.getAbsolutePath());
+
+				} catch (IOException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				} catch (ParseException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			} else if (file.isDirectory()) // if sub-directory then call the
+											// function recursively
+				this.LoadDirHBase(file.getAbsolutePath());
+		}
+	}
+
+	private void LoadDirHBase(String dirname) {
+
+		this.dirName = dirname;
+		this.LoadDirHBase();
+
+	}
+
+	/**
+	 * @param input
+	 * @param hbaseTable
+	 * @param col_family
+	 * @throws IOException
+	 * @throws ParseException
+	 * 
+	 * 
+	 *     Inserts all json records picked up from the inputStream
+	 */
+	private void HBaseBulkPut(InputStream input, String col_family,
+			String col_qualifier) throws IOException, ParseException {
+
+		HTable table = new HTable(conf, hbaseTable);
+		JSONParser parser = new JSONParser();
+
+		BufferedReader br = new BufferedReader(new InputStreamReader(input));
+		String jsonString;
+		List<Put> allputs = new ArrayList<Put>();
+		Map json;
+
+		while ((jsonString = br.readLine()) != null) {
+
+			try {
+
+				json = (Map) parser.parse(jsonString);
+			} catch (ParseException e) {
+				// System.out.println("Unable to Parse: " +jsonString);
+				continue;
+			}
+			// Iterator iter = json.entrySet().iterator();
+
+			// Get Address - either IP/domain or email and make that the Key
+			Put put = new Put(Bytes.toBytes((String) json.get("address")));
+
+			// We are just adding a "Y" flag to mark this address
+			put.add(Bytes.toBytes(col_family), Bytes.toBytes(col_qualifier),
+					Bytes.toBytes("Y"));
+
+			allputs.add(put);
+		}
+		table.put(allputs);
+		table.close();
+	}
+
+	private void printUsage() {
+		System.out
+				.println("Usage: java -cp JarFile org.apache.metron.dataloads.cif.HBaseTableLoad -d <directory> -t <tablename> -f <optional file-list>");
+	}
+
+	private void parse(String[] args) {
+		CommandLineParser parser = new BasicParser();
+		Options options = new Options();
+
+		options.addOption("d", true, "description");
+		options.addOption("t", true, "description");
+		options.addOption("f", false, "description");
+
+		CommandLine cmd = null;
+		try {
+			cmd = parser.parse(options, args);
+
+			if (cmd.hasOption("d"))
+			{
+				this.dirName = cmd.getOptionValue("d");
+				LOG.info("Directory Name:" + cmd.getOptionValue("d"));
+			}
+			else {
+				LOG.info("Missing Directory Name");
+				printUsage();
+				System.exit(-1);
+			}
+
+			if (cmd.hasOption("t"))
+			{
+				this.hbaseTable = cmd.getOptionValue("t");
+				LOG.info("HBase Table Name:" + cmd.getOptionValue("t"));
+			}
+			else {
+				LOG.info("Missing Table Name");
+				printUsage();
+				System.exit(-1);
+			}
+
+			if (cmd.hasOption("f")) {
+				this.usefileList = true;
+				files = LoadFileList(cmd.getOptionValue("f"));
+				LOG.info("FileList:" + cmd.getOptionValue("f"));
+			}
+
+		} catch (org.apache.commons.cli.ParseException e) {
+			LOG.error("Failed to parse comand line properties", e);
+			e.printStackTrace();
+			System.exit(-1);
+		}
+	}
+
+	private Set<String> LoadFileList(String filename) {
+
+		Set<String> output = null;
+		BufferedReader reader;
+
+		try {
+			reader = new BufferedReader(new InputStreamReader(
+					new FileInputStream(filename)));
+			output = new HashSet<String>();
+			String in = "";
+
+			while ((in = reader.readLine()) != null)
+				output.add(in);
+
+			reader.close();
+
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+
+		return output;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
new file mode 100644
index 0000000..bd490c8
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor;
+
+import org.apache.metron.enrichment.lookup.LookupKV;
+
+import java.io.IOException;
+import java.util.Map;
+
+public interface Extractor {
+    Iterable<LookupKV> extract(String line) throws IOException;
+    void initialize(Map<String, Object> config);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCreator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCreator.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCreator.java
new file mode 100644
index 0000000..6e081aa
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCreator.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor;
+
+import java.util.Map;
+
+public interface ExtractorCreator {
+    Extractor create();
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
new file mode 100644
index 0000000..5d17cbe
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor;
+
+import org.apache.metron.dataloads.extractor.inputformat.Formats;
+import org.apache.metron.dataloads.extractor.inputformat.InputFormatHandler;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+public class ExtractorHandler {
+    final static ObjectMapper _mapper = new ObjectMapper();
+    private Map<String, Object> config;
+    private Extractor extractor;
+    private InputFormatHandler inputFormatHandler = Formats.BY_LINE;
+
+    public Map<String, Object> getConfig() {
+        return config;
+    }
+
+    public void setConfig(Map<String, Object> config) {
+        this.config = config;
+    }
+
+    public InputFormatHandler getInputFormatHandler() {
+        return inputFormatHandler;
+    }
+
+    public void setInputFormatHandler(String handler) {
+        try {
+            this.inputFormatHandler= Formats.create(handler);
+        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+            throw new IllegalStateException("Unable to create an inputformathandler", e);
+        }
+    }
+
+    public Extractor getExtractor() {
+        return extractor;
+    }
+    public void setExtractor(String extractor) {
+        try {
+            this.extractor = Extractors.create(extractor);
+        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
+            throw new IllegalStateException("Unable to create an extractor", e);
+        }
+    }
+
+    public static synchronized ExtractorHandler load(InputStream is) throws IOException {
+        ExtractorHandler ret = _mapper.readValue(is, ExtractorHandler.class);
+        ret.getExtractor().initialize(ret.getConfig());
+        return ret;
+    }
+    public static synchronized ExtractorHandler load(String s, Charset c) throws IOException {
+        return load( new ByteArrayInputStream(s.getBytes(c)));
+    }
+    public static synchronized ExtractorHandler load(String s) throws IOException {
+        return load( s, Charset.defaultCharset());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractors.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractors.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractors.java
new file mode 100644
index 0000000..bbd4c22
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractors.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor;
+
+import org.apache.metron.dataloads.extractor.csv.CSVExtractor;
+import org.apache.metron.dataloads.extractor.stix.StixExtractor;
+
+import java.util.Map;
+
+public enum Extractors implements ExtractorCreator {
+    CSV(new ExtractorCreator() {
+
+        @Override
+        public Extractor create() {
+            return new CSVExtractor();
+        }
+    })
+    ,STIX(new ExtractorCreator() {
+        @Override
+        public Extractor create() {
+            return new StixExtractor();
+        }
+    })
+    ;
+    ExtractorCreator _creator;
+    Extractors(ExtractorCreator creator) {
+        this._creator = creator;
+    }
+    @Override
+    public Extractor create() {
+        return _creator.create();
+    }
+    public static Extractor create(String extractorName) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+        try {
+            ExtractorCreator ec = Extractors.valueOf(extractorName);
+            return ec.create();
+        }
+        catch(IllegalArgumentException iae) {
+            Extractor ex = (Extractor) Class.forName(extractorName).newInstance();
+            return ex;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
new file mode 100644
index 0000000..1fce0fc
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor.csv;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.opencsv.CSVParser;
+import com.opencsv.CSVParserBuilder;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.enrichment.lookup.LookupKey;
+
+import java.io.IOException;
+import java.util.*;
+
+public class CSVExtractor implements Extractor {
+  public static final String COLUMNS_KEY="columns";
+  public static final String INDICATOR_COLUMN_KEY="indicator_column";
+  public static final String TYPE_COLUMN_KEY="type_column";
+  public static final String TYPE_KEY="type";
+  public static final String SEPARATOR_KEY="separator";
+  public static final String LOOKUP_CONVERTER = "lookup_converter";
+
+  private int typeColumn;
+  private String type;
+  private int indicatorColumn;
+  private Map<String, Integer> columnMap = new HashMap<>();
+  private CSVParser parser;
+  private LookupConverter converter = LookupConverters.ENRICHMENT.getConverter();
+
+  @Override
+  public Iterable<LookupKV> extract(String line) throws IOException {
+    if(line.trim().startsWith("#")) {
+      //comment
+      return Collections.emptyList();
+    }
+    String[] tokens = parser.parseLine(line);
+
+    LookupKey key = converter.toKey(getType(tokens), tokens[indicatorColumn]);
+    Map<String, String> values = new HashMap<>();
+    for(Map.Entry<String, Integer> kv : columnMap.entrySet()) {
+      values.put(kv.getKey(), tokens[kv.getValue()]);
+    }
+    return Arrays.asList(new LookupKV(key, converter.toValue(values)));
+  }
+
+  private String getType(String[] tokens) {
+    if(type == null) {
+      return tokens[typeColumn];
+    }
+    else {
+      return type;
+    }
+  }
+
+  private static Map.Entry<String, Integer> getColumnMapEntry(String column, int i) {
+    if(column.contains(":")) {
+      Iterable<String> tokens = Splitter.on(':').split(column);
+      String col = Iterables.getFirst(tokens, null);
+      Integer pos = Integer.parseInt(Iterables.getLast(tokens));
+      return new AbstractMap.SimpleEntry<>(col, pos);
+    }
+    else {
+      return new AbstractMap.SimpleEntry<>(column, i);
+    }
+
+  }
+  private static Map<String, Integer> getColumnMap(Map<String, Object> config) {
+    Map<String, Integer> columnMap = new HashMap<>();
+    if(config.containsKey(COLUMNS_KEY)) {
+      Object columnsObj = config.get(COLUMNS_KEY);
+      if(columnsObj instanceof String) {
+        String columns = (String)columnsObj;
+        int i = 0;
+        for (String column : Splitter.on(',').split(columns)) {
+          Map.Entry<String, Integer> e = getColumnMapEntry(column, i++);
+          columnMap.put(e.getKey(), e.getValue());
+        }
+      }
+      else if(columnsObj instanceof List) {
+        List columns = (List)columnsObj;
+        int i = 0;
+        for(Object column : columns) {
+          Map.Entry<String, Integer> e = getColumnMapEntry(column.toString(), i++);
+          columnMap.put(e.getKey(), e.getValue());
+        }
+      }
+      else if(columnsObj instanceof Map) {
+        Map<Object, Object> map = (Map<Object, Object>)columnsObj;
+        for(Map.Entry<Object, Object> e : map.entrySet()) {
+          columnMap.put(e.getKey().toString(), Integer.parseInt(e.getValue().toString()));
+        }
+      }
+    }
+    return columnMap;
+  }
+
+  @Override
+  public void initialize(Map<String, Object> config) {
+    if(config.containsKey(COLUMNS_KEY)) {
+      columnMap = getColumnMap(config);
+    }
+    else {
+      throw new IllegalStateException("CSVExtractor requires " + COLUMNS_KEY + " configuration");
+    }
+    if(config.containsKey(INDICATOR_COLUMN_KEY)) {
+      indicatorColumn = columnMap.get(config.get(INDICATOR_COLUMN_KEY).toString());
+    }
+    if(config.containsKey(TYPE_KEY)) {
+      type = config.get(TYPE_KEY).toString();
+    }
+    else if(config.containsKey(TYPE_COLUMN_KEY)) {
+      typeColumn = columnMap.get(config.get(TYPE_COLUMN_KEY).toString());
+    }
+    if(config.containsKey(SEPARATOR_KEY)) {
+      char separator = config.get(SEPARATOR_KEY).toString().charAt(0);
+      parser = new CSVParserBuilder().withSeparator(separator)
+              .build();
+    }
+    if(config.containsKey(LOOKUP_CONVERTER)) {
+      converter = LookupConverters.getConverter((String) config.get(LOOKUP_CONVERTER));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
new file mode 100644
index 0000000..e0ca4ee
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.extractor.csv;
+
+import org.apache.metron.enrichment.lookup.LookupKey;
+import org.apache.metron.enrichment.lookup.LookupValue;
+
+import java.util.Map;
+
+public interface LookupConverter {
+    LookupKey toKey(String type, String indicator);
+    LookupValue toValue(Map<String, String> metadata);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
new file mode 100644
index 0000000..bd58ba7
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.extractor.csv;
+
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.LookupKey;
+import org.apache.metron.enrichment.lookup.LookupValue;
+
+import java.util.Map;
+
+public enum LookupConverters {
+
+    ENRICHMENT(new LookupConverter() {
+        @Override
+        public LookupKey toKey(String type, String indicator) {
+            return new EnrichmentKey(type, indicator);
+
+        }
+
+        @Override
+        public LookupValue toValue(Map<String, String> metadata) {
+            return new EnrichmentValue(metadata);
+        }
+    })
+    ;
+    LookupConverter converter;
+    LookupConverters(LookupConverter converter) {
+        this.converter = converter;
+    }
+    public LookupConverter getConverter() {
+        return converter;
+    }
+
+    public static LookupConverter getConverter(String name) {
+        try {
+            return LookupConverters.valueOf(name).getConverter();
+        }
+        catch(Throwable t) {
+            try {
+                return (LookupConverter) Class.forName(name).newInstance();
+            } catch (InstantiationException e) {
+                throw new IllegalStateException("Unable to parse " + name, e);
+            } catch (IllegalAccessException e) {
+                throw new IllegalStateException("Unable to parse " + name, e);
+            } catch (ClassNotFoundException e) {
+                throw new IllegalStateException("Unable to parse " + name, e);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java
new file mode 100644
index 0000000..7e58455
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor.inputformat;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+import java.io.IOException;
+import java.util.Map;
+
+public enum Formats implements InputFormatHandler{
+    BY_LINE(new InputFormatHandler() {
+        @Override
+        public void set(Job job, Path input, Map<String, Object> config) throws IOException {
+
+            FileInputFormat.addInputPath(job, input);
+        }
+    })
+    ;
+    InputFormatHandler _handler = null;
+    Formats(InputFormatHandler handler) {
+        this._handler = handler;
+    }
+    @Override
+    public void set(Job job, Path path, Map<String, Object> config) throws IOException {
+        _handler.set(job, path, config);
+    }
+
+    public static InputFormatHandler create(String handlerName) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+        try {
+            InputFormatHandler ec = Formats.valueOf(handlerName);
+            return ec;
+        }
+        catch(IllegalArgumentException iae) {
+            InputFormatHandler ex = (InputFormatHandler) Class.forName(handlerName).newInstance();
+            return ex;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java
new file mode 100644
index 0000000..2287969
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor.inputformat;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.IOException;
+import java.util.Map;
+
+public interface InputFormatHandler {
+    void set(Job job, Path input, Map<String, Object> config) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java
new file mode 100644
index 0000000..e0a58ef
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor.inputformat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class WholeFileFormat implements InputFormatHandler {
+
+    public static class WholeFileRecordReader extends RecordReader<NullWritable, Text> {
+        private FileSplit fileSplit;
+        private Configuration conf;
+        private Text value = new Text();
+        private boolean processed = false;
+
+        @Override
+        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+            this.fileSplit = (FileSplit) split;
+            this.conf = context.getConfiguration();
+        }
+
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException {
+            if (!processed) {
+                byte[] contents = new byte[(int) fileSplit.getLength()];
+                Path file = fileSplit.getPath();
+                FileSystem fs = file.getFileSystem(conf);
+                FSDataInputStream in = null;
+                try {
+                    in = fs.open(file);
+                    IOUtils.readFully(in, contents, 0, contents.length);
+                    value.set(contents, 0, contents.length);
+                } finally {
+                    IOUtils.closeStream(in);
+                }
+                processed = true;
+                return true;
+            }
+            return false;
+        }
+
+        @Override
+        public NullWritable getCurrentKey() throws IOException, InterruptedException {
+            return NullWritable.get();
+        }
+        @Override
+        public Text getCurrentValue() throws IOException, InterruptedException{
+            return value;
+        }
+
+        @Override
+        public float getProgress() throws IOException {
+            return processed ? 1.0f : 0.0f;
+        }
+
+        @Override
+        public void close() throws IOException{
+            //do nothing :)
+        }
+    }
+
+    public static class WholeFileInputFormat extends FileInputFormat<NullWritable, Text> {
+
+        @Override
+        protected boolean isSplitable(JobContext context, Path file) {
+            return false;
+        }
+
+        @Override
+        public RecordReader<NullWritable, Text> createRecordReader(
+                InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+            WholeFileRecordReader reader = new WholeFileRecordReader();
+            reader.initialize(split, context);
+            return reader;
+        }
+    }
+    @Override
+    public void set(Job job, Path input, Map<String, Object> config) throws IOException {
+        WholeFileInputFormat.setInputPaths(job, input);
+        job.setInputFormatClass(WholeFileInputFormat.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
new file mode 100644
index 0000000..4696639
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor.stix;
+
+import com.google.common.base.Splitter;
+import org.apache.commons.io.FileUtils;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.dataloads.extractor.stix.types.ObjectTypeHandler;
+import org.apache.metron.dataloads.extractor.stix.types.ObjectTypeHandlers;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.mitre.cybox.common_2.*;
+import org.mitre.cybox.cybox_2.ObjectType;
+import org.mitre.cybox.cybox_2.Observable;
+import org.mitre.cybox.cybox_2.Observables;
+import org.mitre.stix.common_1.IndicatorBaseType;
+import org.mitre.stix.indicator_2.Indicator;
+import org.mitre.stix.stix_1.STIXPackage;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class StixExtractor implements Extractor {
+    Map<String, Object> config;
+    @Override
+    public Iterable<LookupKV> extract(String line) throws IOException {
+        STIXPackage stixPackage = STIXPackage.fromXMLString(line.replaceAll("\"Equal\"", "\"Equals\""));
+        List<LookupKV> ret = new ArrayList<>();
+        for(Observable o : getObservables(stixPackage)) {
+            ObjectType obj = o.getObject();
+            if(obj != null) {
+                ObjectPropertiesType props = obj.getProperties();
+                if(props != null) {
+                    ObjectTypeHandler handler = ObjectTypeHandlers.getHandlerByInstance(props);
+                    if (handler != null) {
+                        Iterable<LookupKV> extractions = handler.extract(props, config);
+                        for(LookupKV extraction : extractions) {
+                            ret.add(extraction);
+                        }
+                    }
+                }
+            }
+        }
+        return ret;
+    }
+
+    public List<Observable> getObservables(STIXPackage stixPackage) {
+        List<Observable> ret = new ArrayList<>();
+        Observables observables = stixPackage.getObservables();
+        if(observables != null) {
+            for (Observable o : observables.getObservables()) {
+                ret.add(o);
+            }
+        }
+        if (stixPackage.getIndicators() != null) {
+            if (stixPackage.getIndicators().getIndicators() != null) {
+                List<IndicatorBaseType> indicators = stixPackage.getIndicators().getIndicators();
+                int indicatorCount = indicators.size();
+                for (int i = 0; i < indicatorCount; i++) {
+                    Indicator indicator = (Indicator) indicators.get(i);
+                    if (indicator.getObservable() != null) {
+                        ret.add(indicator.getObservable());
+                    }
+                }
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public void initialize(Map<String, Object> config) {
+        this.config = config;
+    }
+
+    public static Iterable<String> split(StringObjectPropertyType value) {
+        final ConditionTypeEnum condition = value.getCondition();
+        final ConditionApplicationEnum applyCondition = value.getApplyCondition();
+        List<String> tokens = new ArrayList<>();
+        if(condition == ConditionTypeEnum.EQUALS && applyCondition == ConditionApplicationEnum.ANY) {
+            String delim = value.getDelimiter();
+            String line = value.getValue().toString();
+            if (delim != null) {
+                for (String token : Splitter.on(delim).split(line)) {
+                    tokens.add(token);
+                }
+            } else {
+                tokens.add(line);
+            }
+        }
+        return tokens;
+    }
+    public static void main(String[] args) throws IOException {
+
+        File file = new File("/tmp/sample.xml");
+
+        /*if (args.length > 0) {
+            file = new File(args[0]);
+        } else {
+            try {
+                URL url = XML2Object.class.getClass().getResource(
+                        "/org/mitre/stix/examples/sample.xml");
+                file = new File(url.toURI());
+            } catch (URISyntaxException e) {
+                throw new RuntimeException(e);
+            }
+        }*/
+
+        String line = FileUtils.readFileToString(file);
+        StixExtractor extractor = new StixExtractor();
+        for(LookupKV results : extractor.extract(line)) {
+            System.out.println(results);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AbstractObjectTypeHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AbstractObjectTypeHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AbstractObjectTypeHandler.java
new file mode 100644
index 0000000..b637c6e
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AbstractObjectTypeHandler.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import org.mitre.cybox.common_2.ObjectPropertiesType;
+import org.mitre.cybox.common_2.StringObjectPropertyType;
+
+public abstract class AbstractObjectTypeHandler<T extends ObjectPropertiesType> implements ObjectTypeHandler<T> {
+    protected Class<T> objectPropertiesType;
+    public AbstractObjectTypeHandler(Class<T> clazz) {
+        objectPropertiesType = clazz;
+    }
+    @Override
+    public Class<T> getTypeClass() {
+        return objectPropertiesType;
+    }
+    public String getType() {
+        return getTypeClass().getSimpleName().toLowerCase();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
new file mode 100644
index 0000000..ffcff43
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import com.google.common.base.Splitter;
+import org.apache.metron.dataloads.extractor.stix.StixExtractor;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.mitre.cybox.common_2.StringObjectPropertyType;
+import org.mitre.cybox.objects.Address;
+import org.mitre.cybox.objects.CategoryTypeEnum;
+
+import java.io.IOException;
+import java.util.*;
+
+public class AddressHandler extends AbstractObjectTypeHandler<Address> {
+  public static final String SPECIFIC_CATEGORY_CONFIG = "stix_address_categories";
+  public static final String TYPE_CONFIG = "stix_address_type";
+  public static final EnumSet<CategoryTypeEnum> SUPPORTED_CATEGORIES = EnumSet.of(CategoryTypeEnum.E_MAIL
+          ,CategoryTypeEnum.IPV_4_ADDR
+          ,CategoryTypeEnum.IPV_6_ADDR
+          ,CategoryTypeEnum.MAC
+  ) ;
+  public AddressHandler() {
+    super(Address.class);
+  }
+
+  @Override
+  public Iterable<LookupKV> extract(final Address type, Map<String, Object> config) throws IOException {
+    List<LookupKV> ret = new ArrayList<>();
+    final CategoryTypeEnum category= type.getCategory();
+    if(!SUPPORTED_CATEGORIES.contains(category)) {
+      return ret;
+    }
+    String typeStr = getType();
+    if(config != null) {
+      if(config.containsKey(SPECIFIC_CATEGORY_CONFIG)) {
+        List<CategoryTypeEnum> categories = new ArrayList<>();
+        for (String c : Splitter.on(",").split(config.get(SPECIFIC_CATEGORY_CONFIG).toString())) {
+          categories.add(CategoryTypeEnum.valueOf(c));
+        }
+        EnumSet<CategoryTypeEnum> specificCategories = EnumSet.copyOf(categories);
+        if (!specificCategories.contains(category)) {
+          return ret;
+        }
+      }
+      if(config.containsKey(TYPE_CONFIG)) {
+        typeStr = config.get(TYPE_CONFIG).toString();
+      }
+    }
+    StringObjectPropertyType value = type.getAddressValue();
+    for(String token : StixExtractor.split(value)) {
+      final String indicatorType = typeStr + ":" + category;
+      LookupKV results = new LookupKV(new EnrichmentKey(indicatorType, token)
+              , new EnrichmentValue(
+              new HashMap<String, String>() {{
+                put("source-type", "STIX");
+                put("indicator-type", indicatorType);
+                put("source", type.toXMLString());
+              }}
+      )
+      );
+      ret.add(results);
+    }
+    return ret;
+  }
+
+  @Override
+  public List<String> getPossibleTypes() {
+    String typeStr = getType();
+    List<String> ret = new ArrayList<>();
+    for(CategoryTypeEnum e : SUPPORTED_CATEGORIES)
+    {
+       ret.add(typeStr + ":" + e);
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
new file mode 100644
index 0000000..755cddd
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import org.apache.metron.dataloads.extractor.stix.StixExtractor;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.mitre.cybox.common_2.StringObjectPropertyType;
+import org.mitre.cybox.objects.DomainName;
+import org.mitre.cybox.objects.DomainNameTypeEnum;
+
+import java.io.IOException;
+import java.util.*;
+
+public class DomainHandler extends AbstractObjectTypeHandler<DomainName> {
+  public static final String TYPE_CONFIG = "stix_domain_type";
+  EnumSet<DomainNameTypeEnum> SUPPORTED_TYPES = EnumSet.of(DomainNameTypeEnum.FQDN);
+  public DomainHandler() {
+    super(DomainName.class);
+  }
+
+  @Override
+  public Iterable<LookupKV> extract(final DomainName type, Map<String, Object> config) throws IOException {
+    List<LookupKV> ret = new ArrayList<>();
+    String typeStr = getType();
+    if(config != null) {
+      Object o = config.get(TYPE_CONFIG);
+      if(o != null) {
+        typeStr = o.toString();
+      }
+    }
+    final DomainNameTypeEnum domainType = type.getType();
+    if(domainType == null || SUPPORTED_TYPES.contains(domainType)) {
+      StringObjectPropertyType value = type.getValue();
+      for (String token : StixExtractor.split(value)) {
+        final String indicatorType = typeStr + ":" + DomainNameTypeEnum.FQDN;
+        LookupKV results = new LookupKV(new EnrichmentKey(indicatorType, token)
+                , new EnrichmentValue(
+                new HashMap<String, String>() {{
+                  put("source-type", "STIX");
+                  put("indicator-type", indicatorType);
+                  put("source", type.toXMLString());
+                }}
+        )
+        );
+        ret.add(results);
+      }
+    }
+    return ret;
+  }
+  @Override
+  public List<String> getPossibleTypes() {
+    String typeStr = getType();
+    List<String> ret = new ArrayList<>();
+    for(DomainNameTypeEnum e : SUPPORTED_TYPES)
+    {
+       ret.add(typeStr + ":" + e);
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
new file mode 100644
index 0000000..c7b05eb
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.metron.dataloads.extractor.stix.StixExtractor;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.mitre.cybox.common_2.StringObjectPropertyType;
+import org.mitre.cybox.objects.Hostname;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HostnameHandler  extends AbstractObjectTypeHandler<Hostname>{
+  public static final String TYPE_CONFIG = "stix_hostname_type";
+  public HostnameHandler() {
+    super(Hostname.class);
+  }
+
+  @Override
+  public Iterable<LookupKV> extract(final Hostname type, Map<String, Object> config) throws IOException {
+    StringObjectPropertyType value = type.getHostnameValue();
+    String typeStr = getType();
+    if(config != null) {
+      Object o = config.get(TYPE_CONFIG);
+      if(o != null) {
+        typeStr = o.toString();
+      }
+    }
+    List<LookupKV> ret = new ArrayList<>();
+    for(String token : StixExtractor.split(value)) {
+      final String indicatorType = typeStr;
+      LookupKV results = new LookupKV(new EnrichmentKey(indicatorType, token)
+              , new EnrichmentValue(new HashMap<String, String>() {{
+        put("source-type", "STIX");
+        put("indicator-type", indicatorType);
+        put("source", type.toXMLString());
+      }}
+      )
+      );
+      ret.add(results);
+    }
+    return ret;
+  }
+  @Override
+  public List<String> getPossibleTypes() {
+    return ImmutableList.of(getType());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
new file mode 100644
index 0000000..c7692be
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.mitre.cybox.common_2.ObjectPropertiesType;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public interface ObjectTypeHandler<T extends ObjectPropertiesType> {
+  Iterable<LookupKV> extract(T type, Map<String, Object> config) throws IOException;
+  Class<T> getTypeClass();
+  List<String> getPossibleTypes();
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandlers.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandlers.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandlers.java
new file mode 100644
index 0000000..06d8cd8
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandlers.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import org.mitre.cybox.common_2.ObjectPropertiesType;
+
+public enum ObjectTypeHandlers {
+      ADDRESS(new AddressHandler())
+    ,HOSTNAME(new HostnameHandler())
+    ,DOMAINNAME(new DomainHandler())
+    ,;
+   ObjectTypeHandler _handler;
+   ObjectTypeHandlers(ObjectTypeHandler handler) {
+      _handler = handler;
+   }
+   ObjectTypeHandler getHandler() {
+      return _handler;
+   }
+   public static ObjectTypeHandler getHandlerByInstance(ObjectPropertiesType inst) {
+      for(ObjectTypeHandlers h : values()) {
+         if(inst.getClass().equals(h.getHandler().getTypeClass())) {
+            return h.getHandler();
+         }
+      }
+      return null;
+   }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
new file mode 100644
index 0000000..558ac16
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.hbase.mr;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.enrichment.converter.HbaseConverter;
+import org.apache.metron.enrichment.lookup.LookupKV;
+
+import java.io.IOException;
+
+public class BulkLoadMapper extends Mapper<Object, Text, ImmutableBytesWritable, Put>
+{
+    public static final String CONFIG_KEY="bl_extractor_config";
+    public static final String COLUMN_FAMILY_KEY = "bl_column_family";
+    public static final String LAST_SEEN_KEY = "bl_last_seen";
+    public static final String CONVERTER_KEY = "bl_converter";
+    Extractor extractor = null;
+    String columnFamily = null;
+    HbaseConverter converter;
+    @Override
+    public void setup(Context context) throws IOException,
+            InterruptedException {
+        initialize(context.getConfiguration());
+    }
+
+    @Override
+    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
+        for(LookupKV results : extractor.extract(value.toString())) {
+            if (results != null) {
+                Put put = converter.toPut(columnFamily, results.getKey(), results.getValue());
+                write(new ImmutableBytesWritable(results.getKey().toBytes()), put, context);
+            }
+        }
+    }
+
+    protected void initialize(Configuration configuration) throws IOException{
+        String configStr = configuration.get(CONFIG_KEY);
+        extractor = ExtractorHandler.load(configStr).getExtractor();
+        columnFamily = configuration.get(COLUMN_FAMILY_KEY);
+        try {
+            converter = (HbaseConverter) Class.forName(configuration.get(CONVERTER_KEY)).newInstance();
+        } catch (InstantiationException e) {
+            throw new IllegalStateException("Unable to create converter object: " + configuration.get(CONVERTER_KEY), e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalStateException("Unable to create converter object: " + configuration.get(CONVERTER_KEY), e);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalStateException("Unable to create converter object: " + configuration.get(CONVERTER_KEY), e);
+        }
+    }
+
+    protected void write(ImmutableBytesWritable key, Put value, Context context) throws IOException, InterruptedException {
+        context.write(key, value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
new file mode 100644
index 0000000..d0f1e46
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.hbase.mr;
+
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.metron.enrichment.lookup.LookupKey;
+import org.apache.metron.enrichment.lookup.accesstracker.AccessTracker;
+import org.apache.metron.enrichment.lookup.accesstracker.AccessTrackerUtil;
+
+import java.io.IOException;
+
+public class PrunerMapper extends TableMapper<ImmutableBytesWritable, Delete> {
+    public static final String ACCESS_TRACKER_TABLE_CONF = "access_tracker_table";
+    public static final String ACCESS_TRACKER_CF_CONF = "access_tracker_cf";
+    public static final String TIMESTAMP_CONF = "access_tracker_timestamp";
+    public static final String ACCESS_TRACKER_NAME_CONF = "access_tracker_name";
+    AccessTracker tracker;
+    @Override
+    public void setup(Context context) throws IOException
+    {
+        String atTable = context.getConfiguration().get(ACCESS_TRACKER_TABLE_CONF);
+        String atCF = context.getConfiguration().get(ACCESS_TRACKER_CF_CONF);
+        String atName = context.getConfiguration().get(ACCESS_TRACKER_NAME_CONF);
+        HTable table = new HTable(context.getConfiguration(), atTable);
+        long timestamp = context.getConfiguration().getLong(TIMESTAMP_CONF, -1);
+        if(timestamp < 0) {
+            throw new IllegalStateException("Must specify a timestamp that is positive.");
+        }
+        try {
+            tracker = AccessTrackerUtil.INSTANCE.loadAll(AccessTrackerUtil.INSTANCE.loadAll(table, atCF, atName, timestamp));
+        } catch (Throwable e) {
+            throw new IllegalStateException("Unable to load the accesstrackers from the directory", e);
+        }
+    }
+
+    @Override
+    public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
+        if(tracker == null || key == null) {
+            throw new RuntimeException("Tracker = " + tracker + " key = " + key);
+        }
+        if(!tracker.hasSeen(toLookupKey(key.get()))) {
+            Delete d = new Delete(key.get());
+            context.write(key, d);
+        }
+    }
+
+    protected LookupKey toLookupKey(final byte[] bytes) {
+        return new LookupKey() {
+            @Override
+            public byte[] toBytes() {
+                return bytes;
+            }
+
+            @Override
+            public void fromBytes(byte[] in) {
+
+            }
+        };
+    }
+
+}