You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/05/31 21:00:29 UTC

[3/3] incubator-metron git commit: METRON-174 Storm consumption of hbase enrichment reference data. This closes apache/incubator-metron#127

METRON-174 Storm consumption of hbase enrichment reference data.  This closes apache/incubator-metron#127


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/d3efe3fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/d3efe3fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/d3efe3fb

Branch: refs/heads/master
Commit: d3efe3fb40da5707ac3b4d1059a6344176afd84b
Parents: ab8163b
Author: cstella <ce...@gmail.com>
Authored: Tue May 31 17:00:14 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Tue May 31 17:00:14 2016 -0400

----------------------------------------------------------------------
 metron-platform/metron-common/pom.xml           |    5 +
 .../metron/common/bolt/ConfiguredBolt.java      |    9 +-
 .../common/bolt/ConfiguredEnrichmentBolt.java   |   15 +-
 .../common/bolt/ConfiguredParserBolt.java       |   14 +-
 .../configuration/SensorParserConfig.java       |   17 +
 .../enrichment/SensorEnrichmentConfig.java      |   17 +-
 .../writer/EnrichmentWriterConfiguration.java   |   50 +
 .../writer/ParserWriterConfiguration.java       |   54 +
 .../writer/SingleBatchConfigurationFacade.java  |   48 +
 .../writer/WriterConfiguration.java             |   29 +
 .../apache/metron/common/csv/CSVConverter.java  |  124 ++
 .../common/interfaces/BulkMessageWriter.java    |   13 +-
 .../metron/common/interfaces/MessageWriter.java |    7 +-
 .../metron/common/utils/ConversionUtils.java    |   40 +
 .../metron/common/writer/AbstractWriter.java    |   26 +
 .../common/writer/BulkWriterComponent.java      |  115 ++
 .../common/writer/WriterToBulkWriter.java       |   54 +
 .../bolt/ConfiguredEnrichmentBoltTest.java      |    8 +-
 .../common/bolt/ConfiguredParserBoltTest.java   |    8 +-
 .../common/utils/ConversionUtilsTest.java       |   32 +
 .../dataloads/extractor/csv/CSVExtractor.java   |   83 +-
 .../extractor/csv/LookupConverter.java          |    2 +-
 .../extractor/csv/LookupConverters.java         |    2 +-
 .../extractor/stix/types/AddressHandler.java    |    2 +-
 .../extractor/stix/types/DomainHandler.java     |    2 +-
 .../extractor/stix/types/HostnameHandler.java   |    2 +-
 .../dataloads/extractor/ExtractorTest.java      |    2 +-
 .../hbase/HBaseEnrichmentConverterTest.java     |    2 +-
 .../LeastRecentlyUsedPrunerIntegrationTest.java |    6 +-
 .../writer/ElasticsearchWriter.java             |   10 +-
 .../simplehbase/SimpleHBaseAdapter.java         |    2 +-
 .../enrichment/bolt/BulkMessageWriterBolt.java  |   49 +-
 .../enrichment/bolt/EnrichmentJoinBolt.java     |    2 +-
 .../enrichment/bolt/EnrichmentSplitterBolt.java |    2 +-
 .../enrichment/bolt/GenericEnrichmentBolt.java  |    2 +-
 .../enrichment/bolt/ThreatIntelJoinBolt.java    |    4 +-
 .../bolt/ThreatIntelSplitterBolt.java           |    2 +-
 .../enrichment/converter/EnrichmentValue.java   |   12 +-
 .../hbase/SimpleHbaseEnrichmentWriter.java      |  282 ++++
 .../apache/metron/writer/hdfs/HdfsWriter.java   |    8 +-
 .../simplehbase/SimpleHBaseAdapterTest.java     |    4 +-
 .../threatintel/ThreatIntelAdapterTest.java     |    2 +-
 .../bolt/BulkMessageWriterBoltTest.java         |   11 +-
 .../converter/EnrichmentConverterTest.java      |    2 +-
 .../apache/metron/hbase/writer/HBaseWriter.java |    3 +-
 .../integration/EnrichmentIntegrationTest.java  |    6 +-
 .../components/ConfigUploadComponent.java       |   22 +-
 .../components/KafkaWithZKComponent.java        |    1 +
 .../integration/mock/MockTableProvider.java     |   45 +
 metron-platform/metron-parsers/pom.xml          |   10 +
 .../apache/metron/parsers/bolt/ParserBolt.java  |   74 +-
 .../apache/metron/parsers/csv/CSVParser.java    |   92 ++
 .../parsers/topology/ParserTopologyBuilder.java |   37 +-
 .../metron/parsers/writer/KafkaWriter.java      |   68 +-
 .../metron/parsers/bolt/ParserBoltTest.java     |  227 +++-
 .../metron/parsers/csv/CSVParserTest.java       |   99 ++
 .../SimpleHBaseEnrichmentWriterTest.java        |  178 +++
 ...pleHbaseEnrichmentWriterIntegrationTest.java |  169 +++
 .../apache/metron/solr/writer/SolrWriter.java   |    7 +-
 .../metron/solr/writer/SolrWriterTest.java      |   11 +-
 .../org/apache/metron/test/mock/MockHTable.java | 1224 +++++++++---------
 61 files changed, 2626 insertions(+), 829 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml
index 172d387..10c192c 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -210,6 +210,11 @@
             <version>0.1BETA</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>commons-beanutils</groupId>
+            <artifactId>commons-beanutils</artifactId>
+            <version>1.8.3</version>
+        </dependency>
     </dependencies>
 
     <reporting>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
index 2d5e241..8c2ac14 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
@@ -30,11 +30,12 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.log4j.Logger;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.Configurations;
 
 import java.io.IOException;
 import java.util.Map;
 
