You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2017/02/06 20:17:41 UTC

[13/17] incubator-metron git commit: METRON-682: Unify and Improve the Flat File Loader closes apache/incubator-metron#432

METRON-682: Unify and Improve the Flat File Loader closes apache/incubator-metron#432


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/1be4fcb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/1be4fcb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/1be4fcb0

Branch: refs/heads/Metron_0.3.1
Commit: 1be4fcb0243453863b6aefe0213fe9f0afed5718
Parents: a11e85c
Author: cstella <ce...@gmail.com>
Authored: Mon Feb 6 11:04:32 2017 -0500
Committer: cstella <ce...@gmail.com>
Committed: Mon Feb 6 11:04:32 2017 -0500

----------------------------------------------------------------------
 metron-analytics/metron-statistics/README.md    |   2 +-
 .../docker/rpm-docker/SPECS/metron.spec         |   1 -
 .../metron/common/utils/cli/OptionHandler.java  |  31 ++
 .../metron-data-management/README.md            |  22 +-
 .../dataloads/bulk/ThreatIntelBulkLoader.java   | 260 --------------
 .../dataloads/extractor/ExtractorHandler.java   |  10 +-
 .../extractor/inputformat/Formats.java          |  50 +--
 .../inputformat/InputFormatHandler.java         |   7 +-
 .../extractor/inputformat/WholeFileFormat.java  | 123 +++----
 .../nonbulk/flatfile/ExtractorState.java        |  16 +-
 .../dataloads/nonbulk/flatfile/LoadOptions.java | 261 ++++++++++++++
 .../SimpleEnrichmentFlatFileLoader.java         | 290 +--------------
 .../flatfile/importer/ImportStrategy.java       |  47 +++
 .../nonbulk/flatfile/importer/Importer.java     |  34 ++
 .../flatfile/importer/LocalImporter.java        | 177 ++++++++++
 .../flatfile/importer/MapReduceImporter.java    |  75 ++++
 .../nonbulk/flatfile/location/FileLocation.java |  57 +++
 .../nonbulk/flatfile/location/HDFSLocation.java |  75 ++++
 .../nonbulk/flatfile/location/Location.java     |  99 ++++++
 .../flatfile/location/LocationStrategy.java     |  67 ++++
 .../nonbulk/flatfile/location/RawLocation.java  |  57 +++
 .../nonbulk/flatfile/location/URLLocation.java  |  63 ++++
 .../src/main/scripts/flatfile_loader.sh         |  22 +-
 .../src/main/scripts/threatintel_bulk_load.sh   |  41 ---
 .../hbase/mr/BulkLoadMapperIntegrationTest.java | 140 --------
 .../LeastRecentlyUsedPrunerIntegrationTest.java |  35 +-
 ...EnrichmentFlatFileLoaderIntegrationTest.java | 349 +++++++++++++++++++
 .../SimpleEnrichmentFlatFileLoaderTest.java     | 164 ---------
 .../nonbulk/taxii/TaxiiIntegrationTest.java     |  13 +-
 .../integration/IndexingIntegrationTest.java    |   4 +-
 30 files changed, 1567 insertions(+), 1025 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-analytics/metron-statistics/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-statistics/README.md b/metron-analytics/metron-statistics/README.md
index 4d78839..cfd44f2 100644
--- a/metron-analytics/metron-statistics/README.md
+++ b/metron-analytics/metron-statistics/README.md
@@ -45,7 +45,7 @@ functions can be used from everywhere where Stellar is used.
 * Input:
   * number - The number to take the absolute value of
 * Returns: The absolute value of the number passed in.
-*
+
 #### `BIN`
 * Description: Computes the bin that the value is in given a set of bounds.
 * Input:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
index 5c5881c..9466b68 100644
--- a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
+++ b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
@@ -181,7 +181,6 @@ This package installs the Metron Parser files
 %{metron_home}/bin/flatfile_loader.sh
 %{metron_home}/bin/prune_elasticsearch_indices.sh
 %{metron_home}/bin/prune_hdfs_files.sh
