You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/07/20 13:14:25 UTC

[2/5] incubator-metron git commit: METRON-154: Decouple enrichment and indexing closes apache/incubator-metron#192

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
index ecd1ce8..e7ad755 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
@@ -26,8 +26,8 @@ import org.apache.metron.common.configuration.writer.SingleBatchConfigurationFac
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.interfaces.BulkMessageWriter;
 import org.apache.metron.common.interfaces.MessageWriter;
-import org.apache.metron.common.writer.BulkWriterComponent;
-import org.apache.metron.common.writer.WriterToBulkWriter;
+import org.apache.metron.writer.BulkWriterComponent;
+import org.apache.metron.writer.WriterToBulkWriter;
 import org.json.simple.JSONObject;
 
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index d9004d1..000b7ff 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -29,12 +29,12 @@ import org.apache.metron.common.interfaces.MessageWriter;
 import org.apache.metron.common.spout.kafka.SpoutConfig;
 import org.apache.metron.common.spout.kafka.SpoutConfigOptions;
 import org.apache.metron.common.utils.ReflectionUtils;
-import org.apache.metron.common.writer.AbstractWriter;
 import org.apache.metron.parsers.bolt.ParserBolt;
 import org.apache.metron.parsers.bolt.WriterBolt;
 import org.apache.metron.parsers.bolt.WriterHandler;
 import org.apache.metron.parsers.interfaces.MessageParser;
-import org.apache.metron.parsers.writer.KafkaWriter;
+import org.apache.metron.writer.AbstractWriter;
+import org.apache.metron.writer.kafka.KafkaWriter;
 import org.json.simple.JSONObject;
 import storm.kafka.KafkaSpout;
 import storm.kafka.ZkHosts;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
