You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2016/04/19 17:37:38 UTC

incubator-metron git commit: METRON-103 Unit Tests for storm bolts (merrimanr) closes apache/incubator-metron#81

Repository: incubator-metron
Updated Branches:
  refs/heads/master 2b67b1f28 -> 25e732661


METRON-103 Unit Tests for storm bolts (merrimanr) closes apache/incubator-metron#81


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/25e73266
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/25e73266
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/25e73266

Branch: refs/heads/master
Commit: 25e7326612c8a76a8cf18f9d9988457d860f4e5c
Parents: 2b67b1f
Author: merrimanr <me...@gmail.com>
Authored: Tue Apr 19 10:36:13 2016 -0500
Committer: rmerriman <rm...@hortonworks.com>
Committed: Tue Apr 19 10:36:13 2016 -0500

----------------------------------------------------------------------
 deployment/roles/metron_ui/tasks/main.yml       |   2 +-
 metron-streaming/Metron-Common/pom.xml          |   6 +
 .../org/apache/metron/bolt/ConfiguredBolt.java  |  53 +++--
 .../org/apache/metron/bolt/BaseBoltTest.java    |  48 -----
 .../metron/bolt/BulkMessageWriterBoltTest.java  | 149 ++++++++++++++
 .../apache/metron/bolt/ConfiguredBoltTest.java  |  27 ++-
 .../org/apache/metron/bolt/JoinBoltTest.java    | 120 ++++++++++++
 .../org/apache/metron/bolt/SplitBoltTest.java   | 124 ++++++++++++
 .../metron/utils/ConfigurationsUtilsTest.java   |   1 -
 .../enrichment/bolt/EnrichmentJoinBolt.java     |   7 -
 .../enrichment/bolt/EnrichmentJoinBoltTest.java |  86 ++++++++
 .../bolt/EnrichmentSplitterBoltTest.java        |  97 +++++++++
 .../bolt/GenericEnrichmentBoltTest.java         | 195 +++++++++++++++++++
 .../bolt/ThreatIntelJoinBoltTest.java           | 105 ++++++++++
 .../bolt/ThreatIntelSplitterBoltTest.java       |  45 +++++
 metron-streaming/Metron-MessageParsers/pom.xml  |   6 +
 .../org/apache/metron/bolt/ParserBoltTest.java  |  90 +++++++++
 .../Metron-TestingUtilities/pom.xml             |  48 ++++-
 .../org/apache/metron/bolt/BaseBoltTest.java    |  93 +++++++++
 .../metron/bolt/BaseEnrichmentBoltTest.java     |  93 +++++++++
 .../topologies/enrichment/remote.yaml           |   6 -
 .../topologies/enrichment/test.yaml             |   6 -
 22 files changed, 1309 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/deployment/roles/metron_ui/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_ui/tasks/main.yml b/deployment/roles/metron_ui/tasks/main.yml
index f4e0513..fd3422b 100644
--- a/deployment/roles/metron_ui/tasks/main.yml
+++ b/deployment/roles/metron_ui/tasks/main.yml
@@ -53,4 +53,4 @@
     production: no
 
 - name: Start Metron UI
-  shell: "pm2 restart {{ metron_ui_directory }}/lib/metron-ui.js --name metron"
+  shell: "pm2 start {{ metron_ui_directory }}/lib/metron-ui.js --name metron"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-Common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/pom.xml b/metron-streaming/Metron-Common/pom.xml
index 80b6a17..6985e6d 100644
--- a/metron-streaming/Metron-Common/pom.xml
+++ b/metron-streaming/Metron-Common/pom.xml
@@ -194,6 +194,12 @@
             <version>2.0.2</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>Metron-TestingUtilities</artifactId>
+            <version>0.1BETA</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <reporting>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
index 40a1f08..2a1cb13 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
@@ -42,39 +42,54 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
   private String zookeeperUrl;
 
   protected final Configurations configurations = new Configurations();
-  private CuratorFramework client;
-  private TreeCache cache;
+  protected CuratorFramework client;
+  protected TreeCache cache;
 
   public ConfiguredBolt(String zookeeperUrl) {
     this.zookeeperUrl = zookeeperUrl;
   }
 