-%{metron_home}/bin/threatintel_bulk_load.sh
 %{metron_home}/bin/threatintel_bulk_prune.sh
 %{metron_home}/bin/threatintel_taxii_load.sh
 %attr(0644,root,root) %{metron_home}/lib/metron-data-management-%{full_version}.jar

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/OptionHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/OptionHandler.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/OptionHandler.java
new file mode 100644
index 0000000..85e7520
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/OptionHandler.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils.cli;
+
+import com.google.common.base.Function;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+
+import java.util.Optional;
+
+public abstract class OptionHandler<OPT_T extends Enum<OPT_T>> implements Function<String, Option>
+{
+  public Optional<Object> getValue(OPT_T option, CommandLine cli) {
+    return Optional.empty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/README.md b/metron-platform/metron-data-management/README.md
index 26dd472..eaafda4 100644
--- a/metron-platform/metron-data-management/README.md
+++ b/metron-platform/metron-data-management/README.md
@@ -206,32 +206,16 @@ The parameters for the utility are as follows:
 | -n         | --enrichment_config       | No           | The JSON document describing the enrichments to configure.  Unlike other loaders, this is run first if specified.                                  |
 
 
-### Bulk Load from HDFS
-
-The shell script `$METRON_HOME/bin/threatintel_bulk_load.sh` will kick off a MR job to load data staged in HDFS into an HBase table.  Note: despite what
-the naming may suggest, this utility works for enrichment as well as threat intel due to the underlying infrastructure being the same.
-
-The parameters for the utility are as follows:
-
-| Short Code | Long Code           | Is Required? | Description                                                                                                       |
-|------------|---------------------|--------------|-------------------------------------------------------------------------------------------------------------------|
-| -h         |                     | No           | Generate the help screen/set of options                                                                           |
-| -e         | --extractor_config  | Yes          | JSON Document describing the extractor for this input data source                                                 |
-| -t         | --table             | Yes          | The HBase table to import into                                                                                    |
-| -f         | --column_family     | Yes          | The HBase table column family to import into                                                                      |
-| -i         | --input             | Yes          | The input data location on HDFS                                                                                   |
-| -n         | --enrichment_config | No           | The JSON document describing the enrichments to configure.  Unlike other loaders, this is run first if specified. |
-or threat intel.
 
 ### Flatfile Loader
 
-The shell script `$METRON_HOME/bin/flatfile_loader.sh` will read data from local disk and load the enrichment or threat intel data into an HBase table.  
+The shell script `$METRON_HOME/bin/flatfile_loader.sh` will read data from local disk, HDFS or URLs and load the enrichment or threat intel data into an HBase table.  
 Note: This utility works for enrichment as well as threat intel due to the underlying infrastructure being the same.
 
 One special thing to note here is that there is a special configuration
 parameter to the Extractor config that is only considered during this
 loader:
-* inputFormatHandler : This specifies how to consider the data.  The two implementations are `BY_LINE` and `org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat`.
+* inputFormat : This specifies how to consider the data.  The two implementations are `BY_LINE` and `WHOLE_FILE`.
 
 The default is `BY_LINE`, which makes sense for a list of CSVs where
 each line indicates a unit of information which can be imported.
@@ -243,7 +227,9 @@ The parameters for the utility are as follows:
 | Short Code | Long Code           | Is Required? | Description                                                                                                                                                                         |   |
 |------------|---------------------|--------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---|
 | -h         |                     | No           | Generate the help screen/set of options                                                                                                                                             |   |
+| -q         | --quiet             | No           | Do not update progress
 | -e         | --extractor_config  | Yes          | JSON Document describing the extractor for this input data source                                                                                                                   |   |
+| -m         | --import_mode       | No           | The Import mode to use: LOCAL, MR.  Default: LOCAL                                                                                                                  |   |
 | -t         | --hbase_table       | Yes          | The HBase table to import into                                                                                                                                                      |   |
 | -c         | --hbase_cf          | Yes          | The HBase table column family to import into                                                                                                                                        |   |
 | -i         | --input             | Yes          | The input data location on local disk.  If this is a file, then that file will be loaded.  If this is a directory, then the files will be loaded recursively under that directory. |   |

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
deleted file mode 100644
index 5ba0a91..0000000
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.dataloads.bulk;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.io.Files;
-import org.apache.commons.cli.*;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.metron.dataloads.extractor.ExtractorHandler;
-import org.apache.metron.dataloads.hbase.mr.BulkLoadMapper;
-import org.apache.metron.common.configuration.enrichment.SensorEnrichmentUpdateConfig;
-import org.apache.metron.enrichment.converter.HbaseConverter;
-import org.apache.metron.enrichment.converter.EnrichmentConverter;
-import org.apache.metron.common.utils.JSONUtils;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.text.*;
-import java.util.Date;
-
-public class ThreatIntelBulkLoader  {
-  private static abstract class OptionHandler implements Function<String, Option> {}
-  public enum BulkLoadOptions {
-    HELP("h", new OptionHandler() {
-
-      @Nullable
-      @Override
-      public Option apply(@Nullable String s) {
-        return new Option(s, "help", false, "Generate Help screen");
-      }
-    })
-    ,TABLE("t", new OptionHandler() {
-      @Nullable
-      @Override
-      public Option apply(@Nullable String s) {
-        Option o = new Option(s, "table", true, "HBase table to import data into");
-        o.setRequired(true);
-        o.setArgName("HBASE_TABLE");
-        return o;
-      }
-    })
-    ,COLUMN_FAMILY("f", new OptionHandler() {
-      @Nullable
-      @Override
-      public Option apply(@Nullable String s) {
-        Option o = new Option(s, "column_family", true, "Column family of the HBase table to import into");
-        o.setRequired(true);
-        o.setArgName("CF_NAME");
-        return o;
-      }
-    })
-    ,EXTRACTOR_CONFIG("e", new OptionHandler() {
-      @Nullable
-      @Override
-      public Option apply(@Nullable String s) {
-        Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
-        o.setArgName("JSON_FILE");
-        o.setRequired(true);
-        return o;
-      }
-    })
-    ,INPUT_DATA("i", new OptionHandler() {
-      @Nullable
-      @Override
-      public Option apply(@Nullable String s) {
-        Option o = new Option(s, "input", true, "Input directory in HDFS for the data to import into HBase");
-        o.setArgName("DIR");
-        o.setRequired(true);
-        return o;
-      }
-    })
-    ,AS_OF_TIME("a", new OptionHandler() {
-      @Nullable
-      @Override
-      public Option apply(@Nullable String s) {
-        Option o = new Option(s, "as_of", true, "The last read timestamp to mark the records with (omit for time of execution)");
-        o.setArgName("datetime");
-        o.setRequired(false);
-        return o;
-      }
-    })
-    ,AS_OF_TIME_FORMAT("z", new OptionHandler() {
-      @Nullable
-      @Override
-      public Option apply(@Nullable String s) {
-        Option o = new Option(s, "as_of_format", true, "The format of the as_of time (only used in conjunction with the as_of option)");
-        o.setArgName("format");
-        o.setRequired(false);
-        return o;
-      }
-    })
-    ,CONVERTER("c", new OptionHandler() {
-      @Nullable
-      @Override
-      public Option apply(@Nullable String s) {
-        Option o = new Option(s, "converter", true, "The HBase converter class to use (Default is threat intel)");
-        o.setArgName("class");
-        o.setRequired(false);
-        return o;
-      }
-    })
-    ,ENRICHMENT_CONFIG("n", new OptionHandler() {
-      @Nullable
-      @Override
-      public Option apply(@Nullable String s) {
-        Option o = new Option(s, "enrichment_config", true
-                , "JSON Document describing the enrichment configuration details." +
-                "  This is used to associate an enrichment type with a field type in zookeeper."
-        );
-        o.setArgName("JSON_FILE");
-        o.setRequired(false);
-        return o;
-      }
-    })
-    ;
-    Option option;
-    String shortCode;
-    BulkLoadOptions(String shortCode, OptionHandler optionHandler) {
-      this.shortCode = shortCode;
-      this.option = optionHandler.apply(shortCode);
-    }
-
-    public boolean has(CommandLine cli) {
-      return cli.hasOption(shortCode);
-    }
-
-    public String get(CommandLine cli) {
-      return cli.getOptionValue(shortCode);
-    }
-
-    public static CommandLine parse(CommandLineParser parser, String[] args) {
-      try {
-        CommandLine cli = parser.parse(getOptions(), args);
-        if(ThreatIntelBulkLoader.BulkLoadOptions.HELP.has(cli)) {
-          printHelp();
-          System.exit(0);
-        }
-        return cli;
-      } catch (ParseException e) {
-        System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
-        e.printStackTrace(System.err);
-        printHelp();
-        System.exit(-1);
-        return null;
-      }
-    }
-
-    public static void printHelp() {
-      HelpFormatter formatter = new HelpFormatter();
-      formatter.printHelp( "ThreatIntelBulkLoader", getOptions());
-    }
-
-    public static Options getOptions() {
-      Options ret = new Options();
-      for(BulkLoadOptions o : BulkLoadOptions.values()) {
-        ret.addOption(o.option);
-      }
-      return ret;
-    }
-  }
-
-  private static long getTimestamp(CommandLine cli) throws java.text.ParseException {
-    if(BulkLoadOptions.AS_OF_TIME.has(cli)) {
-      if(!BulkLoadOptions.AS_OF_TIME_FORMAT.has(cli)) {
-        throw new IllegalStateException("Unable to proceed: Specified as_of_time without an associated format.");
-      }
-      else {
-        DateFormat format = new SimpleDateFormat(BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli));
-        Date d = format.parse(BulkLoadOptions.AS_OF_TIME.get(cli));
-        return d.getTime();
-      }
-    }
-    else {
-      return System.currentTimeMillis();
-    }
-  }
-  private static String readExtractorConfig(File configFile) throws IOException {
-    return Joiner.on("\n").join(Files.readLines(configFile, Charset.defaultCharset()));
-  }
-
-  public static Job createJob(Configuration conf, String input, String table, String cf, String extractorConfigContents, long ts, HbaseConverter converter) throws IOException {
-    Job job = new Job(conf);
-    job.setJobName("ThreatIntelBulkLoader: " + input + " => " +  table + ":" + cf);
-    System.out.println("Configuring " + job.getJobName());
-    job.setJarByClass(ThreatIntelBulkLoader.class);
-    job.setMapperClass(org.apache.metron.dataloads.hbase.mr.BulkLoadMapper.class);
-    job.setOutputFormatClass(TableOutputFormat.class);
-    job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
-    job.getConfiguration().set(BulkLoadMapper.COLUMN_FAMILY_KEY, cf);
-    job.getConfiguration().set(BulkLoadMapper.CONFIG_KEY, extractorConfigContents);
-    job.getConfiguration().set(BulkLoadMapper.LAST_SEEN_KEY, "" + ts);
-    job.getConfiguration().set(BulkLoadMapper.CONVERTER_KEY, converter.getClass().getName());
-    job.setOutputKeyClass(ImmutableBytesWritable.class);
-    job.setOutputValueClass(Put.class);
-    job.setNumReduceTasks(0);
-    ExtractorHandler handler = ExtractorHandler.load(extractorConfigContents);
-    handler.getInputFormatHandler().set(job, new Path(input), handler.getConfig());
-    return job;
-  }
-
-  public static void main(String... argv) throws Exception {
-    Configuration conf = HBaseConfiguration.create();
-    String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
-
-    CommandLine cli = BulkLoadOptions.parse(new PosixParser(), otherArgs);
-    Long ts = getTimestamp(cli);
-    String input = BulkLoadOptions.INPUT_DATA.get(cli);
-    String table = BulkLoadOptions.TABLE.get(cli);
-    String cf = BulkLoadOptions.COLUMN_FAMILY.get(cli);
-    String extractorConfigContents = readExtractorConfig(new File(BulkLoadOptions.EXTRACTOR_CONFIG.get(cli)));
-    String converterClass = EnrichmentConverter.class.getName();
-    if(BulkLoadOptions.CONVERTER.has(cli)) {
-      converterClass = BulkLoadOptions.CONVERTER.get(cli);
-    }
-    SensorEnrichmentUpdateConfig sensorEnrichmentUpdateConfig = null;
-    if(BulkLoadOptions.ENRICHMENT_CONFIG.has(cli)) {
-      sensorEnrichmentUpdateConfig = JSONUtils.INSTANCE.load( new File(BulkLoadOptions.ENRICHMENT_CONFIG.get(cli))
-              , SensorEnrichmentUpdateConfig.class
-      );
-    }
-
-    HbaseConverter converter = (HbaseConverter) Class.forName(converterClass).getConstructor().newInstance();
-    Job job = createJob(conf, input, table, cf, extractorConfigContents, ts, converter);
-    System.out.println(conf);
-    boolean jobRet = job.waitForCompletion(true);
-    if(!jobRet) {
-      System.exit(1);
-    }
-    if(sensorEnrichmentUpdateConfig != null) {
-        sensorEnrichmentUpdateConfig.updateSensorConfigs();
-    }
-    System.exit(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
index 89477d8..2e2f799 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
@@ -33,7 +33,7 @@ public class ExtractorHandler {
     final static ObjectMapper _mapper = new ObjectMapper();
     private Map<String, Object> config;
     private Extractor extractor;
-    private InputFormatHandler inputFormatHandler = Formats.BY_LINE;
+    private InputFormatHandler inputFormat = Formats.BY_LINE;
 
     public Map<String, Object> getConfig() {
         return config;
@@ -43,13 +43,13 @@ public class ExtractorHandler {
         this.config = config;
     }
 
-    public InputFormatHandler getInputFormatHandler() {
-        return inputFormatHandler;
+    public InputFormatHandler getInputFormat() {
+        return inputFormat;
     }
 
-    public void setInputFormatHandler(String handler) {
+    public void setInputFormat(String handler) {
         try {
-            this.inputFormatHandler= Formats.create(handler);
+            this.inputFormat= Formats.create(handler);
         } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
             throw new IllegalStateException("Unable to create an inputformathandler", e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java
index b8be233..961e7d3 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java
@@ -23,34 +23,34 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
-public enum Formats implements InputFormatHandler{
-    BY_LINE(new InputFormatHandler() {
-        @Override
-        public void set(Job job, Path input, Map<String, Object> config) throws IOException {
+public enum Formats implements InputFormatHandler {
+  BY_LINE( (job, inputs, config) -> {
+      for(Path input : inputs) {
+        FileInputFormat.addInputPath(job, input);
+      }
+  }),
+  WHOLE_FILE( new WholeFileFormat());
+  InputFormatHandler _handler = null;
+  Formats(InputFormatHandler handler) {
+    this._handler = handler;
+  }
+  @Override
+  public void set(Job job, List<Path> path, Map<String, Object> config) throws IOException {
+    _handler.set(job, path, config);
+  }
 
-            FileInputFormat.addInputPath(job, input);
-        }
-    })
-    ;
-    InputFormatHandler _handler = null;
-    Formats(InputFormatHandler handler) {
-        this._handler = handler;
+  public static InputFormatHandler create(String handlerName) throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
+    try {
+      InputFormatHandler ec = Formats.valueOf(handlerName)._handler;
+      return ec;
     }
-    @Override
-    public void set(Job job, Path path, Map<String, Object> config) throws IOException {
-        _handler.set(job, path, config);
-    }
-
-    public static InputFormatHandler create(String handlerName) throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
-        try {
-            InputFormatHandler ec = Formats.valueOf(handlerName);
-            return ec;
-        }
-        catch(IllegalArgumentException iae) {
-            InputFormatHandler ex = (InputFormatHandler) Class.forName(handlerName).getConstructor().newInstance();
-            return ex;
-        }
+    catch(IllegalArgumentException iae) {
+      InputFormatHandler ex = (InputFormatHandler) Class.forName(handlerName).getConstructor().newInstance();
+      return ex;
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java
index 2287969..00e89c0 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java
@@ -17,12 +17,17 @@
  */
 package org.apache.metron.dataloads.extractor.inputformat;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
 public interface InputFormatHandler {
-    void set(Job job, Path input, Map<String, Object> config) throws IOException;
+  void set(Job job, List<Path> input, Map<String, Object> config) throws IOException;
+  default void set(Job job, Path input, Map<String, Object> config) throws IOException {
+    set(job, ImmutableList.of(input), config);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java
index e0a58ef..5dc8b53 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java
@@ -30,80 +30,83 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
 public class WholeFileFormat implements InputFormatHandler {
 
-    public static class WholeFileRecordReader extends RecordReader<NullWritable, Text> {
-        private FileSplit fileSplit;
-        private Configuration conf;
-        private Text value = new Text();
-        private boolean processed = false;
+  public static class WholeFileRecordReader extends RecordReader<NullWritable, Text> {
+    private FileSplit fileSplit;
+    private Configuration conf;
+    private Text value = new Text();
+    private boolean processed = false;
 
-        @Override
-        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
-            this.fileSplit = (FileSplit) split;
-            this.conf = context.getConfiguration();
-        }
-
-        @Override
-        public boolean nextKeyValue() throws IOException, InterruptedException {
-            if (!processed) {
-                byte[] contents = new byte[(int) fileSplit.getLength()];
-                Path file = fileSplit.getPath();
-                FileSystem fs = file.getFileSystem(conf);
-                FSDataInputStream in = null;
-                try {
-                    in = fs.open(file);
-                    IOUtils.readFully(in, contents, 0, contents.length);
-                    value.set(contents, 0, contents.length);
-                } finally {
-                    IOUtils.closeStream(in);
-                }
-                processed = true;
-                return true;
-            }
-            return false;
-        }
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+      this.fileSplit = (FileSplit) split;
+      this.conf = context.getConfiguration();
+    }
 
-        @Override
-        public NullWritable getCurrentKey() throws IOException, InterruptedException {
-            return NullWritable.get();
-        }
-        @Override
-        public Text getCurrentValue() throws IOException, InterruptedException{
-            return value;
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      if (!processed) {
+        byte[] contents = new byte[(int) fileSplit.getLength()];
+        Path file = fileSplit.getPath();
+        FileSystem fs = file.getFileSystem(conf);
+        FSDataInputStream in = null;
+        try {
+          in = fs.open(file);
+          IOUtils.readFully(in, contents, 0, contents.length);
+          value.set(contents, 0, contents.length);
+        } finally {
+          IOUtils.closeStream(in);
         }
+        processed = true;
+        return true;
+      }
+      return false;
+    }
 
-        @Override
-        public float getProgress() throws IOException {
-            return processed ? 1.0f : 0.0f;
-        }
+    @Override
+    public NullWritable getCurrentKey() throws IOException, InterruptedException {
+      return NullWritable.get();
+    }
+    @Override
+    public Text getCurrentValue() throws IOException, InterruptedException{
+      return value;
+    }
 
-        @Override
-        public void close() throws IOException{
-            //do nothing :)
-        }
+    @Override
+    public float getProgress() throws IOException {
+      return processed ? 1.0f : 0.0f;
     }
 
-    public static class WholeFileInputFormat extends FileInputFormat<NullWritable, Text> {
+    @Override
+    public void close() throws IOException{
+      //do nothing :)
+    }
+  }
 
-        @Override
-        protected boolean isSplitable(JobContext context, Path file) {
-            return false;
-        }
+  public static class WholeFileInputFormat extends FileInputFormat<NullWritable, Text> {
 
-        @Override
-        public RecordReader<NullWritable, Text> createRecordReader(
-                InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
-            WholeFileRecordReader reader = new WholeFileRecordReader();
-            reader.initialize(split, context);
-            return reader;
-        }
+    @Override
+    protected boolean isSplitable(JobContext context, Path file) {
+      return false;
     }
+
     @Override
-    public void set(Job job, Path input, Map<String, Object> config) throws IOException {
-        WholeFileInputFormat.setInputPaths(job, input);
-        job.setInputFormatClass(WholeFileInputFormat.class);
+    public RecordReader<NullWritable, Text> createRecordReader(
+            InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+      WholeFileRecordReader reader = new WholeFileRecordReader();
+      reader.initialize(split, context);
+      return reader;
+    }
+  }
+  @Override
+  public void set(Job job, List<Path> inputs, Map<String, Object> config) throws IOException {
+    for(Path input : inputs) {
+      WholeFileInputFormat.addInputPath(job, input);
     }
+    job.setInputFormatClass(WholeFileInputFormat.class);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java
index e44eb27..168d251 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java
@@ -17,19 +17,29 @@
  */
 package org.apache.metron.dataloads.nonbulk.flatfile;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.metron.dataloads.extractor.Extractor;
 import org.apache.metron.enrichment.converter.HbaseConverter;
 
+import java.io.IOException;
+
 public class ExtractorState {
   private HTableInterface table;
   private Extractor extractor;
   private HbaseConverter converter;
+  private FileSystem fs;
 
-  public ExtractorState(HTableInterface table, Extractor extractor, HbaseConverter converter) {
+  public ExtractorState(HTableInterface table, Extractor extractor, HbaseConverter converter, Configuration config) {
     this.table = table;
     this.extractor = extractor;
     this.converter = converter;
+    try {
+      this.fs = FileSystem.get(config);
+    } catch (IOException e) {
+      throw new IllegalStateException("Unable to retrieve hadoop file system: " + e.getMessage(), e);
+    }
   }
 
   public HTableInterface getTable() {
@@ -43,4 +53,8 @@ public class ExtractorState {
   public HbaseConverter getConverter() {
     return converter;
   }
+
+  public FileSystem getFileSystem() {
+    return fs;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/LoadOptions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/LoadOptions.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/LoadOptions.java
new file mode 100644
index 0000000..ddaf6a6
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/LoadOptions.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import org.apache.commons.cli.*;
+import org.apache.commons.io.FileUtils;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.cli.OptionHandler;
+import org.apache.metron.dataloads.nonbulk.flatfile.importer.ImportStrategy;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Optional;
+
+public enum LoadOptions {
+  HELP("h", new OptionHandler<LoadOptions>() {
+
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      return new Option(s, "help", false, "Generate Help screen");
+    }
+  })
+  ,QUIET("q", new OptionHandler<LoadOptions>() {
+
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      return new Option(s, "quiet", false, "Do not update progress");
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      return Optional.of(option.has(cli));
+    }
+  })
+  , IMPORT_MODE("m", new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "import_mode", true
+                           , "The Import mode to use: " + Joiner.on(",").join(ImportStrategy.values())
+                           + ".  Default: " + ImportStrategy.LOCAL
+                           );
+      o.setArgName("MODE");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      String mode = option.get(cli);
+      return Optional.of(ImportStrategy.getStrategy(mode).orElse(ImportStrategy.LOCAL));
+    }
+  })
+  ,HBASE_TABLE("t", new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "hbase_table", true, "HBase table to ingest the data into.");
+      o.setArgName("TABLE");
+      o.setRequired(true);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      return Optional.ofNullable(option.get(cli).trim());
+    }
+  })
+  ,HBASE_CF("c", new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "hbase_cf", true, "HBase column family to ingest the data into.");
+      o.setArgName("CF");
+      o.setRequired(true);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      return Optional.ofNullable(option.get(cli).trim());
+    }
+  })
+  ,EXTRACTOR_CONFIG("e", new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
+      o.setArgName("JSON_FILE");
+      o.setRequired(true);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      try {
+        return Optional.ofNullable(FileUtils.readFileToString(new File(option.get(cli).trim())));
+      } catch (IOException e) {
+        throw new IllegalStateException("Unable to retrieve extractor config from " + option.get(cli) + ": " + e.getMessage(), e);
+      }
+    }
+  })
+  ,ENRICHMENT_CONFIG("n", new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "enrichment_config", true
+              , "JSON Document describing the enrichment configuration details." +
+              "  This is used to associate an enrichment type with a field type in zookeeper."
+      );
+      o.setArgName("JSON_FILE");
+      o.setRequired(false);
+      return o;
+    }
+  })
+  ,LOG4J_PROPERTIES("l", new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "log4j", true, "The log4j properties file to load");
+      o.setArgName("FILE");
+      o.setRequired(false);
+      return o;
+    }
+  })
+  ,NUM_THREADS("p", new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "threads", true, "The number of threads to use when extracting data.  The default is the number of cores of your machine.");
+      o.setArgName("NUM_THREADS");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      int numThreads = Runtime.getRuntime().availableProcessors();
+      if(option.has(cli)) {
+        numThreads = ConversionUtils.convert(option.get(cli), Integer.class);
+      }
+      return Optional.of(numThreads);
+    }
+  })
+  ,BATCH_SIZE("b", new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "batchSize", true, "The batch size to use for HBase puts");
+      o.setArgName("SIZE");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      int batchSize = 128;
+      if(option.has(cli)) {
+        batchSize = ConversionUtils.convert(option.get(cli), Integer.class);
+      }
+      return Optional.of(batchSize);
+    }
+  })
+  ,INPUT("i", new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "input", true, "The CSV File to load");
+      o.setArgName("FILE");
+      o.setRequired(true);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      List<String> inputs = new ArrayList<>();
+      for(String input : Splitter.on(",").split(Optional.ofNullable(option.get(cli)).orElse(""))) {
+        inputs.add(input.trim());
+      }
+      return Optional.of(inputs);
+    }
+  })
+  ;
+  Option option;
+  String shortCode;
+  OptionHandler<LoadOptions> handler;
+  LoadOptions(String shortCode, OptionHandler<LoadOptions> optionHandler) {
+    this.shortCode = shortCode;
+    this.handler = optionHandler;
+    this.option = optionHandler.apply(shortCode);
+  }
+
+  public boolean has(CommandLine cli) {
+    return cli.hasOption(shortCode);
+  }
+
+  public String get(CommandLine cli) {
+    return cli.getOptionValue(shortCode);
+  }
+
+  public static CommandLine parse(CommandLineParser parser, String[] args) {
+    try {
+      CommandLine cli = parser.parse(getOptions(), args);
+      if(HELP.has(cli)) {
+        printHelp();
+        System.exit(0);
+      }
+      return cli;
+    } catch (ParseException e) {
+      System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+      e.printStackTrace(System.err);
+      printHelp();
+      System.exit(-1);
+      return null;
+    }
+  }
+
+  public static EnumMap<LoadOptions, Optional<Object> > createConfig(CommandLine cli) {
+    EnumMap<LoadOptions, Optional<Object> > ret = new EnumMap<>(LoadOptions.class);
+    for(LoadOptions option : values()) {
+      ret.put(option, option.handler.getValue(option, cli));
+    }
+    return ret;
+  }
+
+  public static void printHelp() {
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.printHelp( "SimpleEnrichmentFlatFileLoader", getOptions());
+  }
+
+  public static Options getOptions() {
+    Options ret = new Options();
+    for(LoadOptions o : LoadOptions.values()) {
+      ret.addOption(o.option);
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
index 9992422..8ee11aa 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
@@ -17,312 +17,48 @@
  */
 package org.apache.metron.dataloads.nonbulk.flatfile;
 
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
 import org.apache.commons.cli.*;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.log4j.PropertyConfigurator;
-import org.apache.metron.common.utils.ConversionUtils;
-import org.apache.metron.common.utils.file.ReaderSpliterator;
-import org.apache.metron.dataloads.extractor.Extractor;
 import org.apache.metron.dataloads.extractor.ExtractorHandler;
-import org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat;
 import org.apache.metron.common.configuration.enrichment.SensorEnrichmentUpdateConfig;
-import org.apache.metron.hbase.HTableProvider;
-import org.apache.metron.enrichment.converter.HbaseConverter;
-import org.apache.metron.enrichment.converter.EnrichmentConverter;
-import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.dataloads.nonbulk.flatfile.importer.ImportStrategy;
 import org.apache.metron.common.utils.JSONUtils;
 
-import javax.annotation.Nullable;
 import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Stack;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinPool;
-import java.util.stream.Stream;
+import java.util.*;
 
 public class SimpleEnrichmentFlatFileLoader {
-  private static abstract class OptionHandler implements Function<String, Option> {}
-  public static enum LoadOptions {
-    HELP("h", new OptionHandler() {
 
-      @Nullable
-      @Override
-      public Option apply(@Nullable String s) {
-        return new Option(s, "help", false, "Generate Help screen");
-      }
-    })
-    ,HBASE_TABLE("t", new OptionHandler() {
-      @Nullable
-      @Override
-      public Option apply(@Nullable String s) {
-        Option o = new Option(s, "hbase_table", true, "HBase table to ingest the data into.");
-        o.setArgName("TABLE");
-        o.setRequired(true);
-        return o;
-      }
-    })
-    ,HBASE_CF("c", new OptionHandler() {
-      @Nullable
-      @Override
-      public Option apply(@Nullable String s) {
-        Option o = new Option(s, "hbase_cf", true, "HBase column family to ingest the data into.");
-        o.setArgName("CF");
-        o.setRequired(true);
-        return o;
-      }
-    })
-    ,EXTRACTOR_CONFIG("e", new OptionHandler() {
-      @Nullable
-      @Override
-      public Option apply(@Nullable String s) {
-        Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
-        o.setArgName("JSON_FILE");
-        o.setRequired(true);
-        return o;
-      }
-    })
-    ,ENRICHMENT_CONFIG("n", new OptionHandler() {
-      @Nullable
-      @Override
-      public Option apply(@Nullable String s) {
-        Option o = new Option(s, "enrichment_config", true
-                , "JSON Document describing the enrichment configuration details." +
-                "  This is used to associate an enrichment type with a field type in zookeeper."
-        );
-        o.setArgName("JSON_FILE");
-        o.setRequired(false);
-        return o;
-      }
-    })
-    ,LOG4J_PROPERTIES("l", new OptionHandler() {
-      @Nullable
-      @Override
-      public Option apply(@Nullable String s) {
-        Option o = new Option(s, "log4j", true, "The log4j properties file to load");
-        o.setArgName("FILE");
-        o.setRequired(false);
-        return o;
-      }
-    })
-    ,NUM_THREADS("p", new OptionHandler() {
-      @Nullable
-      @Override
-      public Option apply(@Nullable String s) {
-        Option o = new Option(s, "threads", true, "The number of threads to use when extracting data.  The default is the number of cores of your machine.");
-        o.setArgName("NUM_THREADS");
-        o.setRequired(false);
-        return o;
-      }
-    })
-    ,BATCH_SIZE("b", new OptionHandler() {
-      @Nullable
-      @Override
-      public Option apply(@Nullable String s) {
-        Option o = new Option(s, "batchSize", true, "The batch size to use for HBase puts");
-        o.setArgName("SIZE");
-        o.setRequired(false);
-        return o;
-      }
-    })
-    ,INPUT("i", new OptionHandler() {
-      @Nullable
-      @Override
-      public Option apply(@Nullable String s) {
-        Option o = new Option(s, "input", true, "The CSV File to load");
-        o.setArgName("FILE");
-        o.setRequired(true);
-        return o;
-      }
-    })
-    ;
-    Option option;
-    String shortCode;
-    LoadOptions(String shortCode, OptionHandler optionHandler) {
-      this.shortCode = shortCode;
-      this.option = optionHandler.apply(shortCode);
-    }
-
-    public boolean has(CommandLine cli) {
-      return cli.hasOption(shortCode);
-    }
-
-    public String get(CommandLine cli) {
-      return cli.getOptionValue(shortCode);
-    }
-
-    public static CommandLine parse(CommandLineParser parser, String[] args) {
-      try {
-        CommandLine cli = parser.parse(getOptions(), args);
-        if(HELP.has(cli)) {
-          printHelp();
-          System.exit(0);
-        }
-        return cli;
-      } catch (ParseException e) {
-        System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
-        e.printStackTrace(System.err);
-        printHelp();
-        System.exit(-1);
-        return null;
-      }
-    }
-
-    public static void printHelp() {
-      HelpFormatter formatter = new HelpFormatter();
-      formatter.printHelp( "SimpleEnrichmentFlatFileLoader", getOptions());
-    }
-
-    public static Options getOptions() {
-      Options ret = new Options();
-      for(LoadOptions o : LoadOptions.values()) {
-        ret.addOption(o.option);
-      }
-      return ret;
-    }
-  }
-  public static List<File> getFiles(File root) {
-    if(!root.isDirectory())  {
-      return ImmutableList.of(root);
-    }
-    List<File> ret = new ArrayList<>();
-    Stack<File> stack = new Stack<File>();
-    stack.push(root);
-    while(!stack.isEmpty()) {
-      File f = stack.pop();
-      if(f.isDirectory()) {
-        for(File child : f.listFiles()) {
-          stack.push(child);
-        }
-      }
-      else {
-        ret.add(f);
-      }
-    }
-    return ret;
-  }
 
-  public HTableProvider getProvider() {
-    return new HTableProvider();
-  }
-
-  public List<Put> extract( String line
-                     , Extractor extractor
-                     , String cf
-                     , HbaseConverter converter
-                     ) throws IOException
-  {
-    List<Put> ret = new ArrayList<>();
-    Iterable<LookupKV> kvs = extractor.extract(line);
-    for(LookupKV kv : kvs) {
-      Put put = converter.toPut(cf, kv.getKey(), kv.getValue());
-      ret.add(put);
-    }
-    return ret;
-  }
-
-  public void load( final Iterable<Stream<String>> streams
-                  , final ThreadLocal<ExtractorState> state
-                  , final String cf
-                  , int numThreads
-                  )
-  {
-    for(Stream<String> stream : streams) {
-      try {
-        ForkJoinPool forkJoinPool = new ForkJoinPool(numThreads);
-        forkJoinPool.submit(() ->
-          stream.parallel().forEach(input -> {
-            ExtractorState es = state.get();
-            try {
-              es.getTable().put(extract(input, es.getExtractor(), cf, es.getConverter()));
-            } catch (IOException e) {
-              throw new IllegalStateException("Unable to continue: " + e.getMessage(), e);
-            }
-            }
-                                   )
-        ).get();
-      } catch (InterruptedException e) {
-        throw new IllegalStateException(e.getMessage(), e);
-      } catch (ExecutionException e) {
-        throw new IllegalStateException(e.getMessage(), e);
-      } finally {
-        stream.close();
-      }
-    }
-  }
-
-  private static Iterable<Stream<String>> streamify(List<File> files, int batchSize, boolean lineByLine) throws FileNotFoundException {
-    List<Stream<String>> ret = new ArrayList<>();
-    if(!lineByLine) {
-      ret.add(files.stream().map(f -> {
-        try {
-          return FileUtils.readFileToString(f);
-        } catch (IOException e) {
-          throw new IllegalStateException("File " + f.getName() + " not found.");
-        }
-      }));
-    }
-    else {
-      for(File f : files) {
-        ret.add(ReaderSpliterator.lineStream(new BufferedReader(new FileReader(f)), batchSize));
-      }
-    }
-    return ret;
+  public static void main(String... argv) throws Exception {
+    Configuration hadoopConfig = HBaseConfiguration.create();
+    String[] otherArgs = new GenericOptionsParser(hadoopConfig, argv).getRemainingArgs();
+    main(hadoopConfig, otherArgs);
   }
 
-  public static void main(String... argv) throws Exception {
-    Configuration conf = HBaseConfiguration.create();
-    String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+  public static void main(Configuration hadoopConfig, String[] argv) throws Exception {
 
-    CommandLine cli = LoadOptions.parse(new PosixParser(), otherArgs);
+    CommandLine cli = LoadOptions.parse(new PosixParser(), argv);
+    EnumMap<LoadOptions, Optional<Object>> config = LoadOptions.createConfig(cli);
     if(LoadOptions.LOG4J_PROPERTIES.has(cli)) {
       PropertyConfigurator.configure(LoadOptions.LOG4J_PROPERTIES.get(cli));
     }
     ExtractorHandler handler = ExtractorHandler.load(
-            FileUtils.readFileToString(new File(LoadOptions.EXTRACTOR_CONFIG.get(cli)))
+            FileUtils.readFileToString(new File(LoadOptions.EXTRACTOR_CONFIG.get(cli).trim()))
     );
-    int batchSize = 128;
-    if(LoadOptions.BATCH_SIZE.has(cli)) {
-      batchSize = ConversionUtils.convert(LoadOptions.BATCH_SIZE.get(cli), Integer.class);
-    }
-    int numThreads = Runtime.getRuntime().availableProcessors();
-    if(LoadOptions.NUM_THREADS.has(cli)) {
-      numThreads = ConversionUtils.convert(LoadOptions.NUM_THREADS.get(cli), Integer.class);
-    }
-    boolean lineByLine = !handler.getInputFormatHandler().getClass().equals(WholeFileFormat.class);
+    ImportStrategy strategy = (ImportStrategy) config.get(LoadOptions.IMPORT_MODE).get();
+    strategy.getImporter().importData(config, handler, hadoopConfig);
+
     SensorEnrichmentUpdateConfig sensorEnrichmentUpdateConfig = null;
     if(LoadOptions.ENRICHMENT_CONFIG.has(cli)) {
       sensorEnrichmentUpdateConfig = JSONUtils.INSTANCE.load( new File(LoadOptions.ENRICHMENT_CONFIG.get(cli))
               , SensorEnrichmentUpdateConfig.class
       );
     }
-    List<File> inputFiles = getFiles(new File(LoadOptions.INPUT.get(cli)));
-    SimpleEnrichmentFlatFileLoader loader = new SimpleEnrichmentFlatFileLoader();
-    ThreadLocal<ExtractorState> state = new ThreadLocal<ExtractorState>() {
-      @Override
-      protected ExtractorState initialValue() {
-        try {
-          ExtractorHandler handler = ExtractorHandler.load(
-            FileUtils.readFileToString(new File(LoadOptions.EXTRACTOR_CONFIG.get(cli)))
-          );
-          HTableInterface table = loader.getProvider().getTable(conf, LoadOptions.HBASE_TABLE.get(cli));
-          return new ExtractorState(table, handler.getExtractor(), new EnrichmentConverter());
-        } catch (IOException e1) {
-          throw new IllegalStateException("Unable to get table: " + e1);
-        }
-      }
-    };
-
-    loader.load(streamify(inputFiles, batchSize, lineByLine), state, LoadOptions.HBASE_CF.get(cli), numThreads);
 
     if(sensorEnrichmentUpdateConfig != null) {
       sensorEnrichmentUpdateConfig.updateSensorConfigs();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/ImportStrategy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/ImportStrategy.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/ImportStrategy.java
new file mode 100644
index 0000000..df88640
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/ImportStrategy.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.importer;
+
+import java.util.Optional;
+
+public enum ImportStrategy {
+  LOCAL(LocalImporter.INSTANCE),
+  MR(MapReduceImporter.INSTANCE)
+  ;
+  private Importer importer;
+
+  ImportStrategy(Importer importer) {
+    this.importer = importer;
+  }
+
+  public Importer getImporter() {
+    return importer;
+  }
+
+  public static Optional<ImportStrategy> getStrategy(String strategyName) {
+    if(strategyName == null) {
+      return Optional.empty();
+    }
+    for(ImportStrategy strategy : values()) {
+      if(strategy.name().equalsIgnoreCase(strategyName.trim())) {
+        return Optional.of(strategy);
+      }
+    }
+    return Optional.empty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Importer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Importer.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Importer.java
new file mode 100644
index 0000000..81ede08
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Importer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.nonbulk.flatfile.importer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.nonbulk.flatfile.LoadOptions;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+
+import java.io.IOException;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Optional;
+
+public interface Importer {
+  void importData(EnumMap<LoadOptions, Optional<Object>> config, ExtractorHandler handler , final Configuration hadoopConfig) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalImporter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalImporter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalImporter.java
new file mode 100644
index 0000000..652a4c3
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalImporter.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.importer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.metron.common.utils.file.ReaderSpliterator;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat;
+import org.apache.metron.dataloads.nonbulk.flatfile.ExtractorState;
+import org.apache.metron.dataloads.nonbulk.flatfile.LoadOptions;
+import org.apache.metron.dataloads.nonbulk.flatfile.location.Location;
+import org.apache.metron.dataloads.nonbulk.flatfile.location.LocationStrategy;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+import org.apache.metron.enrichment.converter.HbaseConverter;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.hbase.HTableProvider;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public enum LocalImporter implements Importer {
+  INSTANCE;
+
+  public interface HTableProviderRetriever {
+    HTableProvider retrieve();
+  }
+
+
+  @Override
+  public void importData( final EnumMap<LoadOptions, Optional<Object>> config
+                        , final ExtractorHandler handler
+                        , final Configuration hadoopConfig
+                         ) throws IOException {
+    importData(config, handler, hadoopConfig, () -> new HTableProvider());
+
+  }
+  public void importData( final EnumMap<LoadOptions, Optional<Object>> config
+                        , final ExtractorHandler handler
+                        , final Configuration hadoopConfig
+                        , final HTableProviderRetriever provider
+                         ) throws IOException {
+    ThreadLocal<ExtractorState> state = new ThreadLocal<ExtractorState>() {
+      @Override
+      protected ExtractorState initialValue() {
+        try {
+          HTableInterface table = provider.retrieve().getTable(hadoopConfig, (String) config.get(LoadOptions.HBASE_TABLE).get());
+          return new ExtractorState(table, handler.getExtractor(), new EnrichmentConverter(), hadoopConfig);
+        } catch (IOException e1) {
+          throw new IllegalStateException("Unable to get table: " + e1);
+        }
+      }
+    };
+    boolean quiet = (boolean) config.get(LoadOptions.QUIET).get();
+    boolean lineByLine = !handler.getInputFormat().getClass().equals(WholeFileFormat.class);
+    List<String> inputs = (List<String>) config.get(LoadOptions.INPUT).get();
+    String cf = (String) config.get(LoadOptions.HBASE_CF).get();
+    if(!lineByLine) {
+      extractWholeFiles(inputs, state, cf, quiet);
+    }
+    else {
+      int batchSize = (int) config.get(LoadOptions.BATCH_SIZE).get();
+      int numThreads = (int) config.get(LoadOptions.NUM_THREADS).get();
+      extractLineByLine(inputs, state, cf, batchSize, numThreads, quiet);
+    }
+
+  }
+
+  public void extractLineByLine( List<String> inputs
+                               , ThreadLocal<ExtractorState> state
+                               , String cf
+                               , int batchSize
+                               , int numThreads
+                               , boolean quiet
+                               ) throws IOException {
+    inputs.stream().map(input -> LocationStrategy.getLocation(input, state.get().getFileSystem()))
+                   .forEach( loc -> {
+                      final Progress progress = new Progress();
+                      if(!quiet) {
+                        System.out.println("\nProcessing " + loc.toString());
+                      }
+                      try (Stream<String> stream = ReaderSpliterator.lineStream(loc.openReader(), batchSize)) {
+                        ForkJoinPool forkJoinPool = new ForkJoinPool(numThreads);
+                        forkJoinPool.submit(() ->
+                          stream.parallel().forEach(input -> {
+                            ExtractorState es = state.get();
+                            try {
+                              es.getTable().put(extract(input, es.getExtractor(), cf, es.getConverter(), progress, quiet));
+                            } catch (IOException e) {
+                              throw new IllegalStateException("Unable to continue: " + e.getMessage(), e);
+                            }
+                                                             }
+                                       )
+                               ).get();
+                             } catch (Exception e) {
+                               throw new IllegalStateException(e.getMessage(), e);
+                             }
+                                  }
+                   );
+  }
+
+  public void extractWholeFiles( List<String> inputs, ThreadLocal<ExtractorState> state, String cf, boolean quiet) throws IOException {
+    final Progress progress = new Progress();
+    final List<Location> locations = new ArrayList<>();
+      Location.fileVisitor(inputs, loc -> locations.add(loc), state.get().getFileSystem());
+      locations.parallelStream().forEach(loc -> {
+        try(BufferedReader br = loc.openReader()) {
+          String s = br.lines().collect(Collectors.joining());
+          state.get().getTable().put(extract( s
+                                            , state.get().getExtractor()
+                                            , cf, state.get().getConverter()
+                                            , progress
+                                            , quiet
+                                            )
+                                    );
+        } catch (IOException e) {
+          throw new IllegalStateException("Unable to read " + loc + ": " + e.getMessage(), e);
+        }
+      });
+  }
+
+
+  public List<Put> extract(String line
+                     , Extractor extractor
+                     , String cf
+                     , HbaseConverter converter
+                     , final Progress progress
+                     , final boolean quiet
+                     ) throws IOException
+  {
+    List<Put> ret = new ArrayList<>();
+    Iterable<LookupKV> kvs = extractor.extract(line);
+    for(LookupKV kv : kvs) {
+      Put put = converter.toPut(cf, kv.getKey(), kv.getValue());
+      ret.add(put);
+    }
+    if(!quiet) {
+      progress.update();
+    }
+    return ret;
+  }
+
+
+  public static class Progress {
+    private int count = 0;
+    private String anim= "|/-\\";
+
+    public synchronized void update() {
+      int currentCount = count++;
+      System.out.print("\rProcessed " + currentCount + " - " + anim.charAt(currentCount % anim.length()));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
new file mode 100644
index 0000000..e83bdd6
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.importer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.hbase.mr.BulkLoadMapper;
+import org.apache.metron.dataloads.nonbulk.flatfile.LoadOptions;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+
+import java.io.IOException;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+
+public enum MapReduceImporter implements Importer{
+  INSTANCE
+  ;
+
+  private static final Logger LOG = Logger.getLogger(MapReduceImporter.class);
+
+  @Override
+  public void importData(EnumMap<LoadOptions, Optional<Object>> config
+                        , ExtractorHandler handler
+                        , Configuration hadoopConfig
+                        ) throws IOException {
+    String table = (String) config.get(LoadOptions.HBASE_TABLE).get();
+    String cf = (String) config.get(LoadOptions.HBASE_CF).get();
+    String extractorConfigContents  = (String) config.get(LoadOptions.EXTRACTOR_CONFIG).get();
+    Job job = Job.getInstance(hadoopConfig);
+    List<String> inputs = (List<String>) config.get(LoadOptions.INPUT).get();
+    job.setJobName("MapReduceImporter: " + inputs.stream().collect(Collectors.joining(",")) + " => " +  table + ":" + cf);
+    LOG.info("Configuring " + job.getJobName());
+    job.setJarByClass(MapReduceImporter.class);
+    job.setMapperClass(org.apache.metron.dataloads.hbase.mr.BulkLoadMapper.class);
+    job.setOutputFormatClass(TableOutputFormat.class);
+    job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
+    job.getConfiguration().set(BulkLoadMapper.COLUMN_FAMILY_KEY, cf);
+    job.getConfiguration().set(BulkLoadMapper.CONFIG_KEY, extractorConfigContents);
+    job.getConfiguration().set(BulkLoadMapper.CONVERTER_KEY, EnrichmentConverter.class.getName());
+    job.setOutputKeyClass(ImmutableBytesWritable.class);
+    job.setOutputValueClass(Put.class);
+    job.setNumReduceTasks(0);
+    List<Path> paths = inputs.stream().map(p -> new Path(p)).collect(Collectors.toList());
+    handler.getInputFormat().set(job, paths, handler.getConfig());
+    try {
+      job.waitForCompletion(true);
+    } catch (Exception e) {
+      throw new IllegalStateException("Unable to complete job: " + e.getMessage(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/FileLocation.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/FileLocation.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/FileLocation.java
new file mode 100644
index 0000000..267a6fb
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/FileLocation.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.location;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+public class FileLocation implements RawLocation {
+  @Override
+  public Optional<List<String>> list(String loc) {
+    List<String> children = new ArrayList<>();
+    for(File f : new File(loc).listFiles()) {
+        children.add(f.getPath());
+      }
+    return Optional.of(children);
+  }
+
+  @Override
+  public boolean exists(String loc) throws IOException {
+    return new File(loc).exists();
+  }
+
+  @Override
+  public boolean isDirectory(String loc) throws IOException {
+    return new File(loc).isDirectory();
+  }
+
+  @Override
+  public InputStream openInputStream(String loc) throws IOException {
+    return new FileInputStream(loc);
+  }
+
+  @Override
+  public boolean match(String loc) {
+    return new File(loc).exists();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/HDFSLocation.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/HDFSLocation.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/HDFSLocation.java
new file mode 100644
index 0000000..bae6a82
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/HDFSLocation.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.location;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public class HDFSLocation implements RawLocation<FileSystem> {
+
+  FileSystem fs = null;
+
+  @Override
+  public Optional<List<String>> list(String loc) throws IOException {
+    List<String> children = new ArrayList<>();
+    for(FileStatus f : fs.listStatus(new Path(loc)) ) {
+        children.add(f.getPath().toString());
+      }
+    return Optional.of(children);
+  }
+
+  @Override
+  public boolean exists(String loc) throws IOException {
+    return fs.exists(new Path(loc));
+  }
+
+  @Override
+  public boolean isDirectory(String loc) throws IOException {
+    return fs.isDirectory(new Path(loc));
+  }
+
+  @Override
+  public InputStream openInputStream(String loc) throws IOException {
+    return fs.open(new Path(loc));
+  }
+
+  @Override
+  public boolean match(String loc) {
+    try {
+      return loc.startsWith("hdfs://") && exists(loc);
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public void init(FileSystem state) {
+    this.fs = state;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/Location.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/Location.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/Location.java
new file mode 100644
index 0000000..81eada6
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/Location.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.location;
+
+import org.apache.hadoop.fs.*;
+import org.apache.metron.dataloads.nonbulk.flatfile.importer.LocalImporter;
+
+import java.io.*;
+import java.util.*;
+import java.util.function.Consumer;
+
+/**
+ * Location can be either a local file or a file on HDFS.
+ */
+public class Location {
+
+  private String loc;
+  private RawLocation<?> rawLocation;
+
+  public Location(String loc, RawLocation rawLocation) {
+    this.loc = loc;
+    this.rawLocation = rawLocation;
+
+  }
+
+  public RawLocation<?> getRawLocation() {
+    return rawLocation;
+  }
+
+  public Optional<List<Location>> getChildren() throws IOException {
+      if(exists() && isDirectory()) {
+        List<Location> children = new ArrayList<>();
+        for(String child : rawLocation.list(loc).orElse(new ArrayList<>())) {
+          children.add(new Location(child, rawLocation));
+        }
+        return Optional.of(children);
+      }
+      else {
+        return Optional.empty();
+      }
+  }
+
+
+  public boolean exists() throws IOException {
+    return rawLocation.exists(loc);
+  }
+
+  public boolean isDirectory() throws IOException {
+    return rawLocation.isDirectory(loc);
+  }
+
+  public BufferedReader openReader() throws IOException {
+    return rawLocation.openReader(loc);
+  }
+
+  @Override
+  public String toString() {
+    return loc;
+  }
+
+  public static void fileVisitor(List<String> inputs
+                         , final Consumer<Location> importConsumer
+                         , final FileSystem fs
+                         ) throws IOException {
+    Stack<Location> stack = new Stack<>();
+    for(String input : inputs) {
+      Location loc = LocationStrategy.getLocation(input, fs);
+      if(loc.exists()) {
+        stack.add(loc);
+      }
+    }
+    while(!stack.empty()) {
+      Location loc = stack.pop();
+      if(loc.isDirectory()) {
+        for(Location child : loc.getChildren().orElse(Collections.emptyList())) {
+          stack.push(child);
+        }
+      }
+      else {
+        importConsumer.accept(loc);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/LocationStrategy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/LocationStrategy.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/LocationStrategy.java
new file mode 100644
index 0000000..338a1e2
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/LocationStrategy.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.location;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+public enum LocationStrategy {
+  HDFS(fs -> {
+    HDFSLocation location = new HDFSLocation();
+    location.init(fs);
+    return location;
+  })
+  ,FILE(fs -> {
+    FileLocation location = new FileLocation();
+    location.init(fs);
+    return location;
+  })
+  ,URL(fs -> {
+    URLLocation location = new URLLocation();
+    location.init(fs);
+    return location;
+  })
+  ;
+  Function<FileSystem, RawLocation<?>> locationCreator;
+
+  LocationStrategy(Function<FileSystem, RawLocation<?>> locationCreator) {
+    this.locationCreator = locationCreator;
+  }
+
+  public static Optional<RawLocation<?>> getRawLocation(String loc, FileSystem fs) {
+    for(LocationStrategy strategy : values()) {
+      RawLocation<?> location = strategy.locationCreator.apply(fs);
+      if(location.match(loc)) {
+        return Optional.of(location);
+      }
+    }
+    return Optional.empty();
+  }
+
+  public static Location getLocation(String loc, FileSystem fs) {
+    Optional<RawLocation<?>> rawLoc = getRawLocation(loc, fs);
+    if(rawLoc.isPresent()) {
+      return new Location(loc, rawLoc.get());
+    }
+    else {
+      throw new IllegalStateException("Unsupported type: " + loc);
+    }
+  }
+}