-public abstract class ConfiguredBolt extends BaseRichBolt {
+public abstract class ConfiguredBolt<CONFIG_T extends Configurations> extends BaseRichBolt {
 
   private static final Logger LOG = Logger.getLogger(ConfiguredBolt.class);
 
@@ -42,7 +43,7 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
 
   protected CuratorFramework client;
   protected TreeCache cache;
-
+  private final CONFIG_T configurations = defaultConfigurations();
   public ConfiguredBolt(String zookeeperUrl) {
     this.zookeeperUrl = zookeeperUrl;
   }
@@ -57,6 +58,10 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
 
   public void reloadCallback(String name, ConfigurationType type) {
   }
+  public CONFIG_T getConfigurations() {
+    return configurations;
+  }
+  protected abstract CONFIG_T defaultConfigurations();
 
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
index e03e793..6fed7d4 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
@@ -25,24 +25,23 @@ import org.apache.metron.common.configuration.EnrichmentConfigurations;
 
 import java.io.IOException;
 
-public abstract class ConfiguredEnrichmentBolt extends ConfiguredBolt {
+public abstract class ConfiguredEnrichmentBolt extends ConfiguredBolt<EnrichmentConfigurations> {
 
   private static final Logger LOG = Logger.getLogger(ConfiguredEnrichmentBolt.class);
 
-  protected final EnrichmentConfigurations configurations = new EnrichmentConfigurations();
-
   public ConfiguredEnrichmentBolt(String zookeeperUrl) {
     super(zookeeperUrl);
   }
 
-  public EnrichmentConfigurations getConfigurations() {
-    return configurations;
+  @Override
+  protected EnrichmentConfigurations defaultConfigurations() {
+    return new EnrichmentConfigurations();
   }
 
   @Override
   public void loadConfig() {
     try {
-      ConfigurationsUtils.updateEnrichmentConfigsFromZookeeper(configurations, client);
+      ConfigurationsUtils.updateEnrichmentConfigsFromZookeeper(getConfigurations(), client);
     } catch (Exception e) {
       LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
     }
@@ -53,10 +52,10 @@ public abstract class ConfiguredEnrichmentBolt extends ConfiguredBolt {
     if (data.length != 0) {
       String name = path.substring(path.lastIndexOf("/") + 1);
       if (path.startsWith(ConfigurationType.ENRICHMENT.getZookeeperRoot())) {
-        configurations.updateSensorEnrichmentConfig(name, data);
+        getConfigurations().updateSensorEnrichmentConfig(name, data);
         reloadCallback(name, ConfigurationType.ENRICHMENT);
       } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
-        configurations.updateGlobalConfig(data);
+        getConfigurations().updateGlobalConfig(data);
         reloadCallback(name, ConfigurationType.GLOBAL);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
index feab40e..cd379e7 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
@@ -26,10 +26,11 @@ import org.apache.metron.common.configuration.SensorParserConfig;
 
 import java.io.IOException;
 
-public abstract class ConfiguredParserBolt extends ConfiguredBolt {
+public abstract class ConfiguredParserBolt extends ConfiguredBolt<ParserConfigurations> {
 
   private static final Logger LOG = Logger.getLogger(ConfiguredEnrichmentBolt.class);
 
+
   protected final ParserConfigurations configurations = new ParserConfigurations();
   private String sensorType;
   public ConfiguredParserBolt(String zookeeperUrl, String sensorType) {
@@ -41,8 +42,9 @@ public abstract class ConfiguredParserBolt extends ConfiguredBolt {
     return getConfigurations().getSensorParserConfig(sensorType);
   }
 
-  public ParserConfigurations getConfigurations() {
-    return configurations;
+  @Override
+  protected ParserConfigurations defaultConfigurations() {
+    return new ParserConfigurations();
   }
 
   public String getSensorType() {
@@ -51,7 +53,7 @@ public abstract class ConfiguredParserBolt extends ConfiguredBolt {
   @Override
   public void loadConfig() {
     try {
-      ConfigurationsUtils.updateParserConfigsFromZookeeper(configurations, client);
+      ConfigurationsUtils.updateParserConfigsFromZookeeper(getConfigurations(), client);
     } catch (Exception e) {
       LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
     }
@@ -62,10 +64,10 @@ public abstract class ConfiguredParserBolt extends ConfiguredBolt {
     if (data.length != 0) {
       String name = path.substring(path.lastIndexOf("/") + 1);
       if (path.startsWith(ConfigurationType.PARSER.getZookeeperRoot())) {
-        configurations.updateSensorParserConfig(name, data);
+        getConfigurations().updateSensorParserConfig(name, data);
         reloadCallback(name, ConfigurationType.PARSER);
       } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
-        configurations.updateGlobalConfig(data);
+        getConfigurations().updateGlobalConfig(data);
         reloadCallback(name, ConfigurationType.GLOBAL);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
index 5d1bda9..82b407f 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
@@ -18,6 +18,7 @@
 package org.apache.metron.common.configuration;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableList;
 import org.apache.metron.common.utils.JSONUtils;
 
 import java.io.IOException;
@@ -31,6 +32,14 @@ public class SensorParserConfig {
   private String parserClassName;
   private String filterClassName;
   private String sensorTopic;
+  private String writerClassName;
+
+  public String getWriterClassName() {
+    return writerClassName;
+  }
+  public void setWriterClassName(String classNames) {
+    this.writerClassName = classNames;
+  }
   private Map<String, Object> parserConfig = new HashMap<>();
   private List<FieldTransformer> fieldTransformations = new ArrayList<>();
 
@@ -97,6 +106,8 @@ public class SensorParserConfig {
             "parserClassName='" + parserClassName + '\'' +
             ", filterClassName='" + filterClassName + '\'' +
             ", sensorTopic='" + sensorTopic + '\'' +
+            ", writerClassName='" + writerClassName + '\'' +
+            ", parserConfig=" + parserConfig +
             ", parserConfig=" + parserConfig +
             ", fieldTransformations=" + fieldTransformations +
             '}';
@@ -115,6 +126,10 @@ public class SensorParserConfig {
       return false;
     if (getSensorTopic() != null ? !getSensorTopic().equals(that.getSensorTopic()) : that.getSensorTopic() != null)
       return false;
+    if (getWriterClassName() != null ? !getWriterClassName().equals(that.getWriterClassName()) : that.getWriterClassName() != null)
+      return false;
+    if (getParserConfig() != null ? !getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != null)
+      return false;
     if (getParserConfig() != null ? !getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != null)
       return false;
     return getFieldTransformations() != null ? getFieldTransformations().equals(that.getFieldTransformations()) : that.getFieldTransformations() == null;
@@ -126,6 +141,8 @@ public class SensorParserConfig {
     int result = getParserClassName() != null ? getParserClassName().hashCode() : 0;
     result = 31 * result + (getFilterClassName() != null ? getFilterClassName().hashCode() : 0);
     result = 31 * result + (getSensorTopic() != null ? getSensorTopic().hashCode() : 0);
+    result = 31 * result + (getWriterClassName() != null ? getWriterClassName().hashCode() : 0);
+    result = 31 * result + (getParserConfig() != null ? getParserConfig().hashCode() : 0);
     result = 31 * result + (getParserConfig() != null ? getParserConfig().hashCode() : 0);
     result = 31 * result + (getFieldTransformations() != null ? getFieldTransformations().hashCode() : 0);
     return result;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentConfig.java
index 562a928..c5538b9 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentConfig.java
@@ -22,6 +22,8 @@ import org.apache.metron.common.configuration.enrichment.threatintel.ThreatIntel
 import org.apache.metron.common.utils.JSONUtils;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 public class SensorEnrichmentConfig {
 
@@ -29,6 +31,15 @@ public class SensorEnrichmentConfig {
   private int batchSize;
   private EnrichmentConfig enrichment = new EnrichmentConfig();
   private ThreatIntelConfig threatIntel = new ThreatIntelConfig();
+  private Map<String, Object> configuration = new HashMap<>();
+
+  public Map<String, Object> getConfiguration() {
+    return configuration;
+  }
+
+  public void setConfiguration(Map<String, Object> configuration) {
+    this.configuration = configuration;
+  }
 
   public EnrichmentConfig getEnrichment() {
     return enrichment;
@@ -70,6 +81,7 @@ public class SensorEnrichmentConfig {
             ", batchSize=" + batchSize +
             ", enrichment=" + enrichment +
             ", threatIntel=" + threatIntel +
+            ", configuration=" + configuration +
             '}';
   }
 
@@ -84,7 +96,9 @@ public class SensorEnrichmentConfig {
     if (getIndex() != null ? !getIndex().equals(that.getIndex()) : that.getIndex() != null) return false;
     if (getEnrichment() != null ? !getEnrichment().equals(that.getEnrichment()) : that.getEnrichment() != null)
       return false;
-    return getThreatIntel() != null ? getThreatIntel().equals(that.getThreatIntel()) : that.getThreatIntel() == null;
+    if (getThreatIntel() != null ? !getThreatIntel().equals(that.getThreatIntel()) : that.getThreatIntel() != null)
+      return false;
+    return getConfiguration() != null ? getConfiguration().equals(that.getConfiguration()) : that.getConfiguration() == null;
 
   }
 
@@ -94,6 +108,7 @@ public class SensorEnrichmentConfig {
     result = 31 * result + getBatchSize();
     result = 31 * result + (getEnrichment() != null ? getEnrichment().hashCode() : 0);
     result = 31 * result + (getThreatIntel() != null ? getThreatIntel().hashCode() : 0);
+    result = 31 * result + (getConfiguration() != null ? getConfiguration().hashCode() : 0);
     return result;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfiguration.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfiguration.java
new file mode 100644
index 0000000..a8a667a
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfiguration.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration.writer;
+
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+
+import java.util.Map;
+
+public class EnrichmentWriterConfiguration implements WriterConfiguration{
+  private EnrichmentConfigurations config;
+
+  public EnrichmentWriterConfiguration(EnrichmentConfigurations config) {
+    this.config = config;
+  }
+
+  @Override
+  public int getBatchSize(String sensorName) {
+    return config.getSensorEnrichmentConfig(sensorName).getBatchSize();
+  }
+
+  @Override
+  public String getIndex(String sensorName) {
+    return config.getSensorEnrichmentConfig(sensorName).getIndex();
+  }
+
+  @Override
+  public Map<String, Object> getSensorConfig(String sensorName) {
+    return config.getSensorEnrichmentConfig(sensorName).getConfiguration();
+  }
+  @Override
+  public Map<String, Object> getGlobalConfig() {
+    return config.getGlobalConfig();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
new file mode 100644
index 0000000..fba8e65
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration.writer;
+
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.utils.ConversionUtils;
+
+import java.util.Map;
+
+public class ParserWriterConfiguration implements WriterConfiguration {
+  public static final String BATCH_CONF = "batchSize";
+  public static final String INDEX_CONF = "indexName";
+  private ParserConfigurations config;
+  public ParserWriterConfiguration(ParserConfigurations config) {
+    this.config = config;
+  }
+  @Override
+  public int getBatchSize(String sensorName) {
+    Object batchObj = config.getSensorParserConfig(sensorName).getParserConfig().get(BATCH_CONF);
+    return batchObj == null?1:ConversionUtils.convert(batchObj, Integer.class);
+  }
+
+  @Override
+  public String getIndex(String sensorName) {
+    Object indexObj = config.getSensorParserConfig(sensorName).getParserConfig().get(INDEX_CONF);
+    return indexObj.toString();
+  }
+
+  @Override
+  public Map<String, Object> getSensorConfig(String sensorName) {
+    return config.getSensorParserConfig(sensorName).getParserConfig();
+  }
+
+  @Override
+  public Map<String, Object> getGlobalConfig() {
+    return config.getGlobalConfig();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java
new file mode 100644
index 0000000..3ee25d0
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration.writer;
+
+import java.util.Map;
+
+public class SingleBatchConfigurationFacade implements WriterConfiguration {
+  private WriterConfiguration config;
+  public SingleBatchConfigurationFacade(WriterConfiguration config) {
+    this.config = config;
+  }
+
+  @Override
+  public int getBatchSize(String sensorName) {
+    return 1;
+  }
+
+  @Override
+  public String getIndex(String sensorName) {
+    return config.getIndex(sensorName);
+  }
+
+  @Override
+  public Map<String, Object> getSensorConfig(String sensorName) {
+    return config.getSensorConfig(sensorName);
+  }
+
+  @Override
+  public Map<String, Object> getGlobalConfig() {
+    return config.getGlobalConfig();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java
new file mode 100644
index 0000000..f155302
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration.writer;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public interface WriterConfiguration extends Serializable {
+  int getBatchSize(String sensorName);
+  String getIndex(String sensorName);
+  Map<String, Object> getSensorConfig(String sensorName);
+  Map<String, Object> getGlobalConfig();
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/csv/CSVConverter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/csv/CSVConverter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/csv/CSVConverter.java
new file mode 100644
index 0000000..ce23deb
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/csv/CSVConverter.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.csv;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.opencsv.CSVParser;
+import com.opencsv.CSVParserBuilder;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+
+public class CSVConverter implements Serializable {
+  public static final String COLUMNS_KEY="columns";
+  public static final String SEPARATOR_KEY="separator";
+  protected Map<String, Integer> columnMap = new HashMap<>();
+  protected CSVParser parser;
+
+  public Map<String, Integer> getColumnMap() {
+    return columnMap;
+  }
+
+  public CSVParser getParser() {
+    return parser;
+  }
+
+  public Map<String, String> toMap(String line) throws IOException {
+    if(ignore(line)) {
+      return null;
+    }
+    String[] tokens = parser.parseLine(line);
+    Map<String, String> values = new HashMap<>();
+    for(Map.Entry<String, Integer> kv : columnMap.entrySet()) {
+      values.put(kv.getKey(), tokens[kv.getValue()]);
+    }
+    return values;
+  }
+
+  public void initialize(Map<String, Object> config) {
+    if(config.containsKey(COLUMNS_KEY)) {
+      columnMap = getColumnMap(config);
+    }
+    else {
+      throw new IllegalStateException("CSVExtractor requires " + COLUMNS_KEY + " configuration");
+    }
+    char separator = ',';
+    if(config.containsKey(SEPARATOR_KEY)) {
+      separator = config.get(SEPARATOR_KEY).toString().charAt(0);
+
+    }
+    parser = new CSVParserBuilder().withSeparator(separator)
+              .build();
+  }
+  protected boolean ignore(String line) {
+    if(null == line) {
+      return true;
+    }
+    String trimmedLine = line.trim();
+    return trimmedLine.startsWith("#") || isEmpty(trimmedLine);
+  }
+  public static Map.Entry<String, Integer> getColumnMapEntry(String column, int i) {
+    if(column.contains(":")) {
+      Iterable<String> tokens = Splitter.on(':').split(column);
+      String col = Iterables.getFirst(tokens, null);
+      Integer pos = Integer.parseInt(Iterables.getLast(tokens));
+      return new AbstractMap.SimpleEntry<>(col, pos);
+    }
+    else {
+      return new AbstractMap.SimpleEntry<>(column, i);
+    }
+
+  }
+  public static Map<String, Integer> getColumnMap(Map<String, Object> config) {
+    Map<String, Integer> columnMap = new HashMap<>();
+    if(config.containsKey(COLUMNS_KEY)) {
+      Object columnsObj = config.get(COLUMNS_KEY);
+      if(columnsObj instanceof String) {
+        String columns = (String)columnsObj;
+        int i = 0;
+        for (String column : Splitter.on(',').split(columns)) {
+          Map.Entry<String, Integer> e = getColumnMapEntry(column, i++);
+          columnMap.put(e.getKey(), e.getValue());
+        }
+      }
+      else if(columnsObj instanceof List) {
+        List columns = (List)columnsObj;
+        int i = 0;
+        for(Object column : columns) {
+          Map.Entry<String, Integer> e = getColumnMapEntry(column.toString(), i++);
+          columnMap.put(e.getKey(), e.getValue());
+        }
+      }
+      else if(columnsObj instanceof Map) {
+        Map<Object, Object> map = (Map<Object, Object>)columnsObj;
+        for(Map.Entry<Object, Object> e : map.entrySet()) {
+          columnMap.put(e.getKey().toString(), Integer.parseInt(e.getValue().toString()));
+        }
+      }
+    }
+    return columnMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java
index aaa6c51..24cb823 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java
@@ -18,14 +18,21 @@
 package org.apache.metron.common.interfaces;
 
 import backtype.storm.tuple.Tuple;
+import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
 
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
-public interface BulkMessageWriter<T> extends AutoCloseable {
+public interface BulkMessageWriter<MESSAGE_T> extends AutoCloseable, Serializable {
 
-  void init(Map stormConf, EnrichmentConfigurations configuration) throws Exception;
-  void write(String sensorType, EnrichmentConfigurations configurations, List<Tuple> tuples, List<T> messages) throws Exception;
+  void init(Map stormConf, WriterConfiguration config) throws Exception;
+  void write( String sensorType
+            , WriterConfiguration configurations
+            , Iterable<Tuple> tuples
+            , List<MESSAGE_T> messages
+            ) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/MessageWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/MessageWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/MessageWriter.java
index a90a8cb..827bf8f 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/MessageWriter.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/MessageWriter.java
@@ -19,9 +19,12 @@ package org.apache.metron.common.interfaces;
 
 import backtype.storm.tuple.Tuple;
 import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
 
-public interface MessageWriter<T> extends AutoCloseable {
+import java.io.Serializable;
+
+public interface MessageWriter<T> extends AutoCloseable, Serializable {
 
   void init();
-  void write(String sensorType, Configurations configurations, Tuple tuple, T message) throws Exception;
+  void write(String sensorType, WriterConfiguration configurations, Tuple tuple, T message) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ConversionUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ConversionUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ConversionUtils.java
new file mode 100644
index 0000000..29ec908
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ConversionUtils.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.utils;
+
+import org.apache.commons.beanutils.BeanUtilsBean2;
+import org.apache.commons.beanutils.ConvertUtilsBean;
+
+public class ConversionUtils {
+  private static ThreadLocal<ConvertUtilsBean> UTILS_BEAN  = new ThreadLocal<ConvertUtilsBean>() {
+    @Override
+    protected ConvertUtilsBean initialValue() {
+      ConvertUtilsBean ret = BeanUtilsBean2.getInstance().getConvertUtils();
+      ret.deregister();
+      ret.register(false,true, 1);
+      return ret;
+    }
+  };
+  public static <T> T convert(Object o, Class<T> clazz) {
+    if(o == null) {
+      return null;
+    }
+    return clazz.cast(UTILS_BEAN.get().convert(o, clazz));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/AbstractWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/AbstractWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/AbstractWriter.java
new file mode 100644
index 0000000..56a4e48
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/AbstractWriter.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.writer;
+
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+
+public abstract class AbstractWriter {
+  public AbstractWriter() {}
+  public abstract void configure(String sensorName, WriterConfiguration configuration);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
new file mode 100644
index 0000000..320d497
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.writer;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import com.google.common.collect.Iterables;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.common.utils.MessageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.function.Function;
+
+public class BulkWriterComponent<MESSAGE_T> {
+  public static final Logger LOG = LoggerFactory
+            .getLogger(BulkWriterComponent.class);
+  private Map<String, Collection<Tuple>> sensorTupleMap = new HashMap<>();
+  private Map<String, List<MESSAGE_T>> sensorMessageMap = new HashMap<>();
+  private OutputCollector collector;
+  private boolean handleCommit = true;
+  private boolean handleError = true;
+  public BulkWriterComponent(OutputCollector collector) {
+    this.collector = collector;
+  }
+
+  public BulkWriterComponent(OutputCollector collector, boolean handleCommit, boolean handleError) {
+    this(collector);
+    this.handleCommit = handleCommit;
+    this.handleError = handleError;
+  }
+
+  public void commit(Iterable<Tuple> tuples) {
+    tuples.forEach(t -> collector.ack(t));
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Acking " + Iterables.size(tuples) + " tuples");
+    }
+  }
+
+  public void error(Exception e, Iterable<Tuple> tuples) {
+    tuples.forEach(t -> collector.fail(t));
+    LOG.error("Failing " + Iterables.size(tuples) + " tuples", e);
+    ErrorUtils.handleError(collector, e, Constants.ERROR_STREAM);
+  }
+
+  protected Collection<Tuple> createTupleCollection() {
+    return new ArrayList<>();
+  }
+
+
+  public void write( String sensorType
+                   , Tuple tuple
+                   , MESSAGE_T message
+                   , BulkMessageWriter<MESSAGE_T> bulkMessageWriter
+                   , WriterConfiguration configurations
+                   ) throws Exception
+  {
+    int batchSize = configurations.getBatchSize(sensorType);
+    Collection<Tuple> tupleList = sensorTupleMap.get(sensorType);
+    if (tupleList == null) {
+      tupleList = createTupleCollection();
+    }
+    tupleList.add(tuple);
+    List<MESSAGE_T> messageList = sensorMessageMap.get(sensorType);
+    if (messageList == null) {
+      messageList = new ArrayList<>();
+    }
+    messageList.add(message);
+
+    if (tupleList.size() < batchSize) {
+      sensorTupleMap.put(sensorType, tupleList);
+      sensorMessageMap.put(sensorType, messageList);
+    } else {
+      try {
+        bulkMessageWriter.write(sensorType, configurations, tupleList, messageList);
+        if(handleCommit) {
+          commit(tupleList);
+        }
+
+      } catch (Exception e) {
+        if(handleError) {
+          error(e, tupleList);
+        }
+        else {
+          throw e;
+        }
+      }
+      finally {
+        sensorTupleMap.remove(sensorType);
+        sensorMessageMap.remove(sensorType);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/WriterToBulkWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/WriterToBulkWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/WriterToBulkWriter.java
new file mode 100644
index 0000000..b0bde6c
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/WriterToBulkWriter.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.writer;
+
+import backtype.storm.tuple.Tuple;
+import com.google.common.collect.Iterables;
+import org.apache.metron.common.configuration.writer.SingleBatchConfigurationFacade;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.metron.common.interfaces.MessageWriter;
+
+import java.util.List;
+import java.util.Map;
+
+public class WriterToBulkWriter<MESSAGE_T> implements BulkMessageWriter<MESSAGE_T> {
+  MessageWriter<MESSAGE_T> messageWriter;
+
+  public WriterToBulkWriter(MessageWriter<MESSAGE_T> messageWriter) {
+    this.messageWriter = messageWriter;
+  }
+  @Override
+  public void init(Map stormConf, WriterConfiguration config) throws Exception {
+    messageWriter.init();
+  }
+
+  @Override
+  public void write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<MESSAGE_T> messages) throws Exception {
+    if(messages.size() > 1) {
+      throw new IllegalStateException("WriterToBulkWriter expects a batch of exactly 1");
+    }
+    messageWriter.write(sensorType, configurations, Iterables.getFirst(tuples, null), Iterables.getFirst(messages, null));
+  }
+
+  @Override
+  public void close() throws Exception {
+    messageWriter.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java
index c5f2304..520b430 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java
@@ -98,20 +98,20 @@ public class ConfiguredEnrichmentBoltTest extends BaseConfiguredBoltTest {
     StandAloneConfiguredEnrichmentBolt configuredBolt = new StandAloneConfiguredEnrichmentBolt(zookeeperUrl);
     configuredBolt.prepare(new HashMap(), topologyContext, outputCollector);
     waitForConfigUpdate(enrichmentConfigurationTypes);
-    Assert.assertEquals(sampleConfigurations, configuredBolt.configurations);
+    Assert.assertEquals(sampleConfigurations, configuredBolt.getConfigurations());
 
     configsUpdated = new HashSet<>();
     Map<String, Object> sampleGlobalConfig = sampleConfigurations.getGlobalConfig();
     sampleGlobalConfig.put("newGlobalField", "newGlobalValue");
     ConfigurationsUtils.writeGlobalConfigToZookeeper(sampleGlobalConfig, zookeeperUrl);
     waitForConfigUpdate(ConfigurationType.GLOBAL.getName());
-    Assert.assertEquals("Add global config field", sampleConfigurations.getGlobalConfig(), configuredBolt.configurations.getGlobalConfig());
+    Assert.assertEquals("Add global config field", sampleConfigurations.getGlobalConfig(), configuredBolt.getConfigurations().getGlobalConfig());
 
     configsUpdated = new HashSet<>();
     sampleGlobalConfig.remove("newGlobalField");
     ConfigurationsUtils.writeGlobalConfigToZookeeper(sampleGlobalConfig, zookeeperUrl);
     waitForConfigUpdate(ConfigurationType.GLOBAL.getName());
-    Assert.assertEquals("Remove global config field", sampleConfigurations, configuredBolt.configurations);
+    Assert.assertEquals("Remove global config field", sampleConfigurations, configuredBolt.getConfigurations());
 
     configsUpdated = new HashSet<>();
     String sensorType = "testSensorConfig";
@@ -131,7 +131,7 @@ public class ConfiguredEnrichmentBoltTest extends BaseConfiguredBoltTest {
     sampleConfigurations.updateSensorEnrichmentConfig(sensorType, testSensorConfig);
     ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, testSensorConfig, zookeeperUrl);
     waitForConfigUpdate(sensorType);
-    Assert.assertEquals("Add new sensor config", sampleConfigurations, configuredBolt.configurations);
+    Assert.assertEquals("Add new sensor config", sampleConfigurations, configuredBolt.getConfigurations());
     configuredBolt.cleanup();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
index 3010ed8..a1bbc13 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
@@ -97,20 +97,20 @@ public class ConfiguredParserBoltTest extends BaseConfiguredBoltTest {
     StandAloneConfiguredParserBolt configuredBolt = new StandAloneConfiguredParserBolt(zookeeperUrl);
     configuredBolt.prepare(new HashMap(), topologyContext, outputCollector);
     waitForConfigUpdate(parserConfigurationTypes);
-    Assert.assertEquals(sampleConfigurations, configuredBolt.configurations);
+    Assert.assertEquals(sampleConfigurations, configuredBolt.getConfigurations());
 
     configsUpdated = new HashSet<>();
     Map<String, Object> sampleGlobalConfig = sampleConfigurations.getGlobalConfig();
     sampleGlobalConfig.put("newGlobalField", "newGlobalValue");
     ConfigurationsUtils.writeGlobalConfigToZookeeper(sampleGlobalConfig, zookeeperUrl);
     waitForConfigUpdate(ConfigurationType.GLOBAL.getName());
-    Assert.assertEquals("Add global config field", sampleConfigurations.getGlobalConfig(), configuredBolt.configurations.getGlobalConfig());
+    Assert.assertEquals("Add global config field", sampleConfigurations.getGlobalConfig(), configuredBolt.getConfigurations().getGlobalConfig());
 
     configsUpdated = new HashSet<>();
     sampleGlobalConfig.remove("newGlobalField");
     ConfigurationsUtils.writeGlobalConfigToZookeeper(sampleGlobalConfig, zookeeperUrl);
     waitForConfigUpdate(ConfigurationType.GLOBAL.getName());
-    Assert.assertEquals("Remove global config field", sampleConfigurations, configuredBolt.configurations);
+    Assert.assertEquals("Remove global config field", sampleConfigurations, configuredBolt.getConfigurations());
 
     configsUpdated = new HashSet<>();
     String sensorType = "testSensorConfig";
@@ -123,7 +123,7 @@ public class ConfiguredParserBoltTest extends BaseConfiguredBoltTest {
     sampleConfigurations.updateSensorParserConfig(sensorType, testSensorConfig);
     ConfigurationsUtils.writeSensorParserConfigToZookeeper(sensorType, testSensorConfig, zookeeperUrl);
     waitForConfigUpdate(sensorType);
-    Assert.assertEquals("Add new sensor config", sampleConfigurations, configuredBolt.configurations);
+    Assert.assertEquals("Add new sensor config", sampleConfigurations, configuredBolt.getConfigurations());
     configuredBolt.cleanup();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ConversionUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ConversionUtilsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ConversionUtilsTest.java
new file mode 100644
index 0000000..acffeeb
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ConversionUtilsTest.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConversionUtilsTest {
+  @Test
+  public void testIntegerConversions() {
+    Object o = new Integer(1);
+    Assert.assertEquals(new Integer(1), ConversionUtils.convert(o, Integer.class));
+    Assert.assertEquals(new Integer(1), ConversionUtils.convert("1", Integer.class));
+    Assert.assertNull(ConversionUtils.convert("foo", Integer.class));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
index 28c3ece..502b46a 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
@@ -17,10 +17,7 @@
  */
 package org.apache.metron.dataloads.extractor.csv;
 
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import com.opencsv.CSVParser;
-import com.opencsv.CSVParserBuilder;
+import org.apache.metron.common.csv.CSVConverter;
 import org.apache.metron.dataloads.extractor.Extractor;
 import org.apache.metron.enrichment.lookup.LookupKV;
 import org.apache.metron.enrichment.lookup.LookupKey;
@@ -28,21 +25,17 @@ import org.apache.metron.enrichment.lookup.LookupKey;
 import java.io.IOException;
 import java.util.*;
 
-import static org.apache.commons.lang3.StringUtils.isEmpty;
 
-public class CSVExtractor implements Extractor {
-  public static final String COLUMNS_KEY="columns";
+public class CSVExtractor extends CSVConverter implements Extractor {
   public static final String INDICATOR_COLUMN_KEY="indicator_column";
   public static final String TYPE_COLUMN_KEY="type_column";
   public static final String TYPE_KEY="type";
-  public static final String SEPARATOR_KEY="separator";
   public static final String LOOKUP_CONVERTER = "lookup_converter";
 
   private int typeColumn;
   private String type;
   private int indicatorColumn;
-  private Map<String, Integer> columnMap = new HashMap<>();
-  private CSVParser parser;
+
   private LookupConverter converter = LookupConverters.ENRICHMENT.getConverter();
 
   public int getTypeColumn() {
@@ -57,13 +50,6 @@ public class CSVExtractor implements Extractor {
     return indicatorColumn;
   }
 
-  public Map<String, Integer> getColumnMap() {
-    return columnMap;
-  }
-
-  public CSVParser getParser() {
-    return parser;
-  }
 
   public LookupConverter getConverter() {
     return converter;
@@ -76,20 +62,14 @@ public class CSVExtractor implements Extractor {
     String[] tokens = parser.parseLine(line);
 
     LookupKey key = converter.toKey(getType(tokens), tokens[indicatorColumn]);
-    Map<String, String> values = new HashMap<>();
+    Map<String, Object> values = new HashMap<>();
     for(Map.Entry<String, Integer> kv : columnMap.entrySet()) {
       values.put(kv.getKey(), tokens[kv.getValue()]);
     }
     return Arrays.asList(new LookupKV(key, converter.toValue(values)));
   }
 
-  private boolean ignore(String line) {
-    if(null == line) {
-      return true;
-    }
-    String trimmedLine = line.trim();
-    return trimmedLine.startsWith("#") || isEmpty(trimmedLine);
-  }
+
 
   private String getType(String[] tokens) {
     if(type == null) {
@@ -100,56 +80,12 @@ public class CSVExtractor implements Extractor {
     }
   }
 
-  private static Map.Entry<String, Integer> getColumnMapEntry(String column, int i) {
-    if(column.contains(":")) {
-      Iterable<String> tokens = Splitter.on(':').split(column);
-      String col = Iterables.getFirst(tokens, null);
-      Integer pos = Integer.parseInt(Iterables.getLast(tokens));
-      return new AbstractMap.SimpleEntry<>(col, pos);
-    }
-    else {
-      return new AbstractMap.SimpleEntry<>(column, i);
-    }
 
-  }
-  private static Map<String, Integer> getColumnMap(Map<String, Object> config) {
-    Map<String, Integer> columnMap = new HashMap<>();
-    if(config.containsKey(COLUMNS_KEY)) {
-      Object columnsObj = config.get(COLUMNS_KEY);
-      if(columnsObj instanceof String) {
-        String columns = (String)columnsObj;
-        int i = 0;
-        for (String column : Splitter.on(',').split(columns)) {
-          Map.Entry<String, Integer> e = getColumnMapEntry(column, i++);
-          columnMap.put(e.getKey(), e.getValue());
-        }
-      }
-      else if(columnsObj instanceof List) {
-        List columns = (List)columnsObj;
-        int i = 0;
-        for(Object column : columns) {
-          Map.Entry<String, Integer> e = getColumnMapEntry(column.toString(), i++);
-          columnMap.put(e.getKey(), e.getValue());
-        }
-      }
-      else if(columnsObj instanceof Map) {
-        Map<Object, Object> map = (Map<Object, Object>)columnsObj;
-        for(Map.Entry<Object, Object> e : map.entrySet()) {
-          columnMap.put(e.getKey().toString(), Integer.parseInt(e.getValue().toString()));
-        }
-      }
-    }
-    return columnMap;
-  }
 
   @Override
   public void initialize(Map<String, Object> config) {
-    if(config.containsKey(COLUMNS_KEY)) {
-      columnMap = getColumnMap(config);
-    }
-    else {
-      throw new IllegalStateException("CSVExtractor requires " + COLUMNS_KEY + " configuration");
-    }
+    super.initialize(config);
+
     if(config.containsKey(INDICATOR_COLUMN_KEY)) {
       indicatorColumn = columnMap.get(config.get(INDICATOR_COLUMN_KEY).toString());
     }
@@ -159,11 +95,6 @@ public class CSVExtractor implements Extractor {
     else if(config.containsKey(TYPE_COLUMN_KEY)) {
       typeColumn = columnMap.get(config.get(TYPE_COLUMN_KEY).toString());
     }
-    if(config.containsKey(SEPARATOR_KEY)) {
-      char separator = config.get(SEPARATOR_KEY).toString().charAt(0);
-      parser = new CSVParserBuilder().withSeparator(separator)
-              .build();
-    }
     if(config.containsKey(LOOKUP_CONVERTER)) {
       converter = LookupConverters.getConverter((String) config.get(LOOKUP_CONVERTER));
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
index e0ca4ee..29beb22 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
@@ -25,5 +25,5 @@ import java.util.Map;
 
 public interface LookupConverter {
     LookupKey toKey(String type, String indicator);
-    LookupValue toValue(Map<String, String> metadata);
+    LookupValue toValue(Map<String, Object> metadata);
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
index bd58ba7..abced09 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
@@ -35,7 +35,7 @@ public enum LookupConverters {
         }
 
         @Override
-        public LookupValue toValue(Map<String, String> metadata) {
+        public LookupValue toValue(Map<String, Object> metadata) {
             return new EnrichmentValue(metadata);
         }
     })

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
index ffcff43..610f2cc 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
@@ -69,7 +69,7 @@ public class AddressHandler extends AbstractObjectTypeHandler<Address> {
       final String indicatorType = typeStr + ":" + category;
       LookupKV results = new LookupKV(new EnrichmentKey(indicatorType, token)
               , new EnrichmentValue(
-              new HashMap<String, String>() {{
+              new HashMap<String, Object>() {{
                 put("source-type", "STIX");
                 put("indicator-type", indicatorType);
                 put("source", type.toXMLString());

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
index 755cddd..4a3688d 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
@@ -52,7 +52,7 @@ public class DomainHandler extends AbstractObjectTypeHandler<DomainName> {
         final String indicatorType = typeStr + ":" + DomainNameTypeEnum.FQDN;
         LookupKV results = new LookupKV(new EnrichmentKey(indicatorType, token)
                 , new EnrichmentValue(
-                new HashMap<String, String>() {{
+                new HashMap<String, Object>() {{
                   put("source-type", "STIX");
                   put("indicator-type", indicatorType);
                   put("source", type.toXMLString());

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
index c7b05eb..2f22eed 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
@@ -52,7 +52,7 @@ public class HostnameHandler  extends AbstractObjectTypeHandler<Hostname>{
     for(String token : StixExtractor.split(value)) {
       final String indicatorType = typeStr;
       LookupKV results = new LookupKV(new EnrichmentKey(indicatorType, token)
-              , new EnrichmentValue(new HashMap<String, String>() {{
+              , new EnrichmentValue(new HashMap<String, Object>() {{
         put("source-type", "STIX");
         put("indicator-type", indicatorType);
         put("source", type.toXMLString());

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/ExtractorTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/ExtractorTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/ExtractorTest.java
index 0179193..eac6ad2 100644
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/ExtractorTest.java
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/ExtractorTest.java
@@ -38,7 +38,7 @@ public class ExtractorTest {
             EnrichmentKey key = new EnrichmentKey();
             key.indicator = "dummy";
             key.type = "type";
-            Map<String, String> value = new HashMap<>();
+            Map<String, Object> value = new HashMap<>();
             value.put("indicator", "dummy");
             return Arrays.asList(new LookupKV(key, new EnrichmentValue(value)));
         }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
index 28b3e26..a018e27 100644
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
@@ -36,7 +36,7 @@ import java.util.HashMap;
 public class HBaseEnrichmentConverterTest {
     EnrichmentKey key = new EnrichmentKey("domain", "google");
     EnrichmentValue value = new EnrichmentValue(
-            new HashMap<String, String>() {{
+            new HashMap<String, Object>() {{
                 put("foo", "bar");
                 put("grok", "baz");
             }});

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
index 93c216c..3a15f8b 100644
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
@@ -113,7 +113,7 @@ public class LeastRecentlyUsedPrunerIntegrationTest {
         for(LookupKey k : goodKeysHalf) {
             testTable.put(converter.toPut(cf, (EnrichmentKey) k
                                             , new EnrichmentValue(
-                                                  new HashMap<String, String>() {{
+                                                  new HashMap<String, Object>() {{
                                                     put("k", "dummy");
                                                     }}
                                                   )
@@ -124,7 +124,7 @@ public class LeastRecentlyUsedPrunerIntegrationTest {
         pat.persist(true);
         for(LookupKey k : goodKeysOtherHalf) {
             testTable.put(converter.toPut(cf, (EnrichmentKey) k
-                                            , new EnrichmentValue(new HashMap<String, String>() {{
+                                            , new EnrichmentValue(new HashMap<String, Object>() {{
                                                     put("k", "dummy");
                                                     }}
                                                                   )
@@ -140,7 +140,7 @@ public class LeastRecentlyUsedPrunerIntegrationTest {
         pat.persist(true);
         {
             testTable.put(converter.toPut(cf, (EnrichmentKey) badKey.get(0)
-                    , new EnrichmentValue(new HashMap<String, String>() {{
+                    , new EnrichmentValue(new HashMap<String, Object>() {{
                         put("k", "dummy");
                     }}
                     )

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index f06850b..c982d29 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -20,6 +20,7 @@ package org.apache.metron.elasticsearch.writer;
 import backtype.storm.tuple.Tuple;
 import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.interfaces.BulkMessageWriter;
 import org.apache.metron.common.interfaces.FieldNameConverter;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -55,7 +56,7 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
   }
 
   @Override
-  public void init(Map stormConf, EnrichmentConfigurations configurations) {
+  public void init(Map stormConf, WriterConfiguration configurations) {
     Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
 
     Settings.Builder settingsBuilder = Settings.settingsBuilder();
@@ -88,8 +89,7 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
   }
 
   @Override
-  public void write(String sensorType, EnrichmentConfigurations configurations, List<Tuple> tuples, List<JSONObject> messages) throws Exception {
-    SensorEnrichmentConfig sensorEnrichmentConfig = configurations.getSensorEnrichmentConfig(sensorType);
+  public void write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
     String indexPostfix = dateFormat.format(new Date());
     BulkRequestBuilder bulkRequest = client.prepareBulk();
 
@@ -97,8 +97,8 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
 
       String indexName = sensorType;
 
-      if (sensorEnrichmentConfig != null) {
-        indexName = sensorEnrichmentConfig.getIndex();
+      if (configurations != null) {
+        indexName = configurations.getIndex(sensorType);
       }
 
       indexName = indexName + "_index_" + indexPostfix;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
index 65b095e..a152d26 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
@@ -86,7 +86,7 @@ public class SimpleHBaseAdapter implements EnrichmentAdapter<CacheKey>,Serializa
             )
         {
           if (kv != null && kv.getValue() != null && kv.getValue().getMetadata() != null) {
-            for (Map.Entry<String, String> values : kv.getValue().getMetadata().entrySet()) {
+            for (Map.Entry<String, Object> values : kv.getValue().getMetadata().entrySet()) {
               enriched.put(kv.getKey().type + "." + values.getKey(), values.getValue());
             }
             _LOG.trace("Enriched type " + kv.getKey().type + " => " + enriched);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
index 3e407c7..1d49807 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
@@ -23,12 +23,11 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import org.apache.metron.common.Constants;
-import org.apache.metron.common.bolt.ConfiguredBolt;
-import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
 import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
-import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.common.configuration.writer.EnrichmentWriterConfiguration;
 import org.apache.metron.common.utils.MessageUtils;
 import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.metron.common.writer.BulkWriterComponent;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,26 +38,23 @@ public class BulkMessageWriterBolt extends ConfiguredEnrichmentBolt {
 
   private static final Logger LOG = LoggerFactory
           .getLogger(BulkMessageWriterBolt.class);
-  private OutputCollector collector;
   private BulkMessageWriter<JSONObject> bulkMessageWriter;
-  private Map<String, List<Tuple>> sensorTupleMap = new HashMap<>();
-  private Map<String, List<JSONObject>> sensorMessageMap = new HashMap<>();
-
+  private BulkWriterComponent<JSONObject> writerComponent;
   public BulkMessageWriterBolt(String zookeeperUrl) {
     super(zookeeperUrl);
   }
 
-  public BulkMessageWriterBolt withBulkMessageWriter(BulkMessageWriter<JSONObject> bulkMessageWriter) {
+  public BulkMessageWriterBolt withBulkMessageWriter(BulkMessageWriter<JSONObject > bulkMessageWriter) {
     this.bulkMessageWriter = bulkMessageWriter;
     return this;
   }
 
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-    this.collector = collector;
+    this.writerComponent = new BulkWriterComponent<>(collector);
     super.prepare(stormConf, context, collector);
     try {
-      bulkMessageWriter.init(stormConf, configurations);
+      bulkMessageWriter.init(stormConf, new EnrichmentWriterConfiguration(getConfigurations()));
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -70,36 +66,17 @@ public class BulkMessageWriterBolt extends ConfiguredEnrichmentBolt {
     JSONObject message = (JSONObject)((JSONObject) tuple.getValueByField("message")).clone();
     message.put("index." + bulkMessageWriter.getClass().getSimpleName().toLowerCase() + ".ts", "" + System.currentTimeMillis());
     String sensorType = MessageUtils.getSensorType(message);
-    SensorEnrichmentConfig sensorEnrichmentConfig = configurations.getSensorEnrichmentConfig(sensorType);
-    int batchSize = sensorEnrichmentConfig != null ? sensorEnrichmentConfig.getBatchSize() : 1;
-    List<Tuple> tupleList = sensorTupleMap.get(sensorType);
-    if (tupleList == null) tupleList = new ArrayList<>();
-    tupleList.add(tuple);
-    List<JSONObject> messageList = sensorMessageMap.get(sensorType);
-    if (messageList == null) messageList = new ArrayList<>();
-    messageList.add(message);
-    if (messageList.size() < batchSize) {
-      sensorTupleMap.put(sensorType, tupleList);
-      sensorMessageMap.put(sensorType, messageList);
-    } else {
-      try {
-        bulkMessageWriter.write(sensorType, configurations, tupleList, messageList);
-        for(Tuple t: tupleList) {
-          collector.ack(t);
-        }
-      } catch (Exception e) {
-        for(Tuple t: tupleList) {
-          collector.fail(t);
-        }
-        ErrorUtils.handleError(collector, e, Constants.ERROR_STREAM);
-      }
-      sensorTupleMap.remove(sensorType);
-      sensorMessageMap.remove(sensorType);
+    try
+    {
+      writerComponent.write(sensorType, tuple, message, bulkMessageWriter, new EnrichmentWriterConfiguration(getConfigurations()));
+    }
+    catch(Exception e) {
+      throw new RuntimeException("This should have been caught in the writerComponent.  If you see this, file a JIRA");
     }
   }
 
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    declarer.declareStream("error", new Fields("message"));
+    declarer.declareStream(Constants.ERROR_STREAM, new Fields("message"));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
index 48e09f8..7d05c00 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
@@ -86,7 +86,7 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
 
   public Map<String, List<String>> getFieldMap(String sourceType) {
     if(sourceType != null) {
-      SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sourceType);
+      SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType);
       if (config != null && config.getEnrichment() != null) {
         return config.getEnrichment().getFieldMap();
       }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
index c367173..4b5c7bb 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
@@ -125,7 +125,7 @@ public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
 
     protected Map<String, List<String>> getFieldMap(String sensorType) {
         if(sensorType != null) {
-            SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sensorType);
+            SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sensorType);
             if (config != null) {
                 return config.getEnrichment().getFieldMap();
             } else {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 3a4b67d..d4acd08 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -185,7 +185,7 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
         } else {
           JSONObject enrichedField = new JSONObject();
           if (value != null && value.length() != 0) {
-            SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sourceType);
+            SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType);
             if(config == null) {
               LOG.error("Unable to find " + config);
               error = true;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
index c08bd0d..ec1ce7a 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
@@ -41,7 +41,7 @@ public class ThreatIntelJoinBolt extends EnrichmentJoinBolt {
 
   @Override
   public Map<String, List<String>> getFieldMap(String sourceType) {
-    SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sourceType);
+    SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType);
     if(config != null) {
       return config.getThreatIntel().getFieldMap();
     }
@@ -66,7 +66,7 @@ public class ThreatIntelJoinBolt extends EnrichmentJoinBolt {
     if(isAlert) {
       ret.put("is_alert" , "true");
       String sourceType = MessageUtils.getSensorType(ret);
-      SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sourceType);
+      SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType);
       ThreatTriageConfig triageConfig = null;
       if(config != null) {
         triageConfig = config.getThreatIntel().getTriageConfig();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
index 3cd1780..f5b6399 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
@@ -33,7 +33,7 @@ public class ThreatIntelSplitterBolt extends EnrichmentSplitterBolt {
   @Override
   protected Map<String, List<String>> getFieldMap(String sensorType) {
     if (sensorType != null) {
-      SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sensorType);
+      SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sensorType);
       if (config != null) {
         return config.getThreatIntel().getFieldMap();
       } else {