+  public Configurations getConfigurations() {
+    return configurations;
+  }
+
+  public void setCuratorFramework(CuratorFramework client) {
+    this.client = client;
+  }
+
+  public void setTreeCache(TreeCache cache) {
+    this.cache = cache;
+  }
+
   protected void reloadCallback(String name, Configurations.Type type) {
   }
 
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
     try {
-      RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
-      client = CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
+      if (client == null) {
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        client = CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
+      }
       client.start();
-      cache = new TreeCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT);
-      TreeCacheListener listener = new TreeCacheListener() {
-        @Override
-        public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
-          if (event.getType().equals(TreeCacheEvent.Type.NODE_ADDED) || event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) {
-            String path = event.getData().getPath();
-            byte[] data = event.getData().getData();
-            updateConfig(path, data);
+      if (cache == null) {
+        cache = new TreeCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT);
+        TreeCacheListener listener = new TreeCacheListener() {
+          @Override
+          public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
+            if (event.getType().equals(TreeCacheEvent.Type.NODE_ADDED) || event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) {
+              String path = event.getData().getPath();
+              byte[] data = event.getData().getData();
+              updateConfig(path, data);
+            }
           }
+        };
+        cache.getListenable().addListener(listener);
+        try {
+          ConfigurationsUtils.updateConfigsFromZookeeper(configurations, client);
+        } catch (Exception e) {
+          LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
         }
-      };
-      cache.getListenable().addListener(listener);
-      try {
-        ConfigurationsUtils.updateConfigsFromZookeeper(configurations, client);
-      }
-      catch(Exception e) {
-        LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
       }
       cache.start();
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BaseBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BaseBoltTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BaseBoltTest.java
deleted file mode 100644
index ada3854..0000000
--- a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BaseBoltTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.bolt;
-
-import org.apache.curator.test.TestingServer;
-import org.apache.metron.Constants;
-import org.apache.metron.utils.ConfigurationsUtils;
-import org.junit.Before;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-public abstract class BaseBoltTest {
-
-  public String sampleConfigRoot = "../Metron-Testing/src/main/resources/sample/config/";
-  protected String zookeeperUrl;
-  protected Set<String> allConfigurationTypes = new HashSet<>();
-
-  @Before
-  public void setupConfiguration() throws Exception {
-    TestingServer testZkServer = new TestingServer(true);
-    this.zookeeperUrl = testZkServer.getConnectString();
-    byte[] globalConfig = ConfigurationsUtils.readGlobalConfigFromFile(sampleConfigRoot);
-    ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig, zookeeperUrl);
-    allConfigurationTypes.add(Constants.GLOBAL_CONFIG_NAME);
-    Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(sampleConfigRoot);
-    for (String sensorType : sensorEnrichmentConfigs.keySet()) {
-      ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
-      allConfigurationTypes.add(sensorType);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BulkMessageWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BulkMessageWriterBoltTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BulkMessageWriterBoltTest.java
new file mode 100644
index 0000000..a66acf2
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BulkMessageWriterBoltTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.bolt;
+
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.Constants;
+import org.apache.metron.domain.Configurations;
+import org.apache.metron.writer.interfaces.BulkMessageWriter;
+import org.hamcrest.Description;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+
+import java.io.FileInputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
+
+  protected class MessageListMatcher extends ArgumentMatcher<List<JSONObject>> {
+
+    private List<JSONObject> expectedMessageList;
+
+    public MessageListMatcher(List<JSONObject> expectedMessageList) {
+      this.expectedMessageList = expectedMessageList;
+    }
+
+    @Override
+    public boolean matches(Object o) {
+      List<JSONObject> actualMessageList = (List<JSONObject>) o;
+      for(JSONObject message: actualMessageList) removeTimingFields(message);
+      return expectedMessageList.equals(actualMessageList);
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText(String.format("[%s]", expectedMessageList));
+    }
+
+  }
+
+  /**
+   * {
+   * "field": "value",
+   * "source.type": "yaf"
+   * }
+   */
+  @Multiline
+  private String sampleMessageString;
+
+  private JSONObject sampleMessage;
+  private List<JSONObject> messageList;
+  private List<Tuple> tupleList;
+
+  @Before
+  public void parseMessages() throws ParseException {
+    JSONParser parser = new JSONParser();
+    sampleMessage = (JSONObject) parser.parse(sampleMessageString);
+    sampleMessage.put("field", "value1");
+    messageList = new ArrayList<>();
+    messageList.add(((JSONObject) sampleMessage.clone()));
+    sampleMessage.put("field", "value2");
+    messageList.add(((JSONObject) sampleMessage.clone()));
+    sampleMessage.put("field", "value3");
+    messageList.add(((JSONObject) sampleMessage.clone()));
+    sampleMessage.put("field", "value4");
+    messageList.add(((JSONObject) sampleMessage.clone()));
+    sampleMessage.put("field", "value5");
+    messageList.add(((JSONObject) sampleMessage.clone()));
+  }
+
+  @Mock
+  private BulkMessageWriter<JSONObject> bulkMessageWriter;
+
+  @Test
+  public void test() throws Exception {
+    BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl").withBulkMessageWriter(bulkMessageWriter);
+    bulkMessageWriterBolt.setCuratorFramework(client);
+    bulkMessageWriterBolt.setTreeCache(cache);
+    bulkMessageWriterBolt.configurations.updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath));
+    bulkMessageWriterBolt.declareOutputFields(declarer);
+    verify(declarer, times(1)).declareStream(eq("error"), argThat(new FieldsMatcher("message")));
+    Map stormConf = new HashMap();
+    doThrow(new Exception()).when(bulkMessageWriter).init(eq(stormConf), any(Configurations.class));
+    try {
+      bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector);
+      fail("A runtime exception should be thrown when bulkMessageWriter.init throws an exception");
+    } catch(RuntimeException e) {}
+    reset(bulkMessageWriter);
+    bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector);
+    verify(bulkMessageWriter, times(1)).init(eq(stormConf), any(Configurations.class));
+    tupleList = new ArrayList<>();
+    for(int i = 0; i < 4; i++) {
+      when(tuple.getValueByField("message")).thenReturn(messageList.get(i));
+      tupleList.add(tuple);
+      bulkMessageWriterBolt.execute(tuple);
+      verify(bulkMessageWriter, times(0)).write(eq(sensorType), any(Configurations.class), eq(tupleList), eq(messageList));
+    }
+    when(tuple.getValueByField("message")).thenReturn(messageList.get(4));
+    tupleList.add(tuple);
+    bulkMessageWriterBolt.execute(tuple);
+    verify(bulkMessageWriter, times(1)).write(eq(sensorType), any(Configurations.class), eq(tupleList), argThat(new MessageListMatcher(messageList)));
+    verify(outputCollector, times(5)).ack(tuple);
+    reset(outputCollector);
+    doThrow(new Exception()).when(bulkMessageWriter).write(eq(sensorType), any(Configurations.class), Matchers.anyListOf(Tuple.class), Matchers.anyListOf(JSONObject.class));
+    when(tuple.getValueByField("message")).thenReturn(messageList.get(0));
+    for(int i = 0; i < 5; i++) {
+      bulkMessageWriterBolt.execute(tuple);
+    }
+    verify(outputCollector, times(0)).ack(tuple);
+    verify(outputCollector, times(5)).fail(tuple);
+    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any(Values.class));
+    verify(outputCollector, times(1)).reportError(any(Throwable.class));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/ConfiguredBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/ConfiguredBoltTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/ConfiguredBoltTest.java
index 6c538f0..9b12592 100644
--- a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/ConfiguredBoltTest.java
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/ConfiguredBoltTest.java
@@ -17,15 +17,15 @@
  */
 package org.apache.metron.bolt;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Tuple;
+import org.apache.curator.test.TestingServer;
 import org.apache.metron.Constants;
 import org.apache.metron.domain.Configurations;
 import org.apache.metron.domain.SensorEnrichmentConfig;
 import org.apache.metron.utils.ConfigurationsUtils;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -35,11 +35,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.mockito.Mockito.mock;
