You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/03/21 18:06:02 UTC

[20/43] incubator-metron git commit: METRON-56 Create unified enrichment topology (merrimanr via cestella) closes apache/incubator-metron#33

METRON-56 Create unified enrichment topology (merrimanr via cestella) closes apache/incubator-metron#33


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/9f96399d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/9f96399d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/9f96399d

Branch: refs/heads/Metron_0.1BETA
Commit: 9f96399d9ecb252da13edf7bc44a366740945e85
Parents: 0e1055a
Author: merrimanr <me...@gmail.com>
Authored: Tue Mar 1 13:20:13 2016 -0500
Committer: cstella <ce...@gmail.com>
Committed: Tue Mar 1 13:20:13 2016 -0500

----------------------------------------------------------------------
 .../metron/alerts/TelemetryAlertsBolt.java      |    4 +-
 metron-streaming/Metron-Common/pom.xml          |   15 +
 .../main/java/org/apache/metron/Constants.java  |   27 +
 .../metron/bolt/BulkMessageWriterBolt.java      |  102 +
 .../org/apache/metron/bolt/ConfiguredBolt.java  |   91 +
 .../java/org/apache/metron/bolt/JoinBolt.java   |   39 +-
 .../java/org/apache/metron/bolt/SplitBolt.java  |   46 +-
 .../org/apache/metron/domain/Enrichment.java    |   21 +-
 .../org/apache/metron/domain/SourceConfig.java  |   88 +
 .../metron/enrichment/EnrichmentConstants.java  |   28 +
 .../enrichment/EnrichmentSplitterBolt.java      |  129 -
 .../java/org/apache/metron/hbase/HBaseBolt.java |    5 +-
 .../org/apache/metron/hbase/HTableProvider.java |    3 -
 .../metron/helpers/topology/ErrorGenerator.java |   54 -
 .../metron/helpers/topology/ErrorUtils.java     |   64 +
 .../metron/spout/pcap/HDFSWriterCallback.java   |  169 ++
 .../metron/spout/pcap/HDFSWriterConfig.java     |   97 +
 .../apache/metron/topology/TopologyUtils.java   |   28 +
 .../org/apache/metron/utils/ConfigUtils.java    |   48 +
 .../org/apache/metron/writer/HBaseWriter.java   |   88 +
 .../org/apache/metron/writer/PcapWriter.java    |   52 +
 .../writer/interfaces/BulkMessageWriter.java    |   30 +
 .../metron/writer/interfaces/MessageWriter.java |   27 +
 .../src/main/java/storm/kafka/Callback.java     |   26 +
 .../java/storm/kafka/CallbackCollector.java     |  182 ++
 .../java/storm/kafka/CallbackKafkaSpout.java    |   93 +
 .../src/main/java/storm/kafka/EmitContext.java  |  146 +
 .../resources/config/source/bro-config.json     |   13 +
 .../resources/config/source/pcap-config.json    |   13 +
 .../resources/config/source/snort-config.json   |   13 +
 .../resources/config/source/yaf-config.json     |   13 +
 .../adapters/host/HostFromJSONListAdapter.java  |    7 +-
 .../enrichment/bolt/EnrichmentJoinBolt.java     |   43 +-
 .../enrichment/bolt/EnrichmentSplitterBolt.java |  140 +
 .../enrichment/bolt/GenericEnrichmentBolt.java  |   67 +-
 .../enrichment/bolt/ThreatIntelJoinBolt.java    |   41 +
 .../bolt/ThreatIntelSplitterBolt.java           |   40 +
 .../enrichment/utils/EnrichmentUtils.java       |   32 +
 .../enrichment/utils/ThreatIntelUtils.java      |   32 +
 .../metron/indexing/AbstractIndexingBolt.java   |    7 +-
 .../metron/indexing/TelemetryIndexingBolt.java  |   23 +-
 .../adapters/ESTimedRotatingAdapter.java        |    3 +-
 .../metron/writer/ElasticSearchWriter.java      |   95 +
 .../org/apache/metron/writer/HdfsWriter.java    |   44 +
 metron-streaming/Metron-MessageParsers/pom.xml  |   13 +
 .../java/org/apache/metron/bolt/ParserBolt.java |   88 +
 .../org/apache/metron/bolt/PcapParserBolt.java  |   10 +-
 .../apache/metron/bolt/TelemetryParserBolt.java |   12 +-
 .../org/apache/metron/parser/MessageParser.java |   25 +
 .../metron/parsing/parsers/GrokParser.java      |  146 +
 .../metron/parsing/parsers/PcapParser.java      |   23 +-
 .../org/apache/metron/writer/KafkaWriter.java   |   79 +
 .../src/main/resources/patterns/common          |   96 +
 .../src/main/resources/patterns/yaf             |  113 +-
 metron-streaming/Metron-Testing/pom.xml         |   28 +-
 .../util/integration/ComponentRunner.java       |   15 +-
 .../components/ElasticSearchComponent.java      |    8 +-
 .../components/KafkaWithZKComponent.java        |  228 ++
 .../util/integration/util/KafkaUtil.java        |   41 +
 .../org/apache/metron/utils/KafkaLoader.java    |   88 +
 .../apache/metron/utils/SourceConfigUtils.java  |   95 +
 .../Metron_Configs/topologies/asa/local.yaml    |  401 ---
 .../Metron_Configs/topologies/asa/remote.yaml   |  385 +--
 .../Metron_Configs/topologies/asa/test.yaml     |   82 +
 .../Metron_Configs/topologies/bro/local.yaml    |  192 --
 .../Metron_Configs/topologies/bro/remote.yaml   |  176 +-
 .../Metron_Configs/topologies/bro/test.yaml     |   82 +
 .../topologies/enrichment/remote.yaml           |  331 +++
 .../topologies/enrichment/test.yaml             |  314 ++
 .../topologies/fireeye/local.yaml               |  401 ---
 .../topologies/fireeye/remote.yaml              |  382 +--
 .../Metron_Configs/topologies/fireeye/test.yaml |   79 +
 .../Metron_Configs/topologies/ise/local.yaml    |  192 --
 .../Metron_Configs/topologies/ise/remote.yaml   |  177 +-
 .../Metron_Configs/topologies/ise/test.yaml     |   79 +
 .../topologies/lancope/local.yaml               |  401 ---
 .../topologies/lancope/remote.yaml              |  382 +--
 .../Metron_Configs/topologies/lancope/test.yaml |   79 +
 .../topologies/paloalto/local.yaml              |  172 --
 .../topologies/paloalto/remote.yaml             |  155 +-
 .../topologies/paloalto/test.yaml               |   79 +
 .../Metron_Configs/topologies/pcap/local.yaml   |   22 +-
 .../Metron_Configs/topologies/pcap/parse.yaml   |   70 +
 .../Metron_Configs/topologies/pcap/remote.yaml  |    2 +-
 .../Metron_Configs/topologies/snort/local.yaml  |  195 --
 .../Metron_Configs/topologies/snort/remote.yaml |  175 +-
 .../Metron_Configs/topologies/snort/test.yaml   |   79 +
 .../topologies/sourcefire/local.yaml            |  401 ---
 .../topologies/sourcefire/remote.yaml           |  382 +--
 .../topologies/sourcefire/test.yaml             |   79 +
 .../Metron_Configs/topologies/yaf/local.yaml    |  192 --
 .../Metron_Configs/topologies/yaf/remote.yaml   |  185 +-
 .../Metron_Configs/topologies/yaf/test.yaml     |   95 +
 .../src/main/resources/SampleIndexed/YafIndexed |   10 +
 .../src/main/resources/SampleInput/SnortOutput  |    4 +-
 .../main/resources/SampleInput/YafExampleOutput | 2695 +-----------------
 .../src/main/resources/SampleParsed/SnortParsed |    3 +
 .../resources/SampleParsed/YafExampleParsed     |   10 +
 .../integration/EnrichmentIntegrationTest.java  |  195 ++
 .../integration/ParserIntegrationTest.java      |  155 +
 .../integration/SnortIntegrationTest.java       |   62 +
 .../metron/integration/YafIntegrationTest.java  |   62 +
 .../integration/pcap/PcapIntegrationTest.java   |  279 --
 .../metron/integration/util/TestUtils.java      |   38 +
 pom.xml                                         |    2 +-
 105 files changed, 5213 insertions(+), 8156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/alerts/TelemetryAlertsBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/alerts/TelemetryAlertsBolt.java b/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/alerts/TelemetryAlertsBolt.java