deleted file mode 100644
index 6090491..0000000
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.parsers.writer;
-
-import backtype.storm.tuple.Tuple;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.Configurations;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.interfaces.MessageWriter;
-import org.apache.metron.common.utils.ConversionUtils;
-import org.apache.metron.common.utils.StringUtils;
-import org.apache.metron.common.writer.AbstractWriter;
-import org.json.simple.JSONObject;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObject>, Serializable {
-  public enum Configurations {
-     BROKER("kafka.brokerUrl")
-    ,KEY_SERIALIZER("kafka.keySerializer")
-    ,VALUE_SERIALIZER("kafka.valueSerializer")
-    ,REQUIRED_ACKS("kafka.requiredAcks")
-    ,TOPIC("kafka.topic")
-    ,PRODUCER_CONFIGS("kafka.producerConfigs");
-    ;
-    String key;
-    Configurations(String key) {
-      this.key = key;
-    }
-    public Object get(Optional<String> configPrefix, Map<String, Object> config) {
-      return config.get(StringUtils.join(".", configPrefix, Optional.of(key)));
-    }
-    public <T> T getAndConvert(Optional<String> configPrefix, Map<String, Object> config, Class<T> clazz) {
-      Object o = get(configPrefix, config);
-      if(o != null) {
-        return ConversionUtils.convert(o, clazz);
-      }
-      return null;
-    }
-  }
-  private String brokerUrl;
-  private String keySerializer = "org.apache.kafka.common.serialization.StringSerializer";
-  private String valueSerializer = "org.apache.kafka.common.serialization.StringSerializer";
-  private int requiredAcks = 1;
-  private String kafkaTopic = Constants.ENRICHMENT_TOPIC;
-  private KafkaProducer kafkaProducer;
-  private String configPrefix = null;
-  private Map<String, Object> producerConfigs = new HashMap<>();
-
-  public KafkaWriter() {}
-
-  public KafkaWriter(String brokerUrl) {
-    this.brokerUrl = brokerUrl;
-  }
-
-  public KafkaWriter withKeySerializer(String keySerializer) {
-    this.keySerializer = keySerializer;
-    return this;
-  }
-
-  public KafkaWriter withValueSerializer(String valueSerializer) {
-    this.valueSerializer = valueSerializer;
-    return this;
-  }
-
-  public KafkaWriter withRequiredAcks(Integer requiredAcks) {
-    this.requiredAcks = requiredAcks;
-    return this;
-  }
-
-  public KafkaWriter withTopic(String topic) {
-    this.kafkaTopic= topic;
-    return this;
-  }
-  public KafkaWriter withConfigPrefix(String prefix) {
-    this.configPrefix = prefix;
-    return this;
-  }
-
-  public KafkaWriter withProducerConfigs(Map<String, Object> extraConfigs) {
-    this.producerConfigs = extraConfigs;
-    return this;
-  }
-
-  public Optional<String> getConfigPrefix() {
-    return Optional.ofNullable(configPrefix);
-  }
-
-  @Override
-  public void configure(String sensorName, WriterConfiguration configuration) {
-    Map<String, Object> configMap = configuration.getSensorConfig(sensorName);
-    String brokerUrl = Configurations.BROKER.getAndConvert(getConfigPrefix(), configMap, String.class);
-    if(brokerUrl != null) {
-      this.brokerUrl = brokerUrl;
-    }
-    String keySerializer = Configurations.KEY_SERIALIZER.getAndConvert(getConfigPrefix(), configMap, String.class);
-    if(keySerializer != null) {
-      withKeySerializer(keySerializer);
-    }
-    String valueSerializer = Configurations.VALUE_SERIALIZER.getAndConvert(getConfigPrefix(), configMap, String.class);
-    if(valueSerializer != null) {
-      withValueSerializer(keySerializer);
-    }
-    Integer requiredAcks = Configurations.REQUIRED_ACKS.getAndConvert(getConfigPrefix(), configMap, Integer.class);
-    if(requiredAcks!= null) {
-      withRequiredAcks(requiredAcks);
-    }
-    String topic = Configurations.TOPIC.getAndConvert(getConfigPrefix(), configMap, String.class);
-    if(topic != null) {
-      withTopic(topic);
-    }
-    Map<String, Object> producerConfigs = (Map)Configurations.PRODUCER_CONFIGS.get(getConfigPrefix(), configMap);
-    if(producerConfigs != null) {
-      withProducerConfigs(producerConfigs);
-    }
-  }
-
-  public Map<String, Object> createProducerConfigs() {
-    Map<String, Object> producerConfig = new HashMap<>();
-    producerConfig.put("bootstrap.servers", brokerUrl);
-    producerConfig.put("key.serializer", keySerializer);
-    producerConfig.put("value.serializer", valueSerializer);
-    producerConfig.put("request.required.acks", requiredAcks);
-    producerConfig.putAll(producerConfigs == null?new HashMap<>():producerConfigs);
-    return producerConfig;
-  }
-
-  @Override
-  public void init() {
-
-    this.kafkaProducer = new KafkaProducer<>(createProducerConfigs());
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void write(String sourceType, WriterConfiguration configurations, Tuple tuple, JSONObject message) throws Exception {
-    kafkaProducer.send(new ProducerRecord<String, String>(kafkaTopic, message.toJSONString()));
-  }
-
-  @Override
-  public void close() throws Exception {
-    kafkaProducer.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
index 116e262..b3d09d0 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
@@ -18,25 +18,21 @@
 package org.apache.metron.parsers.integration;
 
 import junit.framework.Assert;
-import org.apache.commons.io.FilenameUtils;
 import org.apache.metron.TestConstants;
 import org.apache.metron.common.Constants;
+import org.apache.metron.enrichment.integration.components.ConfigUploadComponent;
 import org.apache.metron.integration.BaseIntegrationTest;
 import org.apache.metron.integration.ComponentRunner;
 import org.apache.metron.integration.Processor;
 import org.apache.metron.integration.ReadinessState;
-import org.apache.metron.integration.components.ConfigUploadComponent;
 import org.apache.metron.integration.components.KafkaWithZKComponent;
 import org.apache.metron.integration.utils.TestUtils;
 import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
-import org.apache.metron.parsers.integration.validation.SampleDataValidation;
 import org.apache.metron.test.TestDataType;
 import org.apache.metron.test.utils.SampleDataUtils;
 import org.apache.metron.test.utils.UnitTestHelper;
-import org.junit.Before;
 import org.junit.Test;
 
-import java.io.File;
 import java.util.*;
 
 public abstract class ParserIntegrationTest extends BaseIntegrationTest {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/writer/KafkaWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/writer/KafkaWriterTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/writer/KafkaWriterTest.java
deleted file mode 100644
index 57dc3e2..0000000
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/writer/KafkaWriterTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.parsers.writer;
-
-import com.google.common.collect.ImmutableMap;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.test.TestingServer;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
-import org.apache.metron.common.configuration.ParserConfigurations;
-import org.apache.metron.common.configuration.SensorParserConfig;
-import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-public class KafkaWriterTest {
-
-
-  public static final String SENSOR_TYPE = "test";
-  public WriterConfiguration createConfiguration(final Map<String, Object> parserConfig) {
-    ParserConfigurations configurations = new ParserConfigurations();
-    configurations.updateSensorParserConfig( SENSOR_TYPE
-                                           , new SensorParserConfig() {{
-                                              setParserConfig(parserConfig);
-                                                                      }}
-                                           );
-    return new ParserWriterConfiguration(configurations);
-  }
-
-  @Test
-  public void testHappyPathGlobalConfig() throws Exception {
-    KafkaWriter writer = new KafkaWriter();
-    WriterConfiguration configuration = createConfiguration(
-            new HashMap<String, Object>() {{
-              put("kafka.brokerUrl" , "localhost:6667");
-              put("kafka.topic" , SENSOR_TYPE);
-              put("kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", "value2"));
-            }}
-    );
-
-    writer.configure(SENSOR_TYPE, configuration);
-    Map<String, Object> producerConfigs = writer.createProducerConfigs();
-    Assert.assertEquals(producerConfigs.get("bootstrap.servers"), "localhost:6667");
-    Assert.assertEquals(producerConfigs.get("key.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
-    Assert.assertEquals(producerConfigs.get("value.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
-    Assert.assertEquals(producerConfigs.get("request.required.acks"), 1);
-    Assert.assertEquals(producerConfigs.get("key1"), 1);
-    Assert.assertEquals(producerConfigs.get("key2"), "value2");
-  }
-
-  @Test
-  public void testHappyPathGlobalConfigWithPrefix() throws Exception {
-    KafkaWriter writer = new KafkaWriter();
-    writer.withConfigPrefix("prefix");
-    WriterConfiguration configuration = createConfiguration(
-            new HashMap<String, Object>() {{
-              put("prefix.kafka.brokerUrl" , "localhost:6667");
-              put("prefix.kafka.topic" , SENSOR_TYPE);
-              put("prefix.kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", "value2"));
-            }}
-    );
-
-    writer.configure(SENSOR_TYPE, configuration);
-    Map<String, Object> producerConfigs = writer.createProducerConfigs();
-    Assert.assertEquals(producerConfigs.get("bootstrap.servers"), "localhost:6667");
-    Assert.assertEquals(producerConfigs.get("key.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
-    Assert.assertEquals(producerConfigs.get("value.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
-    Assert.assertEquals(producerConfigs.get("request.required.acks"), 1);
-    Assert.assertEquals(producerConfigs.get("key1"), 1);
-    Assert.assertEquals(producerConfigs.get("key2"), "value2");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
index cea635f..a21d45b 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
@@ -20,17 +20,16 @@ package org.apache.metron.writers;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.enrichment.converter.EnrichmentConverter;
 import org.apache.metron.enrichment.converter.EnrichmentKey;
 import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.integration.mock.MockTableProvider;
 import org.apache.metron.enrichment.lookup.LookupKV;
-import org.apache.metron.integration.mock.MockTableProvider;
 import org.apache.metron.test.mock.MockHTable;
-import org.apache.metron.writer.hbase.SimpleHbaseEnrichmentWriter;
+import org.apache.metron.enrichment.writer.SimpleHbaseEnrichmentWriter;
 import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Before;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
index f6436b9..2f359ef 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
@@ -20,31 +20,25 @@ package org.apache.metron.writers.integration;
 
 import com.google.common.collect.ImmutableList;
 import org.adrianwalker.multilinestring.Multiline;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.TestConstants;
-import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.enrichment.converter.EnrichmentConverter;
 import org.apache.metron.enrichment.converter.EnrichmentKey;
 import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.integration.components.ConfigUploadComponent;
+import org.apache.metron.enrichment.integration.mock.MockTableProvider;
 import org.apache.metron.enrichment.lookup.LookupKV;
 import org.apache.metron.integration.*;
-import org.apache.metron.integration.components.ConfigUploadComponent;
 import org.apache.metron.integration.components.KafkaWithZKComponent;
-import org.apache.metron.integration.mock.MockTableProvider;
-import org.apache.metron.integration.utils.TestUtils;
 import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
-import org.apache.metron.test.TestDataType;
 import org.apache.metron.test.mock.MockHTable;
-import org.apache.metron.test.utils.SampleDataUtils;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.*;
 
@@ -53,7 +47,7 @@ public class SimpleHbaseEnrichmentWriterIntegrationTest extends BaseIntegrationT
   /**
    {
     "parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
-   ,"writerClassName" : "org.apache.metron.writer.hbase.SimpleHbaseEnrichmentWriter"
+   ,"writerClassName" : "org.apache.metron.enrichment.writer.SimpleHbaseEnrichmentWriter"
    ,"sensorTopic":"dummy"
    ,"parserConfig":
    {
@@ -61,7 +55,7 @@ public class SimpleHbaseEnrichmentWriterIntegrationTest extends BaseIntegrationT
     ,"shew.cf" : "cf"
     ,"shew.keyColumns" : "col2"
     ,"shew.enrichmentType" : "et"
-    ,"shew.hbaseProvider" : "org.apache.metron.integration.mock.MockTableProvider"
+    ,"shew.hbaseProvider" : "org.apache.metron.enrichment.integration.mock.MockTableProvider"
     ,"columns" : {
                 "col1" : 0
                ,"col2" : 1

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
index a6e83f1..afdcf04 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
@@ -18,37 +18,22 @@
 
 package org.apache.metron.writers.integration;
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import org.adrianwalker.multilinestring.Multiline;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.metron.TestConstants;
 import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.FieldValidator;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.field.validation.FieldValidation;
 import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.enrichment.converter.EnrichmentConverter;
-import org.apache.metron.enrichment.converter.EnrichmentKey;
-import org.apache.metron.enrichment.converter.EnrichmentValue;
-import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.enrichment.integration.components.ConfigUploadComponent;
 import org.apache.metron.integration.*;
-import org.apache.metron.integration.components.ConfigUploadComponent;
 import org.apache.metron.integration.components.KafkaWithZKComponent;
-import org.apache.metron.integration.mock.MockTableProvider;
-import org.apache.metron.integration.utils.TestUtils;
 import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
-import org.apache.metron.test.TestDataType;
-import org.apache.metron.test.mock.MockHTable;
-import org.apache.metron.test.utils.SampleDataUtils;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.*;
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-solr/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/pom.xml b/metron-platform/metron-solr/pom.xml
index 54b50f4..3aceda0 100644
--- a/metron-platform/metron-solr/pom.xml
+++ b/metron-platform/metron-solr/pom.xml
@@ -32,16 +32,16 @@
             <version>${global_hbase_guava_version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.metron</groupId>
-            <artifactId>metron-enrichment</artifactId>
-            <version>${project.parent.version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.apache.solr</groupId>
             <artifactId>solr-solrj</artifactId>
             <version>${global_solr_version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-enrichment</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.solr</groupId>
             <artifactId>solr-test-framework</artifactId>
             <version>${global_solr_version}</version>
@@ -98,6 +98,10 @@
                     <artifactId>servlet-api</artifactId>
                     <groupId>javax.servlet</groupId>
                 </exclusion>
+                <exclusion>
+                    <artifactId>commons-httpclient</artifactId>
+                    <groupId>commons-httpclient</groupId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
@@ -122,8 +126,8 @@
                     <groupId>javax.servlet</groupId>
                 </exclusion>
                 <exclusion>
-                    <groupId>org.apache.httpcomponents</groupId>
-                    <artifactId>httpclient</artifactId>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
@@ -133,6 +137,37 @@
             <version>${global_mockito_version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-indexing</artifactId>
+            <version>${project.parent.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-enrichment</artifactId>
+            <version>${project.parent.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-indexing</artifactId>
+            <version>${project.parent.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.httpcomponents</groupId>
+                    <artifactId>httpclient</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
 
     </dependencies>
 
@@ -201,6 +236,7 @@
                             <goal>shade</goal>
                         </goals>
                         <configuration>
+
                             <artifactSet>
                                 <excludes>
                                     <exclude>storm:storm-core:*</exclude>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-solr/src/main/config/solr.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/main/config/solr.properties b/metron-platform/metron-solr/src/main/config/solr.properties
index cdfe25a..8489d8e 100644
--- a/metron-platform/metron-solr/src/main/config/solr.properties
+++ b/metron-platform/metron-solr/src/main/config/solr.properties
@@ -14,37 +14,21 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
+##### Storm #####
+indexing.workers=1
+indexing.executors=0
 
 ##### Kafka #####
 
 kafka.zk=node1:2181
 kafka.broker=node1:6667
-spout.kafka.topic.asa=asa
-spout.kafka.topic.bro=bro
-spout.kafka.topic.fireeye=fireeye
-spout.kafka.topic.ise=ise
-spout.kafka.topic.lancope=lancope
-spout.kafka.topic.paloalto=paloalto
-spout.kafka.topic.pcap=pcap
-spout.kafka.topic.snort=snort
-spout.kafka.topic.yaf=yaf
+kafka.start=WHERE_I_LEFT_OFF
 
 ##### Indexing #####
+index.input.topic=indexing
+index.error.topic=indexing_error
 writer.class.name=org.apache.metron.solr.writer.SolrWriter
 
-##### ElasticSearch #####
-
-es.ip=10.22.0.214
-es.port=9300
-es.clustername=elasticsearch
-
-##### MySQL #####
-
-mysql.ip=10.22.0.214
-mysql.port=3306
-mysql.username=root
-mysql.password=hadoop123
-
 ##### Metrics #####
 
 #reporters
@@ -63,23 +47,6 @@ org.apache.metron.metrics.TelemetryParserBolt.emits=true
 org.apache.metron.metrics.TelemetryParserBolt.fails=true
 
 
-#GenericEnrichmentBolt
-org.apache.metron.metrics.GenericEnrichmentBolt.acks=true
-org.apache.metron.metrics.GenericEnrichmentBolt.emits=true
-org.apache.metron.metrics.GenericEnrichmentBolt.fails=true
-
-
-#TelemetryIndexingBolt
-org.apache.metron.metrics.TelemetryIndexingBolt.acks=true
-org.apache.metron.metrics.TelemetryIndexingBolt.emits=true
-org.apache.metron.metrics.TelemetryIndexingBolt.fails=true
-
-##### Host Enrichment #####
-
-org.apache.metron.enrichment.host.known_hosts=[{"ip":"10.1.128.236", "local":"YES", "type":"webserver", "asset_value" : "important"},\
-{"ip":"10.1.128.237", "local":"UNKNOWN", "type":"unknown", "asset_value" : "important"},\
-{"ip":"10.60.10.254", "local":"YES", "type":"printer", "asset_value" : "important"}]
-
 ##### HDFS #####
 
 bolt.hdfs.batch.size=5000
@@ -91,19 +58,3 @@ bolt.hdfs.finished.file.path=/paloalto/rotated
 bolt.hdfs.compression.codec.class=org.apache.hadoop.io.compress.SnappyCodec
 index.hdfs.output=/tmp/metron/enriched
 
-##### HBase #####
-bolt.hbase.table.name=pcap
-bolt.hbase.table.fields=t:value
-bolt.hbase.table.key.tuple.field.name=key
-bolt.hbase.table.timestamp.tuple.field.name=timestamp
-bolt.hbase.enable.batching=false
-bolt.hbase.write.buffer.size.in.bytes=2000000
-bolt.hbase.durability=SKIP_WAL
-bolt.hbase.partitioner.region.info.refresh.interval.mins=60
-
-##### Threat Intel #####
-
-threat.intel.tracker.table=
-threat.intel.tracker.cf=
-threat.intel.ip.table=
-threat.intel.ip.cf=

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-solr/src/main/scripts/start_solr_topology.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/main/scripts/start_solr_topology.sh b/metron-platform/metron-solr/src/main/scripts/start_solr_topology.sh
index 14628d2..7a98fc7 100755
--- a/metron-platform/metron-solr/src/main/scripts/start_solr_topology.sh
+++ b/metron-platform/metron-solr/src/main/scripts/start_solr_topology.sh
@@ -19,4 +19,4 @@
 METRON_VERSION=${project.version}
 METRON_HOME=/usr/metron/$METRON_VERSION
 TOPOLOGY_JAR=${project.artifactId}-$METRON_VERSION.jar
-storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/enrichment/remote.yaml --filter $METRON_HOME/config/solr.properties
+storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/indexing/remote.yaml --filter $METRON_HOME/config/solr.properties

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrEnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrEnrichmentIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrEnrichmentIntegrationTest.java
deleted file mode 100644
index 31e8d49..0000000
--- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrEnrichmentIntegrationTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.solr.integration;
-
-import com.google.common.base.Function;
-import org.apache.metron.common.configuration.Configurations;
-import org.apache.metron.common.interfaces.FieldNameConverter;
-import org.apache.metron.integration.EnrichmentIntegrationTest;
-import org.apache.metron.integration.ComponentRunner;
-import org.apache.metron.integration.InMemoryComponent;
-import org.apache.metron.integration.Processor;
-import org.apache.metron.integration.ReadinessState;
-import org.apache.metron.integration.components.KafkaWithZKComponent;
-import org.apache.metron.solr.integration.components.SolrComponent;
-import org.apache.metron.integration.utils.SampleUtil;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
-import org.apache.metron.common.utils.JSONUtils;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-public class SolrEnrichmentIntegrationTest extends EnrichmentIntegrationTest {
-
-  private String collection = "metron";
-  private FieldNameConverter fieldNameConverter = fieldName -> fieldName;
-  @Override
-  public FieldNameConverter getFieldNameConverter() {
-    return fieldNameConverter;
-  }
-
-  @Override
-  public InMemoryComponent getSearchComponent(final Properties topologyProperties) throws Exception {
-    SolrComponent solrComponent = new SolrComponent.Builder()
-            .addCollection(collection, "../metron-solr/src/test/resources/solr/conf")
-            .withPostStartCallback(new Function<SolrComponent, Void>() {
-              @Nullable
-              @Override
-              public Void apply(@Nullable SolrComponent solrComponent) {
-                topologyProperties.setProperty("solr.zk", solrComponent.getZookeeperUrl());
-                try {
-                  String testZookeeperUrl = topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY);
-                  Configurations configurations = SampleUtil.getSampleConfigs();
-                  Map<String, Object> globalConfig = configurations.getGlobalConfig();
-                  globalConfig.put("solr.zookeeper", solrComponent.getZookeeperUrl());
-                  ConfigurationsUtils.writeGlobalConfigToZookeeper(JSONUtils.INSTANCE.toJSON(globalConfig), testZookeeperUrl);
-                } catch (Exception e) {
-                  e.printStackTrace();
-                }
-                return null;
-              }
-            })
-            .build();
-    return solrComponent;
-  }
-
-  @Override
-  public Processor<List<Map<String, Object>>> getProcessor(final List<byte[]> inputMessages) {
-    return new Processor<List<Map<String, Object>>>() {
-      List<Map<String, Object>> docs = null;
-      public ReadinessState process(ComponentRunner runner) {
-        SolrComponent solrComponent = runner.getComponent("search", SolrComponent.class);
-        if (solrComponent.hasCollection(collection)) {
-          List<Map<String, Object>> docsFromDisk;
-          try {
-            docs = solrComponent.getAllIndexedDocs(collection);
-            docsFromDisk = EnrichmentIntegrationTest.readDocsFromDisk(hdfsDir);
-            System.out.println(docs.size() + " vs " + inputMessages.size() + " vs " + docsFromDisk.size());
-          } catch (IOException e) {
-            throw new IllegalStateException("Unable to retrieve indexed documents.", e);
-          }
-          if (docs.size() < inputMessages.size() || docs.size() != docsFromDisk.size()) {
-            return ReadinessState.NOT_READY;
-          } else {
-            return ReadinessState.READY;
-          }
-        } else {
-          return ReadinessState.NOT_READY;
-        }
-      }
-
-      public List<Map<String, Object>> getResult() {
-        return docs;
-      }
-    };
-  }
-
-  @Override
-  public void setAdditionalProperties(Properties topologyProperties) {
-    topologyProperties.setProperty("writer.class.name", "org.apache.metron.solr.writer.SolrWriter");
-  }
-
-  @Override
-  public String cleanField(String field) {
-    return field.replaceFirst("_[dfils]$", "");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
new file mode 100644
index 0000000..dd9c559
--- /dev/null
+++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.solr.integration;
+
+import com.google.common.base.Function;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.interfaces.FieldNameConverter;
+import org.apache.metron.enrichment.integration.utils.SampleUtil;
+import org.apache.metron.indexing.integration.IndexingIntegrationTest;
+import org.apache.metron.integration.ComponentRunner;
+import org.apache.metron.integration.InMemoryComponent;
+import org.apache.metron.integration.Processor;
+import org.apache.metron.integration.ReadinessState;
+import org.apache.metron.integration.components.KafkaWithZKComponent;
+import org.apache.metron.solr.integration.components.SolrComponent;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.utils.JSONUtils;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class SolrIndexingIntegrationTest extends IndexingIntegrationTest {
+
+  private String collection = "metron";
+  private FieldNameConverter fieldNameConverter = fieldName -> fieldName;
+  @Override
+  public FieldNameConverter getFieldNameConverter() {
+    return fieldNameConverter;
+  }
+
+  @Override
+  public InMemoryComponent getSearchComponent(final Properties topologyProperties) throws Exception {
+    SolrComponent solrComponent = new SolrComponent.Builder()
+            .addCollection(collection, "../metron-solr/src/test/resources/solr/conf")
+            .withPostStartCallback(new Function<SolrComponent, Void>() {
+              @Nullable
+              @Override
+              public Void apply(@Nullable SolrComponent solrComponent) {
+                topologyProperties.setProperty("solr.zk", solrComponent.getZookeeperUrl());
+                try {
+                  String testZookeeperUrl = topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY);
+                  Configurations configurations = SampleUtil.getSampleConfigs();
+                  Map<String, Object> globalConfig = configurations.getGlobalConfig();
+                  globalConfig.put("solr.zookeeper", solrComponent.getZookeeperUrl());
+                  ConfigurationsUtils.writeGlobalConfigToZookeeper(JSONUtils.INSTANCE.toJSON(globalConfig), testZookeeperUrl);
+                } catch (Exception e) {
+                  e.printStackTrace();
+                }
+                return null;
+              }
+            })
+            .build();
+    return solrComponent;
+  }
+
+  @Override
+  public Processor<List<Map<String, Object>>> getProcessor(final List<byte[]> inputMessages) {
+    return new Processor<List<Map<String, Object>>>() {
+      List<Map<String, Object>> docs = null;
+      public ReadinessState process(ComponentRunner runner) {
+        SolrComponent solrComponent = runner.getComponent("search", SolrComponent.class);
+        if (solrComponent.hasCollection(collection)) {
+          List<Map<String, Object>> docsFromDisk;
+          try {
+            docs = solrComponent.getAllIndexedDocs(collection);
+            docsFromDisk = readDocsFromDisk(hdfsDir);
+            System.out.println(docs.size() + " vs " + inputMessages.size() + " vs " + docsFromDisk.size());
+          } catch (IOException e) {
+            throw new IllegalStateException("Unable to retrieve indexed documents.", e);
+          }
+          if (docs.size() < inputMessages.size() || docs.size() != docsFromDisk.size()) {
+            return ReadinessState.NOT_READY;
+          } else {
+            return ReadinessState.READY;
+          }
+        } else {
+          return ReadinessState.NOT_READY;
+        }
+      }
+
+      public List<Map<String, Object>> getResult() {
+        return docs;
+      }
+    };
+  }
+
+  @Override
+  public void setAdditionalProperties(Properties topologyProperties) {
+    topologyProperties.setProperty("writer.class.name", "org.apache.metron.solr.writer.SolrWriter");
+  }
+
+  @Override
+  public String cleanField(String field) {
+    return field.replaceFirst("_[dfils]$", "");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
index 0993e0d..91c34af 100644
--- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
+++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
@@ -17,11 +17,9 @@
  */
 package org.apache.metron.solr.writer;
 
-import backtype.storm.tuple.Tuple;
-import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.common.configuration.writer.EnrichmentWriterConfiguration;
-import org.apache.metron.integration.utils.SampleUtil;
+import org.apache.metron.enrichment.integration.utils.SampleUtil;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.common.SolrInputDocument;
 import org.hamcrest.Description;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/pom.xml b/metron-platform/metron-writer/pom.xml
new file mode 100644
index 0000000..d088c4b
--- /dev/null
+++ b/metron-platform/metron-writer/pom.xml
@@ -0,0 +1,231 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software
+	Foundation (ASF) under one or more contributor license agreements. See the
+	NOTICE file distributed with this work for additional information regarding
+	copyright ownership. The ASF licenses this file to You under the Apache License,
+	Version 2.0 (the "License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+	Unless required by applicable law or agreed to in writing, software distributed
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+  the specific language governing permissions and limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>metron-platform</artifactId>
+        <version>0.2.0BETA</version>
+    </parent>
+    <artifactId>metron-writer</artifactId>
+    <name>metron-writer</name>
+    <description>Components common to all enrichments</description>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+        <commons.config.version>1.10</commons.config.version>
+        <antlr.version>4.5</antlr.version>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-common</artifactId>
+            <version>${global_hbase_version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>${global_hbase_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${global_storm_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.9.2</artifactId>
+            <version>${global_kafka_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.sun.jmx</groupId>
+                    <artifactId>jmxri</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jdmk</groupId>
+                    <artifactId>jmxtools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.jms</groupId>
+                    <artifactId>jms</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-hdfs</artifactId>
+            <version>${global_storm_version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.storm</groupId>
+                    <artifactId>storm-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>${global_hadoop_version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+            </exclusions>
+            <scope>provided</scope>
+        </dependency>
+         <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-common</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+    </dependencies>
+
+    <reporting>
+        <plugins>
+            <!-- Normally, dependency report takes time, skip it -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-project-info-reports-plugin</artifactId>
+                <version>2.7</version>
+
+                <configuration>
+                    <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>emma-maven-plugin</artifactId>
+                <version>1.0-alpha-3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-pmd-plugin</artifactId>
+                <configuration>
+                  <targetJdk>${global_java_version}</targetJdk>
+                </configuration>
+            </plugin>
+        </plugins>
+    </reporting>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                    <source>${global_java_version}</source>
+                    <compilerArgument>-Xlint:unchecked</compilerArgument>
+                    <target>${global_java_version}</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.antlr</groupId>
+                <artifactId>antlr4-maven-plugin</artifactId>
+                <version>${antlr.version}</version>
+                <configuration>
+                  <outputDirectory>${basedir}/src/main/java</outputDirectory>
+                </configuration>
+                <executions>
+                  <execution>
+                    <goals>
+                      <goal>antlr4</goal>
+                    </goals>
+                  </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${global_shade_version}</version>
+                <configuration>
+                    <createDependencyReducedPom>true</createDependencyReducedPom>
+                    <artifactSet>
+                        <excludes>
+                            <exclude>*slf4j*</exclude>
+                        </excludes>
+                    </artifactSet>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <relocations>
+                                <relocation>
+                                    <pattern>com.google.common</pattern>
+                                    <shadedPattern>org.apache.metron.guava</shadedPattern>
+                                </relocation>
+                            </relocations>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+                                    <resource>.yaml</resource>
+                                </transformer>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass></mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/AbstractWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/AbstractWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/AbstractWriter.java
new file mode 100644
index 0000000..0fa596a
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/AbstractWriter.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer;
+
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+
+public abstract class AbstractWriter {
+  public AbstractWriter() {}
+  public abstract void configure(String sensorName, WriterConfiguration configuration);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
new file mode 100644
index 0000000..ba647bd
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import com.google.common.collect.Iterables;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.metron.common.utils.ErrorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class BulkWriterComponent<MESSAGE_T> {
+  public static final Logger LOG = LoggerFactory
+            .getLogger(BulkWriterComponent.class);
+  private Map<String, Collection<Tuple>> sensorTupleMap = new HashMap<>();
+  private Map<String, List<MESSAGE_T>> sensorMessageMap = new HashMap<>();
+  private OutputCollector collector;
+  private boolean handleCommit = true;
+  private boolean handleError = true;
+  public BulkWriterComponent(OutputCollector collector) {
+    this.collector = collector;
+  }
+
+  public BulkWriterComponent(OutputCollector collector, boolean handleCommit, boolean handleError) {
+    this(collector);
+    this.handleCommit = handleCommit;
+    this.handleError = handleError;
+  }
+
+  public void commit(Iterable<Tuple> tuples) {
+    tuples.forEach(t -> collector.ack(t));
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Acking " + Iterables.size(tuples) + " tuples");
+    }
+  }
+
+  public void error(Throwable e, Iterable<Tuple> tuples) {
+    tuples.forEach(t -> collector.ack(t));
+    if(!Iterables.isEmpty(tuples)) {
+      LOG.error("Failing " + Iterables.size(tuples) + " tuples", e);
+      ErrorUtils.handleError(collector, e, Constants.ERROR_STREAM);
+    }
+  }
+
+  protected Collection<Tuple> createTupleCollection() {
+    return new ArrayList<>();
+  }
+
+
+  public void errorAll(Throwable e) {
+    for(Map.Entry<String, Collection<Tuple>> kv : sensorTupleMap.entrySet()) {
+      error(e, kv.getValue());
+      sensorTupleMap.remove(kv.getKey());
+      sensorMessageMap.remove(kv.getKey());
+    }
+  }
+
+  public void errorAll(String sensorType, Throwable e) {
+    error(e, Optional.ofNullable(sensorTupleMap.get(sensorType)).orElse(new ArrayList<>()));
+    sensorTupleMap.remove(sensorType);
+    sensorMessageMap.remove(sensorType);
+  }
+  public void write( String sensorType
+                   , Tuple tuple
+                   , MESSAGE_T message
+                   , BulkMessageWriter<MESSAGE_T> bulkMessageWriter
+                   , WriterConfiguration configurations
+                   ) throws Exception
+  {
+    int batchSize = configurations.getBatchSize(sensorType);
+    Collection<Tuple> tupleList = sensorTupleMap.get(sensorType);
+    if (tupleList == null) {
+      tupleList = createTupleCollection();
+    }
+    tupleList.add(tuple);
+    List<MESSAGE_T> messageList = sensorMessageMap.get(sensorType);
+    if (messageList == null) {
+      messageList = new ArrayList<>();
+    }
+    messageList.add(message);
+
+    if (tupleList.size() < batchSize) {
+      sensorTupleMap.put(sensorType, tupleList);
+      sensorMessageMap.put(sensorType, messageList);
+    } else {
+      try {
+        bulkMessageWriter.write(sensorType, configurations, tupleList, messageList);
+        if(handleCommit) {
+          commit(tupleList);
+        }
+
+      } catch (Throwable e) {
+        if(handleError) {
+          error(e, tupleList);
+        }
+        else {
+          throw e;
+        }
+      }
+      finally {
+        sensorTupleMap.remove(sensorType);
+        sensorMessageMap.remove(sensorType);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
new file mode 100644
index 0000000..b9aee7e
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer;
+
+import backtype.storm.tuple.Tuple;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.metron.common.utils.ConversionUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+
+public class NoopWriter extends AbstractWriter implements BulkMessageWriter<Object> {
+
+  public static class RandomLatency implements Function<Void, Void> {
+    private int min;
+    private int max;
+
+    public RandomLatency(int min, int max) {
+      this.min = min;
+      this.max = max;
+    }
+
+    public int getMin() {
+      return min;
+    }
+    public int getMax() {
+      return max;
+    }
+
+    @Override
+    public Void apply(Void aVoid) {
+      int sleepMs = ThreadLocalRandom.current().nextInt(min, max + 1);
+      try {
+        Thread.sleep(sleepMs);
+      } catch (InterruptedException e) {
+      }
+      return null;
+    }
+  }
+
+  public static class FixedLatency implements Function<Void, Void> {
+    private int latency;
+    public FixedLatency(int latency) {
+      this.latency = latency;
+    }
+    public int getLatency() {
+      return latency;
+    }
+
+    @Override
+    public Void apply(Void aVoid) {
+      if(latency > 0) {
+        try {
+          Thread.sleep(latency);
+        } catch (InterruptedException e) {
+        }
+      }
+      return null;
+    }
+  }
+  Function<Void, Void> sleepFunction = null;
+
+  public NoopWriter withLatency(String sleepConfig) {
+    sleepFunction = getSleepFunction(sleepConfig);
+    return this;
+  }
+
+
+  private Function<Void, Void> getSleepFunction(String sleepConfig) {
+    String usageMessage = "Unexpected: " + sleepConfig + " Expected value: integer for a fixed sleep duration in milliseconds (e.g. 10) " +
+            "or a range of latencies separated by a comma (e.g. \"10, 20\") to sleep a random amount in that range.";
+    try {
+      if (sleepConfig.contains(",")) {
+        // random latency within a range.
+        Iterable<String> it = Splitter.on(',').split(sleepConfig);
+        Integer min = ConversionUtils.convert(Iterables.getFirst(it, "").trim(), Integer.class);
+        Integer max= ConversionUtils.convert(Iterables.getLast(it, "").trim(), Integer.class);
+        if (min != null && max != null) {
+          return new RandomLatency(min, max);
+        }
+      } else {
+        //fixed latency
+        Integer latency = ConversionUtils.convert(sleepConfig.trim(), Integer.class);
+        if(latency != null) {
+          return new FixedLatency(latency);
+        }
+      }
+    }
+    catch(Throwable t) {
+      throw new IllegalArgumentException(usageMessage, t);
+    }
+    throw new IllegalArgumentException(usageMessage);
+  }
+
+  @Override
+  public void configure(String sensorName, WriterConfiguration configuration) {
+    Map<String, Object> config = configuration.getSensorConfig(sensorName);
+    if(config != null) {
+      Object noopLatency = config.get("noopLatency");
+      if(noopLatency != null) {
+        sleepFunction = getSleepFunction(noopLatency.toString());
+      }
+    }
+  }
+
+  @Override
+  public void init(Map stormConf, WriterConfiguration config) throws Exception {
+  }
+
+  @Override
+  public void write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<Object> messages) throws Exception {
+    if(sleepFunction != null) {
+      sleepFunction.apply(null);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
new file mode 100644
index 0000000..3df3e9d
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer;
+
+import backtype.storm.tuple.Tuple;
+import com.google.common.collect.Iterables;
+import org.apache.metron.common.configuration.writer.SingleBatchConfigurationFacade;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.metron.common.interfaces.MessageWriter;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+public class WriterToBulkWriter<MESSAGE_T> implements BulkMessageWriter<MESSAGE_T>, Serializable {
+  MessageWriter<MESSAGE_T> messageWriter;
+
+  public static transient Function<WriterConfiguration, WriterConfiguration> TRANSFORMATION = config -> new SingleBatchConfigurationFacade(config);
+
+  public WriterToBulkWriter(MessageWriter<MESSAGE_T> messageWriter) {
+    this.messageWriter = messageWriter;
+  }
+  @Override
+  public void init(Map stormConf, WriterConfiguration config) throws Exception {
+    messageWriter.init();
+  }
+
+  @Override
+  public void write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<MESSAGE_T> messages) throws Exception {
+    if(messages.size() > 1) {
+      throw new IllegalStateException("WriterToBulkWriter expects a batch of exactly 1");
+    }
+    messageWriter.write(sensorType, configurations, Iterables.getFirst(tuples, null), Iterables.getFirst(messages, null));
+  }
+
+  @Override
+  public void close() throws Exception {
+    messageWriter.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
new file mode 100644
index 0000000..91e8446
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
+import org.apache.metron.common.configuration.writer.EnrichmentWriterConfiguration;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.interfaces.MessageWriter;
+import org.apache.metron.common.utils.MessageUtils;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.metron.writer.BulkWriterComponent;
+import org.apache.metron.writer.WriterToBulkWriter;
+import org.apache.metron.writer.message.MessageGetter;
+import org.apache.metron.writer.message.MessageGetters;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.function.Function;
+
+public class BulkMessageWriterBolt extends ConfiguredEnrichmentBolt {
+
+  private static final Logger LOG = LoggerFactory
+          .getLogger(BulkMessageWriterBolt.class);
+  private BulkMessageWriter<JSONObject> bulkMessageWriter;
+  private BulkWriterComponent<JSONObject> writerComponent;
+  private String messageGetterStr = MessageGetters.NAMED.name();
+  private transient MessageGetter messageGetter = null;
+  private transient Function<WriterConfiguration, WriterConfiguration> configurationTransformation;
+  public BulkMessageWriterBolt(String zookeeperUrl) {
+    super(zookeeperUrl);
+  }
+
+  public BulkMessageWriterBolt withBulkMessageWriter(BulkMessageWriter<JSONObject > bulkMessageWriter) {
+    this.bulkMessageWriter = bulkMessageWriter;
+    return this;
+  }
+
+  public BulkMessageWriterBolt withMessageWriter(MessageWriter<JSONObject> messageWriter) {
+    this.bulkMessageWriter = new WriterToBulkWriter<>(messageWriter);
+    return this;
+  }
+
+  public BulkMessageWriterBolt withMessageGetter(String messageGetter) {
+    this.messageGetterStr = messageGetter;
+    return this;
+  }
+
+  @Override
+  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    this.writerComponent = new BulkWriterComponent<>(collector);
+    super.prepare(stormConf, context, collector);
+    messageGetter = MessageGetters.valueOf(messageGetterStr);
+    if(bulkMessageWriter instanceof WriterToBulkWriter) {
+      configurationTransformation = WriterToBulkWriter.TRANSFORMATION;
+    }
+    else {
+      configurationTransformation = x -> x;
+    }
+    try {
+      bulkMessageWriter.init(stormConf
+                            , configurationTransformation.apply(new EnrichmentWriterConfiguration(getConfigurations()))
+                            );
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void execute(Tuple tuple) {
+    JSONObject message = messageGetter.getMessage(tuple);
+    String sensorType = MessageUtils.getSensorType(message);
+    try
+    {
+      writerComponent.write(sensorType
+                           , tuple
+                           , message
+                           , bulkMessageWriter
+                           , configurationTransformation.apply(new EnrichmentWriterConfiguration(getConfigurations()))
+                           );
+    }
+    catch(Exception e) {
+      throw new RuntimeException("This should have been caught in the writerComponent.  If you see this, file a JIRA", e);
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declareStream(Constants.ERROR_STREAM, new Fields("message"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
new file mode 100644
index 0000000..01f1245
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.hdfs;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.NoRotationPolicy;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.rotation.RotationAction;
+import org.json.simple.JSONObject;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
+  List<RotationAction> rotationActions = new ArrayList<>();
+  FileRotationPolicy rotationPolicy = new NoRotationPolicy();
+  SyncPolicy syncPolicy = new CountSyncPolicy(1); //sync every time, duh.
+  FileNameFormat fileNameFormat;
+  Map<String, SourceHandler> sourceHandlerMap = new HashMap<>();
+  transient Map stormConfig;
+  public HdfsWriter withFileNameFormat(FileNameFormat fileNameFormat){
+    this.fileNameFormat = fileNameFormat;
+    return this;
+  }
+
+  public HdfsWriter withSyncPolicy(SyncPolicy syncPolicy){
+    this.syncPolicy = syncPolicy;
+    return this;
+  }
+  public HdfsWriter withRotationPolicy(FileRotationPolicy rotationPolicy){
+    this.rotationPolicy = rotationPolicy;
+    return this;
+  }
+
+  public HdfsWriter addRotationAction(RotationAction action){
+    this.rotationActions.add(action);
+    return this;
+  }
+
+  @Override
+  public void init(Map stormConfig, WriterConfiguration configurations) {
+    this.stormConfig = stormConfig;
+  }
+
+
+  @Override
+  public void write(String sourceType
+                   , WriterConfiguration configurations
+                   , Iterable<Tuple> tuples
+                   , List<JSONObject> messages
+                   ) throws Exception
+  {
+    SourceHandler handler = getSourceHandler(sourceType);
+    handler.handle(messages);
+  }
+
+  @Override
+  public void close() {
+    for(SourceHandler handler : sourceHandlerMap.values()) {
+      handler.close();
+    }
+  }
+  private synchronized SourceHandler getSourceHandler(String sourceType) throws IOException {
+    SourceHandler ret = sourceHandlerMap.get(sourceType);
+    if(ret == null) {
+      ret = new SourceHandler(rotationActions, rotationPolicy, syncPolicy, new SourceFileNameFormat(sourceType, fileNameFormat), stormConfig);
+      sourceHandlerMap.put(sourceType, ret);
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java
new file mode 100644
index 0000000..1c345b4
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.hdfs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.storm.hdfs.common.rotation.RotationAction;
+
+import java.io.IOException;
+
+public class SourceAwareMoveAction implements RotationAction{
+  private static final Logger LOG = Logger.getLogger(SourceHandler.class);
+  private String destination;
+
+  public SourceAwareMoveAction toDestination(String destDir){
+    destination = destDir;
+    return this;
+  }
+
+  private static String getSource(Path filePath) {
+    return filePath.getParent().getName();
+  }
+
+  @Override
+  public void execute(FileSystem fileSystem, Path filePath) throws IOException {
+    Path destPath = new Path(new Path(destination, getSource(filePath)), filePath.getName());
+    LOG.info("Moving file " + filePath + " to " + destPath);
+    boolean success = fileSystem.rename(filePath, destPath);
+    return;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
new file mode 100644
index 0000000..ae0242d
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.hdfs;
+
+import backtype.storm.task.TopologyContext;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+
+import java.util.Map;
+
+public class SourceFileNameFormat implements FileNameFormat {
+  FileNameFormat delegate;
+  String sourceType;
+  public SourceFileNameFormat(String sourceType, FileNameFormat delegate) {
+    this.delegate = delegate;
+    this.sourceType = sourceType;
+  }
+
+  @Override
+  public void prepare(Map map, TopologyContext topologyContext) {
+    this.delegate.prepare(map, topologyContext);
+  }
+
+  @Override
+  public String getName(long l, long l1) {
+    return delegate.getName(l, l1);
+  }
+
+  @Override
+  public String getPath() {
+    return delegate.getPath() + "/" + sourceType;
+  }
+}