-
-public class ConfiguredBoltTest extends BaseBoltTest {
-
+public class ConfiguredBoltTest extends BaseEnrichmentBoltTest {
   private static Set<String> configsUpdated = new HashSet<>();
+  private Set<String> allConfigurationTypes = new HashSet<>();
+  private String zookeeperUrl;
 
   public static class StandAloneConfiguredBolt extends ConfiguredBolt {
 
@@ -61,11 +60,23 @@ public class ConfiguredBoltTest extends BaseBoltTest {
     }
   }
 
+  @Before
+  public void setupConfiguration() throws Exception {
+    TestingServer testZkServer = new TestingServer(true);
+    this.zookeeperUrl = testZkServer.getConnectString();
+    byte[] globalConfig = ConfigurationsUtils.readGlobalConfigFromFile(sampleConfigRoot);
+    ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig, zookeeperUrl);
+    allConfigurationTypes.add(Constants.GLOBAL_CONFIG_NAME);
+    Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(sampleConfigRoot);
+    for (String sensorType : sensorEnrichmentConfigs.keySet()) {
+      ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
+      allConfigurationTypes.add(sensorType);
+    }
+  }
+
   @Test
   public void test() throws Exception {
     Configurations sampleConfigurations = new Configurations();
-    TopologyContext topologyContext = mock(TopologyContext.class);
-    OutputCollector outputCollector = mock(OutputCollector.class);
     try {
       StandAloneConfiguredBolt configuredBolt = new StandAloneConfiguredBolt(null);
       configuredBolt.prepare(new HashMap(), topologyContext, outputCollector);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/JoinBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/JoinBoltTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/JoinBoltTest.java
new file mode 100644
index 0000000..49f5c73
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/JoinBoltTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.bolt;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Values;
+import org.adrianwalker.multilinestring.Multiline;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class JoinBoltTest extends BaseEnrichmentBoltTest {
+
+  public class StandAloneJoinBolt extends JoinBolt<JSONObject> {
+
+    public StandAloneJoinBolt(String zookeeperUrl) {
+      super(zookeeperUrl);
+    }
+
+    @Override
+    public void prepare(Map map, TopologyContext topologyContext) {
+
+    }
+
+    @Override
+    public Set<String> getStreamIds(JSONObject value) {
+      return streamIds;
+    }
+
+    @Override
+    public JSONObject joinMessages(Map<String, JSONObject> streamMessageMap) {
+      return joinedMessage;
+    }
+  }
+
+  /**
+   {
+   "joinField": "joinValue"
+   }
+   */
+  @Multiline
+  private String joinedMessageString;
+
+  private JSONObject joinedMessage;
+
+  @Before
+  public void parseMessages() {
+    JSONParser parser = new JSONParser();
+    try {
+      joinedMessage = (JSONObject) parser.parse(joinedMessageString);
+    } catch (ParseException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void test() {
+    StandAloneJoinBolt joinBolt = new StandAloneJoinBolt("zookeeperUrl");
+    joinBolt.setCuratorFramework(client);
+    joinBolt.setTreeCache(cache);
+    try {
+      joinBolt.prepare(new HashMap(), topologyContext, outputCollector);
+      fail("Should fail if a maxCacheSize property is not set");
+    } catch(IllegalStateException e) {}
+    joinBolt.withMaxCacheSize(100);
+    try {
+      joinBolt.prepare(new HashMap(), topologyContext, outputCollector);
+      fail("Should fail if a maxTimeRetain property is not set");
+    } catch(IllegalStateException e) {}
+    joinBolt.withMaxTimeRetain(10000);
+    joinBolt.prepare(new HashMap(), topologyContext, outputCollector);
+    joinBolt.declareOutputFields(declarer);
+    verify(declarer, times(1)).declareStream(eq("message"), argThat(new FieldsMatcher("key", "message")));
+    when(tuple.getValueByField("key")).thenReturn(key);
+    when(tuple.getSourceStreamId()).thenReturn("geo");
+    when(tuple.getValueByField("message")).thenReturn(geoMessage);
+    joinBolt.execute(tuple);
+    verify(outputCollector, times(0)).emit(eq("message"), any(tuple.getClass()), any(Values.class));
+    verify(outputCollector, times(0)).ack(tuple);
+    when(tuple.getSourceStreamId()).thenReturn("host");
+    when(tuple.getValueByField("message")).thenReturn(hostMessage);
+    joinBolt.execute(tuple);
+    verify(outputCollector, times(0)).emit(eq("message"), any(tuple.getClass()), any(Values.class));
+    verify(outputCollector, times(0)).ack(tuple);
+    when(tuple.getSourceStreamId()).thenReturn("hbaseEnrichment");
+    when(tuple.getValueByField("message")).thenReturn(hbaseEnrichmentMessage);
+    joinBolt.execute(tuple);
+    verify(outputCollector, times(1)).emit(eq("message"), any(tuple.getClass()), eq(new Values(key, joinedMessage)));
+    verify(outputCollector, times(1)).ack(tuple);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/SplitBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/SplitBoltTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/SplitBoltTest.java
new file mode 100644
index 0000000..1091ffd
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/SplitBoltTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.bolt;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import junit.framework.Assert;
+import org.apache.metron.domain.Configurations;
+import org.json.simple.JSONObject;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class SplitBoltTest extends BaseEnrichmentBoltTest {
+
+  public class StandAloneSplitBolt extends SplitBolt<JSONObject> {
+
+    public StandAloneSplitBolt(String zookeeperUrl) {
+      super(zookeeperUrl);
+    }
+
+
+    @Override
+    public void prepare(Map map, TopologyContext topologyContext) {
+
+    }
+
+    @Override
+    public Set<String> getStreamIds() {
+      return streamIds;
+    }
+
+    @Override
+    public String getKey(Tuple tuple, JSONObject message) {
+      return key;
+    }
+
+    @Override
+    public JSONObject generateMessage(Tuple tuple) {
+      return sampleMessage;
+    }
+
+    @Override
+    public Map<String, JSONObject> splitMessage(JSONObject message) {
+      return null;
+    }
+
+    @Override
+    public void declareOther(OutputFieldsDeclarer declarer) {
+
+    }
+
+    @Override
+    public void emitOther(Tuple tuple, JSONObject message) {
+
+    }
+  }
+
+  @Test
+  public void test() {
+    StandAloneSplitBolt splitBolt = spy(new StandAloneSplitBolt("zookeeperUrl"));
+    splitBolt.setCuratorFramework(client);
+    splitBolt.setTreeCache(cache);
+    doCallRealMethod().when(splitBolt).reloadCallback(anyString(), any(Configurations.Type.class));
+    splitBolt.prepare(new HashMap(), topologyContext, outputCollector);
+    splitBolt.declareOutputFields(declarer);
+    verify(declarer, times(1)).declareStream(eq("message"), argThat(new FieldsMatcher("key", "message")));
+    for(String streamId: streamIds) {
+      verify(declarer, times(1)).declareStream(eq(streamId), argThat(new FieldsMatcher("key", "message")));
+    }
+    verify(declarer, times(1)).declareStream(eq("error"), argThat(new FieldsMatcher("message")));
+
+    JSONObject sampleMessage = splitBolt.generateMessage(tuple);
+    Map<String, JSONObject> streamMessageMap = new HashMap<>();
+    streamMessageMap.put("geo", geoMessage);
+    streamMessageMap.put("host", hostMessage);
+    streamMessageMap.put("hbaseEnrichment", hbaseEnrichmentMessage);
+    doReturn(streamMessageMap).when(splitBolt).splitMessage(sampleMessage);
+    splitBolt.execute(tuple);
+    verify(outputCollector, times(1)).emit(eq("message"), any(tuple.getClass()), eq(new Values(key, sampleMessage)));
+    verify(outputCollector, times(1)).emit(eq("geo"), eq(new Values(key, geoMessage)));
+    verify(outputCollector, times(1)).emit(eq("host"), eq(new Values(key, hostMessage)));
+    verify(outputCollector, times(1)).emit(eq("hbaseEnrichment"), eq(new Values(key, hbaseEnrichmentMessage)));
+    verify(outputCollector, times(1)).ack(tuple);
+    streamMessageMap = new HashMap<>();
+    streamMessageMap.put("host", null);
+    doReturn(streamMessageMap).when(splitBolt).splitMessage(sampleMessage);
+    try {
+      splitBolt.execute(tuple);
+      Assert.fail("An exception should be thrown when splitMessage produces a null value for a stream");
+    }catch (IllegalArgumentException e) {}
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-Common/src/test/java/org/apache/metron/utils/ConfigurationsUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/utils/ConfigurationsUtilsTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/utils/ConfigurationsUtilsTest.java
index 3214f0e..65bef92 100644
--- a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/utils/ConfigurationsUtilsTest.java
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/utils/ConfigurationsUtilsTest.java
@@ -62,7 +62,6 @@ public class ConfigurationsUtilsTest {
     ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(testSensorType, testSensorConfigBytes, zookeeperUrl);
     byte[] readSensorConfigBytes = ConfigurationsUtils.readSensorEnrichmentConfigBytesFromZookeeper(testSensorType, client);
     Assert.assertTrue(Arrays.equals(testSensorConfigBytes, readSensorConfigBytes));
-
     String name = "testConfig";
     Map<String, Object> testConfig = new HashMap<>();
     testConfig.put("stringField", "value");

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
index 9b47d71..ee79a70 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
@@ -37,17 +37,10 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
   protected static final Logger LOG = LoggerFactory
           .getLogger(EnrichmentJoinBolt.class);
 
-  private List<Enrichment> enrichments;
-
   public EnrichmentJoinBolt(String zookeeperUrl) {
     super(zookeeperUrl);
   }
 
-  public EnrichmentJoinBolt withEnrichments(List<Enrichment> enrichments) {
-    this.enrichments = enrichments;
-    return this;
-  }
-
   @Override
   public void prepare(Map map, TopologyContext topologyContext) {
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
new file mode 100644
index 0000000..ff69b58
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.bolt.BaseEnrichmentBoltTest;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class EnrichmentJoinBoltTest extends BaseEnrichmentBoltTest {
+
+  /**
+   * {
+   * "enrichedField": "enrichedValue",
+   * "emptyEnrichedField": ""
+   * }
+   */
+  @Multiline
+  private String enrichedMessageString;
+
+  /**
+   * {
+   * "ip_src_addr": "ip1",
+   * "ip_dst_addr": "ip2",
+   * "source.type": "yaf",
+   * "enrichedField": "enrichedValue"
+   * }
+   */
+  @Multiline
+  private String expectedJoinedMessageString;
+
+  private JSONObject enrichedMessage;
+  private JSONObject expectedJoinedMessage;
+
+  @Before
+  public void parseMessages() throws ParseException {
+    JSONParser parser = new JSONParser();
+    enrichedMessage = (JSONObject) parser.parse(enrichedMessageString);
+    expectedJoinedMessage = (JSONObject) parser.parse(expectedJoinedMessageString);
+  }
+
+  @Test
+  public void test() throws IOException {
+    EnrichmentJoinBolt enrichmentJoinBolt = new EnrichmentJoinBolt("zookeeperUrl");
+    enrichmentJoinBolt.setCuratorFramework(client);
+    enrichmentJoinBolt.setTreeCache(cache);
+    enrichmentJoinBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath));
+    enrichmentJoinBolt.withMaxCacheSize(100);
+    enrichmentJoinBolt.withMaxTimeRetain(10000);
+    enrichmentJoinBolt.prepare(new HashMap<>(), topologyContext, outputCollector);
+    Set<String> actualStreamIds = enrichmentJoinBolt.getStreamIds(sampleMessage);
+    streamIds.add("message");
+    Assert.assertEquals(streamIds, actualStreamIds);
+    Map<String, JSONObject> streamMessageMap = new HashMap<>();
+    streamMessageMap.put("message", sampleMessage);
+    streamMessageMap.put("enriched", enrichedMessage);
+    JSONObject joinedMessage = enrichmentJoinBolt.joinMessages(streamMessageMap);
+    removeTimingFields(joinedMessage);
+    Assert.assertEquals(expectedJoinedMessage, joinedMessage);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java
new file mode 100644
index 0000000..40ae25d
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.metron.bolt.BaseEnrichmentBoltTest;
+import org.apache.metron.domain.Enrichment;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.ParseException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Mockito.when;
+
+
+public class EnrichmentSplitterBoltTest extends BaseEnrichmentBoltTest {
+
+  @Test
+  public void test() throws ParseException, IOException {
+    final Enrichment geo = new Enrichment();
+    geo.setType("geo");
+    final Enrichment host = new Enrichment();
+    host.setType("host");
+    final Enrichment hbaseEnrichment = new Enrichment();
+    hbaseEnrichment.setType("hbaseEnrichment");
+    List<Enrichment> enrichments = new ArrayList<Enrichment>() {{
+      add(geo);
+      add(host);
+      add(hbaseEnrichment);
+    }};
+
+    EnrichmentSplitterBolt enrichmentSplitterBolt = new EnrichmentSplitterBolt("zookeeperUrl").withEnrichments(enrichments);
+    enrichmentSplitterBolt.setCuratorFramework(client);
+    enrichmentSplitterBolt.setTreeCache(cache);
+    enrichmentSplitterBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath));
+    enrichmentSplitterBolt.prepare(new HashMap<>(), topologyContext, outputCollector);
+
+    String key = enrichmentSplitterBolt.getKey(tuple, sampleMessage);
+    Assert.assertTrue(key != null && key.length() == 36);
+    String someKey = "someKey";
+    when(tuple.getStringByField("key")).thenReturn(someKey);
+    key = enrichmentSplitterBolt.getKey(tuple, sampleMessage);
+    Assert.assertEquals(someKey, key);
+    when(tuple.getBinary(0)).thenReturn(sampleMessageString.getBytes());
+    JSONObject generatedMessage = enrichmentSplitterBolt.generateMessage(tuple);
+    removeTimingFields(generatedMessage);
+    Assert.assertEquals(sampleMessage, generatedMessage);
+    String messageFieldName = "messageFieldName";
+    enrichmentSplitterBolt.withMessageFieldName(messageFieldName);
+    when(tuple.getValueByField(messageFieldName)).thenReturn(sampleMessage);
+    generatedMessage = enrichmentSplitterBolt.generateMessage(tuple);
+    Assert.assertEquals(sampleMessage, generatedMessage);
+    Set<String> actualStreamIds = enrichmentSplitterBolt.getStreamIds();
+    Assert.assertEquals(streamIds, actualStreamIds);
+
+    Map<String, JSONObject> actualSplitMessages = enrichmentSplitterBolt.splitMessage(sampleMessage);
+    Assert.assertEquals(3, actualSplitMessages.size());
+    Assert.assertEquals(geoMessage, actualSplitMessages.get("geo"));
+    Assert.assertEquals(hostMessage, actualSplitMessages.get("host"));
+    Assert.assertEquals(hbaseEnrichmentMessage, actualSplitMessages.get("hbaseEnrichment"));
+
+
+  }
+
+  public void removeTimingFields(JSONObject message) {
+    ImmutableSet keys = ImmutableSet.copyOf(message.keySet());
+    for(Object key: keys) {
+      if (key.toString().contains("splitter.begin.ts")) {
+        message.remove(key);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
new file mode 100644
index 0000000..f195f5e
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import backtype.storm.tuple.Values;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.bolt.BaseEnrichmentBoltTest;
+import org.apache.metron.domain.Enrichment;
+import org.apache.metron.domain.SensorEnrichmentConfig;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.utils.ConfigurationsUtils;
+import org.hamcrest.Description;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class GenericEnrichmentBoltTest extends BaseEnrichmentBoltTest {
+
+  protected class EnrichedMessageMatcher extends ArgumentMatcher<Values> {
+
+    private String expectedKey;
+    private JSONObject expectedMessage;
+
+    public EnrichedMessageMatcher(String expectedKey, JSONObject expectedMessage) {
+      this.expectedKey = expectedKey;
+      this.expectedMessage = expectedMessage;
+    }
+
+    @Override
+    public boolean matches(Object o) {
+      Values values = (Values) o;
+      String actualKey = (String) values.get(0);
+      JSONObject actualMessage = (JSONObject) values.get(1);
+      removeTimingFields(actualMessage);
+      return expectedKey.equals(actualKey) && expectedMessage.equals(actualMessage);
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText(String.format("[%s]", expectedMessage));
+    }
+
+  }
+
+  /**
+   {
+   "field1": "value1",
+   "field2": "value2",
+   "source.type": "yaf"
+   }
+   */
+  @Multiline
+  private String originalMessageString;
+
+  /**
+   {
+   "enrichedField1": "enrichedValue1"
+   }
+   */
+  @Multiline
+  private String enrichedField1String;
+
+  /**
+   {
+   "enrichedField2": "enrichedValue2"
+   }
+   */
+  @Multiline
+  private String enrichedField2String;
+
+  /**
+   {
+   "field1.enrichedField1": "enrichedValue1",
+   "field2.enrichedField2": "enrichedValue2",
+   "source.type": "yaf"
+   }
+   */
+  @Multiline
+  private String enrichedMessageString;
+
+  private JSONObject originalMessage;
+  private JSONObject enrichedField1;
+  private JSONObject enrichedField2;
+  private JSONObject enrichedMessage;
+
+  @Before
+  public void parseMessages() throws ParseException {
+    JSONParser parser = new JSONParser();
+    originalMessage = (JSONObject) parser.parse(originalMessageString);
+    enrichedField1 = (JSONObject) parser.parse(enrichedField1String);
+    enrichedField2 = (JSONObject) parser.parse(enrichedField2String);
+    enrichedMessage = (JSONObject) parser.parse(enrichedMessageString);
+  }
+
+  @Mock
+  public EnrichmentAdapter<CacheKey> enrichmentAdapter;
+
+  @Before
+  public void initMocks() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void test() throws IOException {
+    String key = "someKey";
+    String enrichmentType = "enrichmentType";
+    Enrichment<EnrichmentAdapter<CacheKey>> testEnrichment = new Enrichment<>();
+    testEnrichment.setType(enrichmentType);
+    testEnrichment.setAdapter(enrichmentAdapter);
+    GenericEnrichmentBolt genericEnrichmentBolt = new GenericEnrichmentBolt("zookeeperUrl");
+    genericEnrichmentBolt.setCuratorFramework(client);
+    genericEnrichmentBolt.setTreeCache(cache);
+    genericEnrichmentBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath));
+    try {
+      genericEnrichmentBolt.prepare(new HashMap(), topologyContext, outputCollector);
+      fail("Should fail if a maxCacheSize property is not set");
+    } catch(IllegalStateException e) {}
+    genericEnrichmentBolt.withMaxCacheSize(100);
+    try {
+      genericEnrichmentBolt.prepare(new HashMap(), topologyContext, outputCollector);
+      fail("Should fail if a maxTimeRetain property is not set");
+    } catch(IllegalStateException e) {}
+    genericEnrichmentBolt.withMaxTimeRetain(10000);
+    try {
+      genericEnrichmentBolt.prepare(new HashMap(), topologyContext, outputCollector);
+      fail("Should fail if an adapter is not set");
+    } catch(IllegalStateException e) {}
+    genericEnrichmentBolt.withEnrichment(testEnrichment);
+    when(enrichmentAdapter.initializeAdapter()).thenReturn(true);
+    genericEnrichmentBolt.prepare(new HashMap(), topologyContext, outputCollector);
+    verify(enrichmentAdapter, times(1)).initializeAdapter();
+    when(enrichmentAdapter.initializeAdapter()).thenReturn(false);
+    try {
+      genericEnrichmentBolt.prepare(new HashMap(), topologyContext, outputCollector);
+      fail("An exception should be thrown if enrichment adapter initialization fails");
+    } catch(IllegalStateException e) {}
+    genericEnrichmentBolt.declareOutputFields(declarer);
+    verify(declarer, times(1)).declareStream(eq(enrichmentType), argThat(new FieldsMatcher("key", "message")));
+    verify(declarer, times(1)).declareStream(eq("error"), argThat(new FieldsMatcher("message")));
+    when(tuple.getStringByField("key")).thenReturn(null);
+    genericEnrichmentBolt.execute(tuple);
+    verify(outputCollector, times(1)).emit(eq("error"), any(Values.class));
+    when(tuple.getStringByField("key")).thenReturn(key);
+    when(tuple.getValueByField("message")).thenReturn(originalMessage);
+    genericEnrichmentBolt.execute(tuple);
+    verify(outputCollector, times(1)).emit(eq(enrichmentType), argThat(new EnrichedMessageMatcher(key, new JSONObject())));
+    reset(enrichmentAdapter);
+
+    SensorEnrichmentConfig sensorEnrichmentConfig = SensorEnrichmentConfig.
+            fromBytes(ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(sampleConfigRoot).get(sensorType));
+    CacheKey cacheKey1 = new CacheKey("field1", "value1", sensorEnrichmentConfig);
+    CacheKey cacheKey2 = new CacheKey("field2", "value2", sensorEnrichmentConfig);
+    when(enrichmentAdapter.enrich(cacheKey1)).thenReturn(enrichedField1);
+    when(enrichmentAdapter.enrich(cacheKey2)).thenReturn(enrichedField2);
+    genericEnrichmentBolt.execute(tuple);
+    verify(enrichmentAdapter, times(1)).logAccess(cacheKey1);
+    verify(enrichmentAdapter, times(1)).logAccess(cacheKey2);
+    verify(outputCollector, times(1)).emit(eq(enrichmentType), argThat(new EnrichedMessageMatcher(key, enrichedMessage)));
+
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
new file mode 100644
index 0000000..a065d2d
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import junit.framework.Assert;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.bolt.BaseEnrichmentBoltTest;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ThreatIntelJoinBoltTest extends BaseEnrichmentBoltTest {
+
+  /**
+   {
+   "field1": "value1",
+   "enrichedField1": "enrichedValue1",
+   "source.type": "yaf"
+   }
+   */
+  @Multiline
+  private String messageString;
+
+  /**
+   {
+   "field1": "value1",
+   "enrichedField1": "enrichedValue1",
+   "source.type": "yaf",
+   "threatintels.field.end.ts": "timing"
+   }
+   */
+  @Multiline
+  private String messageWithTimingString;
+
+  /**
+   {
+   "field1": "value1",
+   "enrichedField1": "enrichedValue1",
+   "source.type": "yaf",
+   "threatintels.field": "threatIntelValue"
+   }
+   */
+  @Multiline
+  private String alertMessageString;
+
+  private JSONObject message;
+  private JSONObject messageWithTiming;
+  private JSONObject alertMessage;
+
+  @Before
+  public void parseMessages() throws ParseException {
+    JSONParser parser = new JSONParser();
+    message = (JSONObject) parser.parse(messageString);
+    messageWithTiming = (JSONObject) parser.parse(messageWithTimingString);
+    alertMessage = (JSONObject) parser.parse(alertMessageString);
+  }
+
+  @Test
+  public void test() throws IOException {
+    ThreatIntelJoinBolt threatIntelJoinBolt = new ThreatIntelJoinBolt("zookeeperUrl");
+    threatIntelJoinBolt.setCuratorFramework(client);
+    threatIntelJoinBolt.setTreeCache(cache);
+    threatIntelJoinBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath));
+    threatIntelJoinBolt.withMaxCacheSize(100);
+    threatIntelJoinBolt.withMaxTimeRetain(10000);
+    threatIntelJoinBolt.prepare(new HashMap<>(), topologyContext, outputCollector);
+    Map<String, List<String>> fieldMap = threatIntelJoinBolt.getFieldMap("incorrectSourceType");
+    Assert.assertNull(fieldMap);
+    fieldMap = threatIntelJoinBolt.getFieldMap(sensorType);
+    Assert.assertTrue(fieldMap.containsKey("hbaseThreatIntel"));
+    Map<String, JSONObject> streamMessageMap = new HashMap<>();
+    streamMessageMap.put("message", message);
+    JSONObject joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap);
+    Assert.assertFalse(joinedMessage.containsKey("is_alert"));
+    streamMessageMap.put("message", messageWithTiming);
+    joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap);
+    Assert.assertFalse(joinedMessage.containsKey("is_alert"));
+    streamMessageMap.put("message", alertMessage);
+    joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap);
+    Assert.assertTrue(joinedMessage.containsKey("is_alert") && "true".equals(joinedMessage.get("is_alert")));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java
new file mode 100644
index 0000000..1c16f83
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import junit.framework.Assert;
+import org.apache.metron.bolt.BaseEnrichmentBoltTest;
+import org.junit.Test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ThreatIntelSplitterBoltTest extends BaseEnrichmentBoltTest {
+
+  @Test
+  public void test() throws IOException {
+    String threatIntelType = "hbaseThreatIntel";
+    ThreatIntelSplitterBolt threatIntelSplitterBolt = new ThreatIntelSplitterBolt("zookeeperUrl");
+    threatIntelSplitterBolt.setCuratorFramework(client);
+    threatIntelSplitterBolt.setTreeCache(cache);
+    threatIntelSplitterBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath));
+    threatIntelSplitterBolt.prepare(new HashMap<>(), topologyContext, outputCollector);
+    Map<String, List<String>> fieldMap = threatIntelSplitterBolt.getFieldMap(sensorType);
+    Assert.assertTrue(fieldMap.containsKey(threatIntelType));
+    String fieldName = threatIntelSplitterBolt.getKeyName(threatIntelType, "field");
+    Assert.assertEquals("threatintels.hbaseThreatIntel.field", fieldName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-MessageParsers/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/pom.xml b/metron-streaming/Metron-MessageParsers/pom.xml
index 2687213..108447a 100644
--- a/metron-streaming/Metron-MessageParsers/pom.xml
+++ b/metron-streaming/Metron-MessageParsers/pom.xml
@@ -93,6 +93,12 @@
 				</exclusion>
 			</exclusions>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.metron</groupId>
+			<artifactId>Metron-TestingUtilities</artifactId>
+			<version>${project.parent.version}</version>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 	<reporting>
 		<plugins>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/bolt/ParserBoltTest.java b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/bolt/ParserBoltTest.java
new file mode 100644
index 0000000..856bc1f
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/bolt/ParserBoltTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.bolt;
+
+import org.apache.metron.domain.Configurations;
+import org.apache.metron.parser.interfaces.MessageFilter;
+import org.apache.metron.parser.interfaces.MessageParser;
+import org.apache.metron.writer.interfaces.MessageWriter;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ParserBoltTest extends BaseBoltTest {
+
+  @Mock
+  private MessageParser<JSONObject> parser;
+
+  @Mock
+  private MessageWriter<JSONObject> writer;
+
+  @Mock
+  private MessageFilter<JSONObject> filter;
+
+  @Test
+  public void test() throws Exception {
+    String sensorType = "yaf";
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, writer);
+    parserBolt.setCuratorFramework(client);
+    parserBolt.setTreeCache(cache);
+    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
+    verify(parser, times(1)).init();
+    verify(writer, times(1)).init();
+    byte[] sampleBinary = "some binary message".getBytes();
+    JSONParser jsonParser = new JSONParser();
+    final JSONObject sampleMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\" }");
+    final JSONObject sampleMessage2 = (JSONObject) jsonParser.parse("{ \"field2\":\"value2\" }");
+    List<JSONObject> messages = new ArrayList<JSONObject>() {{
+      add(sampleMessage1);
+      add(sampleMessage2);
+    }};
+    final JSONObject finalMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\", \"source.type\":\"" + sensorType + "\" }");
+    final JSONObject finalMessage2 = (JSONObject) jsonParser.parse("{ \"field2\":\"value2\", \"source.type\":\"" + sensorType + "\" }");
+    when(tuple.getBinary(0)).thenReturn(sampleBinary);
+    when(parser.parse(sampleBinary)).thenReturn(messages);
+    when(parser.validate(eq(messages.get(0)))).thenReturn(true);
+    when(parser.validate(eq(messages.get(1)))).thenReturn(false);
+    parserBolt.execute(tuple);
+    verify(writer, times(1)).write(eq(sensorType), any(Configurations.class), eq(tuple), eq(finalMessage1));
+    verify(outputCollector, times(1)).ack(tuple);
+    when(parser.validate(eq(messages.get(0)))).thenReturn(true);
+    when(parser.validate(eq(messages.get(1)))).thenReturn(true);
+    when(filter.emitTuple(messages.get(0))).thenReturn(false);
+    when(filter.emitTuple(messages.get(1))).thenReturn(true);
+    parserBolt.withMessageFilter(filter);
+    parserBolt.execute(tuple);
+    verify(writer, times(1)).write(eq(sensorType), any(Configurations.class), eq(tuple), eq(finalMessage2));
+    verify(outputCollector, times(2)).ack(tuple);
+    doThrow(new Exception()).when(writer).write(eq(sensorType), any(Configurations.class), eq(tuple), eq(finalMessage2));
+    parserBolt.execute(tuple);
+    verify(outputCollector, times(1)).reportError(any(Throwable.class));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-TestingUtilities/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-TestingUtilities/pom.xml b/metron-streaming/Metron-TestingUtilities/pom.xml
index 1c67e7c..d19c0b2 100644
--- a/metron-streaming/Metron-TestingUtilities/pom.xml
+++ b/metron-streaming/Metron-TestingUtilities/pom.xml
@@ -35,8 +35,11 @@
       <artifactId>junit</artifactId>
       <version>${global_junit_version}</version>
     </dependency>
-
-
+    <dependency>
+      <groupId>com.googlecode.json-simple</groupId>
+      <artifactId>json-simple</artifactId>
+      <version>${global_json_simple_version}</version>
+    </dependency>
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
@@ -64,6 +67,47 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-core</artifactId>
+      <version>${global_storm_version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>servlet-api</artifactId>
+          <groupId>javax.servlet</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>log4j-over-slf4j</artifactId>
+          <groupId>org.slf4j</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-recipes</artifactId>
+      <version>2.7.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-test</artifactId>
+      <version>2.7.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>${global_mockito_version}</version>
+    </dependency>
+    <dependency>
+      <groupId>nl.jqno.equalsverifier</groupId>
+      <artifactId>equalsverifier</artifactId>
+      <version>2.0.2</version>
+    </dependency>
+    <dependency>
+      <groupId>org.adrianwalker</groupId>
+      <artifactId>multiline-string</artifactId>
+      <version>0.1.2</version>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-TestingUtilities/src/main/java/org/apache/metron/bolt/BaseBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-TestingUtilities/src/main/java/org/apache/metron/bolt/BaseBoltTest.java b/metron-streaming/Metron-TestingUtilities/src/main/java/org/apache/metron/bolt/BaseBoltTest.java
new file mode 100644
index 0000000..cb928ff
--- /dev/null
+++ b/metron-streaming/Metron-TestingUtilities/src/main/java/org/apache/metron/bolt/BaseBoltTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableSet;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.hamcrest.Description;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Arrays;
+import java.util.List;
+
+public abstract class BaseBoltTest {
+  
+  @Mock
+  protected TopologyContext topologyContext;
+
+  @Mock
+  protected OutputCollector outputCollector;
+
+  @Mock
+  protected Tuple tuple;
+
+  @Mock
+  protected OutputFieldsDeclarer declarer;
+
+  @Mock
+  protected CuratorFramework client;
+
+  @Mock
+  protected TreeCache cache;
+
+  @Before
+  public void initMocks() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  protected class FieldsMatcher extends ArgumentMatcher<Fields> {
+
+    private List<String> expectedFields;
+
+    public FieldsMatcher(String... fields) {
+      this.expectedFields = Arrays.asList(fields);
+    }
+
+    @Override
+    public boolean matches(Object o) {
+      Fields fields = (Fields) o;
+      return expectedFields.equals(fields.toList());
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText(String.format("[%s]", Joiner.on(",").join(expectedFields)));
+    }
+
+  }
+
+  public void removeTimingFields(JSONObject message) {
+    ImmutableSet keys = ImmutableSet.copyOf(message.keySet());
+    for (Object key : keys) {
+      if (key.toString().endsWith(".ts")) {
+        message.remove(key);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-TestingUtilities/src/main/java/org/apache/metron/bolt/BaseEnrichmentBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-TestingUtilities/src/main/java/org/apache/metron/bolt/BaseEnrichmentBoltTest.java b/metron-streaming/Metron-TestingUtilities/src/main/java/org/apache/metron/bolt/BaseEnrichmentBoltTest.java
new file mode 100644
index 0000000..9ab9b23
--- /dev/null
+++ b/metron-streaming/Metron-TestingUtilities/src/main/java/org/apache/metron/bolt/BaseEnrichmentBoltTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.bolt;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.Before;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class BaseEnrichmentBoltTest extends BaseBoltTest {
+
+  public String sampleConfigRoot = "../Metron-Testing/src/main/resources/sample/config/";
+  public String sampleSensorEnrichmentConfigPath = sampleConfigRoot + "sensors/yaf.json";
+  protected Set<String> streamIds = new HashSet<>();
+  protected String key = "someKey";
+  protected String sensorType = "yaf";
+
+  /**
+   * {
+   * "ip_src_addr": "ip1",
+   * "ip_dst_addr": "ip2",
+   * "source.type": "yaf"
+   * }
+   */
+  @Multiline
+  protected String sampleMessageString;
+
+  /**
+   * {
+   * "enrichments.geo.ip_src_addr": "ip1",
+   * "enrichments.geo.ip_dst_addr": "ip2",
+   * "source.type": "yaf"
+   * }
+   */
+  @Multiline
+  protected String geoMessageString;
+
+  /**
+   * {
+   * "enrichments.host.ip_src_addr": "ip1",
+   * "enrichments.host.ip_dst_addr": "ip2",
+   * "source.type": "yaf"
+   * }
+   */
+  @Multiline
+  protected String hostMessageString;
+
+  /**
+   * {
+   * "enrichments.hbaseEnrichment.ip_src_addr": "ip1",
+   * "enrichments.hbaseEnrichment.ip_dst_addr": "ip2",
+   * "source.type": "yaf"
+   * }
+   */
+  @Multiline
+  protected String hbaseEnrichmentMessageString;
+
+  protected JSONObject sampleMessage;
+  protected JSONObject geoMessage;
+  protected JSONObject hostMessage;
+  protected JSONObject hbaseEnrichmentMessage;
+
+  @Before
+  public void parseBaseMessages() throws ParseException {
+    JSONParser parser = new JSONParser();
+    sampleMessage = (JSONObject) parser.parse(sampleMessageString);
+    geoMessage = (JSONObject) parser.parse(geoMessageString);
+    hostMessage = (JSONObject) parser.parse(hostMessageString);
+    hbaseEnrichmentMessage = (JSONObject) parser.parse(hbaseEnrichmentMessageString);
+    streamIds.add("geo");
+    streamIds.add("host");
+    streamIds.add("hbaseEnrichment");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
index ddf0d9e..ef2143d 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
@@ -239,9 +239,6 @@ bolts:
         constructorArgs:
             - "${kafka.zk}"
         configMethods:
-            -   name: "withEnrichments"
-                args:
-                    - ref: "enrichments"
             -   name: "withMaxCacheSize"
                 args: [10000]
             -   name: "withMaxTimeRetain"
@@ -275,9 +272,6 @@ bolts:
         constructorArgs:
             - "${kafka.zk}"
         configMethods:
-            -   name: "withEnrichments"
-                args:
-                    - ref: "threatIntels"
             -   name: "withMaxCacheSize"
                 args: [10000]
             -   name: "withMaxTimeRetain"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/test.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/test.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/test.yaml
index 60cce7d..c39d3e7 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/test.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/test.yaml
@@ -221,9 +221,6 @@ bolts:
         constructorArgs:
             - "${kafka.zk}"
         configMethods:
-            -   name: "withEnrichments"
-                args:
-                    - ref: "enrichments"
             -   name: "withMaxCacheSize"
                 args: [10000]
             -   name: "withMaxTimeRetain"
@@ -257,9 +254,6 @@ bolts:
         constructorArgs:
             - "${kafka.zk}"
         configMethods:
-            -   name: "withEnrichments"
-                args:
-                    - ref: "threatIntels"
             -   name: "withMaxCacheSize"
                 args: [10000]
             -   name: "withMaxTimeRetain"