index fd898e3..663ae40 100644
--- a/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/alerts/TelemetryAlertsBolt.java
+++ b/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/alerts/TelemetryAlertsBolt.java
@@ -34,7 +34,7 @@ import backtype.storm.tuple.Values;
 
 import com.google.common.cache.CacheBuilder;
 import org.apache.metron.alerts.interfaces.AlertsAdapter;
-import org.apache.metron.helpers.topology.ErrorGenerator;
+import org.apache.metron.helpers.topology.ErrorUtils;
 import org.apache.metron.json.serialization.JSONEncoderHelper;
 import org.apache.metron.metrics.MetricReporter;
 
@@ -245,7 +245,7 @@ public class TelemetryAlertsBolt extends AbstractAlertBolt {
 			 */
 
 
-			JSONObject error = ErrorGenerator.generateErrorMessage(
+			JSONObject error = ErrorUtils.generateErrorMessage(
 					"Alerts problem: " + original_message, e);
 			_collector.emit("error", new Values(error));
 		}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/pom.xml b/metron-streaming/Metron-Common/pom.xml
index 57a58d7..c4fc5aa 100644
--- a/metron-streaming/Metron-Common/pom.xml
+++ b/metron-streaming/Metron-Common/pom.xml
@@ -161,6 +161,21 @@
             <artifactId>json-schema-validator</artifactId>
             <version>${global_json_schema_validator_version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+            <version>2.7.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>flux-core</artifactId>
+            <version>${global_flux_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-kafka</artifactId>
+            <version>${global_storm_version}</version>
+        </dependency>
     </dependencies>
 
     <reporting>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/Constants.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/Constants.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/Constants.java
new file mode 100644
index 0000000..c6eafe9
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/Constants.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron;
+
+public class Constants {
+
+  public static final String ZOOKEEPER_ROOT = "/metron";
+  public static final String ZOOKEEPER_TOPOLOGY_ROOT = ZOOKEEPER_ROOT + "/topology";
+  public static final String SOURCE_TYPE = "source.type";
+  public static final String ENRICHMENT_TOPIC = "enrichments";
+  public static final String ERROR_STREAM = "error";
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
new file mode 100644
index 0000000..6d094ee
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.metron.Constants;
+import org.apache.metron.domain.SourceConfig;
+import org.apache.metron.helpers.topology.ErrorUtils;
+import org.apache.metron.topology.TopologyUtils;
+import org.apache.metron.writer.interfaces.BulkMessageWriter;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class BulkMessageWriterBolt extends ConfiguredBolt {
+
+  int count = 0;
+
+  private static final Logger LOG = LoggerFactory
+          .getLogger(BulkMessageWriterBolt.class);
+  private OutputCollector collector;
+  private BulkMessageWriter<JSONObject> bulkMessageWriter;
+  private Map<String, List<Tuple>> sourceTupleMap = new HashMap<>();
+  private Map<String, List<JSONObject>> sourceMessageMap = new HashMap<>();
+
+
+  public BulkMessageWriterBolt(String zookeeperUrl) {
+    super(zookeeperUrl);
+  }
+
+  public BulkMessageWriterBolt withBulkMessageWriter(BulkMessageWriter<JSONObject> bulkMessageWriter) {
+    this.bulkMessageWriter = bulkMessageWriter;
+    return this;
+  }
+
+  @Override
+  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    this.collector = collector;
+    super.prepare(stormConf, context, collector);
+    bulkMessageWriter.init();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void execute(Tuple tuple) {
+    JSONObject message = (JSONObject) tuple.getValueByField("message");
+    String sourceType = TopologyUtils.getSourceType(message);
+    SourceConfig configuration = configurations.get(sourceType);
+    int batchSize = configuration != null ? configuration.getBatchSize() : 1;
+    List<Tuple> tupleList = sourceTupleMap.get(sourceType);
+    if (tupleList == null) tupleList = new ArrayList<>();
+    tupleList.add(tuple);
+    List<JSONObject> messageList = sourceMessageMap.get(sourceType);
+    if (messageList == null) messageList = new ArrayList<>();
+    messageList.add(message);
+    if (messageList.size() < batchSize) {
+      sourceTupleMap.put(sourceType, tupleList);
+      sourceMessageMap.put(sourceType, messageList);
+    } else {
+      try {
+        bulkMessageWriter.write(sourceType, configuration, tupleList, messageList);
+        for(Tuple t: tupleList) {
+          collector.ack(t);
+        }
+      } catch (Exception e) {
+        for(Tuple t: tupleList) {
+          collector.fail(t);
+        }
+        ErrorUtils.handleError(collector, e, Constants.ERROR_STREAM);
+      }
+      sourceTupleMap.remove(sourceType);
+      sourceMessageMap.remove(sourceType);
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declareStream("error", new Fields("message"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
new file mode 100644
index 0000000..30c8e23
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.base.BaseRichBolt;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.log4j.Logger;
+import org.apache.metron.Constants;
+import org.apache.metron.domain.SourceConfig;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class ConfiguredBolt extends BaseRichBolt {
+
+  private static final Logger LOG = Logger.getLogger(ConfiguredBolt.class);
+
+  private String zookeeperUrl;
+
+  protected Map<String, SourceConfig> configurations = Collections.synchronizedMap(new HashMap<String, SourceConfig>());
+  private CuratorFramework client;
+  private PathChildrenCache cache;
+
+  public ConfiguredBolt(String zookeeperUrl) {
+    this.zookeeperUrl = zookeeperUrl;
+  }
+
+  @Override
+  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+    client = CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
+    client.start();
+    cache = new PathChildrenCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT, true);
+    PathChildrenCacheListener listener = new PathChildrenCacheListener() {
+      @Override
+      public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
+        if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) || event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
+          byte[] data = event.getData().getData();
+          if (data != null) {
+            SourceConfig temp = SourceConfig.load(data);
+            if (temp != null) {
+              String[] path = event.getData().getPath().split("/");
+              configurations.put(path[path.length - 1], temp);
+            }
+          }
+        }
+      }
+    };
+    cache.getListenable().addListener(listener);
+    try {
+      cache.start();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void cleanup() {
+    try {
+      cache.close();
+      client.close();
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java
index dc84473..dac1c0a 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java
@@ -20,14 +20,12 @@ package org.apache.metron.bolt;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,18 +35,21 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-public abstract class JoinBolt<V> extends BaseRichBolt {
+public abstract class JoinBolt<V> extends ConfiguredBolt {
 
   private static final Logger LOG = LoggerFactory
           .getLogger(JoinBolt.class);
   protected OutputCollector collector;
-  protected ImmutableSet<String> streamIds;
 
   protected transient CacheLoader<String, Map<String, V>> loader;
   protected transient LoadingCache<String, Map<String, V>> cache;
   protected Long maxCacheSize;
   protected Long maxTimeRetain;
 
+  public JoinBolt(String zookeeperUrl) {
+    super(zookeeperUrl);
+  }
+
   public JoinBolt withMaxCacheSize(long maxCacheSize) {
     this.maxCacheSize = maxCacheSize;
     return this;
@@ -61,6 +62,7 @@ public abstract class JoinBolt<V> extends BaseRichBolt {
 
   @Override
   public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+    super.prepare(map, topologyContext, outputCollector);
     this.collector = outputCollector;
     if (this.maxCacheSize == null)
       throw new IllegalStateException("maxCacheSize must be specified");
@@ -74,9 +76,6 @@ public abstract class JoinBolt<V> extends BaseRichBolt {
     cache = CacheBuilder.newBuilder().maximumSize(maxCacheSize)
             .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES)
             .build(loader);
-    Set<String> temp = getStreamIds();
-    temp.add("message");
-    streamIds = ImmutableSet.copyOf(temp);
     prepare(map, topologyContext);
   }
 
@@ -85,26 +84,28 @@ public abstract class JoinBolt<V> extends BaseRichBolt {
   public void execute(Tuple tuple) {
     String streamId = tuple.getSourceStreamId();
     String key = (String) tuple.getValueByField("key");
-    V value = (V) tuple.getValueByField("message");
+    V message = (V) tuple.getValueByField("message");
     try {
-      Map<String, V> streamValueMap = cache.get(key);
-      if (streamValueMap.containsKey(streamId)) {
+      Map<String, V> streamMessageMap = cache.get(key);
+      if (streamMessageMap.containsKey(streamId)) {
         LOG.warn(String.format("Received key %s twice for " +
                 "stream %s", key, streamId));
       }
-      streamValueMap.put(streamId, value);
-      Set<String> streamValueKeys = streamValueMap.keySet();
-      if (streamValueKeys.size() == streamIds.size() && Sets.symmetricDifference
-              (streamValueKeys, streamIds)
+      streamMessageMap.put(streamId, message);
+      Set<String> streamIds = getStreamIds(message);
+      Set<String> streamMessageKeys = streamMessageMap.keySet();
+      if (streamMessageKeys.size() == streamIds.size() && Sets.symmetricDifference
+              (streamMessageKeys, streamIds)
               .isEmpty()) {
-        collector.emit("message", tuple, new Values(key, joinValues
-                (streamValueMap)));
+        collector.emit("message", tuple, new Values(key, joinMessages
+                (streamMessageMap)));
         collector.ack(tuple);
         cache.invalidate(key);
       } else {
-        cache.put(key, streamValueMap);
+        cache.put(key, streamMessageMap);
       }
     } catch (ExecutionException e) {
+      collector.reportError(e);
       LOG.error(e.getMessage(), e);
     }
   }
@@ -116,7 +117,7 @@ public abstract class JoinBolt<V> extends BaseRichBolt {
 
   public abstract void prepare(Map map, TopologyContext topologyContext);
 
-  public abstract Set<String> getStreamIds();
+  public abstract Set<String> getStreamIds(V value);
 
-  public abstract V joinValues(Map<String, V> streamValueMap);
+  public abstract V joinMessages(Map<String, V> streamMessageMap);
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java
index d3d2cf3..89e13a4 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java
@@ -20,34 +20,33 @@ package org.apache.metron.bolt;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
-import com.google.common.collect.ImmutableSet;
 
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 
 public abstract class SplitBolt<T> extends
-        BaseRichBolt {
+        ConfiguredBolt {
 
   protected OutputCollector collector;
-  private Set<String> streamIds;
+
+  public SplitBolt(String zookeeperUrl) {
+    super(zookeeperUrl);
+  }
 
   @Override
   public final void prepare(Map map, TopologyContext topologyContext,
                        OutputCollector outputCollector) {
+    super.prepare(map, topologyContext, outputCollector);
     collector = outputCollector;
-    streamIds = ImmutableSet.copyOf(getStreamIds());
     prepare(map, topologyContext);
   }
 
   @Override
   public final void execute(Tuple tuple) {
-    emit(tuple, generateMessages(tuple));
+    emit(tuple, generateMessage(tuple));
   }
 
   @Override
@@ -60,24 +59,23 @@ public abstract class SplitBolt<T> extends
     declareOther(declarer);
   }
 
-  public void emit(Tuple tuple, List<T> messages) {
-    for(T message: messages) {
-      String key = getKey(tuple, message);
-      collector.emit("message", tuple, new Values(key, message));
-      Map<String, T> streamValueMap = splitMessage(message);
-      for (String streamId : streamIds) {
-        T streamValue = streamValueMap.get(streamId);
-        if (streamValue == null) {
-          streamValue = getDefaultValue(streamId);
-        }
-        collector.emit(streamId, new Values(key, streamValue));
+  public void emit(Tuple tuple, T message) {
+    if (message == null) return;
+    String key = getKey(tuple, message);
+    collector.emit("message", tuple, new Values(key, message));
+    Map<String, T> streamMessageMap = splitMessage(message);
+    for (String streamId : streamMessageMap.keySet()) {
+      T streamMessage = streamMessageMap.get(streamId);
+      if (streamMessage == null) {
+        streamMessage = getDefaultMessage(streamId);
       }
-      collector.ack(tuple);
+      collector.emit(streamId, new Values(key, streamMessage));
     }
-    emitOther(tuple, messages);
+    collector.ack(tuple);
+    emitOther(tuple, message);
   }
 
-  protected T getDefaultValue(String streamId) {
+  protected T getDefaultMessage(String streamId) {
     throw new IllegalArgumentException("Could not find a message for" +
             " stream: " + streamId);
   }
@@ -88,13 +86,13 @@ public abstract class SplitBolt<T> extends
 
   public abstract String getKey(Tuple tuple, T message);
 
-  public abstract List<T> generateMessages(Tuple tuple);
+  public abstract T generateMessage(Tuple tuple);
 
   public abstract Map<String, T> splitMessage(T message);
 
   public abstract void declareOther(OutputFieldsDeclarer declarer);
 
-  public abstract void emitOther(Tuple tuple, List<T> messages);
+  public abstract void emitOther(Tuple tuple, T message);
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
index d75e9a3..7079d5c 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
@@ -20,28 +20,25 @@ package org.apache.metron.domain;
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
 
 import java.io.Serializable;
-import java.util.List;
 
 public class Enrichment<T extends EnrichmentAdapter> implements Serializable {
 
-  private String name;
-  private List<String> fields;
+  private String type;
   private T adapter;
 
-  public String getName() {
-    return name;
-  }
+  public Enrichment() {}
 
-  public void setName(String name) {
-    this.name = name;
+  public Enrichment(String type, T adapter) {
+    this.type = type;
+    this.adapter = adapter;
   }
 
-  public List<String> getFields() {
-    return fields;
+  public String getType() {
+    return type;
   }
 
-  public void setFields(List<String> fields) {
-    this.fields = fields;
+  public void setType(String type) {
+    this.type = type;
   }
 
   public T getAdapter() {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SourceConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SourceConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SourceConfig.java
new file mode 100644
index 0000000..8e1a960
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SourceConfig.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.domain;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.yaml.snakeyaml.TypeDescription;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.Constructor;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+public class SourceConfig {
+
+  final static ObjectMapper _mapper = new ObjectMapper();
+
+  private String index;
+  private Map<String, List<String>> enrichmentFieldMap;
+  private Map<String, List<String>> threatIntelFieldMap;
+  private int batchSize;
+
+  public String getIndex() {
+    return index;
+  }
+
+  public void setIndex(String index) {
+    this.index = index;
+  }
+
+  public Map<String, List<String>> getEnrichmentFieldMap() {
+    return enrichmentFieldMap;
+  }
+
+  public void setEnrichmentFieldMap(Map<String, List<String>> enrichmentFieldMap) {
+    this.enrichmentFieldMap = enrichmentFieldMap;
+  }
+
+  public Map<String, List<String>> getThreatIntelFieldMap() {
+    return threatIntelFieldMap;
+  }
+
+  public void setThreatIntelFieldMap(Map<String, List<String>> threatIntelFieldMap) {
+    this.threatIntelFieldMap = threatIntelFieldMap;
+  }
+
+  public int getBatchSize() {
+    return batchSize;
+  }
+
+  public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
+  }
+
+  public static synchronized SourceConfig load(InputStream is) throws IOException {
+    SourceConfig ret = _mapper.readValue(is, SourceConfig.class);
+    return ret;
+  }
+
+  public static synchronized SourceConfig load(byte[] data) throws IOException {
+    return load( new ByteArrayInputStream(data));
+  }
+
+  public static synchronized SourceConfig load(String s, Charset c) throws IOException {
+    return load( s.getBytes(c));
+  }
+  public static synchronized SourceConfig load(String s) throws IOException {
+    return load( s, Charset.defaultCharset());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConstants.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConstants.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConstants.java
new file mode 100644
index 0000000..4f7be3b
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConstants.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment;
+
+public class EnrichmentConstants {
+
+
+
+  public static final String INDEX_NAME = "index.name";
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentSplitterBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentSplitterBolt.java
deleted file mode 100644
index 967970f..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentSplitterBolt.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import com.google.common.base.Splitter;
-import org.apache.metron.bolt.SplitBolt;
-import org.apache.metron.domain.Enrichment;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * Created by cstella on 2/10/16.
- */
-public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
-    protected static final Logger LOG = LoggerFactory.getLogger(EnrichmentSplitterBolt.class);
-    protected List<Enrichment> enrichments = new ArrayList<>();
-    protected String messageFieldName = "message";
-    /**
-     * @param enrichments A class for sending tuples to enrichment bolt
-     * @return Instance of this class
-     */
-    public EnrichmentSplitterBolt withEnrichments(List<Enrichment> enrichments) {
-        this.enrichments = enrichments;
-        return this;
-    }
-    public EnrichmentSplitterBolt withMessageFieldName(String messageFieldName) {
-        this.messageFieldName = messageFieldName;
-        return this;
-    }
-    @Override
-    public void prepare(Map map, TopologyContext topologyContext) {
-
-    }
-    @Override
-    public String getKey(Tuple tuple, JSONObject message) {
-        String key = null;
-        try {
-            key = tuple.getStringByField("key");
-        }
-        catch(Throwable t) {
-            //swallowing this just in case.
-        }
-        if(key != null) {
-            return key;
-        }
-        else {
-            return UUID.randomUUID().toString();
-        }
-    }
-
-    @Override
-    public List<JSONObject> generateMessages(Tuple tuple) {
-        return Arrays.asList((JSONObject)tuple.getValueByField(messageFieldName));
-    }
-
-    @Override
-    public Set<String> getStreamIds() {
-        Set<String> streamIds = new HashSet<>();
-        for(Enrichment enrichment: enrichments) {
-            streamIds.add(enrichment.getName());
-        }
-        return streamIds;
-    }
-    @SuppressWarnings("unchecked")
-    @Override
-    public Map<String, JSONObject> splitMessage(JSONObject message) {
-
-        Map<String, JSONObject> streamMessageMap = new HashMap<>();
-        for (Enrichment enrichment : enrichments) {
-            List<String> fields = enrichment.getFields();
-            if (fields != null && fields.size() > 0) {
-                JSONObject enrichmentObject = new JSONObject();
-                for (String field : fields) {
-                    enrichmentObject.put(field, getField(message,field));
-                }
-                streamMessageMap.put(enrichment.getName(), enrichmentObject);
-            }
-        }
-        /*if(message != null && enrichments.size() != 1) {
-            throw new RuntimeException("JSON: " + message.toJSONString() + " => " + streamMessageMap);
-        }*/
-        return streamMessageMap;
-    }
-
-    public Object getField(JSONObject object, String path) {
-        Map ret = object;
-        for(String node: Splitter.on('/').split(path))  {
-            Object o = ret.get(node);
-            if(o instanceof Map) {
-                ret = (Map) o;
-            }
-            else {
-                return o;
-            }
-        }
-        return ret;
-    }
-
-    @Override
-    public void declareOther(OutputFieldsDeclarer declarer) {
-
-    }
-
-    @Override
-    public void emitOther(Tuple tuple, List<JSONObject> messages) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
index 7aa02c5..6caa016 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
@@ -20,7 +20,6 @@ package org.apache.metron.hbase;
 
 
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
 import java.util.Map;
 
 import com.google.common.base.Function;
@@ -40,7 +39,7 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 
-import org.apache.metron.helpers.topology.ErrorGenerator;
+import org.apache.metron.helpers.topology.ErrorUtils;
 
 /**
  * A Storm bolt for putting data into HBase.
@@ -136,7 +135,7 @@ public class HBaseBolt implements IRichBolt {
       this.connector.put(p);
     } catch (IOException ex) {
 
-  		JSONObject error = ErrorGenerator.generateErrorMessage(
+  		JSONObject error = ErrorUtils.generateErrorMessage(
   				"Alerts problem: " + input.toString(), ex);
   		collector.emit("error", new Values(error));
   		

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java
index 9055837..e454f04 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java
@@ -23,9 +23,6 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 
 import java.io.IOException;
 
-/**
- * Created by cstella on 2/11/16.
- */
 public class HTableProvider implements TableProvider {
     @Override
     public HTableInterface getTable(Configuration config, String tableName) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorGenerator.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorGenerator.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorGenerator.java
deleted file mode 100644
index 8ec940a..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorGenerator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.helpers.topology;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.json.simple.JSONObject;
-
-public class ErrorGenerator {
-
-	@SuppressWarnings("unchecked")
-	public static JSONObject generateErrorMessage(String message, Exception e)
-	{
-		JSONObject error_message = new JSONObject();
-		
-		/*
-		 * Save full stack trace in object.
-		 */
-		String stackTrace = ExceptionUtils.getStackTrace(e);
-		
-		String exception = e.toString();
-		
-		error_message.put("time", System.currentTimeMillis());
-		try {
-			error_message.put("hostname", InetAddress.getLocalHost().getHostName());
-		} catch (UnknownHostException ex) {
-			// TODO Auto-generated catch block
-			ex.printStackTrace();
-		}
-		
-		error_message.put("message", message);
-		error_message.put("exception", exception);
-		error_message.put("stack", stackTrace);
-		
-		return error_message;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorUtils.java
new file mode 100644
index 0000000..b02cbaf
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorUtils.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.helpers.topology;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Values;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.metron.Constants;
+import org.json.simple.JSONObject;
+
+public class ErrorUtils {
+
+	@SuppressWarnings("unchecked")
+	public static JSONObject generateErrorMessage(String message, Throwable t)
+	{
+		JSONObject error_message = new JSONObject();
+		
+		/*
+		 * Save full stack trace in object.
+		 */
+		String stackTrace = ExceptionUtils.getStackTrace(t);
+		
+		String exception = t.toString();
+		
+		error_message.put("time", System.currentTimeMillis());
+		try {
+			error_message.put("hostname", InetAddress.getLocalHost().getHostName());
+		} catch (UnknownHostException ex) {
+			// TODO Auto-generated catch block
+			ex.printStackTrace();
+		}
+		
+		error_message.put("message", message);
+		error_message.put(Constants.SOURCE_TYPE, "error");
+		error_message.put("exception", exception);
+		error_message.put("stack", stackTrace);
+		
+		return error_message;
+	}
+
+	public static void handleError(OutputCollector collector, Throwable t, String errorStream) {
+		JSONObject error = ErrorUtils.generateErrorMessage(t.getMessage(), t);
+		collector.emit(errorStream, new Values(error));
+		collector.reportError(t);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
new file mode 100644
index 0000000..2c430d3
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.spout.pcap;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.log4j.Logger;
+import storm.kafka.Callback;
+import storm.kafka.EmitContext;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+public class HDFSWriterCallback implements Callback {
+  static final long serialVersionUID = 0xDEADBEEFL;
+  private static final Logger LOG = Logger.getLogger(HDFSWriterCallback.class);
+  public static final byte[] PCAP_GLOBAL_HEADER = new byte[] {
+          (byte) 0xd4, (byte) 0xc3, (byte) 0xb2, (byte) 0xa1, 0x02, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00
+          ,0x00, 0x00, 0x00, 0x00, (byte) 0xff, (byte) 0xff, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00
+  };
+
+  private static final List<Object> RET_TUPLE = ImmutableList.of((Object)Byte.valueOf((byte) 0x00), Byte.valueOf((byte)0x00));
+  private FileSystem fs;
+  private SequenceFile.Writer writer;
+  private HDFSWriterConfig config;
+  private long batchStartTime;
+  private long numWritten;
+  private EmitContext context;
+
+  public HDFSWriterCallback() {
+    //this.config = config;
+  }
+
+  public HDFSWriterCallback withConfig(HDFSWriterConfig config) {
+    LOG.info("Configured: " + config);
+    this.config = config;
+    return this;
+  }
+
+  @Override
+  public List<Object> apply(List<Object> tuple, EmitContext context) {
+
+    LongWritable ts = (LongWritable) tuple.get(0);
+    BytesWritable rawPacket = (BytesWritable)tuple.get(1);
+    try {
+      turnoverIfNecessary(ts.get());
+      writer.append(ts, headerize(rawPacket.getBytes()));
+      writer.hflush();
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+      //drop?  not sure..
+    }
+    return RET_TUPLE;
+  }
+
+  private static BytesWritable headerize(byte[] packet) {
+    byte[] ret = new byte[packet.length + PCAP_GLOBAL_HEADER.length];
+    int offset = 0;
+    System.arraycopy(PCAP_GLOBAL_HEADER, 0, ret, offset, PCAP_GLOBAL_HEADER.length);
+    offset += PCAP_GLOBAL_HEADER.length;
+    System.arraycopy(packet, 0, ret, offset, packet.length);
+    return new BytesWritable(ret);
+  }
+
+
+  private synchronized void turnoverIfNecessary(long ts) throws IOException {
+    long duration = ts - batchStartTime;
+    if(batchStartTime == 0L || duration > config.getMaxTimeMS() || numWritten > config.getNumPackets()) {
+      //turnover
+      Path path = getPath(ts);
+      if(writer != null) {
+        writer.close();
+      }
+      writer = SequenceFile.createWriter(new Configuration()
+              , SequenceFile.Writer.file(path)
+              , SequenceFile.Writer.keyClass(LongWritable.class)
+              , SequenceFile.Writer.valueClass(BytesWritable.class)
+      );
+      //reset state
+      LOG.info("Turning over and writing to " + path);
+      batchStartTime = ts;
+      numWritten = 0;
+    }
+  }
+
+  private Path getPath(long ts) {
+    String fileName = Joiner.on("_").join("pcap"
+            , "" + ts
+            , context.get(EmitContext.Type.UUID)
+    );
+    return new Path(config.getOutputPath(), fileName);
+  }
+
+  @Override
+  public void initialize(EmitContext context) {
+    this.context = context;
+    try {
+      fs = FileSystem.get(new Configuration());
+    } catch (IOException e) {
+      throw new IllegalStateException("Unable to create filesystem", e);
+    }
+  }
+
+  /**
+   * Closes this resource, relinquishing any underlying resources.
+   * This method is invoked automatically on objects managed by the
+   * {@code try}-with-resources statement.
+   * <p/>
+   * <p>While this interface method is declared to throw {@code
+   * Exception}, implementers are <em>strongly</em> encouraged to
+   * declare concrete implementations of the {@code close} method to
+   * throw more specific exceptions, or to throw no exception at all
+   * if the close operation cannot fail.
+   * <p/>
+   * <p><em>Implementers of this interface are also strongly advised
+   * to not have the {@code close} method throw {@link
+   * InterruptedException}.</em>
+   * <p/>
+   * This exception interacts with a thread's interrupted status,
+   * and runtime misbehavior is likely to occur if an {@code
+   * InterruptedException} is {@linkplain Throwable#addSuppressed
+   * suppressed}.
+   * <p/>
+   * More generally, if it would cause problems for an
+   * exception to be suppressed, the {@code AutoCloseable.close}
+   * method should not throw it.
+   * <p/>
+   * <p>Note that unlike the {@link Closeable#close close}
+   * method of {@link Closeable}, this {@code close} method
+   * is <em>not</em> required to be idempotent.  In other words,
+   * calling this {@code close} method more than once may have some
+   * visible side effect, unlike {@code Closeable.close} which is
+   * required to have no effect if called more than once.
+   * <p/>
+   * However, implementers of this interface are strongly encouraged
+   * to make their {@code close} methods idempotent.
+   *
+   * @throws Exception if this resource cannot be closed
+   */
+  @Override
+  public void close() throws Exception {
+    if(writer != null) {
+      writer.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java
new file mode 100644
index 0000000..ccfc884
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.spout.pcap;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HDFSWriterConfig implements Serializable {
+  static final long serialVersionUID = 0xDEADBEEFL;
+  private long numPackets;
+  private long maxTimeMS;
+  private String outputPath;
+  private String zookeeperQuorum;
+
+  public HDFSWriterConfig withOutputPath(String path) {
+    outputPath = path;
+    return this;
+  }
+
+  public HDFSWriterConfig withNumPackets(long n) {
+    numPackets = n;
+    return this;
+  }
+
+  public HDFSWriterConfig withMaxTimeMS(long t) {
+    maxTimeMS = t;
+    return this;
+  }
+
+  public HDFSWriterConfig withZookeeperQuorum(String zookeeperQuorum) {
+    this.zookeeperQuorum = zookeeperQuorum;
+    return this;
+  }
+
+  public List<String> getZookeeperServers() {
+    List<String> out = new ArrayList<>();
+    if(zookeeperQuorum != null) {
+      for (String hostPort : Splitter.on(',').split(zookeeperQuorum)) {
+        Iterable<String> tokens = Splitter.on(':').split(hostPort);
+        String host = Iterables.getFirst(tokens, null);
+        if(host != null) {
+          out.add(host);
+        }
+      }
+    }
+    return out;
+  }
+
+  public Integer getZookeeperPort() {
+    if(zookeeperQuorum != null) {
+      String hostPort = Iterables.getFirst(Splitter.on(',').split(zookeeperQuorum), null);
+      String portStr = Iterables.getLast(Splitter.on(':').split(hostPort));
+      return Integer.parseInt(portStr);
+    }
+    return  null;
+  }
+
+  public String getOutputPath() {
+    return outputPath;
+  }
+
+  public long getNumPackets() {
+    return numPackets;
+  }
+
+  public long getMaxTimeMS() {
+    return maxTimeMS;
+  }
+
+  @Override
+  public String toString() {
+    return "HDFSWriterConfig{" +
+            "numPackets=" + numPackets +
+            ", maxTimeMS=" + maxTimeMS +
+            ", outputPath='" + outputPath + '\'' +
+            '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/topology/TopologyUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/topology/TopologyUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/topology/TopologyUtils.java
new file mode 100644
index 0000000..581d74f
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/topology/TopologyUtils.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.topology;
+
+import org.apache.metron.Constants;
+import org.json.simple.JSONObject;
+
+public class TopologyUtils {
+
+  public static String getSourceType(JSONObject message) {
+    return (String) message.get(Constants.SOURCE_TYPE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigUtils.java
new file mode 100644
index 0000000..7f5afe9
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigUtils.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.utils;
+
+import java.lang.reflect.InvocationTargetException;
+
+public class ConfigUtils<T> {
+
+  public static <T> T createInstance(String className, T defaultClass) {
+    T instance;
+    if(className == null || className.length() == 0 || className.charAt(0) == '$') {
+      return defaultClass;
+    }
+    else {
+      try {
+        Class<? extends T> clazz = (Class<? extends T>) Class.forName(className);
+        instance = clazz.getConstructor().newInstance();
+      } catch (InstantiationException e) {
+        throw new IllegalStateException("Unable to instantiate connector.", e);
+      } catch (IllegalAccessException e) {
+        throw new IllegalStateException("Unable to instantiate connector: illegal access", e);
+      } catch (InvocationTargetException e) {
+        throw new IllegalStateException("Unable to instantiate connector", e);
+      } catch (NoSuchMethodException e) {
+        throw new IllegalStateException("Unable to instantiate connector: no such method", e);
+      } catch (ClassNotFoundException e) {
+        throw new IllegalStateException("Unable to instantiate connector: class not found", e);
+      }
+    }
+    return instance;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/HBaseWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/HBaseWriter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/HBaseWriter.java
new file mode 100644
index 0000000..b257b24
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/HBaseWriter.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.domain.SourceConfig;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.utils.ConfigUtils;
+import org.apache.metron.writer.interfaces.MessageWriter;
+import org.json.simple.JSONObject;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+public abstract class HBaseWriter implements MessageWriter<JSONObject>, Serializable {
+
+  private String tableName;
+  private String connectorImpl;
+  private TableProvider provider;
+  private HTableInterface table;
+
+  public HBaseWriter(String tableName) {
+    this.tableName = tableName;
+  }
+
+  public HBaseWriter withProviderImpl(String connectorImpl) {
+    this.connectorImpl = connectorImpl;
+    return this;
+  }
+
+  @Override
+  public void init() {
+    final Configuration config = HBaseConfiguration.create();
+    try {
+      provider = ConfigUtils.createInstance(connectorImpl, new HTableProvider());
+      table = provider.getTable(config, tableName);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public void write(String sourceType, SourceConfig configuration, Tuple tuple, JSONObject message) throws Exception {
+    Put put = new Put(getKey(tuple, message));
+    Map<String, byte[]> values = getValues(tuple, message);
+    for(String column: values.keySet()) {
+      String[] columnParts = column.split(":");
+      long timestamp = getTimestamp(tuple, message);
+      if (timestamp > -1) {
+        put.addColumn(Bytes.toBytes(columnParts[0]), Bytes.toBytes(columnParts[1]), timestamp, values.get(column));
+      } else {
+        put.addColumn(Bytes.toBytes(columnParts[0]), Bytes.toBytes(columnParts[1]), values.get(column));
+      }
+    }
+    table.put(put);
+  }
+
+  @Override
+  public void close() throws Exception {
+    table.close();
+  }
+
+  public abstract byte[] getKey(Tuple tuple, JSONObject message);
+  public abstract long getTimestamp(Tuple tuple, JSONObject message);
+  public abstract Map<String, byte[]> getValues(Tuple tuple, JSONObject message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/PcapWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/PcapWriter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/PcapWriter.java
new file mode 100644
index 0000000..b5ab587
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/PcapWriter.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer;
+
+import backtype.storm.tuple.Tuple;
+import org.json.simple.JSONObject;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class PcapWriter extends HBaseWriter {
+
+  private String column;
+
+  public PcapWriter(String tableName, String column) {
+    super(tableName);
+    this.column = column;
+  }
+
+  @Override
+  public byte[] getKey(Tuple tuple, JSONObject message) {
+    String key = (String) message.get("pcap_id");
+    return key.getBytes();
+  }
+
+  @Override
+  public long getTimestamp(Tuple tuple, JSONObject message) {
+    return (long) message.get("ts_micro");
+  }
+
+  @Override
+  public Map<String, byte[]> getValues(Tuple tuple, JSONObject message) {
+    Map<String, byte[]> values = new HashMap<>();
+    values.put(column, tuple.getBinary(0));
+    return values;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
new file mode 100644
index 0000000..90c0261
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.interfaces;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.domain.SourceConfig;
+
+import java.util.List;
+
+public interface BulkMessageWriter<T> extends AutoCloseable {
+
+  void init();
+  void write(String sourceType, SourceConfig configuration, List<Tuple> tuples, List<T> messages) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/MessageWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/MessageWriter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/MessageWriter.java
new file mode 100644
index 0000000..12de836
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/MessageWriter.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.interfaces;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.domain.SourceConfig;
+
+public interface MessageWriter<T> extends AutoCloseable {
+
+  void init();
+  void write(String sourceType, SourceConfig configuration, Tuple tuple, T message) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/storm/kafka/Callback.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/storm/kafka/Callback.java b/metron-streaming/Metron-Common/src/main/java/storm/kafka/Callback.java
new file mode 100644
index 0000000..ff05c29
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/storm/kafka/Callback.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface Callback extends AutoCloseable, Serializable {
+  List<Object> apply(List<Object> tuple, EmitContext context);
+  void initialize(EmitContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/storm/kafka/CallbackCollector.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/storm/kafka/CallbackCollector.java b/metron-streaming/Metron-Common/src/main/java/storm/kafka/CallbackCollector.java
new file mode 100644
index 0000000..485da5a
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/storm/kafka/CallbackCollector.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+import backtype.storm.spout.ISpoutOutputCollector;
+import backtype.storm.spout.SpoutOutputCollector;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class CallbackCollector extends SpoutOutputCollector implements Serializable {
+  static final long serialVersionUID = 0xDEADBEEFL;
+  Callback _callback;
+  SpoutOutputCollector _delegate;
+  EmitContext _context;
+  public CallbackCollector(Callback callback, SpoutOutputCollector collector, EmitContext context) {
+    super(collector);
+    this._callback = callback;
+    this._delegate = collector;
+    this._context = context;
+  }
+
+
+  /**
+   * Emits a new tuple to the specified output stream with the given message ID.
+   * When Storm detects that this tuple has been fully processed, or has failed
+   * to be fully processed, the spout will receive an ack or fail callback respectively
+   * with the messageId as long as the messageId was not null. If the messageId was null,
+   * Storm will not track the tuple and no callback will be received. The emitted values must be
+   * immutable.
+   *
+   * @param streamId
+   * @param tuple
+   * @param messageId
+   * @return the list of task ids that this tuple was sent to
+   */
+  @Override
+  public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
+    List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.MESSAGE_ID, messageId)
+            .with(EmitContext.Type.STREAM_ID, streamId)
+    );
+    return _delegate.emit(streamId, t, messageId);
+  }
+
+  /**
+   * Emits a new tuple to the default output stream with the given message ID.
+   * When Storm detects that this tuple has been fully processed, or has failed
+   * to be fully processed, the spout will receive an ack or fail callback respectively
+   * with the messageId as long as the messageId was not null. If the messageId was null,
+   * Storm will not track the tuple and no callback will be received. The emitted values must be
+   * immutable.
+   *
+   * @param tuple
+   * @param messageId
+   * @return the list of task ids that this tuple was sent to
+   */
+  @Override
+  public List<Integer> emit(List<Object> tuple, Object messageId) {
+    List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.MESSAGE_ID, messageId));
+    return super.emit(t, messageId);
+  }
+
+  /**
+   * Emits a tuple to the default output stream with a null message id. Storm will
+   * not track this message so ack and fail will never be called for this tuple. The
+   * emitted values must be immutable.
+   *
+   * @param tuple
+   */
+  @Override
+  public List<Integer> emit(List<Object> tuple) {
+    List<Object> t = _callback.apply(tuple, _context.cloneContext());
+    return super.emit(t);
+  }
+
+  /**
+   * Emits a tuple to the specified output stream with a null message id. Storm will
+   * not track this message so ack and fail will never be called for this tuple. The
+   * emitted values must be immutable.
+   *
+   * @param streamId
+   * @param tuple
+   */
+  @Override
+  public List<Integer> emit(String streamId, List<Object> tuple) {
+    List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId));
+    return super.emit(streamId, t);
+  }
+
+  /**
+   * Emits a tuple to the specified task on the specified output stream. This output
+   * stream must have been declared as a direct stream, and the specified task must
+   * use a direct grouping on this stream to receive the message. The emitted values must be
+   * immutable.
+   *
+   * @param taskId
+   * @param streamId
+   * @param tuple
+   * @param messageId
+   */
+  @Override
+  public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
+    List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId)
+            .with(EmitContext.Type.MESSAGE_ID, messageId)
+            .with(EmitContext.Type.TASK_ID, new Integer(taskId))
+    );
+    super.emitDirect(taskId, streamId, t, messageId);
+  }
+
+  /**
+   * Emits a tuple to the specified task on the default output stream. This output
+   * stream must have been declared as a direct stream, and the specified task must
+   * use a direct grouping on this stream to receive the message. The emitted values must be
+   * immutable.
+   *
+   * @param taskId
+   * @param tuple
+   * @param messageId
+   */
+  @Override
+  public void emitDirect(int taskId, List<Object> tuple, Object messageId) {
+    List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.MESSAGE_ID, messageId)
+            .with(EmitContext.Type.TASK_ID, new Integer(taskId))
+    );
+    super.emitDirect(taskId, t, messageId);
+  }
+
+  /**
+   * Emits a tuple to the specified task on the specified output stream. This output
+   * stream must have been declared as a direct stream, and the specified task must
+   * use a direct grouping on this stream to receive the message. The emitted values must be
+   * immutable.
+   * <p/>
+   * <p> Because no message id is specified, Storm will not track this message
+   * so ack and fail will never be called for this tuple.</p>
+   *
+   * @param taskId
+   * @param streamId
+   * @param tuple
+   */
+  @Override
+  public void emitDirect(int taskId, String streamId, List<Object> tuple) {
+    List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId)
+            .with(EmitContext.Type.TASK_ID, new Integer(taskId))
+    );
+    super.emitDirect(taskId, streamId, t);
+  }
+
+  /**
+   * Emits a tuple to the specified task on the default output stream. This output
+   * stream must have been declared as a direct stream, and the specified task must
+   * use a direct grouping on this stream to receive the message. The emitted values must be
+   * immutable.
+   * <p/>
+   * <p> Because no message id is specified, Storm will not track this message
+   * so ack and fail will never be called for this tuple.</p>
+   *
+   * @param taskId
+   * @param tuple
+   */
+  @Override
+  public void emitDirect(int taskId, List<Object> tuple) {
+
+    List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.TASK_ID, new Integer(taskId)));
+    super.emitDirect(taskId, t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/storm/kafka/CallbackKafkaSpout.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/storm/kafka/CallbackKafkaSpout.java b/metron-streaming/Metron-Common/src/main/java/storm/kafka/CallbackKafkaSpout.java
new file mode 100644
index 0000000..431bdf9
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/storm/kafka/CallbackKafkaSpout.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+import backtype.storm.Config;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import storm.kafka.*;
+
+import java.util.*;
+
+public class CallbackKafkaSpout extends KafkaSpout {
+  static final long serialVersionUID = 0xDEADBEEFL;
+  Class<? extends Callback> callbackClazz;
+  Callback _callback;
+  EmitContext _context;
+  public CallbackKafkaSpout(SpoutConfig spoutConfig, String callbackClass) {
+    this(spoutConfig, toCallbackClass(callbackClass));
+  }
+
+  public CallbackKafkaSpout(SpoutConfig spoutConf, Class<? extends Callback> callback) {
+    super(spoutConf);
+    callbackClazz = callback;
+  }
+
+  public void initialize() {
+    _callback = createCallback(callbackClazz);
+    _context = new EmitContext().with(EmitContext.Type.SPOUT_CONFIG, _spoutConfig)
+            .with(EmitContext.Type.UUID, _uuid);
+    _callback.initialize(_context);
+  }
+
+
+  private static Class<? extends Callback> toCallbackClass(String callbackClass)  {
+    try{
+      return (Class<? extends Callback>) Callback.class.forName(callbackClass);
+    }
+    catch (ClassNotFoundException e) {
+      throw new RuntimeException(callbackClass + " not found", e);
+    }
+  }
+
+  protected Callback createCallback(Class<? extends Callback> callbackClass)  {
+    try {
+      return callbackClass.newInstance();
+    } catch (InstantiationException e) {
+      throw new RuntimeException("Unable to instantiate callback", e);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException("Illegal access", e);
+    }
+  }
+
+  @Override
+  public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+    if(_callback == null) {
+      initialize();
+    }
+    super.open( conf, context
+            , new CallbackCollector(_callback, collector
+                    ,_context.cloneContext().with(EmitContext.Type.OPEN_CONFIG, conf)
+                    .with(EmitContext.Type.TOPOLOGY_CONTEXT, context)
+            )
+    );
+  }
+
+  @Override
+  public void close() {
+    super.close();
+    if(_callback != null) {
+      try {
+        _callback.close();
+      } catch (Exception e) {
+        throw new IllegalStateException("Unable to close callback", e);
+      }
+    }
+  }
+}