You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/05/31 21:00:27 UTC

[1/3] incubator-metron git commit: METRON-174 Storm consumption of hbase enrichment reference data. This closes apache/incubator-metron#127

Repository: incubator-metron
Updated Branches:
  refs/heads/master ab8163bcc -> d3efe3fb4


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
index bdf61e8..96e10d9 100644
--- a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
+++ b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
@@ -20,6 +20,7 @@ package org.apache.metron.solr.writer;
 import backtype.storm.tuple.Tuple;
 import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.interfaces.BulkMessageWriter;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.response.UpdateResponse;
@@ -54,7 +55,7 @@ public class SolrWriter implements BulkMessageWriter<JSONObject>, Serializable {
   }
 
   @Override
-  public void init(Map stormConf, EnrichmentConfigurations configurations) throws IOException, SolrServerException {
+  public void init(Map stormConf, WriterConfiguration configurations) throws IOException, SolrServerException {
     Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
     if(solr == null) solr = new MetronSolrClient((String) globalConfiguration.get("solr.zookeeper"));
     String collection = getCollection(configurations);
@@ -63,7 +64,7 @@ public class SolrWriter implements BulkMessageWriter<JSONObject>, Serializable {
   }
 
   @Override
-  public void write(String sourceType, EnrichmentConfigurations configurations, List<Tuple> tuples, List<JSONObject> messages) throws Exception {
+  public void write(String sourceType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
     for(JSONObject message: messages) {
       SolrInputDocument document = new SolrInputDocument();
       document.addField("id", getIdValue(message));
@@ -79,7 +80,7 @@ public class SolrWriter implements BulkMessageWriter<JSONObject>, Serializable {
     }
   }
 
-  protected String getCollection(Configurations configurations) {
+  protected String getCollection(WriterConfiguration configurations) {
     String collection = (String) configurations.getGlobalConfig().get("solr.collection");
     return collection != null ? collection : DEFAULT_COLLECTION;
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
index 580fd31..0993e0d 100644
--- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
+++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
@@ -20,6 +20,7 @@ package org.apache.metron.solr.writer;
 import backtype.storm.tuple.Tuple;
 import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.configuration.writer.EnrichmentWriterConfiguration;
 import org.apache.metron.integration.utils.SampleUtil;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.common.SolrInputDocument;
@@ -107,7 +108,7 @@ public class SolrWriterTest {
     String collection = "metron";
     MetronSolrClient solr = Mockito.mock(MetronSolrClient.class);
     SolrWriter writer = new SolrWriter().withMetronSolrClient(solr);
-    writer.init(null, configurations);
+    writer.init(null, new EnrichmentWriterConfiguration(configurations));
     verify(solr, times(1)).createCollection(collection, 1, 1);
     verify(solr, times(1)).setDefaultCollection(collection);
 
@@ -120,18 +121,18 @@ public class SolrWriterTest {
     globalConfig.put("solr.replicationFactor", replicationFactor);
     configurations.updateGlobalConfig(globalConfig);
     writer = new SolrWriter().withMetronSolrClient(solr);
-    writer.init(null, configurations);
+    writer.init(null, new EnrichmentWriterConfiguration(configurations));
     verify(solr, times(1)).createCollection(collection, numShards, replicationFactor);
     verify(solr, times(1)).setDefaultCollection(collection);
 
-    writer.write("test", configurations, new ArrayList<Tuple>(), messages);
+    writer.write("test", new EnrichmentWriterConfiguration(configurations), new ArrayList<>(), messages);
     verify(solr, times(1)).add(argThat(new SolrInputDocumentMatcher(message1.toJSONString().hashCode(), "test", 100, 100.0)));
     verify(solr, times(1)).add(argThat(new SolrInputDocumentMatcher(message2.toJSONString().hashCode(), "test", 200, 200.0)));
     verify(solr, times(0)).commit(collection);
 
     writer = new SolrWriter().withMetronSolrClient(solr).withShouldCommit(true);
-    writer.init(null, configurations);
-    writer.write("test", configurations, new ArrayList<Tuple>(), messages);
+    writer.init(null, new EnrichmentWriterConfiguration(configurations));
+    writer.write("test", new EnrichmentWriterConfiguration(configurations), new ArrayList<>(), messages);
     verify(solr, times(2)).add(argThat(new SolrInputDocumentMatcher(message1.toJSONString().hashCode(), "test", 100, 100.0)));
     verify(solr, times(2)).add(argThat(new SolrInputDocumentMatcher(message2.toJSONString().hashCode(), "test", 200, 200.0)));
     verify(solr, times(1)).commit(collection);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
index f9f764e..6606fdc 100644
--- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
@@ -45,628 +45,632 @@ import java.util.*;
  */
 public class MockHTable implements HTableInterface {
 
-    public static class Provider implements Serializable {
-        private static Map<String, HTableInterface> _cache = new HashMap<>();
-        public HTableInterface getTable(Configuration config, String tableName) throws IOException {
-            return _cache.get(tableName);
+  public static class Provider implements Serializable {
+    private static Map<String, HTableInterface> _cache = new HashMap<>();
+    public HTableInterface getTable(Configuration config, String tableName) throws IOException {
+      HTableInterface ret = _cache.get(tableName);
+      return ret;
+    }
+    public static HTableInterface getFromCache(String tableName) {
+      return _cache.get(tableName);
+    }
+    public static HTableInterface addToCache(String tableName, String... columnFamilies) {
+      MockHTable ret =  new MockHTable(tableName, columnFamilies);
+      _cache.put(tableName, ret);
+      return ret;
+    }
+
+    public static void clear() {
+      _cache.clear();
+    }
+  }
+
+  private final String tableName;
+  private final List<String> columnFamilies = new ArrayList<>();
+  private HColumnDescriptor[] descriptors;
+
+  private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data
+          = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+
+  private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) {
+    return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions);
+  }
+
+  private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) {
+    List<KeyValue> ret = new ArrayList<KeyValue>();
+    for (byte[] family : rowdata.keySet())
+      for (byte[] qualifier : rowdata.get(family).keySet()) {
+        int versionsAdded = 0;
+        for (Map.Entry<Long, byte[]> tsToVal : rowdata.get(family).get(qualifier).descendingMap().entrySet()) {
+          if (versionsAdded++ == maxVersions)
+            break;
+          Long timestamp = tsToVal.getKey();
+          if (timestamp < timestampStart)
+            continue;
+          if (timestamp > timestampEnd)
+            continue;
+          byte[] value = tsToVal.getValue();
+          ret.add(new KeyValue(row, family, qualifier, timestamp, value));
         }
-        public static HTableInterface getFromCache(String tableName) {
-            return _cache.get(tableName);
+      }
+    return ret;
+  }
+  public MockHTable(String tableName) {
+    this.tableName = tableName;
+  }
+
+  public MockHTable(String tableName, String... columnFamilies) {
+    this.tableName = tableName;
+    for(String cf : columnFamilies) {
+      addColumnFamily(cf);
+    }
+  }
+
+  public int size() {
+    return data.size();
+  }
+  public void addColumnFamily(String columnFamily) {
+    this.columnFamilies.add(columnFamily);
+    descriptors = new HColumnDescriptor[columnFamilies.size()];
+    int i = 0;
+    for(String cf : columnFamilies) {
+      descriptors[i++] = new HColumnDescriptor(cf);
+    }
+  }
+
+
+  @Override
+  public byte[] getTableName() {
+    return Bytes.toBytes(tableName);
+  }
+
+  @Override
+  public TableName getName() {
+    return TableName.valueOf(tableName);
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public HTableDescriptor getTableDescriptor() throws IOException {
+    HTableDescriptor ret = new HTableDescriptor(tableName);
+    for(HColumnDescriptor c : descriptors) {
+      ret.addFamily(c);
+    }
+    return ret;
+  }
+
+  @Override
+  public boolean exists(Get get) throws IOException {
+    if(get.getFamilyMap() == null || get.getFamilyMap().size() == 0) {
+      return data.containsKey(get.getRow());
+    } else {
+      byte[] row = get.getRow();
+      if(!data.containsKey(row)) {
+        return false;
+      }
+      for(byte[] family : get.getFamilyMap().keySet()) {
+        if(!data.get(row).containsKey(family)) {
+          return false;
+        } else {
+          return true;
         }
-        public static HTableInterface addToCache(String tableName, String... columnFamilies) {
-            MockHTable ret =  new MockHTable(tableName, columnFamilies);
-            _cache.put(tableName, ret);
-            return ret;
+      }
+      return true;
+    }
+  }
+
+  /**
+   * Test for the existence of columns in the table, as specified by the Gets.
+   * <p/>
+   * <p/>
+   * This will return an array of booleans. Each value will be true if the related Get matches
+   * one or more keys, false if not.
+   * <p/>
+   * <p/>
+   * This is a server-side call so it prevents any data from being transferred to
+   * the client.
+   *
+   * @param gets the Gets
+   * @return Array of boolean.  True if the specified Get matches one or more keys, false if not.
+   * @throws IOException e
+   */
+  @Override
+  public boolean[] existsAll(List<Get> gets) throws IOException {
+    boolean[] ret = new boolean[gets.size()];
+    int i = 0;
+    for(boolean b : exists(gets)) {
+      ret[i++] = b;
+    }
+    return ret;
+  }
+
+  @Override
+  public Boolean[] exists(List<Get> list) throws IOException {
+    Boolean[] ret = new Boolean[list.size()];
+    int i = 0;
+    for(Get g : list) {
+      ret[i++] = exists(g);
+    }
+    return ret;
+  }
+
+  @Override
+  public void batch(List<? extends Row> list, Object[] objects) throws IOException, InterruptedException {
+    throw new UnsupportedOperationException();
+
+  }
+
+  /**
+   * @param actions
+   * @deprecated
+   */
+  @Override
+  public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
+    List<Result> results = new ArrayList<Result>();
+    for (Row r : actions) {
+      if (r instanceof Delete) {
+        delete((Delete) r);
+        continue;
+      }
+      if (r instanceof Put) {
+        put((Put) r);
+        continue;
+      }
+      if (r instanceof Get) {
+        results.add(get((Get) r));
+      }
+    }
+    return results.toArray();
+  }
+
+  @Override
+  public <R> void batchCallback(List<? extends Row> list, Object[] objects, Batch.Callback<R> callback) throws IOException, InterruptedException {
+    throw new UnsupportedOperationException();
+
+  }
+
+  /**
+   * @param list
+   * @param callback
+   * @deprecated
+   */
+  @Override
+  public <R> Object[] batchCallback(List<? extends Row> list, Batch.Callback<R> callback) throws IOException, InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Result get(Get get) throws IOException {
+    if (!data.containsKey(get.getRow()))
+      return new Result();
+    byte[] row = get.getRow();
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+    if (!get.hasFamilies()) {
+      kvs = toKeyValue(row, data.get(row), get.getMaxVersions());
+    } else {
+      for (byte[] family : get.getFamilyMap().keySet()){
+        if (data.get(row).get(family) == null)
+          continue;
+        NavigableSet<byte[]> qualifiers = get.getFamilyMap().get(family);
+        if (qualifiers == null || qualifiers.isEmpty())
+          qualifiers = data.get(row).get(family).navigableKeySet();
+        for (byte[] qualifier : qualifiers){
+          if (qualifier == null)
+            qualifier = "".getBytes();
+          if (!data.get(row).containsKey(family) ||
+                  !data.get(row).get(family).containsKey(qualifier) ||
+                  data.get(row).get(family).get(qualifier).isEmpty())
+            continue;
+          Map.Entry<Long, byte[]> timestampAndValue = data.get(row).get(family).get(qualifier).lastEntry();
+          kvs.add(new KeyValue(row,family, qualifier, timestampAndValue.getKey(), timestampAndValue.getValue()));
         }
-
-        public static void clear() {
-            _cache.clear();
+      }
+    }
+    Filter filter = get.getFilter();
+    if (filter != null) {
+      filter.reset();
+      List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size());
+      for (KeyValue kv : kvs) {
+        if (filter.filterAllRemaining()) {
+          break;
         }
-    }
-
-    private final String tableName;
-    private final List<String> columnFamilies = new ArrayList<>();
-    private HColumnDescriptor[] descriptors;
-
-    private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data
-            = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-
-    private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) {
-        return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions);
-    }
-
-    private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) {
-        List<KeyValue> ret = new ArrayList<KeyValue>();
-        for (byte[] family : rowdata.keySet())
-            for (byte[] qualifier : rowdata.get(family).keySet()) {
-                int versionsAdded = 0;
-                for (Map.Entry<Long, byte[]> tsToVal : rowdata.get(family).get(qualifier).descendingMap().entrySet()) {
-                    if (versionsAdded++ == maxVersions)
-                        break;
-                    Long timestamp = tsToVal.getKey();
-                    if (timestamp < timestampStart)
-                        continue;
-                    if (timestamp > timestampEnd)
-                        continue;
-                    byte[] value = tsToVal.getValue();
-                    ret.add(new KeyValue(row, family, qualifier, timestamp, value));
-                }
-            }
-        return ret;
-    }
-    public MockHTable(String tableName) {
-        this.tableName = tableName;
-    }
-
-    public MockHTable(String tableName, String... columnFamilies) {
-        this.tableName = tableName;
-        for(String cf : columnFamilies) {
-            addColumnFamily(cf);
+        if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
+          continue;
         }
-    }
-
-    public void addColumnFamily(String columnFamily) {
-        this.columnFamilies.add(columnFamily);
-        descriptors = new HColumnDescriptor[columnFamilies.size()];
-        int i = 0;
-        for(String cf : columnFamilies) {
-            descriptors[i++] = new HColumnDescriptor(cf);
-        }
-    }
-
-
-    @Override
-    public byte[] getTableName() {
-        return Bytes.toBytes(tableName);
-    }
-
-    @Override
-    public TableName getName() {
-        return TableName.valueOf(tableName);
-    }
-
-    @Override
-    public Configuration getConfiguration() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public HTableDescriptor getTableDescriptor() throws IOException {
-        HTableDescriptor ret = new HTableDescriptor(tableName);
-        for(HColumnDescriptor c : descriptors) {
-            ret.addFamily(c);
-        }
-        return ret;
-    }
-
-    @Override
-    public boolean exists(Get get) throws IOException {
-        if(get.getFamilyMap() == null || get.getFamilyMap().size() == 0) {
-            return data.containsKey(get.getRow());
-        } else {
-            byte[] row = get.getRow();
-            if(!data.containsKey(row)) {
-                return false;
-            }
-            for(byte[] family : get.getFamilyMap().keySet()) {
-                if(!data.get(row).containsKey(family)) {
-                    return false;
-                } else {
-                    return true;
-                }
-            }
-            return true;
-        }
-    }
-
-    /**
-     * Test for the existence of columns in the table, as specified by the Gets.
-     * <p/>
-     * <p/>
-     * This will return an array of booleans. Each value will be true if the related Get matches
-     * one or more keys, false if not.
-     * <p/>
-     * <p/>
-     * This is a server-side call so it prevents any data from being transferred to
-     * the client.
-     *
-     * @param gets the Gets
-     * @return Array of boolean.  True if the specified Get matches one or more keys, false if not.
-     * @throws IOException e
-     */
-    @Override
-    public boolean[] existsAll(List<Get> gets) throws IOException {
-        boolean[] ret = new boolean[gets.size()];
-        int i = 0;
-        for(boolean b : exists(gets)) {
-           ret[i++] = b;
-        }
-        return ret;
-    }
-
-    @Override
-    public Boolean[] exists(List<Get> list) throws IOException {
-        Boolean[] ret = new Boolean[list.size()];
-        int i = 0;
-        for(Get g : list) {
-           ret[i++] = exists(g);
+        if (filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE) {
+          nkvs.add(kv);
         }
-        return ret;
-    }
-
-    @Override
-    public void batch(List<? extends Row> list, Object[] objects) throws IOException, InterruptedException {
-        throw new UnsupportedOperationException();
-
-    }
-
-    /**
-     * @param actions
-     * @deprecated
-     */
-    @Override
-    public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
-        List<Result> results = new ArrayList<Result>();
-        for (Row r : actions) {
-            if (r instanceof Delete) {
-                delete((Delete) r);
+        // ignoring next key hint which is a optimization to reduce file system IO
+      }
+      if (filter.hasFilterRow()) {
+        filter.filterRow();
+      }
+      kvs = nkvs;
+    }
+
+    return new Result(kvs);
+  }
+
+  @Override
+  public Result[] get(List<Get> list) throws IOException {
+    Result[] ret = new Result[list.size()];
+    int i = 0;
+    for(Get g : list) {
+      ret[i++] = get(g);
+    }
+    return ret;
+  }
+
+  /**
+   * @param bytes
+   * @param bytes1
+   * @deprecated
+   */
+  @Override
+  public Result getRowOrBefore(byte[] bytes, byte[] bytes1) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ResultScanner getScanner(Scan scan) throws IOException {
+    final List<Result> ret = new ArrayList<Result>();
+    byte[] st = scan.getStartRow();
+    byte[] sp = scan.getStopRow();
+    Filter filter = scan.getFilter();
+
+    for (byte[] row : data.keySet()){
+      // if row is equal to startRow emit it. When startRow (inclusive) and
+      // stopRow (exclusive) is the same, it should not be excluded which would
+      // happen w/o this control.
+      if (st != null && st.length > 0 &&
+              Bytes.BYTES_COMPARATOR.compare(st, row) != 0) {
+        // if row is before startRow do not emit, pass to next row
+        if (st != null && st.length > 0 &&
+                Bytes.BYTES_COMPARATOR.compare(st, row) > 0)
+          continue;
+        // if row is equal to stopRow or after it do not emit, stop iteration
+        if (sp != null && sp.length > 0 &&
+                Bytes.BYTES_COMPARATOR.compare(sp, row) <= 0)
+          break;
+      }
+
+      List<KeyValue> kvs = null;
+      if (!scan.hasFamilies()) {
+        kvs = toKeyValue(row, data.get(row), scan.getTimeRange().getMin(), scan.getTimeRange().getMax(), scan.getMaxVersions());
+      } else {
+        kvs = new ArrayList<KeyValue>();
+        for (byte[] family : scan.getFamilyMap().keySet()){
+          if (data.get(row).get(family) == null)
+            continue;
+          NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family);
+          if (qualifiers == null || qualifiers.isEmpty())
+            qualifiers = data.get(row).get(family).navigableKeySet();
+          for (byte[] qualifier : qualifiers){
+            if (data.get(row).get(family).get(qualifier) == null)
+              continue;
+            for (Long timestamp : data.get(row).get(family).get(qualifier).descendingKeySet()){
+              if (timestamp < scan.getTimeRange().getMin())
                 continue;
-            }
-            if (r instanceof Put) {
-                put((Put) r);
+              if (timestamp > scan.getTimeRange().getMax())
                 continue;
+              byte[] value = data.get(row).get(family).get(qualifier).get(timestamp);
+              kvs.add(new KeyValue(row, family, qualifier, timestamp, value));
+              if(kvs.size() == scan.getMaxVersions()) {
+                break;
+              }
             }
-            if (r instanceof Get) {
-                results.add(get((Get) r));
-            }
+          }
         }
-        return results.toArray();
-    }
-
-    @Override
-    public <R> void batchCallback(List<? extends Row> list, Object[] objects, Batch.Callback<R> callback) throws IOException, InterruptedException {
-        throw new UnsupportedOperationException();
-
-    }
-
-    /**
-     * @param list
-     * @param callback
-     * @deprecated
-     */
-    @Override
-    public <R> Object[] batchCallback(List<? extends Row> list, Batch.Callback<R> callback) throws IOException, InterruptedException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Result get(Get get) throws IOException {
-        if (!data.containsKey(get.getRow()))
-            return new Result();
-        byte[] row = get.getRow();
-        List<KeyValue> kvs = new ArrayList<KeyValue>();
-        if (!get.hasFamilies()) {
-            kvs = toKeyValue(row, data.get(row), get.getMaxVersions());
-        } else {
-            for (byte[] family : get.getFamilyMap().keySet()){
-                if (data.get(row).get(family) == null)
-                    continue;
-                NavigableSet<byte[]> qualifiers = get.getFamilyMap().get(family);
-                if (qualifiers == null || qualifiers.isEmpty())
-                    qualifiers = data.get(row).get(family).navigableKeySet();
-                for (byte[] qualifier : qualifiers){
-                    if (qualifier == null)
-                        qualifier = "".getBytes();
-                    if (!data.get(row).containsKey(family) ||
-                            !data.get(row).get(family).containsKey(qualifier) ||
-                            data.get(row).get(family).get(qualifier).isEmpty())
-                        continue;
-                    Map.Entry<Long, byte[]> timestampAndValue = data.get(row).get(family).get(qualifier).lastEntry();
-                    kvs.add(new KeyValue(row,family, qualifier, timestampAndValue.getKey(), timestampAndValue.getValue()));
-                }
-            }
+      }
+      if (filter != null) {
+        filter.reset();
+        List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size());
+        for (KeyValue kv : kvs) {
+          if (filter.filterAllRemaining()) {
+            break;
+          }
+          if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
+            continue;
+          }
+          Filter.ReturnCode filterResult = filter.filterKeyValue(kv);
+          if (filterResult == Filter.ReturnCode.INCLUDE) {
+            nkvs.add(kv);
+          } else if (filterResult == Filter.ReturnCode.NEXT_ROW) {
+            break;
+          }
+          // ignoring next key hint which is a optimization to reduce file system IO
         }
-        Filter filter = get.getFilter();
-        if (filter != null) {
-            filter.reset();
-            List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size());
-            for (KeyValue kv : kvs) {
-                if (filter.filterAllRemaining()) {
-                    break;
-                }
-                if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
-                    continue;
-                }
-                if (filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE) {
-                    nkvs.add(kv);
-                }
-                // ignoring next key hint which is a optimization to reduce file system IO
-            }
-            if (filter.hasFilterRow()) {
-                filter.filterRow();
-            }
-            kvs = nkvs;
+        if (filter.hasFilterRow()) {
+          filter.filterRow();
         }
-
-        return new Result(kvs);
-    }
-
-    @Override
-    public Result[] get(List<Get> list) throws IOException {
-        Result[] ret = new Result[list.size()];
-        int i = 0;
-        for(Get g : list) {
-            ret[i++] = get(g);
+        kvs = nkvs;
+      }
+      if (!kvs.isEmpty()) {
+        ret.add(new Result(kvs));
+      }
+    }
+
+    return new ResultScanner() {
+      private final Iterator<Result> iterator = ret.iterator();
+      public Iterator<Result> iterator() {
+        return iterator;
+      }
+      public Result[] next(int nbRows) throws IOException {
+        ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
+        for(int i = 0; i < nbRows; i++) {
+          Result next = next();
+          if (next != null) {
+            resultSets.add(next);
+          } else {
+            break;
+          }
         }
-        return ret;
-    }
-
-    /**
-     * @param bytes
-     * @param bytes1
-     * @deprecated
-     */
-    @Override
-    public Result getRowOrBefore(byte[] bytes, byte[] bytes1) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public ResultScanner getScanner(Scan scan) throws IOException {
-        final List<Result> ret = new ArrayList<Result>();
-        byte[] st = scan.getStartRow();
-        byte[] sp = scan.getStopRow();
-        Filter filter = scan.getFilter();
-
-        for (byte[] row : data.keySet()){
-            // if row is equal to startRow emit it. When startRow (inclusive) and
-            // stopRow (exclusive) is the same, it should not be excluded which would
-            // happen w/o this control.
-            if (st != null && st.length > 0 &&
-                    Bytes.BYTES_COMPARATOR.compare(st, row) != 0) {
-                // if row is before startRow do not emit, pass to next row
-                if (st != null && st.length > 0 &&
-                        Bytes.BYTES_COMPARATOR.compare(st, row) > 0)
-                    continue;
-                // if row is equal to stopRow or after it do not emit, stop iteration
-                if (sp != null && sp.length > 0 &&
-                        Bytes.BYTES_COMPARATOR.compare(sp, row) <= 0)
-                    break;
-            }
-
-            List<KeyValue> kvs = null;
-            if (!scan.hasFamilies()) {
-                kvs = toKeyValue(row, data.get(row), scan.getTimeRange().getMin(), scan.getTimeRange().getMax(), scan.getMaxVersions());
-            } else {
-                kvs = new ArrayList<KeyValue>();
-                for (byte[] family : scan.getFamilyMap().keySet()){
-                    if (data.get(row).get(family) == null)
-                        continue;
-                    NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family);
-                    if (qualifiers == null || qualifiers.isEmpty())
-                        qualifiers = data.get(row).get(family).navigableKeySet();
-                    for (byte[] qualifier : qualifiers){
-                        if (data.get(row).get(family).get(qualifier) == null)
-                            continue;
-                        for (Long timestamp : data.get(row).get(family).get(qualifier).descendingKeySet()){
-                            if (timestamp < scan.getTimeRange().getMin())
-                                continue;
-                            if (timestamp > scan.getTimeRange().getMax())
-                                continue;
-                            byte[] value = data.get(row).get(family).get(qualifier).get(timestamp);
-                            kvs.add(new KeyValue(row, family, qualifier, timestamp, value));
-                            if(kvs.size() == scan.getMaxVersions()) {
-                                break;
-                            }
-                        }
-                    }
-                }
-            }
-            if (filter != null) {
-                filter.reset();
-                List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size());
-                for (KeyValue kv : kvs) {
-                    if (filter.filterAllRemaining()) {
-                        break;
-                    }
-                    if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
-                        continue;
-                    }
-                    Filter.ReturnCode filterResult = filter.filterKeyValue(kv);
-                    if (filterResult == Filter.ReturnCode.INCLUDE) {
-                        nkvs.add(kv);
-                    } else if (filterResult == Filter.ReturnCode.NEXT_ROW) {
-                        break;
-                    }
-                    // ignoring next key hint which is a optimization to reduce file system IO
-                }
-                if (filter.hasFilterRow()) {
-                    filter.filterRow();
-                }
-                kvs = nkvs;
-            }
-            if (!kvs.isEmpty()) {
-                ret.add(new Result(kvs));
-            }
+        return resultSets.toArray(new Result[resultSets.size()]);
+      }
+      public Result next() throws IOException {
+        try {
+          return iterator().next();
+        } catch (NoSuchElementException e) {
+          return null;
         }
-
-        return new ResultScanner() {
-            private final Iterator<Result> iterator = ret.iterator();
-            public Iterator<Result> iterator() {
-                return iterator;
-            }
-            public Result[] next(int nbRows) throws IOException {
-                ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
-                for(int i = 0; i < nbRows; i++) {
-                    Result next = next();
-                    if (next != null) {
-                        resultSets.add(next);
-                    } else {
-                        break;
-                    }
-                }
-                return resultSets.toArray(new Result[resultSets.size()]);
-            }
-            public Result next() throws IOException {
-                try {
-                    return iterator().next();
-                } catch (NoSuchElementException e) {
-                    return null;
-                }
-            }
-            public void close() {}
-        };
-    }
-    @Override
-    public ResultScanner getScanner(byte[] family) throws IOException {
-        Scan scan = new Scan();
-        scan.addFamily(family);
-        return getScanner(scan);
-    }
-
-    @Override
-    public ResultScanner getScanner(byte[] family, byte[] qualifier)
-            throws IOException {
-        Scan scan = new Scan();
-        scan.addColumn(family, qualifier);
-        return getScanner(scan);
-    }
-
-    List<Put> putLog = new ArrayList<>();
-
-    public List<Put> getPutLog() {
-        return putLog;
-    }
-
-    @Override
-    public void put(Put put) throws IOException {
-        putLog.add(put);
-        byte[] row = put.getRow();
-        NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR));
-        for (byte[] family : put.getFamilyMap().keySet()){
-            NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = forceFind(rowData, family, new TreeMap<byte[], NavigableMap<Long, byte[]>>(Bytes.BYTES_COMPARATOR));
-            for (KeyValue kv : put.getFamilyMap().get(family)){
-                kv.updateLatestStamp(Bytes.toBytes(System.currentTimeMillis()));
-                byte[] qualifier = kv.getQualifier();
-                NavigableMap<Long, byte[]> qualifierData = forceFind(familyData, qualifier, new TreeMap<Long, byte[]>());
-                qualifierData.put(kv.getTimestamp(), kv.getValue());
-            }
-        }
-    }
-
-    /**
-     * Helper method to find a key in a map. If key is not found, newObject is
-     * added to map and returned
-     *
-     * @param map
-     *          map to extract value from
-     * @param key
-     *          key to look for
-     * @param newObject
-     *          set key to this if not found
-     * @return found value or newObject if not found
-     */
-    private <K, V> V forceFind(NavigableMap<K, V> map, K key, V newObject){
-        V data = map.get(key);
-        if (data == null){
-            data = newObject;
-            map.put(key, data);
-        }
-        return data;
-    }
-
-    @Override
-    public void put(List<Put> puts) throws IOException {
-        for (Put put : puts)
-            put(put);
-    }
-
-    @Override
-    public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, byte[] bytes3, Put put) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    /**
-     * Atomically checks if a row/family/qualifier value matches the expected
-     * value. If it does, it adds the put.  If the passed value is null, the check
-     * is for the lack of column (ie: non-existance)
-     *
-     * @param row       to check
-     * @param family    column family to check
-     * @param qualifier column qualifier to check
-     * @param compareOp comparison operator to use
-     * @param value     the expected value
-     * @param put       data to put if check succeeds
-     * @return true if the new put was executed, false otherwise
-     * @throws IOException e
-     */
-    @Override
-    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException {
-        return false;
-    }
-
-    @Override
-    public void delete(Delete delete) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void delete(List<Delete> list) throws IOException {
-        throw new UnsupportedOperationException();
-
-    }
-
-    @Override
-    public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, byte[] bytes3, Delete delete) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    /**
-     * Atomically checks if a row/family/qualifier value matches the expected
-     * value. If it does, it adds the delete.  If the passed value is null, the
-     * check is for the lack of column (ie: non-existance)
-     *
-     * @param row       to check
-     * @param family    column family to check
-     * @param qualifier column qualifier to check
-     * @param compareOp comparison operator to use
-     * @param value     the expected value
-     * @param delete    data to delete if check succeeds
-     * @return true if the new delete was executed, false otherwise
-     * @throws IOException e
-     */
-    @Override
-    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException {
-        return false;
-    }
-
-    @Override
-    public void mutateRow(RowMutations rowMutations) throws IOException {
-        throw new UnsupportedOperationException();
-
-    }
-
-    @Override
-    public Result append(Append append) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Result increment(Increment increment) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l, Durability durability) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    /**
-     * @param bytes
-     * @param bytes1
-     * @param bytes2
-     * @param l
-     * @param b
-     * @deprecated
-     */
-    @Override
-    public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l, boolean b) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean isAutoFlush() {
-        return autoflush;
-    }
-
-    @Override
-    public void flushCommits() throws IOException {
-
-    }
-
-    @Override
-    public void close() throws IOException {
-
-    }
-
-    @Override
-    public CoprocessorRpcChannel coprocessorService(byte[] bytes) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> aClass, byte[] bytes, byte[] bytes1, Batch.Call<T, R> call) throws ServiceException, Throwable {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public <T extends Service, R> void coprocessorService(Class<T> aClass, byte[] bytes, byte[] bytes1, Batch.Call<T, R> call, Batch.Callback<R> callback) throws ServiceException, Throwable {
-        throw new UnsupportedOperationException();
-    }
-
-    boolean autoflush = true;
-
-    /**
-     * @param b
-     * @deprecated
-     */
-    @Override
-    public void setAutoFlush(boolean b) {
-        autoflush = b;
-    }
-
-    @Override
-    public void setAutoFlush(boolean b, boolean b1) {
-        autoflush = b;
-    }
-
-    @Override
-    public void setAutoFlushTo(boolean b) {
-        autoflush = b;
-    }
-
-    long writeBufferSize = 0;
-    @Override
-    public long getWriteBufferSize() {
-        return writeBufferSize;
-    }
-
-    @Override
-    public void setWriteBufferSize(long l) throws IOException {
-        writeBufferSize = l;
-    }
-
-    @Override
-    public <R extends Message> Map<byte[], R> batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message message, byte[] bytes, byte[] bytes1, R r) throws ServiceException, Throwable {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message message, byte[] bytes, byte[] bytes1, R r, Batch.Callback<R> callback) throws ServiceException, Throwable {
-        throw new UnsupportedOperationException();
-    }
-
-    /**
-     * Atomically checks if a row/family/qualifier value matches the expected value.
-     * If it does, it performs the row mutations.  If the passed value is null, the check
-     * is for the lack of column (ie: non-existence)
-     *
-     * @param row       to check
-     * @param family    column family to check
-     * @param qualifier column qualifier to check
-     * @param compareOp the comparison operator
-     * @param value     the expected value
-     * @param mutation  mutations to perform if check succeeds
-     * @return true if the new put was executed, false otherwise
-     * @throws IOException e
-     */
-    @Override
-    public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException {
-        return false;
-    }
+      }
+      public void close() {}
+    };
+  }
+  @Override
+  public ResultScanner getScanner(byte[] family) throws IOException {
+    Scan scan = new Scan();
+    scan.addFamily(family);
+    return getScanner(scan);
+  }
+
+  @Override
+  public ResultScanner getScanner(byte[] family, byte[] qualifier)
+          throws IOException {
+    Scan scan = new Scan();
+    scan.addColumn(family, qualifier);
+    return getScanner(scan);
+  }
+
+  List<Put> putLog = new ArrayList<>();
+
+  public List<Put> getPutLog() {
+    return putLog;
+  }
+
+  @Override
+  public void put(Put put) throws IOException {
+    putLog.add(put);
+    byte[] row = put.getRow();
+    NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR));
+    for (byte[] family : put.getFamilyMap().keySet()){
+      NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = forceFind(rowData, family, new TreeMap<byte[], NavigableMap<Long, byte[]>>(Bytes.BYTES_COMPARATOR));
+      for (KeyValue kv : put.getFamilyMap().get(family)){
+        kv.updateLatestStamp(Bytes.toBytes(System.currentTimeMillis()));
+        byte[] qualifier = kv.getQualifier();
+        NavigableMap<Long, byte[]> qualifierData = forceFind(familyData, qualifier, new TreeMap<Long, byte[]>());
+        qualifierData.put(kv.getTimestamp(), kv.getValue());
+      }
+    }
+  }
+
+  /**
+   * Helper method to find a key in a map. If key is not found, newObject is
+   * added to map and returned
+   *
+   * @param map
+   *          map to extract value from
+   * @param key
+   *          key to look for
+   * @param newObject
+   *          set key to this if not found
+   * @return found value or newObject if not found
+   */
+  private <K, V> V forceFind(NavigableMap<K, V> map, K key, V newObject){
+    V data = map.get(key);
+    if (data == null){
+      data = newObject;
+      map.put(key, data);
+    }
+    return data;
+  }
+
+  @Override
+  public void put(List<Put> puts) throws IOException {
+    for (Put put : puts)
+      put(put);
+  }
+
+  @Override
+  public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, byte[] bytes3, Put put) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Atomically checks if a row/family/qualifier value matches the expected
+   * value. If it does, it adds the put.  If the passed value is null, the check
+   * is for the lack of column (ie: non-existance)
+   *
+   * @param row       to check
+   * @param family    column family to check
+   * @param qualifier column qualifier to check
+   * @param compareOp comparison operator to use
+   * @param value     the expected value
+   * @param put       data to put if check succeeds
+   * @return true if the new put was executed, false otherwise
+   * @throws IOException e
+   */
+  @Override
+  public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException {
+    return false;
+  }
+
+  @Override
+  public void delete(Delete delete) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void delete(List<Delete> list) throws IOException {
+    throw new UnsupportedOperationException();
+
+  }
+
+  @Override
+  public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, byte[] bytes3, Delete delete) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Atomically checks if a row/family/qualifier value matches the expected
+   * value. If it does, it adds the delete.  If the passed value is null, the
+   * check is for the lack of column (ie: non-existance)
+   *
+   * @param row       to check
+   * @param family    column family to check
+   * @param qualifier column qualifier to check
+   * @param compareOp comparison operator to use
+   * @param value     the expected value
+   * @param delete    data to delete if check succeeds
+   * @return true if the new delete was executed, false otherwise
+   * @throws IOException e
+   */
+  @Override
+  public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException {
+    return false;
+  }
+
+  @Override
+  public void mutateRow(RowMutations rowMutations) throws IOException {
+    throw new UnsupportedOperationException();
+
+  }
+
+  @Override
+  public Result append(Append append) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Result increment(Increment increment) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l, Durability durability) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * @param bytes
+   * @param bytes1
+   * @param bytes2
+   * @param l
+   * @param b
+   * @deprecated
+   */
+  @Override
+  public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l, boolean b) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isAutoFlush() {
+    return autoflush;
+  }
+
+  @Override
+  public void flushCommits() throws IOException {
+
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+
+  @Override
+  public CoprocessorRpcChannel coprocessorService(byte[] bytes) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> aClass, byte[] bytes, byte[] bytes1, Batch.Call<T, R> call) throws ServiceException, Throwable {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T extends Service, R> void coprocessorService(Class<T> aClass, byte[] bytes, byte[] bytes1, Batch.Call<T, R> call, Batch.Callback<R> callback) throws ServiceException, Throwable {
+    throw new UnsupportedOperationException();
+  }
+
+  boolean autoflush = true;
+
+  /**
+   * @param b
+   * @deprecated
+   */
+  @Override
+  public void setAutoFlush(boolean b) {
+    autoflush = b;
+  }
+
+  @Override
+  public void setAutoFlush(boolean b, boolean b1) {
+    autoflush = b;
+  }
+
+  @Override
+  public void setAutoFlushTo(boolean b) {
+    autoflush = b;
+  }
+
+  long writeBufferSize = 0;
+  @Override
+  public long getWriteBufferSize() {
+    return writeBufferSize;
+  }
+
+  @Override
+  public void setWriteBufferSize(long l) throws IOException {
+    writeBufferSize = l;
+  }
+
+  @Override
+  public <R extends Message> Map<byte[], R> batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message message, byte[] bytes, byte[] bytes1, R r) throws ServiceException, Throwable {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message message, byte[] bytes, byte[] bytes1, R r, Batch.Callback<R> callback) throws ServiceException, Throwable {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Atomically checks if a row/family/qualifier value matches the expected value.
+   * If it does, it performs the row mutations.  If the passed value is null, the check
+   * is for the lack of column (ie: non-existence)
+   *
+   * @param row       to check
+   * @param family    column family to check
+   * @param qualifier column qualifier to check
+   * @param compareOp the comparison operator
+   * @param value     the expected value
+   * @param mutation  mutations to perform if check succeeds
+   * @return true if the new put was executed, false otherwise
+   * @throws IOException e
+   */
+  @Override
+  public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException {
+    return false;
+  }
 }


[2/3] incubator-metron git commit: METRON-174 Storm consumption of hbase enrichment reference data. This closes apache/incubator-metron#127

Posted by ce...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentValue.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentValue.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentValue.java
index d9b7b38..bd019a0 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentValue.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentValue.java
@@ -35,20 +35,20 @@ public class EnrichmentValue implements LookupValue {
     public static final String VALUE_COLUMN_NAME = "v";
     public static final byte[] VALUE_COLUMN_NAME_B = Bytes.toBytes(VALUE_COLUMN_NAME);
 
-    private Map<String, String> metadata = null;
+    private Map<String, Object> metadata = null;
 
     public EnrichmentValue()
     {
 
     }
 
-    public EnrichmentValue(Map<String, String> metadata) {
+    public EnrichmentValue(Map<String, Object> metadata) {
         this.metadata = metadata;
     }
 
 
 
-    public Map<String, String> getMetadata() {
+    public Map<String, Object> getMetadata() {
         return metadata;
     }
 
@@ -66,14 +66,14 @@ public class EnrichmentValue implements LookupValue {
             }
         }
     }
-    public Map<String, String> stringToValue(String s){
+    public Map<String, Object> stringToValue(String s){
         try {
-            return _mapper.get().readValue(s, new TypeReference<Map<String, String>>(){});
+            return _mapper.get().readValue(s, new TypeReference<Map<String, Object>>(){});
         } catch (IOException e) {
             throw new RuntimeException("Unable to convert string to metadata: " + s);
         }
     }
-    public String valueToString(Map<String, String> value) {
+    public String valueToString(Map<String, Object> value) {
         try {
             return _mapper.get().writeValueAsString(value);
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter.java
new file mode 100644
index 0000000..f7e7aeb
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.hbase;
+
+import backtype.storm.tuple.Tuple;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.ReflectionUtils;
+import org.apache.metron.common.writer.AbstractWriter;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.converter.HbaseConverter;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+import org.json.simple.JSONObject;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class SimpleHbaseEnrichmentWriter extends AbstractWriter implements BulkMessageWriter<JSONObject>, Serializable {
+  public enum Configurations {
+    HBASE_TABLE("shew.table")
+    ,HBASE_CF("shew.cf")
+    ,KEY_COLUMNS("shew.keyColumns")
+    ,KEY_DELIM("shew.keyDelim")
+    ,ENRICHMENT_TYPE("shew.enrichmentType")
+    ,VALUE_COLUMNS("shew.valueColumns")
+    ,HBASE_PROVIDER("shew.hbaseProvider")
+    ;
+    String key;
+    Configurations(String key) {
+      this.key = key;
+    }
+    public String getKey() {
+      return key;
+    }
+    public Object get(Map<String, Object> config) {
+      return config.get(key);
+    }
+    public <T> T getAndConvert(Map<String, Object> config, Class<T> clazz) {
+      Object o = get(config);
+      if(o != null) {
+        return ConversionUtils.convert(o, clazz);
+      }
+      return null;
+    }
+  }
+  public static class KeyTransformer {
+    List<String> keys = new ArrayList<>();
+    Set<String> keySet;
+    private String delim = ":";
+    public KeyTransformer(String key) {
+      this(key, null);
+    }
+    public KeyTransformer(String key, String delim) {
+      keys.add(key);
+      keySet = new HashSet<>(this.keys);
+      this.delim = delim == null?this.delim:delim;
+    }
+    public KeyTransformer(Iterable<String> keys) {
+      this(keys, null);
+    }
+    public KeyTransformer(Iterable<String> keys, String delim) {
+      Iterables.addAll(this.keys, keys);
+      keySet = new HashSet<>(this.keys);
+      this.delim = delim == null?this.delim:delim;
+    }
+
+    public String transform(final JSONObject message) {
+      return
+      keys.stream().map( x -> {
+        Object o = message.get(x);
+        return o == null?"":o.toString();
+      }).collect(Collectors.joining(delim));
+    }
+  }
+  private transient EnrichmentConverter converter;
+  private String tableName;
+  private String cf;
+  private HTableInterface table;
+  private TableProvider provider;
+  private Map.Entry<Object, KeyTransformer> keyTransformer;
+
+  public SimpleHbaseEnrichmentWriter() {
+  }
+
+  @Override
+  public void configure(String sensorName, WriterConfiguration configuration) {
+    String hbaseProviderImpl = Configurations.HBASE_PROVIDER.getAndConvert(configuration.getSensorConfig(sensorName),String.class);
+    if(hbaseProviderImpl != null) {
+      provider = ReflectionUtils.createInstance(hbaseProviderImpl);
+    }
+    if(converter == null) {
+      converter = new EnrichmentConverter();
+    }
+  }
+
+  @Override
+  public void init(Map stormConf, WriterConfiguration configuration) throws Exception {
+    if(converter == null) {
+      converter = new EnrichmentConverter();
+    }
+  }
+
+  protected synchronized TableProvider getProvider() {
+    if(provider == null) {
+
+      provider = new HTableProvider();
+    }
+    return provider;
+  }
+
+  public HTableInterface getTable(String tableName, String cf) throws IOException {
+    synchronized(this) {
+      boolean isInitial = this.tableName == null || this.cf == null;
+      boolean isValid = tableName != null && cf != null;
+
+      if( isInitial || (isValid && (!this.tableName.equals(tableName) || !this.cf.equals(cf)) )
+        )
+      {
+        Configuration conf = HBaseConfiguration.create();
+        //new table connection
+        if(table != null) {
+          table.close();
+        }
+        table = getProvider().getTable(conf, tableName);
+        this.tableName = tableName;
+        this.cf = cf;
+      }
+      return table;
+    }
+  }
+
+  public HTableInterface getTable(Map<String, Object> config) throws IOException {
+    return getTable(Configurations.HBASE_TABLE.getAndConvert(config, String.class)
+                   ,Configurations.HBASE_CF.getAndConvert(config, String.class)
+                   );
+
+  }
+
+
+  private List<String> getColumns(Object keyColumnsObj, boolean allowNull) {
+    Object o = keyColumnsObj;
+    if(allowNull && keyColumnsObj == null) {
+      return Collections.emptyList();
+    }
+    if(o instanceof String) {
+      return ImmutableList.of(o.toString());
+    }
+    else if (o instanceof List) {
+      List<String> keyCols = new ArrayList<>();
+      for(Object key : (List)o) {
+        keyCols.add(key.toString());
+      }
+      return keyCols;
+    }
+    else {
+      throw new RuntimeException("Unable to get columns: " + o);
+    }
+  }
+
+  private KeyTransformer getTransformer(Map<String, Object> config) {
+    Object o = Configurations.KEY_COLUMNS.get(config);
+    KeyTransformer transformer = null;
+    if(keyTransformer != null && keyTransformer.getKey() == o) {
+      return keyTransformer.getValue();
+    }
+    else {
+      List<String> keys = getColumns(o, false);
+      Object delimObj = Configurations.KEY_DELIM.get(config);
+      String delim = (delimObj == null || !(delimObj instanceof String))?null:delimObj.toString();
+      transformer = new KeyTransformer(keys, delim);
+      keyTransformer = new AbstractMap.SimpleEntry<>(o, transformer);
+      return transformer;
+    }
+  }
+
+
+  private EnrichmentValue getValue( JSONObject message
+                                  , Set<String> keyColumns
+                                  , Set<String> valueColumns
+                                  )
+  {
+    Map<String, Object> metadata = new HashMap<>();
+    if(valueColumns == null || valueColumns.isEmpty()) {
+      for (Object kv : message.entrySet()) {
+        Map.Entry<Object, Object> entry = (Map.Entry<Object, Object>) kv;
+        if (!keyColumns.contains(entry.getKey())) {
+          metadata.put(entry.getKey().toString(), entry.getValue());
+        }
+      }
+      return new EnrichmentValue(metadata);
+    }
+    else {
+      for (Object kv : message.entrySet()) {
+        Map.Entry<Object, Object> entry = (Map.Entry<Object, Object>) kv;
+        if (valueColumns.contains(entry.getKey())) {
+          metadata.put(entry.getKey().toString(), entry.getValue());
+        }
+      }
+      return new EnrichmentValue(metadata);
+    }
+  }
+
+  private EnrichmentKey getKey(JSONObject message, KeyTransformer transformer, String enrichmentType) {
+    if(enrichmentType != null) {
+      return new EnrichmentKey(enrichmentType, transformer.transform(message));
+    }
+    else {
+      return null;
+    }
+  }
+
+  @Override
+  public void write( String sensorType
+                    , WriterConfiguration configurations
+                    , Iterable<Tuple> tuples
+                    , List<JSONObject> messages
+                    ) throws Exception
+  {
+    Map<String, Object> sensorConfig = configurations.getSensorConfig(sensorType);
+    HTableInterface table = getTable(sensorConfig);
+    KeyTransformer transformer = getTransformer(sensorConfig);
+    Object enrichmentTypeObj = Configurations.ENRICHMENT_TYPE.get(sensorConfig);
+    String enrichmentType = enrichmentTypeObj == null?null:enrichmentTypeObj.toString();
+    Set<String> valueColumns = new HashSet<>(getColumns(Configurations.VALUE_COLUMNS.get(sensorConfig), true));
+    List<Put> puts = new ArrayList<>();
+    for(JSONObject message : messages) {
+      EnrichmentKey key = getKey(message, transformer, enrichmentType);
+      EnrichmentValue value = getValue(message, transformer.keySet, valueColumns);
+      if(key == null || value == null) {
+        continue;
+      }
+      Put put = converter.toPut(this.cf, key, value);
+      if(put != null) {
+        puts.add(put);
+      }
+    }
+    table.put(puts);
+  }
+
+  @Override
+  public void close() throws Exception {
+    synchronized(this) {
+      if(table != null) {
+        table.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
index d8f0e73..01f1245 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
@@ -19,6 +19,7 @@ package org.apache.metron.writer.hdfs;
 
 import backtype.storm.tuple.Tuple;
 import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.interfaces.BulkMessageWriter;
 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
@@ -62,14 +63,15 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
   }
 
   @Override
-  public void init(Map stormConfig, EnrichmentConfigurations configurations) {
+  public void init(Map stormConfig, WriterConfiguration configurations) {
     this.stormConfig = stormConfig;
   }
 
+
   @Override
   public void write(String sourceType
-                   , EnrichmentConfigurations configurations
-                   , List<Tuple> tuples
+                   , WriterConfiguration configurations
+                   , Iterable<Tuple> tuples
                    , List<JSONObject> messages
                    ) throws Exception
   {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java
index 1a958df..bf72219 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java
@@ -50,10 +50,10 @@ public class SimpleHBaseAdapterTest {
   private EnrichmentLookup lookup;
   private static final String PLAYFUL_CLASSIFICATION_TYPE = "playful_classification";
   private static final String CF1_CLASSIFICATION_TYPE = "cf1";
-  private static final Map<String, String> CF1_ENRICHMENT = new HashMap<String, String>() {{
+  private static final Map<String, Object> CF1_ENRICHMENT = new HashMap<String, Object>() {{
     put("key", "value");
   }};
-  private static final Map<String, String> PLAYFUL_ENRICHMENT = new HashMap<String, String>() {{
+  private static final Map<String, Object> PLAYFUL_ENRICHMENT = new HashMap<String, Object>() {{
     put("orientation", "north");
   }};
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
index f420b01..46216b2 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
@@ -101,7 +101,7 @@ public class ThreatIntelAdapterTest {
     final MockHTable trackerTable = (MockHTable) MockHTable.Provider.addToCache(atTableName, cf);
     final MockHTable threatIntelTable = (MockHTable) MockHTable.Provider.addToCache(threatIntelTableName, cf);
     EnrichmentHelper.INSTANCE.load(threatIntelTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>() {{
-      add(new LookupKV<>(new EnrichmentKey("10.0.2.3", "10.0.2.3"), new EnrichmentValue(new HashMap<String, String>())));
+      add(new LookupKV<>(new EnrichmentKey("10.0.2.3", "10.0.2.3"), new EnrichmentValue(new HashMap<>())));
     }});
 
     BloomAccessTracker bat = new BloomAccessTracker(threatIntelTableName, 100, 0.03);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
index 61961a7..332ded0 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
@@ -22,6 +22,7 @@ import backtype.storm.tuple.Values;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
 import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.common.interfaces.BulkMessageWriter;
@@ -117,28 +118,28 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
     bulkMessageWriterBolt.declareOutputFields(declarer);
     verify(declarer, times(1)).declareStream(eq("error"), argThat(new FieldsMatcher("message")));
     Map stormConf = new HashMap();
-    doThrow(new Exception()).when(bulkMessageWriter).init(eq(stormConf), any(EnrichmentConfigurations.class));
+    doThrow(new Exception()).when(bulkMessageWriter).init(eq(stormConf), any(WriterConfiguration.class));
     try {
       bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector);
       fail("A runtime exception should be thrown when bulkMessageWriter.init throws an exception");
     } catch(RuntimeException e) {}
     reset(bulkMessageWriter);
     bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector);
-    verify(bulkMessageWriter, times(1)).init(eq(stormConf), any(EnrichmentConfigurations.class));
+    verify(bulkMessageWriter, times(1)).init(eq(stormConf), any(WriterConfiguration.class));
     tupleList = new ArrayList<>();
     for(int i = 0; i < 4; i++) {
       when(tuple.getValueByField("message")).thenReturn(messageList.get(i));
       tupleList.add(tuple);
       bulkMessageWriterBolt.execute(tuple);
-      verify(bulkMessageWriter, times(0)).write(eq(sensorType), any(EnrichmentConfigurations.class), eq(tupleList), eq(messageList));
+      verify(bulkMessageWriter, times(0)).write(eq(sensorType), any(WriterConfiguration.class), eq(tupleList), eq(messageList));
     }
     when(tuple.getValueByField("message")).thenReturn(messageList.get(4));
     tupleList.add(tuple);
     bulkMessageWriterBolt.execute(tuple);
-    verify(bulkMessageWriter, times(1)).write(eq(sensorType), any(EnrichmentConfigurations.class), eq(tupleList), argThat(new MessageListMatcher(messageList)));
+    verify(bulkMessageWriter, times(1)).write(eq(sensorType), any(WriterConfiguration.class), eq(tupleList), argThat(new MessageListMatcher(messageList)));
     verify(outputCollector, times(5)).ack(tuple);
     reset(outputCollector);
-    doThrow(new Exception()).when(bulkMessageWriter).write(eq(sensorType), any(EnrichmentConfigurations.class), Matchers.anyListOf(Tuple.class), Matchers.anyListOf(JSONObject.class));
+    doThrow(new Exception()).when(bulkMessageWriter).write(eq(sensorType), any(WriterConfiguration.class), Matchers.anyListOf(Tuple.class), Matchers.anyListOf(JSONObject.class));
     when(tuple.getValueByField("message")).thenReturn(messageList.get(0));
     for(int i = 0; i < 5; i++) {
       bulkMessageWriterBolt.execute(tuple);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/converter/EnrichmentConverterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/converter/EnrichmentConverterTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/converter/EnrichmentConverterTest.java
index 20ec64c..e506c97 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/converter/EnrichmentConverterTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/converter/EnrichmentConverterTest.java
@@ -39,7 +39,7 @@ public class EnrichmentConverterTest {
   public void testValueConversion() throws IOException {
     EnrichmentConverter converter = new EnrichmentConverter();
     EnrichmentKey k1 = new EnrichmentKey("type", "indicator");
-    EnrichmentValue v1 = new EnrichmentValue(new HashMap<String, String>() {{
+    EnrichmentValue v1 = new EnrichmentValue(new HashMap<String, Object>() {{
       put("k1", "v1");
       put("k2", "v2");
     }});

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/writer/HBaseWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/writer/HBaseWriter.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/writer/HBaseWriter.java
index 1fd69b3..e4d48dc 100644
--- a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/writer/HBaseWriter.java
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/writer/HBaseWriter.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.hbase.HTableProvider;
 import org.apache.metron.hbase.TableProvider;
 import org.apache.metron.common.utils.ReflectionUtils;
@@ -62,7 +63,7 @@ public abstract class HBaseWriter implements MessageWriter<JSONObject>, Serializ
   }
 
   @Override
-  public void write(String sourceType, Configurations configurations, Tuple tuple, JSONObject message) throws Exception {
+  public void write(String sourceType, WriterConfiguration configurations, Tuple tuple, JSONObject message) throws Exception {
     Put put = new Put(getKey(tuple, message));
     Map<String, byte[]> values = getValues(tuple, message);
     for(String column: values.keySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
index 521ffdf..483f2b5 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
@@ -65,7 +65,7 @@ public abstract class EnrichmentIntegrationTest extends BaseIntegrationTest {
   private static final String DST_IP = "ip_dst_addr";
   private static final String MALICIOUS_IP_TYPE = "malicious_ip";
   private static final String PLAYFUL_CLASSIFICATION_TYPE = "playful_classification";
-  private static final Map<String, String> PLAYFUL_ENRICHMENT = new HashMap<String, String>() {{
+  private static final Map<String, Object> PLAYFUL_ENRICHMENT = new HashMap<String, Object>() {{
     put("orientation", "north");
   }};
   protected String testSensorType = "test";
@@ -183,7 +183,7 @@ public abstract class EnrichmentIntegrationTest extends BaseIntegrationTest {
     final MockHTable trackerTable = (MockHTable)MockHTable.Provider.addToCache(trackerHBaseTableName, cf);
     final MockHTable threatIntelTable = (MockHTable)MockHTable.Provider.addToCache(threatIntelTableName, cf);
     EnrichmentHelper.INSTANCE.load(threatIntelTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>(){{
-      add(new LookupKV<>(new EnrichmentKey(MALICIOUS_IP_TYPE, "10.0.2.3"), new EnrichmentValue(new HashMap<String, String>())));
+      add(new LookupKV<>(new EnrichmentKey(MALICIOUS_IP_TYPE, "10.0.2.3"), new EnrichmentValue(new HashMap<>())));
     }});
     final MockHTable enrichmentTable = (MockHTable)MockHTable.Provider.addToCache(enrichmentsTableName, cf);
     EnrichmentHelper.INSTANCE.load(enrichmentTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>(){{
@@ -206,7 +206,7 @@ public abstract class EnrichmentIntegrationTest extends BaseIntegrationTest {
             .withComponent("config", configUploadComponent)
             .withComponent("search", searchComponent)
             .withComponent("storm", fluxComponent)
-            .withMillisecondsBetweenAttempts(10000)
+            .withMillisecondsBetweenAttempts(15000)
             .withNumRetries(10)
             .build();
     runner.start();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ConfigUploadComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ConfigUploadComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ConfigUploadComponent.java
index 0a088cb..865b017 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ConfigUploadComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ConfigUploadComponent.java
@@ -23,6 +23,8 @@ import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.integration.UnableToStartException;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 public class ConfigUploadComponent implements InMemoryComponent {
@@ -31,7 +33,7 @@ public class ConfigUploadComponent implements InMemoryComponent {
   private String globalConfigPath;
   private String parserConfigsPath;
   private String enrichmentConfigsPath;
-
+  private Map<String, SensorParserConfig> parserSensorConfigs = new HashMap<>();
   public ConfigUploadComponent withTopologyProperties(Properties topologyProperties) {
     this.topologyProperties = topologyProperties;
     return this;
@@ -51,11 +53,27 @@ public class ConfigUploadComponent implements InMemoryComponent {
     return this;
   }
 
+  public ConfigUploadComponent withParserSensorConfig(String name, SensorParserConfig config) {
+    parserSensorConfigs.put(name, config);
+    return this;
+  }
+
 
   @Override
   public void start() throws UnableToStartException {
     try {
-      ConfigurationsUtils.uploadConfigsToZookeeper(globalConfigPath, parserConfigsPath, enrichmentConfigsPath, topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY));
+      ConfigurationsUtils.uploadConfigsToZookeeper( globalConfigPath
+                                                  , parserConfigsPath
+                                                  , enrichmentConfigsPath
+                                                  , topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY)
+                                                  );
+      for(Map.Entry<String, SensorParserConfig> kv : parserSensorConfigs.entrySet()) {
+        ConfigurationsUtils.writeSensorParserConfigToZookeeper( kv.getKey()
+                                                              , kv.getValue()
+                                                              , topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY)
+                                                              );
+      }
+
     } catch (Exception e) {
       throw new UnableToStartException(e.getMessage(), e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
index f991fe2..99f916d 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
@@ -136,6 +136,7 @@ public class KafkaWithZKComponent implements InMemoryComponent {
 
     // setup Broker
     Properties props = TestUtils.createBrokerConfig(0, brokerPort, true);
+    props.setProperty("zookeeper.connection.timeout.ms","1000000");
     KafkaConfig config = new KafkaConfig(props);
     Time mock = new MockTime();
     kafkaServer = TestUtils.createServer(config, mock);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/mock/MockTableProvider.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/mock/MockTableProvider.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/mock/MockTableProvider.java
new file mode 100644
index 0000000..becadce
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/mock/MockTableProvider.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.integration.mock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.test.mock.MockHTable;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public class MockTableProvider implements TableProvider, Serializable {
+  static MockHTable.Provider provider = new MockHTable.Provider();
+  @Override
+  public HTableInterface getTable(Configuration config, String tableName) throws IOException {
+    return provider.getTable(config, tableName);
+  }
+  public static void addTable(String tableName, String... cf) {
+    provider.addToCache(tableName, cf);
+  }
+  public static MockHTable getTable(String tableName) {
+    try {
+      return (MockHTable) provider.getTable(null, tableName);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to get table: " + tableName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-parsers/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/pom.xml b/metron-platform/metron-parsers/pom.xml
index b42c2ed..9efcd56 100644
--- a/metron-platform/metron-parsers/pom.xml
+++ b/metron-platform/metron-parsers/pom.xml
@@ -32,6 +32,16 @@
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-enrichment</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-pcap</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
             <version>${global_hadoop_version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index addeaab..b68bac5 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -20,9 +20,17 @@ package org.apache.metron.parsers.bolt;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.bolt.ConfiguredParserBolt;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
+import org.apache.metron.common.configuration.writer.SingleBatchConfigurationFacade;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.metron.common.writer.BulkWriterComponent;
+import org.apache.metron.common.writer.WriterToBulkWriter;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.parsers.filters.Filters;
 import org.apache.metron.common.configuration.FieldTransformer;
@@ -34,20 +42,46 @@ import org.apache.metron.parsers.interfaces.MessageParser;
 import org.apache.metron.common.interfaces.MessageWriter;
 import org.json.simple.JSONObject;
 
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 
-public class ParserBolt extends ConfiguredParserBolt {
+public class ParserBolt extends ConfiguredParserBolt implements Serializable {
 
   private OutputCollector collector;
   private MessageParser<JSONObject> parser;
-  private MessageFilter<JSONObject> filter;
-  private MessageWriter<JSONObject> writer;
+  private MessageFilter<JSONObject> filter = new GenericMessageFilter();
+  private transient Function<ParserConfigurations, WriterConfiguration> writerTransformer;
+  private BulkMessageWriter<JSONObject> messageWriter;
+  private BulkWriterComponent<JSONObject> writerComponent;
+  private boolean isBulk = false;
+  public ParserBolt( String zookeeperUrl
+                   , String sensorType
+                   , MessageParser<JSONObject> parser
+                   , MessageWriter<JSONObject> writer
+  )
+  {
+    super(zookeeperUrl, sensorType);
+    isBulk = false;
+    this.parser = parser;
+    messageWriter = new WriterToBulkWriter<>(writer);
+  }
 
-  public ParserBolt(String zookeeperUrl, String sensorType, MessageParser<JSONObject> parser, MessageWriter<JSONObject> writer) {
+  public ParserBolt( String zookeeperUrl
+                   , String sensorType
+                   , MessageParser<JSONObject> parser
+                   , BulkMessageWriter<JSONObject> writer
+  )
+  {
     super(zookeeperUrl, sensorType);
+    isBulk = true;
     this.parser = parser;
-    this.writer = writer;
+    messageWriter = writer;
+
+
   }
 
   public ParserBolt withMessageFilter(MessageFilter<JSONObject> filter) {
@@ -69,7 +103,24 @@ public class ParserBolt extends ConfiguredParserBolt {
       );
     }
     parser.init();
-    writer.init();
+
+    if(isBulk) {
+      writerTransformer = config -> new ParserWriterConfiguration(config);
+    }
+    else {
+      writerTransformer = config -> new SingleBatchConfigurationFacade(new ParserWriterConfiguration(config));
+    }
+    try {
+      messageWriter.init(stormConf, writerTransformer.apply(getConfigurations()));
+    } catch (Exception e) {
+      throw new IllegalStateException("Unable to initialize message writer", e);
+    }
+    this.writerComponent = new BulkWriterComponent<JSONObject>(collector, isBulk, isBulk) {
+      @Override
+      protected Collection<Tuple> createTupleCollection() {
+        return new HashSet<>();
+      }
+    };
     SensorParserConfig config = getSensorParserConfig();
     if(config != null) {
       config.init();
@@ -77,6 +128,7 @@ public class ParserBolt extends ConfiguredParserBolt {
     else {
       throw new IllegalStateException("Unable to retrieve a parser config for " + getSensorType());
     }
+    parser.configure(config.getParserConfig());
   }
 
 
@@ -86,23 +138,27 @@ public class ParserBolt extends ConfiguredParserBolt {
     byte[] originalMessage = tuple.getBinary(0);
     SensorParserConfig sensorParserConfig = getSensorParserConfig();
     try {
+      boolean ackTuple = true;
       if(sensorParserConfig != null) {
         List<JSONObject> messages = parser.parse(originalMessage);
         for (JSONObject message : messages) {
           if (parser.validate(message)) {
             if (filter != null && filter.emitTuple(message)) {
+              ackTuple = !isBulk;
               message.put(Constants.SENSOR_TYPE, getSensorType());
               for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) {
                 if (handler != null) {
                   handler.transformAndUpdate(message, sensorParserConfig.getParserConfig());
                 }
               }
-              writer.write(getSensorType(), configurations, tuple, message);
+              writerComponent.write(getSensorType(), tuple, message, messageWriter, writerTransformer.apply(getConfigurations()));
             }
           }
         }
       }
-      collector.ack(tuple);
+      if(ackTuple) {
+        collector.ack(tuple);
+      }
     } catch (Throwable ex) {
       ErrorUtils.handleError(collector, ex, Constants.ERROR_STREAM);
       collector.fail(tuple);
@@ -111,6 +167,6 @@ public class ParserBolt extends ConfiguredParserBolt {
 
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
+    declarer.declareStream(Constants.ERROR_STREAM, new Fields("message"));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/csv/CSVParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/csv/CSVParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/csv/CSVParser.java
new file mode 100644
index 0000000..a0097ed
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/csv/CSVParser.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.csv;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.metron.common.csv.CSVConverter;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.parsers.BasicParser;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class CSVParser extends BasicParser {
+  protected static final Logger LOG = LoggerFactory.getLogger(CSVParser.class);
+  public static final String TIMESTAMP_FORMAT_CONF = "timestampFormat";
+  private transient CSVConverter converter;
+  private SimpleDateFormat timestampFormat;
+  @Override
+  public void configure(Map<String, Object> parserConfig) {
+    converter = new CSVConverter();
+    converter.initialize(parserConfig);
+    Object tsFormatObj = parserConfig.get(TIMESTAMP_FORMAT_CONF);
+    if(tsFormatObj != null) {
+      timestampFormat = new SimpleDateFormat(tsFormatObj.toString());
+    }
+  }
+
+  @Override
+  public void init() {
+
+  }
+
+
+  @Override
+  public List<JSONObject> parse(byte[] rawMessage) {
+    try {
+      String msg = new String(rawMessage, "UTF-8");
+      Map<String, String> value = converter.toMap(msg);
+      if(value != null) {
+        value.put("original_string", msg);
+        Object timestampObj = value.get("timestamp");
+        Long timestamp = null;
+        if(timestampObj == null) {
+          timestamp = System.currentTimeMillis();
+        }
+        else {
+          if(timestampFormat == null) {
+            timestamp = ConversionUtils.convert(timestampObj, Long.class);
+          }
+          else {
+            try {
+              timestamp = timestampFormat.parse(timestampObj.toString()).getTime();
+            }
+            catch(Exception e) {
+              LOG.error("Unable to format " + timestampObj.toString());
+            }
+          }
+        }
+        JSONObject jsonVal = new JSONObject(value);
+        jsonVal.put("timestamp", timestamp);
+        return ImmutableList.of(jsonVal);
+      }
+      else {
+        return Collections.emptyList();
+      }
+    } catch (Exception e) {
+      LOG.error("Unable to parse " + new String(rawMessage), e);
+      return Collections.emptyList();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index 598de55..ad6068f 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -20,9 +20,14 @@ package org.apache.metron.parsers.topology;
 import backtype.storm.topology.TopologyBuilder;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.metron.common.interfaces.MessageWriter;
 import org.apache.metron.common.spout.kafka.SpoutConfig;
 import org.apache.metron.common.utils.ReflectionUtils;
+import org.apache.metron.common.writer.AbstractWriter;
 import org.apache.metron.parsers.bolt.ParserBolt;
 import org.apache.metron.parsers.interfaces.MessageParser;
 import org.apache.metron.parsers.writer.KafkaWriter;
@@ -45,7 +50,13 @@ public class ParserTopologyBuilder {
                                      ) throws Exception {
     CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl);
     client.start();
-    SensorParserConfig sensorParserConfig = ConfigurationsUtils.readSensorParserConfigFromZookeeper(sensorType, client);
+    ParserConfigurations configurations = new ParserConfigurations();
+    ConfigurationsUtils.updateParserConfigsFromZookeeper(configurations, client);
+    SensorParserConfig sensorParserConfig = configurations.getSensorParserConfig(sensorType);
+    if(sensorParserConfig == null) {
+      throw new IllegalStateException("Cannot find the parser configuration in zookeeper for " + sensorType + "." +
+              "  Please check that it exists in zookeeper by using the zk_load_configs.sh -m DUMP command.");
+    }
     client.close();
     String sensorTopic = sensorParserConfig.getSensorTopic() != null ? sensorParserConfig.getSensorTopic() : sensorType;
     TopologyBuilder builder = new TopologyBuilder();
@@ -56,11 +67,31 @@ public class ParserTopologyBuilder {
            .setNumTasks(parserNumTasks);
     MessageParser<JSONObject> parser = ReflectionUtils.createInstance(sensorParserConfig.getParserClassName());
     parser.configure(sensorParserConfig.getParserConfig());
-    KafkaWriter writer = new KafkaWriter(brokerUrl);
-    ParserBolt parserBolt = new ParserBolt(zookeeperUrl, sensorType, parser, writer);
+    ParserBolt parserBolt = null;
+    {
+      if(sensorParserConfig.getWriterClassName() == null) {
+        KafkaWriter writer = new KafkaWriter(brokerUrl);
+        writer.configure(sensorType, new ParserWriterConfiguration(configurations));
+        parserBolt = new ParserBolt(zookeeperUrl, sensorType, parser, writer);
+      }
+      else {
+        AbstractWriter writer = ReflectionUtils.createInstance(sensorParserConfig.getWriterClassName());
+        writer.configure(sensorType, new ParserWriterConfiguration(configurations));
+        if(writer instanceof BulkMessageWriter) {
+          parserBolt = new ParserBolt(zookeeperUrl, sensorType, parser, (BulkMessageWriter<JSONObject>)writer);
+        }
+        else if(writer instanceof MessageWriter) {
+          parserBolt = new ParserBolt(zookeeperUrl, sensorType, parser, (MessageWriter<JSONObject>)writer);
+        }
+        else {
+          throw new IllegalStateException("Unable to create parser bolt: writer must be a MessageWriter or a BulkMessageWriter");
+        }
+      }
+    }
     builder.setBolt("parserBolt", parserBolt, parserParallelism)
            .setNumTasks(spoutNumTasks)
            .shuffleGrouping("kafkaSpout");
     return builder;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
index d168e29..6262dc1 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
@@ -22,21 +22,50 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.interfaces.MessageWriter;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.common.writer.AbstractWriter;
 import org.json.simple.JSONObject;
 
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 
-public class KafkaWriter implements MessageWriter<JSONObject>, Serializable {
-
+public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObject>, Serializable {
+  public enum Configurations {
+     BROKER("kafka.brokerUrl")
+    ,KEY_SERIALIZER("kafka.keySerializer")
+    ,VALUE_SERIALIZER("kafka.valueSerializer")
+    ,REQUIRED_ACKS("kafka.requiredAcks")
+    ,TOPIC("kafka.topic");
+    ;
+    String key;
+    Configurations(String key) {
+      this.key = key;
+    }
+    public Object get(Map<String, Object> config) {
+      return config.get(key);
+    }
+    public <T> T getAndConvert(Map<String, Object> config, Class<T> clazz) {
+      Object o = get(config);
+      if(o != null) {
+        return ConversionUtils.convert(o, clazz);
+      }
+      return null;
+    }
+  }
   private String brokerUrl;
   private String keySerializer = "org.apache.kafka.common.serialization.StringSerializer";
   private String valueSerializer = "org.apache.kafka.common.serialization.StringSerializer";
   private int requiredAcks = 1;
+  private String kafkaTopic = Constants.ENRICHMENT_TOPIC;
   private KafkaProducer kafkaProducer;
 
+  public KafkaWriter() {}
+
+
+
   public KafkaWriter(String brokerUrl) {
     this.brokerUrl = brokerUrl;
   }
@@ -51,11 +80,40 @@ public class KafkaWriter implements MessageWriter<JSONObject>, Serializable {
     return this;
   }
 
-  public KafkaWriter withRequiredAcks(int requiredAcks) {
+  public KafkaWriter withRequiredAcks(Integer requiredAcks) {
     this.requiredAcks = requiredAcks;
     return this;
   }
 
+  public KafkaWriter withTopic(String topic) {
+    this.kafkaTopic= topic;
+    return this;
+  }
+  @Override
+  public void configure(String sensorName, WriterConfiguration configuration) {
+    Map<String, Object> configMap = configuration.getSensorConfig(sensorName);
+    String brokerUrl = Configurations.BROKER.getAndConvert(configMap, String.class);
+    if(brokerUrl != null) {
+      this.brokerUrl = brokerUrl;
+    }
+    String keySerializer = Configurations.KEY_SERIALIZER.getAndConvert(configMap, String.class);
+    if(keySerializer != null) {
+      withKeySerializer(keySerializer);
+    }
+    String valueSerializer = Configurations.VALUE_SERIALIZER.getAndConvert(configMap, String.class);
+    if(valueSerializer != null) {
+      withValueSerializer(keySerializer);
+    }
+    Integer requiredAcks = Configurations.REQUIRED_ACKS.getAndConvert(configMap, Integer.class);
+    if(requiredAcks!= null) {
+      withRequiredAcks(requiredAcks);
+    }
+    String topic = Configurations.TOPIC.getAndConvert(configMap, String.class);
+    if(topic != null) {
+      withTopic(topic);
+    }
+  }
+
   @Override
   public void init() {
     Map<String, Object> producerConfig = new HashMap<>();
@@ -68,8 +126,8 @@ public class KafkaWriter implements MessageWriter<JSONObject>, Serializable {
 
   @SuppressWarnings("unchecked")
   @Override
-  public void write(String sourceType, Configurations configurations, Tuple tuple, JSONObject message) throws Exception {
-    kafkaProducer.send(new ProducerRecord<String, String>(Constants.ENRICHMENT_TOPIC, message.toJSONString()));
+  public void write(String sourceType, WriterConfiguration configurations, Tuple tuple, JSONObject message) throws Exception {
+    kafkaProducer.send(new ProducerRecord<String, String>(kafkaTopic, message.toJSONString()));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 658d52e..73aad23 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -17,6 +17,16 @@
  */
 package org.apache.metron.parsers.bolt;
 
+import org.apache.metron.common.configuration.SensorParserConfig;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.configuration.ParserConfigurations;
@@ -28,6 +38,7 @@ import org.apache.metron.parsers.interfaces.MessageParser;
 import org.apache.metron.common.interfaces.MessageWriter;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mock;
 
@@ -35,6 +46,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
@@ -52,18 +64,42 @@ public class ParserBoltTest extends BaseBoltTest {
   private MessageWriter<JSONObject> writer;
 
   @Mock
+  private BulkMessageWriter<JSONObject> batchWriter;
+
+  @Mock
   private MessageFilter<JSONObject> filter;
 
+  @Mock
+  private Tuple t1;
+
+  @Mock
+  private Tuple t2;
+
+  @Mock
+  private Tuple t3;
+
+  @Mock
+  private Tuple t4;
+
+  @Mock
+  private Tuple t5;
+
   @Test
   public void test() throws Exception {
     String sensorType = "yaf";
     ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, writer) {
       @Override
-      public ParserConfigurations getConfigurations() {
+      protected ParserConfigurations defaultConfigurations() {
         return new ParserConfigurations() {
           @Override
           public SensorParserConfig getSensorParserConfig(String sensorType) {
-            return new SensorParserConfig();
+            return new SensorParserConfig() {
+              @Override
+              public Map<String, Object> getParserConfig() {
+                return new HashMap<String, Object>() {{
+                }};
+              }
+            };
           }
         };
       }
@@ -88,7 +124,7 @@ public class ParserBoltTest extends BaseBoltTest {
     when(parser.validate(eq(messages.get(0)))).thenReturn(true);
     when(parser.validate(eq(messages.get(1)))).thenReturn(false);
     parserBolt.execute(tuple);
-    verify(writer, times(1)).write(eq(sensorType), any(Configurations.class), eq(tuple), eq(finalMessage1));
+    verify(writer, times(1)).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), eq(finalMessage1));
     verify(outputCollector, times(1)).ack(tuple);
     when(parser.validate(eq(messages.get(0)))).thenReturn(true);
     when(parser.validate(eq(messages.get(1)))).thenReturn(true);
@@ -96,12 +132,46 @@ public class ParserBoltTest extends BaseBoltTest {
     when(filter.emitTuple(messages.get(1))).thenReturn(true);
     parserBolt.withMessageFilter(filter);
     parserBolt.execute(tuple);
-    verify(writer, times(1)).write(eq(sensorType), any(Configurations.class), eq(tuple), eq(finalMessage2));
+    verify(writer, times(1)).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), eq(finalMessage2));
     verify(outputCollector, times(2)).ack(tuple);
-    doThrow(new Exception()).when(writer).write(eq(sensorType), any(Configurations.class), eq(tuple), eq(finalMessage2));
+    doThrow(new Exception()).when(writer).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), eq(finalMessage2));
     parserBolt.execute(tuple);
     verify(outputCollector, times(1)).reportError(any(Throwable.class));
   }
+@Test
+public void testImplicitBatchOfOne() throws Exception {
+
+  String sensorType = "yaf";
+
+  ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, batchWriter) {
+    @Override
+    protected ParserConfigurations defaultConfigurations() {
+      return new ParserConfigurations() {
+        @Override
+        public SensorParserConfig getSensorParserConfig(String sensorType) {
+          return new SensorParserConfig() {
+            @Override
+            public Map<String, Object> getParserConfig() {
+              return new HashMap<String, Object>() {{
+              }};
+            }
+          };
+        }
+      };
+    }
+  };
+  parserBolt.setCuratorFramework(client);
+  parserBolt.setTreeCache(cache);
+  parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
+  verify(parser, times(1)).init();
+  verify(batchWriter, times(1)).init(any(), any());
+  when(parser.validate(any())).thenReturn(true);
+  when(parser.parse(any())).thenReturn(ImmutableList.of(new JSONObject()));
+  when(filter.emitTuple(any())).thenReturn(true);
+  parserBolt.withMessageFilter(filter);
+  parserBolt.execute(t1);
+  verify(outputCollector, times(1)).ack(t1);
+}
 
   /**
    {
@@ -116,7 +186,7 @@ public class ParserBoltTest extends BaseBoltTest {
   @Test
   public void testFilter() throws Exception {
     String sensorType = "yaf";
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, writer) {
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, batchWriter) {
       @Override
       protected SensorParserConfig getSensorParserConfig() {
         try {
@@ -130,6 +200,150 @@ public class ParserBoltTest extends BaseBoltTest {
     parserBolt.setTreeCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(parser, times(1)).init();
+    verify(batchWriter, times(1)).init(any(), any());
+    when(parser.validate(any())).thenReturn(true);
+    when(parser.parse(any())).thenReturn(ImmutableList.of(new JSONObject()));
+    parserBolt.withMessageFilter(filter);
+    parserBolt.execute(t1);
+    verify(outputCollector, times(1)).ack(t1);
+  }
+  @Test
+  public void testBatchOfOne() throws Exception {
+
+    String sensorType = "yaf";
+
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, batchWriter) {
+      @Override
+      protected ParserConfigurations defaultConfigurations() {
+        return new ParserConfigurations() {
+          @Override
+          public SensorParserConfig getSensorParserConfig(String sensorType) {
+            return new SensorParserConfig() {
+              @Override
+              public Map<String, Object> getParserConfig() {
+                return new HashMap<String, Object>() {{
+                  put(ParserWriterConfiguration.BATCH_CONF, "1");
+                }};
+              }
+            };
+          }
+        };
+      }
+    };
+    parserBolt.setCuratorFramework(client);
+    parserBolt.setTreeCache(cache);
+    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
+    verify(parser, times(1)).init();
+    verify(batchWriter, times(1)).init(any(), any());
+    when(parser.validate(any())).thenReturn(true);
+    when(parser.parse(any())).thenReturn(ImmutableList.of(new JSONObject()));
+    when(filter.emitTuple(any())).thenReturn(true);
+    parserBolt.withMessageFilter(filter);
+    parserBolt.execute(t1);
+    verify(outputCollector, times(1)).ack(t1);
+  }
+  @Test
+  public void testBatchOfFive() throws Exception {
+
+    String sensorType = "yaf";
+
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, batchWriter) {
+      @Override
+      protected ParserConfigurations defaultConfigurations() {
+        return new ParserConfigurations() {
+          @Override
+          public SensorParserConfig getSensorParserConfig(String sensorType) {
+            return new SensorParserConfig() {
+              @Override
+              public Map<String, Object> getParserConfig() {
+                return new HashMap<String, Object>() {{
+                  put(ParserWriterConfiguration.BATCH_CONF, 5);
+                }};
+              }
+            };
+          }
+        };
+      }
+    };
+    parserBolt.setCuratorFramework(client);
+    parserBolt.setTreeCache(cache);
+    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
+    verify(parser, times(1)).init();
+    verify(batchWriter, times(1)).init(any(), any());
+    when(parser.validate(any())).thenReturn(true);
+    when(parser.parse(any())).thenReturn(ImmutableList.of(new JSONObject()));
+    when(filter.emitTuple(any())).thenReturn(true);
+    parserBolt.withMessageFilter(filter);
+    writeNonBatch(outputCollector, parserBolt, t1);
+    writeNonBatch(outputCollector, parserBolt, t2);
+    writeNonBatch(outputCollector, parserBolt, t3);
+    writeNonBatch(outputCollector, parserBolt, t4);
+    parserBolt.execute(t5);
+    verify(outputCollector, times(1)).ack(t1);
+    verify(outputCollector, times(1)).ack(t2);
+    verify(outputCollector, times(1)).ack(t3);
+    verify(outputCollector, times(1)).ack(t4);
+    verify(outputCollector, times(1)).ack(t5);
+
+
+  }
+  @Test
+  public void testBatchOfFiveWithError() throws Exception {
+
+    String sensorType = "yaf";
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, batchWriter) {
+      @Override
+      protected ParserConfigurations defaultConfigurations() {
+        return new ParserConfigurations() {
+          @Override
+          public SensorParserConfig getSensorParserConfig(String sensorType) {
+            return new SensorParserConfig() {
+              @Override
+              public Map<String, Object> getParserConfig() {
+                return new HashMap<String, Object>() {{
+                  put(ParserWriterConfiguration.BATCH_CONF, 5);
+                }};
+              }
+            };
+          }
+        };
+      }
+    };
+    parserBolt.setCuratorFramework(client);
+    parserBolt.setTreeCache(cache);
+    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
+    verify(parser, times(1)).init();
+    verify(batchWriter, times(1)).init(any(), any());
+
+    doThrow(new Exception()).when(batchWriter).write(any(), any(), any(), any());
+    when(parser.validate(any())).thenReturn(true);
+    when(parser.parse(any())).thenReturn(ImmutableList.of(new JSONObject()));
+    when(filter.emitTuple(any())).thenReturn(true);
+    parserBolt.withMessageFilter(filter);
+    writeNonBatch(outputCollector, parserBolt, t1);
+    writeNonBatch(outputCollector, parserBolt, t2);
+    writeNonBatch(outputCollector, parserBolt, t3);
+    writeNonBatch(outputCollector, parserBolt, t4);
+    parserBolt.execute(t5);
+    verify(outputCollector, times(0)).ack(t1);
+    verify(outputCollector, times(1)).fail(t1);
+    verify(outputCollector, times(0)).ack(t2);
+    verify(outputCollector, times(1)).fail(t2);
+    verify(outputCollector, times(0)).ack(t3);
+    verify(outputCollector, times(1)).fail(t3);
+    verify(outputCollector, times(0)).ack(t4);
+    verify(outputCollector, times(1)).fail(t4);
+    verify(outputCollector, times(0)).ack(t5);
+    verify(outputCollector, times(1)).fail(t5);
+
+
+  }
+  private static void writeNonBatch(OutputCollector collector, ParserBolt bolt, Tuple t) {
+    bolt.execute(t);
+    verify(collector, times(0)).ack(t);
+  }
+
+/*=======
     verify(writer, times(1)).init();
     byte[] sampleBinary = "some binary message".getBytes();
     JSONParser jsonParser = new JSONParser();
@@ -147,4 +361,5 @@ public class ParserBoltTest extends BaseBoltTest {
     verify(writer, times(1)).write(eq(sensorType), any(Configurations.class), eq(tuple), eq(finalMessage1));
     verify(outputCollector, times(1)).ack(tuple);
   }
+>>>>>>> master*/
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/csv/CSVParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/csv/CSVParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/csv/CSVParserTest.java
new file mode 100644
index 0000000..5f314fa
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/csv/CSVParserTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.csv;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+public class CSVParserTest {
+  /**
+   {
+    "parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
+   ,"sensorTopic":"dummy"
+   ,"parserConfig":
+   {
+    "columns" : {
+                "col1" : 0
+               ,"col2" : 1
+               ,"col3" : 2
+                 }
+   }
+   }
+   */
+  @Multiline
+  public static String parserConfig;
+
+  @Test
+  public void test() throws IOException {
+    CSVParser parser = new CSVParser();
+
+    SensorParserConfig config = JSONUtils.INSTANCE.load(parserConfig, SensorParserConfig.class);
+    parser.init();
+    parser.configure(config.getParserConfig());
+    {
+      String line = "#foo,bar,grok";
+      Assert.assertEquals(0, parser.parse(Bytes.toBytes(line)).size());
+    }
+    {
+      String line = "";
+      Assert.assertEquals(0, parser.parse(Bytes.toBytes(line)).size());
+    }
+    {
+      String line = "foo,bar,grok";
+      List<JSONObject> results = parser.parse(Bytes.toBytes(line));
+      Assert.assertEquals(1, results.size());
+      JSONObject o = results.get(0);
+      Assert.assertTrue(parser.validate(o));
+      Assert.assertEquals(5, o.size());
+      Assert.assertEquals("foo", o.get("col1"));
+      Assert.assertEquals("bar", o.get("col2"));
+      Assert.assertEquals("grok", o.get("col3"));
+    }
+    {
+      String line = "\"foo\", \"bar\",\"grok\"";
+      List<JSONObject> results = parser.parse(Bytes.toBytes(line));
+      Assert.assertEquals(1, results.size());
+      JSONObject o = results.get(0);
+      Assert.assertTrue(parser.validate(o));
+      Assert.assertEquals(5, o.size());
+      Assert.assertEquals("foo", o.get("col1"));
+      Assert.assertEquals("bar", o.get("col2"));
+      Assert.assertEquals("grok", o.get("col3"));
+    }
+    {
+      String line = "foo, bar, grok";
+      List<JSONObject> results = parser.parse(Bytes.toBytes(line));
+      Assert.assertEquals(1, results.size());
+      JSONObject o = results.get(0);
+      Assert.assertTrue(parser.validate(o));
+      Assert.assertEquals(5, o.size());
+      Assert.assertEquals("foo", o.get("col1"));
+      Assert.assertEquals(" bar", o.get("col2"));
+      Assert.assertEquals(" grok", o.get("col3"));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
new file mode 100644
index 0000000..cea635f
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writers;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.integration.mock.MockTableProvider;
+import org.apache.metron.test.mock.MockHTable;
+import org.apache.metron.writer.hbase.SimpleHbaseEnrichmentWriter;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SimpleHBaseEnrichmentWriterTest {
+  private static final String SENSOR_TYPE= "dummy";
+  private static final String TABLE_NAME= SENSOR_TYPE;
+  private static final String TABLE_CF= "cf";
+  private static final String ENRICHMENT_TYPE = "et";
+  private static final Map<String, Object> BASE_WRITER_CONFIG = new HashMap<String, Object>() {{
+    put(SimpleHbaseEnrichmentWriter.Configurations.HBASE_TABLE.getKey(), TABLE_NAME);
+    put(SimpleHbaseEnrichmentWriter.Configurations.HBASE_CF.getKey(), TABLE_CF);
+    put(SimpleHbaseEnrichmentWriter.Configurations.ENRICHMENT_TYPE.getKey(), ENRICHMENT_TYPE);
+    put(SimpleHbaseEnrichmentWriter.Configurations.HBASE_PROVIDER.getKey(), MockTableProvider.class.getName());
+  }};
+  @Before
+  public void setupMockTable() {
+    MockTableProvider.addTable(TABLE_NAME, TABLE_CF);
+  }
+  @Test
+  public void testBatchOneNormalPath() throws Exception {
+    final String sensorType = "dummy";
+    SimpleHbaseEnrichmentWriter writer = new SimpleHbaseEnrichmentWriter();
+
+    WriterConfiguration configuration = createConfig(1,
+            new HashMap<String, Object>(BASE_WRITER_CONFIG) {{
+              put(SimpleHbaseEnrichmentWriter.Configurations.KEY_COLUMNS.getKey(), "ip");
+            }}
+    );
+    writer.configure(sensorType,configuration);
+
+    writer.write( SENSOR_TYPE
+            , configuration
+            , null
+            , new ArrayList<JSONObject>() {{
+              add(new JSONObject(ImmutableMap.of("ip", "localhost", "user", "cstella", "foo", "bar")));
+            }}
+    );
+    List<LookupKV<EnrichmentKey, EnrichmentValue>> values = getValues();
+    Assert.assertEquals(1, values.size());
+    Assert.assertEquals("localhost", values.get(0).getKey().indicator);
+    Assert.assertEquals("cstella", values.get(0).getValue().getMetadata().get("user"));
+    Assert.assertEquals("bar", values.get(0).getValue().getMetadata().get("foo"));
+    Assert.assertEquals(2, values.get(0).getValue().getMetadata().size());
+  }
+
+  @Test
+  public void testFilteredKey() throws Exception {
+    final String sensorType = "dummy";
+    SimpleHbaseEnrichmentWriter writer = new SimpleHbaseEnrichmentWriter();
+
+    WriterConfiguration configuration = createConfig(1,
+            new HashMap<String, Object>(BASE_WRITER_CONFIG) {{
+              put(SimpleHbaseEnrichmentWriter.Configurations.KEY_COLUMNS.getKey(), "ip");
+              put(SimpleHbaseEnrichmentWriter.Configurations.VALUE_COLUMNS.getKey(), "user");
+            }}
+    );
+    writer.configure(sensorType,configuration);
+
+    writer.write( SENSOR_TYPE
+            , configuration
+            , null
+            , new ArrayList<JSONObject>() {{
+              add(new JSONObject(ImmutableMap.of("ip", "localhost", "user", "cstella", "foo", "bar")));
+            }}
+    );
+    List<LookupKV<EnrichmentKey, EnrichmentValue>> values = getValues();
+    Assert.assertEquals(1, values.size());
+    Assert.assertEquals("localhost", values.get(0).getKey().indicator);
+    Assert.assertEquals("cstella", values.get(0).getValue().getMetadata().get("user"));
+    Assert.assertNull(values.get(0).getValue().getMetadata().get("foo"));
+    Assert.assertEquals(1, values.get(0).getValue().getMetadata().size());
+  }
+
+  @Test
+  public void testFilteredKeys() throws Exception {
+    final String sensorType = "dummy";
+    SimpleHbaseEnrichmentWriter writer = new SimpleHbaseEnrichmentWriter();
+
+    WriterConfiguration configuration = createConfig(1,
+            new HashMap<String, Object>(BASE_WRITER_CONFIG) {{
+              put(SimpleHbaseEnrichmentWriter.Configurations.KEY_COLUMNS.getKey(), "ip");
+              put(SimpleHbaseEnrichmentWriter.Configurations.VALUE_COLUMNS.getKey(), ImmutableList.of("user", "ip"));
+            }}
+    );
+    writer.configure(sensorType,configuration);
+
+    writer.write( SENSOR_TYPE
+            , configuration
+            , null
+            , new ArrayList<JSONObject>() {{
+              add(new JSONObject(ImmutableMap.of("ip", "localhost", "user", "cstella", "foo", "bar")));
+            }}
+    );
+    List<LookupKV<EnrichmentKey, EnrichmentValue>> values = getValues();
+    Assert.assertEquals(1, values.size());
+    Assert.assertEquals("localhost", values.get(0).getKey().indicator);
+    Assert.assertEquals("cstella", values.get(0).getValue().getMetadata().get("user"));
+    Assert.assertEquals("localhost", values.get(0).getValue().getMetadata().get("ip"));
+    Assert.assertNull(values.get(0).getValue().getMetadata().get("foo"));
+    Assert.assertEquals(2, values.get(0).getValue().getMetadata().size());
+  }
+  public static List<LookupKV<EnrichmentKey, EnrichmentValue>> getValues() throws IOException {
+    MockHTable table = MockTableProvider.getTable(TABLE_NAME);
+    Assert.assertNotNull(table);
+    List<LookupKV<EnrichmentKey, EnrichmentValue>> ret = new ArrayList<>();
+    EnrichmentConverter converter = new EnrichmentConverter();
+    for(Result r : table.getScanner(Bytes.toBytes(TABLE_CF))) {
+      ret.add(converter.fromResult(r, TABLE_CF));
+    }
+    return ret;
+  }
+  public static WriterConfiguration createConfig(final int batchSize, final Map<String, Object> sensorConfig)
+  {
+    return new WriterConfiguration() {
+      @Override
+      public int getBatchSize(String sensorName) {
+        return batchSize;
+      }
+
+      @Override
+      public String getIndex(String sensorName) {
+        return SENSOR_TYPE;
+      }
+
+      @Override
+      public Map<String, Object> getSensorConfig(String sensorName) {
+        return sensorConfig;
+
+      }
+
+      @Override
+      public Map<String, Object> getGlobalConfig() {
+        return null;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
new file mode 100644
index 0000000..f6436b9
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writers.integration;
+
+import com.google.common.collect.ImmutableList;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.TestConstants;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.integration.*;
+import org.apache.metron.integration.components.ConfigUploadComponent;
+import org.apache.metron.integration.components.KafkaWithZKComponent;
+import org.apache.metron.integration.mock.MockTableProvider;
+import org.apache.metron.integration.utils.TestUtils;
+import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
+import org.apache.metron.test.TestDataType;
+import org.apache.metron.test.mock.MockHTable;
+import org.apache.metron.test.utils.SampleDataUtils;
+import org.apache.metron.test.utils.UnitTestHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.*;
+
+public class SimpleHbaseEnrichmentWriterIntegrationTest extends BaseIntegrationTest {
+
+  /**
+   {
+    "parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
+   ,"writerClassName" : "org.apache.metron.writer.hbase.SimpleHbaseEnrichmentWriter"
+   ,"sensorTopic":"dummy"
+   ,"parserConfig":
+   {
+     "shew.table" : "dummy"
+    ,"shew.cf" : "cf"
+    ,"shew.keyColumns" : "col2"
+    ,"shew.enrichmentType" : "et"
+    ,"shew.hbaseProvider" : "org.apache.metron.integration.mock.MockTableProvider"
+    ,"columns" : {
+                "col1" : 0
+               ,"col2" : 1
+               ,"col3" : 2
+                 }
+   }
+   }
+   */
+  @Multiline
+  public static String parserConfig;
+
+  @Test
+  public void test() throws UnableToStartException, IOException {
+    final String sensorType = "dummy";
+    final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
+      add(Bytes.toBytes("col11,col12,col13"));
+      add(Bytes.toBytes("col21,col22,col23"));
+      add(Bytes.toBytes("col31,col32,col33"));
+    }};
+    MockTableProvider.addTable(sensorType, "cf");
+    final Properties topologyProperties = new Properties();
+    final KafkaWithZKComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaWithZKComponent.Topic>() {{
+      add(new KafkaWithZKComponent.Topic(sensorType, 1));
+    }});
+    topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
+
+    ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
+            .withTopologyProperties(topologyProperties)
+            .withGlobalConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
+            .withParserSensorConfig(sensorType, JSONUtils.INSTANCE.load(parserConfig, SensorParserConfig.class));
+
+    ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder()
+            .withSensorType(sensorType)
+            .withTopologyProperties(topologyProperties)
+            .withBrokerUrl(kafkaComponent.getBrokerList()).build();
+
+    UnitTestHelper.verboseLogging();
+    ComponentRunner runner = new ComponentRunner.Builder()
+            .withComponent("kafka", kafkaComponent)
+            .withComponent("config", configUploadComponent)
+            .withComponent("storm", parserTopologyComponent)
+            .withMillisecondsBetweenAttempts(5000)
+            .withNumRetries(10)
+            .build();
+    try {
+      runner.start();
+      kafkaComponent.writeMessages(sensorType, inputMessages);
+      List<LookupKV<EnrichmentKey, EnrichmentValue>> outputMessages =
+              runner.process(new Processor<List<LookupKV<EnrichmentKey, EnrichmentValue>>>() {
+                List<LookupKV<EnrichmentKey, EnrichmentValue>> messages = null;
+
+                public ReadinessState process(ComponentRunner runner) {
+                  MockHTable table = MockTableProvider.getTable(sensorType);
+                  if (table != null && table.size() == inputMessages.size()) {
+                    EnrichmentConverter converter = new EnrichmentConverter();
+                    messages = new ArrayList<>();
+                    try {
+                      for (Result r : table.getScanner(Bytes.toBytes("cf"))) {
+                        messages.add(converter.fromResult(r, "cf"));
+                      }
+                    } catch (IOException e) {
+                    }
+                    return ReadinessState.READY;
+                  }
+                  return ReadinessState.NOT_READY;
+                }
+
+                public List<LookupKV<EnrichmentKey, EnrichmentValue>> getResult() {
+                  return messages;
+                }
+              });
+      Set<String> validIndicators = new HashSet<>(ImmutableList.of("col12", "col22", "col32"));
+      Map<String, Map<String, String>> validMetadata = new HashMap<String, Map<String, String>>() {{
+        put("col12", new HashMap<String, String>() {{
+          put("col1", "col11");
+          put("col3", "col13");
+        }});
+        put("col22", new HashMap<String, String>() {{
+          put("col1", "col21");
+          put("col3", "col23");
+        }});
+        put("col32", new HashMap<String, String>() {{
+          put("col1", "col31");
+          put("col3", "col33");
+        }});
+      }};
+      for (LookupKV<EnrichmentKey, EnrichmentValue> kv : outputMessages) {
+        Assert.assertTrue(validIndicators.contains(kv.getKey().indicator));
+        Assert.assertEquals(kv.getValue().getMetadata().get("source.type"), "dummy");
+        Assert.assertNotNull(kv.getValue().getMetadata().get("timestamp"));
+        Assert.assertNotNull(kv.getValue().getMetadata().get("original_string"));
+        Map<String, String> metadata = validMetadata.get(kv.getKey().indicator);
+        for (Map.Entry<String, String> x : metadata.entrySet()) {
+          Assert.assertEquals(kv.getValue().getMetadata().get(x.getKey()), x.getValue());
+        }
+        Assert.assertEquals(metadata.size() + 3, kv.getValue().getMetadata().size());
+      }
+    }
+    finally {
+      if(runner != null) {
+        runner.stop();
+      }
+    }
+  }
+}


[3/3] incubator-metron git commit: METRON-174 Storm consumption of hbase enrichment reference data. This closes apache/incubator-metron#127

Posted by ce...@apache.org.
METRON-174 Storm consumption of hbase enrichment reference data.  This closes apache/incubator-metron#127


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/d3efe3fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/d3efe3fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/d3efe3fb

Branch: refs/heads/master
Commit: d3efe3fb40da5707ac3b4d1059a6344176afd84b
Parents: ab8163b
Author: cstella <ce...@gmail.com>
Authored: Tue May 31 17:00:14 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Tue May 31 17:00:14 2016 -0400

----------------------------------------------------------------------
 metron-platform/metron-common/pom.xml           |    5 +
 .../metron/common/bolt/ConfiguredBolt.java      |    9 +-
 .../common/bolt/ConfiguredEnrichmentBolt.java   |   15 +-
 .../common/bolt/ConfiguredParserBolt.java       |   14 +-
 .../configuration/SensorParserConfig.java       |   17 +
 .../enrichment/SensorEnrichmentConfig.java      |   17 +-
 .../writer/EnrichmentWriterConfiguration.java   |   50 +
 .../writer/ParserWriterConfiguration.java       |   54 +
 .../writer/SingleBatchConfigurationFacade.java  |   48 +
 .../writer/WriterConfiguration.java             |   29 +
 .../apache/metron/common/csv/CSVConverter.java  |  124 ++
 .../common/interfaces/BulkMessageWriter.java    |   13 +-
 .../metron/common/interfaces/MessageWriter.java |    7 +-
 .../metron/common/utils/ConversionUtils.java    |   40 +
 .../metron/common/writer/AbstractWriter.java    |   26 +
 .../common/writer/BulkWriterComponent.java      |  115 ++
 .../common/writer/WriterToBulkWriter.java       |   54 +
 .../bolt/ConfiguredEnrichmentBoltTest.java      |    8 +-
 .../common/bolt/ConfiguredParserBoltTest.java   |    8 +-
 .../common/utils/ConversionUtilsTest.java       |   32 +
 .../dataloads/extractor/csv/CSVExtractor.java   |   83 +-
 .../extractor/csv/LookupConverter.java          |    2 +-
 .../extractor/csv/LookupConverters.java         |    2 +-
 .../extractor/stix/types/AddressHandler.java    |    2 +-
 .../extractor/stix/types/DomainHandler.java     |    2 +-
 .../extractor/stix/types/HostnameHandler.java   |    2 +-
 .../dataloads/extractor/ExtractorTest.java      |    2 +-
 .../hbase/HBaseEnrichmentConverterTest.java     |    2 +-
 .../LeastRecentlyUsedPrunerIntegrationTest.java |    6 +-
 .../writer/ElasticsearchWriter.java             |   10 +-
 .../simplehbase/SimpleHBaseAdapter.java         |    2 +-
 .../enrichment/bolt/BulkMessageWriterBolt.java  |   49 +-
 .../enrichment/bolt/EnrichmentJoinBolt.java     |    2 +-
 .../enrichment/bolt/EnrichmentSplitterBolt.java |    2 +-
 .../enrichment/bolt/GenericEnrichmentBolt.java  |    2 +-
 .../enrichment/bolt/ThreatIntelJoinBolt.java    |    4 +-
 .../bolt/ThreatIntelSplitterBolt.java           |    2 +-
 .../enrichment/converter/EnrichmentValue.java   |   12 +-
 .../hbase/SimpleHbaseEnrichmentWriter.java      |  282 ++++
 .../apache/metron/writer/hdfs/HdfsWriter.java   |    8 +-
 .../simplehbase/SimpleHBaseAdapterTest.java     |    4 +-
 .../threatintel/ThreatIntelAdapterTest.java     |    2 +-
 .../bolt/BulkMessageWriterBoltTest.java         |   11 +-
 .../converter/EnrichmentConverterTest.java      |    2 +-
 .../apache/metron/hbase/writer/HBaseWriter.java |    3 +-
 .../integration/EnrichmentIntegrationTest.java  |    6 +-
 .../components/ConfigUploadComponent.java       |   22 +-
 .../components/KafkaWithZKComponent.java        |    1 +
 .../integration/mock/MockTableProvider.java     |   45 +
 metron-platform/metron-parsers/pom.xml          |   10 +
 .../apache/metron/parsers/bolt/ParserBolt.java  |   74 +-
 .../apache/metron/parsers/csv/CSVParser.java    |   92 ++
 .../parsers/topology/ParserTopologyBuilder.java |   37 +-
 .../metron/parsers/writer/KafkaWriter.java      |   68 +-
 .../metron/parsers/bolt/ParserBoltTest.java     |  227 +++-
 .../metron/parsers/csv/CSVParserTest.java       |   99 ++
 .../SimpleHBaseEnrichmentWriterTest.java        |  178 +++
 ...pleHbaseEnrichmentWriterIntegrationTest.java |  169 +++
 .../apache/metron/solr/writer/SolrWriter.java   |    7 +-
 .../metron/solr/writer/SolrWriterTest.java      |   11 +-
 .../org/apache/metron/test/mock/MockHTable.java | 1224 +++++++++---------
 61 files changed, 2626 insertions(+), 829 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml
index 172d387..10c192c 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -210,6 +210,11 @@
             <version>0.1BETA</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>commons-beanutils</groupId>
+            <artifactId>commons-beanutils</artifactId>
+            <version>1.8.3</version>
+        </dependency>
     </dependencies>
 
     <reporting>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
index 2d5e241..8c2ac14 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
@@ -30,11 +30,12 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.log4j.Logger;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.Configurations;
 
 import java.io.IOException;
 import java.util.Map;
 
-public abstract class ConfiguredBolt extends BaseRichBolt {
+public abstract class ConfiguredBolt<CONFIG_T extends Configurations> extends BaseRichBolt {
 
   private static final Logger LOG = Logger.getLogger(ConfiguredBolt.class);
 
@@ -42,7 +43,7 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
 
   protected CuratorFramework client;
   protected TreeCache cache;
-
+  private final CONFIG_T configurations = defaultConfigurations();
   public ConfiguredBolt(String zookeeperUrl) {
     this.zookeeperUrl = zookeeperUrl;
   }
@@ -57,6 +58,10 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
 
   public void reloadCallback(String name, ConfigurationType type) {
   }
+  public CONFIG_T getConfigurations() {
+    return configurations;
+  }
+  protected abstract CONFIG_T defaultConfigurations();
 
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
index e03e793..6fed7d4 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
@@ -25,24 +25,23 @@ import org.apache.metron.common.configuration.EnrichmentConfigurations;
 
 import java.io.IOException;
 
-public abstract class ConfiguredEnrichmentBolt extends ConfiguredBolt {
+public abstract class ConfiguredEnrichmentBolt extends ConfiguredBolt<EnrichmentConfigurations> {
 
   private static final Logger LOG = Logger.getLogger(ConfiguredEnrichmentBolt.class);
 
-  protected final EnrichmentConfigurations configurations = new EnrichmentConfigurations();
-
   public ConfiguredEnrichmentBolt(String zookeeperUrl) {
     super(zookeeperUrl);
   }
 
-  public EnrichmentConfigurations getConfigurations() {
-    return configurations;
+  @Override
+  protected EnrichmentConfigurations defaultConfigurations() {
+    return new EnrichmentConfigurations();
   }
 
   @Override
   public void loadConfig() {
     try {
-      ConfigurationsUtils.updateEnrichmentConfigsFromZookeeper(configurations, client);
+      ConfigurationsUtils.updateEnrichmentConfigsFromZookeeper(getConfigurations(), client);
     } catch (Exception e) {
       LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
     }
@@ -53,10 +52,10 @@ public abstract class ConfiguredEnrichmentBolt extends ConfiguredBolt {
     if (data.length != 0) {
       String name = path.substring(path.lastIndexOf("/") + 1);
       if (path.startsWith(ConfigurationType.ENRICHMENT.getZookeeperRoot())) {
-        configurations.updateSensorEnrichmentConfig(name, data);
+        getConfigurations().updateSensorEnrichmentConfig(name, data);
         reloadCallback(name, ConfigurationType.ENRICHMENT);
       } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
-        configurations.updateGlobalConfig(data);
+        getConfigurations().updateGlobalConfig(data);
         reloadCallback(name, ConfigurationType.GLOBAL);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
index feab40e..cd379e7 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
@@ -26,10 +26,11 @@ import org.apache.metron.common.configuration.SensorParserConfig;
 
 import java.io.IOException;
 
-public abstract class ConfiguredParserBolt extends ConfiguredBolt {
+public abstract class ConfiguredParserBolt extends ConfiguredBolt<ParserConfigurations> {
 
   private static final Logger LOG = Logger.getLogger(ConfiguredEnrichmentBolt.class);
 
+
   protected final ParserConfigurations configurations = new ParserConfigurations();
   private String sensorType;
   public ConfiguredParserBolt(String zookeeperUrl, String sensorType) {
@@ -41,8 +42,9 @@ public abstract class ConfiguredParserBolt extends ConfiguredBolt {
     return getConfigurations().getSensorParserConfig(sensorType);
   }
 
-  public ParserConfigurations getConfigurations() {
-    return configurations;
+  @Override
+  protected ParserConfigurations defaultConfigurations() {
+    return new ParserConfigurations();
   }
 
   public String getSensorType() {
@@ -51,7 +53,7 @@ public abstract class ConfiguredParserBolt extends ConfiguredBolt {
   @Override
   public void loadConfig() {
     try {
-      ConfigurationsUtils.updateParserConfigsFromZookeeper(configurations, client);
+      ConfigurationsUtils.updateParserConfigsFromZookeeper(getConfigurations(), client);
     } catch (Exception e) {
       LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
     }
@@ -62,10 +64,10 @@ public abstract class ConfiguredParserBolt extends ConfiguredBolt {
     if (data.length != 0) {
       String name = path.substring(path.lastIndexOf("/") + 1);
       if (path.startsWith(ConfigurationType.PARSER.getZookeeperRoot())) {
-        configurations.updateSensorParserConfig(name, data);
+        getConfigurations().updateSensorParserConfig(name, data);
         reloadCallback(name, ConfigurationType.PARSER);
       } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
-        configurations.updateGlobalConfig(data);
+        getConfigurations().updateGlobalConfig(data);
         reloadCallback(name, ConfigurationType.GLOBAL);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
index 5d1bda9..82b407f 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
@@ -18,6 +18,7 @@
 package org.apache.metron.common.configuration;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableList;
 import org.apache.metron.common.utils.JSONUtils;
 
 import java.io.IOException;
@@ -31,6 +32,14 @@ public class SensorParserConfig {
   private String parserClassName;
   private String filterClassName;
   private String sensorTopic;
+  private String writerClassName;
+
+  public String getWriterClassName() {
+    return writerClassName;
+  }
+  public void setWriterClassName(String classNames) {
+    this.writerClassName = classNames;
+  }
   private Map<String, Object> parserConfig = new HashMap<>();
   private List<FieldTransformer> fieldTransformations = new ArrayList<>();
 
@@ -97,6 +106,8 @@ public class SensorParserConfig {
             "parserClassName='" + parserClassName + '\'' +
             ", filterClassName='" + filterClassName + '\'' +
             ", sensorTopic='" + sensorTopic + '\'' +
+            ", writerClassName='" + writerClassName + '\'' +
+            ", parserConfig=" + parserConfig +
             ", parserConfig=" + parserConfig +
             ", fieldTransformations=" + fieldTransformations +
             '}';
@@ -115,6 +126,10 @@ public class SensorParserConfig {
       return false;
     if (getSensorTopic() != null ? !getSensorTopic().equals(that.getSensorTopic()) : that.getSensorTopic() != null)
       return false;
+    if (getWriterClassName() != null ? !getWriterClassName().equals(that.getWriterClassName()) : that.getWriterClassName() != null)
+      return false;
+    if (getParserConfig() != null ? !getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != null)
+      return false;
     if (getParserConfig() != null ? !getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != null)
       return false;
     return getFieldTransformations() != null ? getFieldTransformations().equals(that.getFieldTransformations()) : that.getFieldTransformations() == null;
@@ -126,6 +141,8 @@ public class SensorParserConfig {
     int result = getParserClassName() != null ? getParserClassName().hashCode() : 0;
     result = 31 * result + (getFilterClassName() != null ? getFilterClassName().hashCode() : 0);
     result = 31 * result + (getSensorTopic() != null ? getSensorTopic().hashCode() : 0);
+    result = 31 * result + (getWriterClassName() != null ? getWriterClassName().hashCode() : 0);
+    result = 31 * result + (getParserConfig() != null ? getParserConfig().hashCode() : 0);
     result = 31 * result + (getParserConfig() != null ? getParserConfig().hashCode() : 0);
     result = 31 * result + (getFieldTransformations() != null ? getFieldTransformations().hashCode() : 0);
     return result;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentConfig.java
index 562a928..c5538b9 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentConfig.java
@@ -22,6 +22,8 @@ import org.apache.metron.common.configuration.enrichment.threatintel.ThreatIntel
 import org.apache.metron.common.utils.JSONUtils;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 public class SensorEnrichmentConfig {
 
@@ -29,6 +31,15 @@ public class SensorEnrichmentConfig {
   private int batchSize;
   private EnrichmentConfig enrichment = new EnrichmentConfig();
   private ThreatIntelConfig threatIntel = new ThreatIntelConfig();
+  private Map<String, Object> configuration = new HashMap<>();
+
+  public Map<String, Object> getConfiguration() {
+    return configuration;
+  }
+
+  public void setConfiguration(Map<String, Object> configuration) {
+    this.configuration = configuration;
+  }
 
   public EnrichmentConfig getEnrichment() {
     return enrichment;
@@ -70,6 +81,7 @@ public class SensorEnrichmentConfig {
             ", batchSize=" + batchSize +
             ", enrichment=" + enrichment +
             ", threatIntel=" + threatIntel +
+            ", configuration=" + configuration +
             '}';
   }
 
@@ -84,7 +96,9 @@ public class SensorEnrichmentConfig {
     if (getIndex() != null ? !getIndex().equals(that.getIndex()) : that.getIndex() != null) return false;
     if (getEnrichment() != null ? !getEnrichment().equals(that.getEnrichment()) : that.getEnrichment() != null)
       return false;
-    return getThreatIntel() != null ? getThreatIntel().equals(that.getThreatIntel()) : that.getThreatIntel() == null;
+    if (getThreatIntel() != null ? !getThreatIntel().equals(that.getThreatIntel()) : that.getThreatIntel() != null)
+      return false;
+    return getConfiguration() != null ? getConfiguration().equals(that.getConfiguration()) : that.getConfiguration() == null;
 
   }
 
@@ -94,6 +108,7 @@ public class SensorEnrichmentConfig {
     result = 31 * result + getBatchSize();
     result = 31 * result + (getEnrichment() != null ? getEnrichment().hashCode() : 0);
     result = 31 * result + (getThreatIntel() != null ? getThreatIntel().hashCode() : 0);
+    result = 31 * result + (getConfiguration() != null ? getConfiguration().hashCode() : 0);
     return result;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfiguration.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfiguration.java
new file mode 100644
index 0000000..a8a667a
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfiguration.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration.writer;
+
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+
+import java.util.Map;
+
+public class EnrichmentWriterConfiguration implements WriterConfiguration{
+  private EnrichmentConfigurations config;
+
+  public EnrichmentWriterConfiguration(EnrichmentConfigurations config) {
+    this.config = config;
+  }
+
+  @Override
+  public int getBatchSize(String sensorName) {
+    return config.getSensorEnrichmentConfig(sensorName).getBatchSize();
+  }
+
+  @Override
+  public String getIndex(String sensorName) {
+    return config.getSensorEnrichmentConfig(sensorName).getIndex();
+  }
+
+  @Override
+  public Map<String, Object> getSensorConfig(String sensorName) {
+    return config.getSensorEnrichmentConfig(sensorName).getConfiguration();
+  }
+  @Override
+  public Map<String, Object> getGlobalConfig() {
+    return config.getGlobalConfig();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
new file mode 100644
index 0000000..fba8e65
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration.writer;
+
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.utils.ConversionUtils;
+
+import java.util.Map;
+
+public class ParserWriterConfiguration implements WriterConfiguration {
+  public static final String BATCH_CONF = "batchSize";
+  public static final String INDEX_CONF = "indexName";
+  private ParserConfigurations config;
+  public ParserWriterConfiguration(ParserConfigurations config) {
+    this.config = config;
+  }
+  @Override
+  public int getBatchSize(String sensorName) {
+    Object batchObj = config.getSensorParserConfig(sensorName).getParserConfig().get(BATCH_CONF);
+    return batchObj == null?1:ConversionUtils.convert(batchObj, Integer.class);
+  }
+
+  @Override
+  public String getIndex(String sensorName) {
+    Object indexObj = config.getSensorParserConfig(sensorName).getParserConfig().get(INDEX_CONF);
+    return indexObj.toString();
+  }
+
+  @Override
+  public Map<String, Object> getSensorConfig(String sensorName) {
+    return config.getSensorParserConfig(sensorName).getParserConfig();
+  }
+
+  @Override
+  public Map<String, Object> getGlobalConfig() {
+    return config.getGlobalConfig();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java
new file mode 100644
index 0000000..3ee25d0
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration.writer;
+
+import java.util.Map;
+
+public class SingleBatchConfigurationFacade implements WriterConfiguration {
+  private WriterConfiguration config;
+  public SingleBatchConfigurationFacade(WriterConfiguration config) {
+    this.config = config;
+  }
+
+  @Override
+  public int getBatchSize(String sensorName) {
+    return 1;
+  }
+
+  @Override
+  public String getIndex(String sensorName) {
+    return config.getIndex(sensorName);
+  }
+
+  @Override
+  public Map<String, Object> getSensorConfig(String sensorName) {
+    return config.getSensorConfig(sensorName);
+  }
+
+  @Override
+  public Map<String, Object> getGlobalConfig() {
+    return config.getGlobalConfig();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java
new file mode 100644
index 0000000..f155302
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration.writer;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public interface WriterConfiguration extends Serializable {
+  int getBatchSize(String sensorName);
+  String getIndex(String sensorName);
+  Map<String, Object> getSensorConfig(String sensorName);
+  Map<String, Object> getGlobalConfig();
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/csv/CSVConverter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/csv/CSVConverter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/csv/CSVConverter.java
new file mode 100644
index 0000000..ce23deb
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/csv/CSVConverter.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.csv;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.opencsv.CSVParser;
+import com.opencsv.CSVParserBuilder;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+
+public class CSVConverter implements Serializable {
+  public static final String COLUMNS_KEY="columns";
+  public static final String SEPARATOR_KEY="separator";
+  protected Map<String, Integer> columnMap = new HashMap<>();
+  protected CSVParser parser;
+
+  public Map<String, Integer> getColumnMap() {
+    return columnMap;
+  }
+
+  public CSVParser getParser() {
+    return parser;
+  }
+
+  public Map<String, String> toMap(String line) throws IOException {
+    if(ignore(line)) {
+      return null;
+    }
+    String[] tokens = parser.parseLine(line);
+    Map<String, String> values = new HashMap<>();
+    for(Map.Entry<String, Integer> kv : columnMap.entrySet()) {
+      values.put(kv.getKey(), tokens[kv.getValue()]);
+    }
+    return values;
+  }
+
+  public void initialize(Map<String, Object> config) {
+    if(config.containsKey(COLUMNS_KEY)) {
+      columnMap = getColumnMap(config);
+    }
+    else {
+      throw new IllegalStateException("CSVExtractor requires " + COLUMNS_KEY + " configuration");
+    }
+    char separator = ',';
+    if(config.containsKey(SEPARATOR_KEY)) {
+      separator = config.get(SEPARATOR_KEY).toString().charAt(0);
+
+    }
+    parser = new CSVParserBuilder().withSeparator(separator)
+              .build();
+  }
+  protected boolean ignore(String line) {
+    if(null == line) {
+      return true;
+    }
+    String trimmedLine = line.trim();
+    return trimmedLine.startsWith("#") || isEmpty(trimmedLine);
+  }
+  public static Map.Entry<String, Integer> getColumnMapEntry(String column, int i) {
+    if(column.contains(":")) {
+      Iterable<String> tokens = Splitter.on(':').split(column);
+      String col = Iterables.getFirst(tokens, null);
+      Integer pos = Integer.parseInt(Iterables.getLast(tokens));
+      return new AbstractMap.SimpleEntry<>(col, pos);
+    }
+    else {
+      return new AbstractMap.SimpleEntry<>(column, i);
+    }
+
+  }
+  public static Map<String, Integer> getColumnMap(Map<String, Object> config) {
+    Map<String, Integer> columnMap = new HashMap<>();
+    if(config.containsKey(COLUMNS_KEY)) {
+      Object columnsObj = config.get(COLUMNS_KEY);
+      if(columnsObj instanceof String) {
+        String columns = (String)columnsObj;
+        int i = 0;
+        for (String column : Splitter.on(',').split(columns)) {
+          Map.Entry<String, Integer> e = getColumnMapEntry(column, i++);
+          columnMap.put(e.getKey(), e.getValue());
+        }
+      }
+      else if(columnsObj instanceof List) {
+        List columns = (List)columnsObj;
+        int i = 0;
+        for(Object column : columns) {
+          Map.Entry<String, Integer> e = getColumnMapEntry(column.toString(), i++);
+          columnMap.put(e.getKey(), e.getValue());
+        }
+      }
+      else if(columnsObj instanceof Map) {
+        Map<Object, Object> map = (Map<Object, Object>)columnsObj;
+        for(Map.Entry<Object, Object> e : map.entrySet()) {
+          columnMap.put(e.getKey().toString(), Integer.parseInt(e.getValue().toString()));
+        }
+      }
+    }
+    return columnMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java
index aaa6c51..24cb823 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java
@@ -18,14 +18,21 @@
 package org.apache.metron.common.interfaces;
 
 import backtype.storm.tuple.Tuple;
+import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
 
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
-public interface BulkMessageWriter<T> extends AutoCloseable {
+public interface BulkMessageWriter<MESSAGE_T> extends AutoCloseable, Serializable {
 
-  void init(Map stormConf, EnrichmentConfigurations configuration) throws Exception;
-  void write(String sensorType, EnrichmentConfigurations configurations, List<Tuple> tuples, List<T> messages) throws Exception;
+  void init(Map stormConf, WriterConfiguration config) throws Exception;
+  void write( String sensorType
+            , WriterConfiguration configurations
+            , Iterable<Tuple> tuples
+            , List<MESSAGE_T> messages
+            ) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/MessageWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/MessageWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/MessageWriter.java
index a90a8cb..827bf8f 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/MessageWriter.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/MessageWriter.java
@@ -19,9 +19,12 @@ package org.apache.metron.common.interfaces;
 
 import backtype.storm.tuple.Tuple;
 import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
 
-public interface MessageWriter<T> extends AutoCloseable {
+import java.io.Serializable;
+
+public interface MessageWriter<T> extends AutoCloseable, Serializable {
 
   void init();
-  void write(String sensorType, Configurations configurations, Tuple tuple, T message) throws Exception;
+  void write(String sensorType, WriterConfiguration configurations, Tuple tuple, T message) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ConversionUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ConversionUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ConversionUtils.java
new file mode 100644
index 0000000..29ec908
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ConversionUtils.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.utils;
+
+import org.apache.commons.beanutils.BeanUtilsBean2;
+import org.apache.commons.beanutils.ConvertUtilsBean;
+
+public class ConversionUtils {
+  private static ThreadLocal<ConvertUtilsBean> UTILS_BEAN  = new ThreadLocal<ConvertUtilsBean>() {
+    @Override
+    protected ConvertUtilsBean initialValue() {
+      ConvertUtilsBean ret = BeanUtilsBean2.getInstance().getConvertUtils();
+      ret.deregister();
+      ret.register(false,true, 1);
+      return ret;
+    }
+  };
+  public static <T> T convert(Object o, Class<T> clazz) {
+    if(o == null) {
+      return null;
+    }
+    return clazz.cast(UTILS_BEAN.get().convert(o, clazz));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/AbstractWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/AbstractWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/AbstractWriter.java
new file mode 100644
index 0000000..56a4e48
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/AbstractWriter.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.writer;
+
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+
+public abstract class AbstractWriter {
+  public AbstractWriter() {}
+  public abstract void configure(String sensorName, WriterConfiguration configuration);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
new file mode 100644
index 0000000..320d497
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.writer;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import com.google.common.collect.Iterables;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.common.utils.MessageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.function.Function;
+
+public class BulkWriterComponent<MESSAGE_T> {
+  public static final Logger LOG = LoggerFactory
+            .getLogger(BulkWriterComponent.class);
+  private Map<String, Collection<Tuple>> sensorTupleMap = new HashMap<>();
+  private Map<String, List<MESSAGE_T>> sensorMessageMap = new HashMap<>();
+  private OutputCollector collector;
+  private boolean handleCommit = true;
+  private boolean handleError = true;
+  public BulkWriterComponent(OutputCollector collector) {
+    this.collector = collector;
+  }
+
+  public BulkWriterComponent(OutputCollector collector, boolean handleCommit, boolean handleError) {
+    this(collector);
+    this.handleCommit = handleCommit;
+    this.handleError = handleError;
+  }
+
+  public void commit(Iterable<Tuple> tuples) {
+    tuples.forEach(t -> collector.ack(t));
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Acking " + Iterables.size(tuples) + " tuples");
+    }
+  }
+
+  public void error(Exception e, Iterable<Tuple> tuples) {
+    tuples.forEach(t -> collector.fail(t));
+    LOG.error("Failing " + Iterables.size(tuples) + " tuples", e);
+    ErrorUtils.handleError(collector, e, Constants.ERROR_STREAM);
+  }
+
+  protected Collection<Tuple> createTupleCollection() {
+    return new ArrayList<>();
+  }
+
+
+  public void write( String sensorType
+                   , Tuple tuple
+                   , MESSAGE_T message
+                   , BulkMessageWriter<MESSAGE_T> bulkMessageWriter
+                   , WriterConfiguration configurations
+                   ) throws Exception
+  {
+    int batchSize = configurations.getBatchSize(sensorType);
+    Collection<Tuple> tupleList = sensorTupleMap.get(sensorType);
+    if (tupleList == null) {
+      tupleList = createTupleCollection();
+    }
+    tupleList.add(tuple);
+    List<MESSAGE_T> messageList = sensorMessageMap.get(sensorType);
+    if (messageList == null) {
+      messageList = new ArrayList<>();
+    }
+    messageList.add(message);
+
+    if (tupleList.size() < batchSize) {
+      sensorTupleMap.put(sensorType, tupleList);
+      sensorMessageMap.put(sensorType, messageList);
+    } else {
+      try {
+        bulkMessageWriter.write(sensorType, configurations, tupleList, messageList);
+        if(handleCommit) {
+          commit(tupleList);
+        }
+
+      } catch (Exception e) {
+        if(handleError) {
+          error(e, tupleList);
+        }
+        else {
+          throw e;
+        }
+      }
+      finally {
+        sensorTupleMap.remove(sensorType);
+        sensorMessageMap.remove(sensorType);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/WriterToBulkWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/WriterToBulkWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/WriterToBulkWriter.java
new file mode 100644
index 0000000..b0bde6c
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/WriterToBulkWriter.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.writer;
+
+import backtype.storm.tuple.Tuple;
+import com.google.common.collect.Iterables;
+import org.apache.metron.common.configuration.writer.SingleBatchConfigurationFacade;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.metron.common.interfaces.MessageWriter;
+
+import java.util.List;
+import java.util.Map;
+
+public class WriterToBulkWriter<MESSAGE_T> implements BulkMessageWriter<MESSAGE_T> {
+  MessageWriter<MESSAGE_T> messageWriter;
+
+  public WriterToBulkWriter(MessageWriter<MESSAGE_T> messageWriter) {
+    this.messageWriter = messageWriter;
+  }
+  @Override
+  public void init(Map stormConf, WriterConfiguration config) throws Exception {
+    messageWriter.init();
+  }
+
+  @Override
+  public void write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<MESSAGE_T> messages) throws Exception {
+    if(messages.size() > 1) {
+      throw new IllegalStateException("WriterToBulkWriter expects a batch of exactly 1");
+    }
+    messageWriter.write(sensorType, configurations, Iterables.getFirst(tuples, null), Iterables.getFirst(messages, null));
+  }
+
+  @Override
+  public void close() throws Exception {
+    messageWriter.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java
index c5f2304..520b430 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java
@@ -98,20 +98,20 @@ public class ConfiguredEnrichmentBoltTest extends BaseConfiguredBoltTest {
     StandAloneConfiguredEnrichmentBolt configuredBolt = new StandAloneConfiguredEnrichmentBolt(zookeeperUrl);
     configuredBolt.prepare(new HashMap(), topologyContext, outputCollector);
     waitForConfigUpdate(enrichmentConfigurationTypes);
-    Assert.assertEquals(sampleConfigurations, configuredBolt.configurations);
+    Assert.assertEquals(sampleConfigurations, configuredBolt.getConfigurations());
 
     configsUpdated = new HashSet<>();
     Map<String, Object> sampleGlobalConfig = sampleConfigurations.getGlobalConfig();
     sampleGlobalConfig.put("newGlobalField", "newGlobalValue");
     ConfigurationsUtils.writeGlobalConfigToZookeeper(sampleGlobalConfig, zookeeperUrl);
     waitForConfigUpdate(ConfigurationType.GLOBAL.getName());
-    Assert.assertEquals("Add global config field", sampleConfigurations.getGlobalConfig(), configuredBolt.configurations.getGlobalConfig());
+    Assert.assertEquals("Add global config field", sampleConfigurations.getGlobalConfig(), configuredBolt.getConfigurations().getGlobalConfig());
 
     configsUpdated = new HashSet<>();
     sampleGlobalConfig.remove("newGlobalField");
     ConfigurationsUtils.writeGlobalConfigToZookeeper(sampleGlobalConfig, zookeeperUrl);
     waitForConfigUpdate(ConfigurationType.GLOBAL.getName());
-    Assert.assertEquals("Remove global config field", sampleConfigurations, configuredBolt.configurations);
+    Assert.assertEquals("Remove global config field", sampleConfigurations, configuredBolt.getConfigurations());
 
     configsUpdated = new HashSet<>();
     String sensorType = "testSensorConfig";
@@ -131,7 +131,7 @@ public class ConfiguredEnrichmentBoltTest extends BaseConfiguredBoltTest {
     sampleConfigurations.updateSensorEnrichmentConfig(sensorType, testSensorConfig);
     ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, testSensorConfig, zookeeperUrl);
     waitForConfigUpdate(sensorType);
-    Assert.assertEquals("Add new sensor config", sampleConfigurations, configuredBolt.configurations);
+    Assert.assertEquals("Add new sensor config", sampleConfigurations, configuredBolt.getConfigurations());
     configuredBolt.cleanup();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
index 3010ed8..a1bbc13 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
@@ -97,20 +97,20 @@ public class ConfiguredParserBoltTest extends BaseConfiguredBoltTest {
     StandAloneConfiguredParserBolt configuredBolt = new StandAloneConfiguredParserBolt(zookeeperUrl);
     configuredBolt.prepare(new HashMap(), topologyContext, outputCollector);
     waitForConfigUpdate(parserConfigurationTypes);
-    Assert.assertEquals(sampleConfigurations, configuredBolt.configurations);
+    Assert.assertEquals(sampleConfigurations, configuredBolt.getConfigurations());
 
     configsUpdated = new HashSet<>();
     Map<String, Object> sampleGlobalConfig = sampleConfigurations.getGlobalConfig();
     sampleGlobalConfig.put("newGlobalField", "newGlobalValue");
     ConfigurationsUtils.writeGlobalConfigToZookeeper(sampleGlobalConfig, zookeeperUrl);
     waitForConfigUpdate(ConfigurationType.GLOBAL.getName());
-    Assert.assertEquals("Add global config field", sampleConfigurations.getGlobalConfig(), configuredBolt.configurations.getGlobalConfig());
+    Assert.assertEquals("Add global config field", sampleConfigurations.getGlobalConfig(), configuredBolt.getConfigurations().getGlobalConfig());
 
     configsUpdated = new HashSet<>();
     sampleGlobalConfig.remove("newGlobalField");
     ConfigurationsUtils.writeGlobalConfigToZookeeper(sampleGlobalConfig, zookeeperUrl);
     waitForConfigUpdate(ConfigurationType.GLOBAL.getName());
-    Assert.assertEquals("Remove global config field", sampleConfigurations, configuredBolt.configurations);
+    Assert.assertEquals("Remove global config field", sampleConfigurations, configuredBolt.getConfigurations());
 
     configsUpdated = new HashSet<>();
     String sensorType = "testSensorConfig";
@@ -123,7 +123,7 @@ public class ConfiguredParserBoltTest extends BaseConfiguredBoltTest {
     sampleConfigurations.updateSensorParserConfig(sensorType, testSensorConfig);
     ConfigurationsUtils.writeSensorParserConfigToZookeeper(sensorType, testSensorConfig, zookeeperUrl);
     waitForConfigUpdate(sensorType);
-    Assert.assertEquals("Add new sensor config", sampleConfigurations, configuredBolt.configurations);
+    Assert.assertEquals("Add new sensor config", sampleConfigurations, configuredBolt.getConfigurations());
     configuredBolt.cleanup();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ConversionUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ConversionUtilsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ConversionUtilsTest.java
new file mode 100644
index 0000000..acffeeb
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ConversionUtilsTest.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConversionUtilsTest {
+  @Test
+  public void testIntegerConversions() {
+    Object o = new Integer(1);
+    Assert.assertEquals(new Integer(1), ConversionUtils.convert(o, Integer.class));
+    Assert.assertEquals(new Integer(1), ConversionUtils.convert("1", Integer.class));
+    Assert.assertNull(ConversionUtils.convert("foo", Integer.class));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
index 28c3ece..502b46a 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
@@ -17,10 +17,7 @@
  */
 package org.apache.metron.dataloads.extractor.csv;
 
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import com.opencsv.CSVParser;
-import com.opencsv.CSVParserBuilder;
+import org.apache.metron.common.csv.CSVConverter;
 import org.apache.metron.dataloads.extractor.Extractor;
 import org.apache.metron.enrichment.lookup.LookupKV;
 import org.apache.metron.enrichment.lookup.LookupKey;
@@ -28,21 +25,17 @@ import org.apache.metron.enrichment.lookup.LookupKey;
 import java.io.IOException;
 import java.util.*;
 
-import static org.apache.commons.lang3.StringUtils.isEmpty;
 
-public class CSVExtractor implements Extractor {
-  public static final String COLUMNS_KEY="columns";
+public class CSVExtractor extends CSVConverter implements Extractor {
   public static final String INDICATOR_COLUMN_KEY="indicator_column";
   public static final String TYPE_COLUMN_KEY="type_column";
   public static final String TYPE_KEY="type";
-  public static final String SEPARATOR_KEY="separator";
   public static final String LOOKUP_CONVERTER = "lookup_converter";
 
   private int typeColumn;
   private String type;
   private int indicatorColumn;
-  private Map<String, Integer> columnMap = new HashMap<>();
-  private CSVParser parser;
+
   private LookupConverter converter = LookupConverters.ENRICHMENT.getConverter();
 
   public int getTypeColumn() {
@@ -57,13 +50,6 @@ public class CSVExtractor implements Extractor {
     return indicatorColumn;
   }
 
-  public Map<String, Integer> getColumnMap() {
-    return columnMap;
-  }
-
-  public CSVParser getParser() {
-    return parser;
-  }
 
   public LookupConverter getConverter() {
     return converter;
@@ -76,20 +62,14 @@ public class CSVExtractor implements Extractor {
     String[] tokens = parser.parseLine(line);
 
     LookupKey key = converter.toKey(getType(tokens), tokens[indicatorColumn]);
-    Map<String, String> values = new HashMap<>();
+    Map<String, Object> values = new HashMap<>();
     for(Map.Entry<String, Integer> kv : columnMap.entrySet()) {
       values.put(kv.getKey(), tokens[kv.getValue()]);
     }
     return Arrays.asList(new LookupKV(key, converter.toValue(values)));
   }
 
-  private boolean ignore(String line) {
-    if(null == line) {
-      return true;
-    }
-    String trimmedLine = line.trim();
-    return trimmedLine.startsWith("#") || isEmpty(trimmedLine);
-  }
+
 
   private String getType(String[] tokens) {
     if(type == null) {
@@ -100,56 +80,12 @@ public class CSVExtractor implements Extractor {
     }
   }
 
-  private static Map.Entry<String, Integer> getColumnMapEntry(String column, int i) {
-    if(column.contains(":")) {
-      Iterable<String> tokens = Splitter.on(':').split(column);
-      String col = Iterables.getFirst(tokens, null);
-      Integer pos = Integer.parseInt(Iterables.getLast(tokens));
-      return new AbstractMap.SimpleEntry<>(col, pos);
-    }
-    else {
-      return new AbstractMap.SimpleEntry<>(column, i);
-    }
 
-  }
-  private static Map<String, Integer> getColumnMap(Map<String, Object> config) {
-    Map<String, Integer> columnMap = new HashMap<>();
-    if(config.containsKey(COLUMNS_KEY)) {
-      Object columnsObj = config.get(COLUMNS_KEY);
-      if(columnsObj instanceof String) {
-        String columns = (String)columnsObj;
-        int i = 0;
-        for (String column : Splitter.on(',').split(columns)) {
-          Map.Entry<String, Integer> e = getColumnMapEntry(column, i++);
-          columnMap.put(e.getKey(), e.getValue());
-        }
-      }
-      else if(columnsObj instanceof List) {
-        List columns = (List)columnsObj;
-        int i = 0;
-        for(Object column : columns) {
-          Map.Entry<String, Integer> e = getColumnMapEntry(column.toString(), i++);
-          columnMap.put(e.getKey(), e.getValue());
-        }
-      }
-      else if(columnsObj instanceof Map) {
-        Map<Object, Object> map = (Map<Object, Object>)columnsObj;
-        for(Map.Entry<Object, Object> e : map.entrySet()) {
-          columnMap.put(e.getKey().toString(), Integer.parseInt(e.getValue().toString()));
-        }
-      }
-    }
-    return columnMap;
-  }
 
   @Override
   public void initialize(Map<String, Object> config) {
-    if(config.containsKey(COLUMNS_KEY)) {
-      columnMap = getColumnMap(config);
-    }
-    else {
-      throw new IllegalStateException("CSVExtractor requires " + COLUMNS_KEY + " configuration");
-    }
+    super.initialize(config);
+
     if(config.containsKey(INDICATOR_COLUMN_KEY)) {
       indicatorColumn = columnMap.get(config.get(INDICATOR_COLUMN_KEY).toString());
     }
@@ -159,11 +95,6 @@ public class CSVExtractor implements Extractor {
     else if(config.containsKey(TYPE_COLUMN_KEY)) {
       typeColumn = columnMap.get(config.get(TYPE_COLUMN_KEY).toString());
     }
-    if(config.containsKey(SEPARATOR_KEY)) {
-      char separator = config.get(SEPARATOR_KEY).toString().charAt(0);
-      parser = new CSVParserBuilder().withSeparator(separator)
-              .build();
-    }
     if(config.containsKey(LOOKUP_CONVERTER)) {
       converter = LookupConverters.getConverter((String) config.get(LOOKUP_CONVERTER));
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
index e0ca4ee..29beb22 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
@@ -25,5 +25,5 @@ import java.util.Map;
 
 public interface LookupConverter {
     LookupKey toKey(String type, String indicator);
-    LookupValue toValue(Map<String, String> metadata);
+    LookupValue toValue(Map<String, Object> metadata);
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
index bd58ba7..abced09 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
@@ -35,7 +35,7 @@ public enum LookupConverters {
         }
 
         @Override
-        public LookupValue toValue(Map<String, String> metadata) {
+        public LookupValue toValue(Map<String, Object> metadata) {
             return new EnrichmentValue(metadata);
         }
     })

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
index ffcff43..610f2cc 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
@@ -69,7 +69,7 @@ public class AddressHandler extends AbstractObjectTypeHandler<Address> {
       final String indicatorType = typeStr + ":" + category;
       LookupKV results = new LookupKV(new EnrichmentKey(indicatorType, token)
               , new EnrichmentValue(
-              new HashMap<String, String>() {{
+              new HashMap<String, Object>() {{
                 put("source-type", "STIX");
                 put("indicator-type", indicatorType);
                 put("source", type.toXMLString());

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
index 755cddd..4a3688d 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
@@ -52,7 +52,7 @@ public class DomainHandler extends AbstractObjectTypeHandler<DomainName> {
         final String indicatorType = typeStr + ":" + DomainNameTypeEnum.FQDN;
         LookupKV results = new LookupKV(new EnrichmentKey(indicatorType, token)
                 , new EnrichmentValue(
-                new HashMap<String, String>() {{
+                new HashMap<String, Object>() {{
                   put("source-type", "STIX");
                   put("indicator-type", indicatorType);
                   put("source", type.toXMLString());

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
index c7b05eb..2f22eed 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
@@ -52,7 +52,7 @@ public class HostnameHandler  extends AbstractObjectTypeHandler<Hostname>{
     for(String token : StixExtractor.split(value)) {
       final String indicatorType = typeStr;
       LookupKV results = new LookupKV(new EnrichmentKey(indicatorType, token)
-              , new EnrichmentValue(new HashMap<String, String>() {{
+              , new EnrichmentValue(new HashMap<String, Object>() {{
         put("source-type", "STIX");
         put("indicator-type", indicatorType);
         put("source", type.toXMLString());

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/ExtractorTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/ExtractorTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/ExtractorTest.java
index 0179193..eac6ad2 100644
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/ExtractorTest.java
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/ExtractorTest.java
@@ -38,7 +38,7 @@ public class ExtractorTest {
             EnrichmentKey key = new EnrichmentKey();
             key.indicator = "dummy";
             key.type = "type";
-            Map<String, String> value = new HashMap<>();
+            Map<String, Object> value = new HashMap<>();
             value.put("indicator", "dummy");
             return Arrays.asList(new LookupKV(key, new EnrichmentValue(value)));
         }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
index 28b3e26..a018e27 100644
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
@@ -36,7 +36,7 @@ import java.util.HashMap;
 public class HBaseEnrichmentConverterTest {
     EnrichmentKey key = new EnrichmentKey("domain", "google");
     EnrichmentValue value = new EnrichmentValue(
-            new HashMap<String, String>() {{
+            new HashMap<String, Object>() {{
                 put("foo", "bar");
                 put("grok", "baz");
             }});

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
index 93c216c..3a15f8b 100644
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
@@ -113,7 +113,7 @@ public class LeastRecentlyUsedPrunerIntegrationTest {
         for(LookupKey k : goodKeysHalf) {
             testTable.put(converter.toPut(cf, (EnrichmentKey) k
                                             , new EnrichmentValue(
-                                                  new HashMap<String, String>() {{
+                                                  new HashMap<String, Object>() {{
                                                     put("k", "dummy");
                                                     }}
                                                   )
@@ -124,7 +124,7 @@ public class LeastRecentlyUsedPrunerIntegrationTest {
         pat.persist(true);
         for(LookupKey k : goodKeysOtherHalf) {
             testTable.put(converter.toPut(cf, (EnrichmentKey) k
-                                            , new EnrichmentValue(new HashMap<String, String>() {{
+                                            , new EnrichmentValue(new HashMap<String, Object>() {{
                                                     put("k", "dummy");
                                                     }}
                                                                   )
@@ -140,7 +140,7 @@ public class LeastRecentlyUsedPrunerIntegrationTest {
         pat.persist(true);
         {
             testTable.put(converter.toPut(cf, (EnrichmentKey) badKey.get(0)
-                    , new EnrichmentValue(new HashMap<String, String>() {{
+                    , new EnrichmentValue(new HashMap<String, Object>() {{
                         put("k", "dummy");
                     }}
                     )

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index f06850b..c982d29 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -20,6 +20,7 @@ package org.apache.metron.elasticsearch.writer;
 import backtype.storm.tuple.Tuple;
 import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.interfaces.BulkMessageWriter;
 import org.apache.metron.common.interfaces.FieldNameConverter;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -55,7 +56,7 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
   }
 
   @Override
-  public void init(Map stormConf, EnrichmentConfigurations configurations) {
+  public void init(Map stormConf, WriterConfiguration configurations) {
     Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
 
     Settings.Builder settingsBuilder = Settings.settingsBuilder();
@@ -88,8 +89,7 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
   }
 
   @Override
-  public void write(String sensorType, EnrichmentConfigurations configurations, List<Tuple> tuples, List<JSONObject> messages) throws Exception {
-    SensorEnrichmentConfig sensorEnrichmentConfig = configurations.getSensorEnrichmentConfig(sensorType);
+  public void write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
     String indexPostfix = dateFormat.format(new Date());
     BulkRequestBuilder bulkRequest = client.prepareBulk();
 
@@ -97,8 +97,8 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
 
       String indexName = sensorType;
 
-      if (sensorEnrichmentConfig != null) {
-        indexName = sensorEnrichmentConfig.getIndex();
+      if (configurations != null) {
+        indexName = configurations.getIndex(sensorType);
       }
 
       indexName = indexName + "_index_" + indexPostfix;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
index 65b095e..a152d26 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
@@ -86,7 +86,7 @@ public class SimpleHBaseAdapter implements EnrichmentAdapter<CacheKey>,Serializa
             )
         {
           if (kv != null && kv.getValue() != null && kv.getValue().getMetadata() != null) {
-            for (Map.Entry<String, String> values : kv.getValue().getMetadata().entrySet()) {
+            for (Map.Entry<String, Object> values : kv.getValue().getMetadata().entrySet()) {
               enriched.put(kv.getKey().type + "." + values.getKey(), values.getValue());
             }
             _LOG.trace("Enriched type " + kv.getKey().type + " => " + enriched);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
index 3e407c7..1d49807 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
@@ -23,12 +23,11 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import org.apache.metron.common.Constants;
-import org.apache.metron.common.bolt.ConfiguredBolt;
-import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
 import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
-import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.common.configuration.writer.EnrichmentWriterConfiguration;
 import org.apache.metron.common.utils.MessageUtils;
 import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.metron.common.writer.BulkWriterComponent;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,26 +38,23 @@ public class BulkMessageWriterBolt extends ConfiguredEnrichmentBolt {
 
   private static final Logger LOG = LoggerFactory
           .getLogger(BulkMessageWriterBolt.class);
-  private OutputCollector collector;
   private BulkMessageWriter<JSONObject> bulkMessageWriter;
-  private Map<String, List<Tuple>> sensorTupleMap = new HashMap<>();
-  private Map<String, List<JSONObject>> sensorMessageMap = new HashMap<>();
-
+  private BulkWriterComponent<JSONObject> writerComponent;
   public BulkMessageWriterBolt(String zookeeperUrl) {
     super(zookeeperUrl);
   }
 
-  public BulkMessageWriterBolt withBulkMessageWriter(BulkMessageWriter<JSONObject> bulkMessageWriter) {
+  public BulkMessageWriterBolt withBulkMessageWriter(BulkMessageWriter<JSONObject > bulkMessageWriter) {
     this.bulkMessageWriter = bulkMessageWriter;
     return this;
   }
 
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-    this.collector = collector;
+    this.writerComponent = new BulkWriterComponent<>(collector);
     super.prepare(stormConf, context, collector);
     try {
-      bulkMessageWriter.init(stormConf, configurations);
+      bulkMessageWriter.init(stormConf, new EnrichmentWriterConfiguration(getConfigurations()));
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -70,36 +66,17 @@ public class BulkMessageWriterBolt extends ConfiguredEnrichmentBolt {
     JSONObject message = (JSONObject)((JSONObject) tuple.getValueByField("message")).clone();
     message.put("index." + bulkMessageWriter.getClass().getSimpleName().toLowerCase() + ".ts", "" + System.currentTimeMillis());
     String sensorType = MessageUtils.getSensorType(message);
-    SensorEnrichmentConfig sensorEnrichmentConfig = configurations.getSensorEnrichmentConfig(sensorType);
-    int batchSize = sensorEnrichmentConfig != null ? sensorEnrichmentConfig.getBatchSize() : 1;
-    List<Tuple> tupleList = sensorTupleMap.get(sensorType);
-    if (tupleList == null) tupleList = new ArrayList<>();
-    tupleList.add(tuple);
-    List<JSONObject> messageList = sensorMessageMap.get(sensorType);
-    if (messageList == null) messageList = new ArrayList<>();
-    messageList.add(message);
-    if (messageList.size() < batchSize) {
-      sensorTupleMap.put(sensorType, tupleList);
-      sensorMessageMap.put(sensorType, messageList);
-    } else {
-      try {
-        bulkMessageWriter.write(sensorType, configurations, tupleList, messageList);
-        for(Tuple t: tupleList) {
-          collector.ack(t);
-        }
-      } catch (Exception e) {
-        for(Tuple t: tupleList) {
-          collector.fail(t);
-        }
-        ErrorUtils.handleError(collector, e, Constants.ERROR_STREAM);
-      }
-      sensorTupleMap.remove(sensorType);
-      sensorMessageMap.remove(sensorType);
+    try
+    {
+      writerComponent.write(sensorType, tuple, message, bulkMessageWriter, new EnrichmentWriterConfiguration(getConfigurations()));
+    }
+    catch(Exception e) {
+      throw new RuntimeException("This should have been caught in the writerComponent.  If you see this, file a JIRA");
     }
   }
 
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    declarer.declareStream("error", new Fields("message"));
+    declarer.declareStream(Constants.ERROR_STREAM, new Fields("message"));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
index 48e09f8..7d05c00 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
@@ -86,7 +86,7 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
 
   public Map<String, List<String>> getFieldMap(String sourceType) {
     if(sourceType != null) {
-      SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sourceType);
+      SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType);
       if (config != null && config.getEnrichment() != null) {
         return config.getEnrichment().getFieldMap();
       }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
index c367173..4b5c7bb 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
@@ -125,7 +125,7 @@ public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
 
     protected Map<String, List<String>> getFieldMap(String sensorType) {
         if(sensorType != null) {
-            SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sensorType);
+            SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sensorType);
             if (config != null) {
                 return config.getEnrichment().getFieldMap();
             } else {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 3a4b67d..d4acd08 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -185,7 +185,7 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
         } else {
           JSONObject enrichedField = new JSONObject();
           if (value != null && value.length() != 0) {
-            SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sourceType);
+            SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType);
             if(config == null) {
               LOG.error("Unable to find " + config);
               error = true;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
index c08bd0d..ec1ce7a 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
@@ -41,7 +41,7 @@ public class ThreatIntelJoinBolt extends EnrichmentJoinBolt {
 
   @Override
   public Map<String, List<String>> getFieldMap(String sourceType) {
-    SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sourceType);
+    SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType);
     if(config != null) {
       return config.getThreatIntel().getFieldMap();
     }
@@ -66,7 +66,7 @@ public class ThreatIntelJoinBolt extends EnrichmentJoinBolt {
     if(isAlert) {
       ret.put("is_alert" , "true");
       String sourceType = MessageUtils.getSensorType(ret);
-      SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sourceType);
+      SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType);
       ThreatTriageConfig triageConfig = null;
       if(config != null) {
         triageConfig = config.getThreatIntel().getTriageConfig();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d3efe3fb/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
index 3cd1780..f5b6399 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
@@ -33,7 +33,7 @@ public class ThreatIntelSplitterBolt extends EnrichmentSplitterBolt {
   @Override
   protected Map<String, List<String>> getFieldMap(String sensorType) {
     if (sensorType != null) {
-      SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sensorType);
+      SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sensorType);
       if (config != null) {
         return config.getThreatIntel().getFieldMap();
       } else {