You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2016/04/12 02:05:25 UTC

incubator-metron git commit: METRON-101 Unit Tests for configuration-related classes (merrimanr via nickwallen) closes apache/incubator-metron#71

Repository: incubator-metron
Updated Branches:
  refs/heads/master 9ec1ef20b -> 8d5fb1bbc


METRON-101 Unit Tests for configuration-related classes (merrimanr via nickwallen) closes apache/incubator-metron#71


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/8d5fb1bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/8d5fb1bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/8d5fb1bb

Branch: refs/heads/master
Commit: 8d5fb1bbcd05a66c6c33475341b5216459b90a73
Parents: 9ec1ef2
Author: merrimanr <me...@gmail.com>
Authored: Mon Apr 11 20:00:15 2016 -0400
Committer: Nick Allen <ni...@nickallen.org>
Committed: Mon Apr 11 20:00:15 2016 -0400

----------------------------------------------------------------------
 metron-streaming/Metron-Common/pom.xml          |  24 ++-
 .../org/apache/metron/bolt/ConfiguredBolt.java  |  24 ++-
 .../apache/metron/domain/Configurations.java    |  36 ++++-
 .../metron/domain/SensorEnrichmentConfig.java   |  13 +-
 .../metron/utils/ConfigurationsUtils.java       |  59 +++++---
 .../org/apache/metron/bolt/BaseBoltTest.java    |  48 ++++++
 .../apache/metron/bolt/ConfiguredBoltTest.java  | 149 +++++++++++++++++++
 .../metron/domain/ConfigurationsTest.java       |  39 +++++
 .../domain/SensorEnrichmentConfigTest.java      |  42 ++++++
 .../metron/utils/ConfigurationsUtilsTest.java   |  95 ++++++++++++
 .../enrichment/bolt/GenericEnrichmentBolt.java  |   3 +-
 .../SolrEnrichmentIntegrationTest.java          |   2 +-
 12 files changed, 484 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/pom.xml b/metron-streaming/Metron-Common/pom.xml
index 22460d0..80b6a17 100644
--- a/metron-streaming/Metron-Common/pom.xml
+++ b/metron-streaming/Metron-Common/pom.xml
@@ -63,12 +63,6 @@
             </exclusions>
         </dependency>
         <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>log4j-over-slf4j</artifactId>
-            <version>${global_slf4j_version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_2.9.2</artifactId>
             <version>${global_kafka_version}</version>
@@ -182,6 +176,24 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <version>2.7.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>${global_mockito_version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>nl.jqno.equalsverifier</groupId>
+            <artifactId>equalsverifier</artifactId>
+            <version>2.0.2</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <reporting>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
index 3d22fe9..40a1f08 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
@@ -40,7 +40,6 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
   private static final Logger LOG = Logger.getLogger(ConfiguredBolt.class);
 
   private String zookeeperUrl;
-  private long timeout = Constants.DEFAULT_CONFIGURED_BOLT_TIMEOUT;
 
   protected final Configurations configurations = new Configurations();
   private CuratorFramework client;
@@ -50,21 +49,15 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
     this.zookeeperUrl = zookeeperUrl;
   }
 
-  protected void reloadCallback() {
-  }
-
-  public ConfiguredBolt withTimeout(long timeout) {
-    this.timeout = timeout;
-    return this;
+  protected void reloadCallback(String name, Configurations.Type type) {
   }
 
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
-    client = CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
-    client.start();
     try {
-      ConfigurationsUtils.updateConfigsFromZookeeper(configurations, client);
+      RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+      client = CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
+      client.start();
       cache = new TreeCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT);
       TreeCacheListener listener = new TreeCacheListener() {
         @Override
@@ -73,7 +66,6 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
             String path = event.getData().getPath();
             byte[] data = event.getData().getData();
             updateConfig(path, data);
-            reloadCallback();
           }
         }
       };
@@ -84,7 +76,6 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
       catch(Exception e) {
         LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
       }
-
       cache.start();
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
@@ -93,15 +84,20 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
   }
 
   public void updateConfig(String path, byte[] data) throws IOException {
-    if (data.length != 0 && path != null) {
+    if (data.length != 0) {
       String name = path.substring(path.lastIndexOf("/") + 1);
+      Configurations.Type type;
       if (path.startsWith(Constants.ZOOKEEPER_SENSOR_ROOT)) {
         configurations.updateSensorEnrichmentConfig(name, data);
+        type = Configurations.Type.SENSOR;
       } else if (Constants.ZOOKEEPER_GLOBAL_ROOT.equals(path)) {
         configurations.updateGlobalConfig(data);
+        type = Configurations.Type.GLOBAL;
       } else {
         configurations.updateConfig(name, data);
+        type = Configurations.Type.OTHER;
       }
+      reloadCallback(name, type);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java
index d93cc5f..63e0f95 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java
@@ -33,6 +33,10 @@ public class Configurations implements Serializable {
 
   private static final Logger LOG = Logger.getLogger(Configurations.class);
 
+  public enum Type {
+    GLOBAL, SENSOR, OTHER
+  }
+
   public static final String GLOBAL_CONFIG_NAME = "global";
 
   private ConcurrentMap<String, Object> configurations = new ConcurrentHashMap<>();
@@ -47,7 +51,8 @@ public class Configurations implements Serializable {
   }
 
   public void updateGlobalConfig(InputStream io) throws IOException {
-    Map<String, Object> globalConfig = JSONUtils.INSTANCE.load(io, new TypeReference<Map<String, Object>>() {});
+    Map<String, Object> globalConfig = JSONUtils.INSTANCE.load(io, new TypeReference<Map<String, Object>>() {
+    });
     updateGlobalConfig(globalConfig);
   }
 
@@ -77,16 +82,31 @@ public class Configurations implements Serializable {
     return (Map<String, Object>) configurations.get(name);
   }
 
-  public void updateConfig(String name, byte[] data) {
-    try {
-      Map<String, Object> config = JSONUtils.INSTANCE.load(new ByteArrayInputStream(data), new TypeReference<Map<String, Object>>(){});
-      updateConfig(name, config);
-    } catch (IOException e) {
-      throw new IllegalStateException(e);
-    }
+  public void updateConfig(String name, byte[] data) throws IOException {
+    if (data == null) throw new IllegalStateException("config data cannot be null");
+    Map<String, Object> config = JSONUtils.INSTANCE.load(new ByteArrayInputStream(data), new TypeReference<Map<String, Object>>() {});
+    updateConfig(name, config);
   }
 
   public void updateConfig(String name, Map<String, Object> config) {
     configurations.put(name, config);
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    Configurations that = (Configurations) o;
+    return configurations.equals(that.configurations);
+  }
+
+  @Override
+  public int hashCode() {
+    return configurations.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return configurations.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java
index e37c6d0..ea345ca 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java
@@ -83,9 +83,7 @@ public class SensorEnrichmentConfig {
   public static SensorEnrichmentConfig fromBytes(byte[] config) throws IOException {
     return JSONUtils.INSTANCE.load(new String(config), SensorEnrichmentConfig.class);
   }
-  public String toJSON(boolean pretty) throws JsonProcessingException {
-    return JSONUtils.INSTANCE.toJSON(this, pretty);
-  }
+
   public String toJSON() throws JsonProcessingException {
     return JSONUtils.INSTANCE.toJSON(this, true);
   }
@@ -110,6 +108,15 @@ public class SensorEnrichmentConfig {
   }
 
   @Override
+  public String toString() {
+    return "{index=" + index + ", batchSize=" + batchSize +
+            ", enrichmentFieldMap=" + enrichmentFieldMap +
+            ", threatIntelFieldMap" + threatIntelFieldMap +
+            ", fieldToEnrichmentTypeMap=" + fieldToEnrichmentTypeMap +
+            ", fieldToThreatIntelTypeMap=" + fieldToThreatIntelTypeMap + "}";
+  }
+
+  @Override
   public int hashCode() {
     int result = getIndex() != null ? getIndex().hashCode() : 0;
     result = 31 * result + (getEnrichmentFieldMap() != null ? getEnrichmentFieldMap().hashCode() : 0);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java
index 45e93a1..62259df 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java
@@ -49,38 +49,59 @@ public class ConfigurationsUtils {
     return CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
   }
 
-  public static void writeToZookeeperFromFile(String path, String filePath, String zookeeperUrl) throws Exception {
-    writeToZookeeper(path, Files.readAllBytes(Paths.get(filePath)), zookeeperUrl);
+  public static void writeGlobalConfigToZookeeper(Map<String, Object> globalConfig, String zookeeperUrl) throws Exception {
+    writeGlobalConfigToZookeeper(JSONUtils.INSTANCE.toJSON(globalConfig), zookeeperUrl);
   }
 
-  public static void writeToZookeeperFromFile(String path, String filePath, CuratorFramework client) throws Exception {
-    writeToZookeeper(path, Files.readAllBytes(Paths.get(filePath)), client);
+  public static void writeGlobalConfigToZookeeper(byte[] globalConfig, String zookeeperUrl) throws Exception {
+    CuratorFramework client = getClient(zookeeperUrl);
+    client.start();
+    try {
+      writeGlobalConfigToZookeeper(globalConfig, client);
+    }
+    finally {
+      client.close();
+    }
   }
-  public static void writerGlobalConfigToZookeeper(byte[] configData, String zookeeperUrl) throws Exception {
-    writeToZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, configData, zookeeperUrl);
+
+  public static void writeGlobalConfigToZookeeper(byte[] globalConfig, CuratorFramework client) throws Exception {
+    writeToZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, globalConfig, client);
   }
 
-  public static void writerGlobalConfigToZookeeper(byte[] configData, CuratorFramework client) throws Exception {
-    writeToZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, configData, client);
+  public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig, String zookeeperUrl) throws Exception {
+    writeSensorEnrichmentConfigToZookeeper(sensorType, JSONUtils.INSTANCE.toJSON(sensorEnrichmentConfig), zookeeperUrl);
   }
+
   public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, String zookeeperUrl) throws Exception {
-    writeToZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, configData, zookeeperUrl);
+    CuratorFramework client = getClient(zookeeperUrl);
+    client.start();
+    try {
+      writeSensorEnrichmentConfigToZookeeper(sensorType, configData, client);
+    }
+    finally {
+      client.close();
+    }
   }
 
   public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, CuratorFramework client) throws Exception {
     writeToZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, configData, client);
   }
-  public static void writeToZookeeper(String path, byte[] configData, String zookeeperUrl) throws Exception {
+
+  public static void writeConfigToZookeeper(String name, Map<String, Object> config, String zookeeperUrl) throws Exception {
+    writeConfigToZookeeper(name, JSONUtils.INSTANCE.toJSON(config), zookeeperUrl);
+  }
+
+  public static void writeConfigToZookeeper(String name, byte[] config, String zookeeperUrl) throws Exception {
     CuratorFramework client = getClient(zookeeperUrl);
     client.start();
     try {
-      writeToZookeeper(path, configData, client);
+      writeToZookeeper(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + name, config, client);
     }
     finally {
       client.close();
     }
-
   }
+
   public static void writeToZookeeper(String path, byte[] configData, CuratorFramework client) throws Exception {
     try {
       client.setData().forPath(path, configData);
@@ -98,19 +119,23 @@ public class ConfigurationsUtils {
   }
 
   public static byte[] readGlobalConfigBytesFromZookeeper(CuratorFramework client) throws Exception {
-    return readConfigBytesFromZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, client);
+    return readFromZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, client);
   }
 
   public static byte[] readSensorEnrichmentConfigBytesFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
-    return readConfigBytesFromZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, client);
+    return readFromZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, client);
+  }
+
+  public static byte[] readConfigBytesFromZookeeper(String name, CuratorFramework client) throws Exception {
+    return readFromZookeeper(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + name, client);
   }
 
-  public static byte[] readConfigBytesFromZookeeper(String path, CuratorFramework client) throws Exception {
+  public static byte[] readFromZookeeper(String path, CuratorFramework client) throws Exception {
     return client.getData().forPath(path);
   }
 
   public static void uploadConfigsToZookeeper(String rootFilePath, String zookeeperUrl) throws Exception {
-    ConfigurationsUtils.writerGlobalConfigToZookeeper(readGlobalConfigFromFile(rootFilePath), zookeeperUrl);
+    ConfigurationsUtils.writeGlobalConfigToZookeeper(readGlobalConfigFromFile(rootFilePath), zookeeperUrl);
     Map<String, byte[]> sensorEnrichmentConfigs = readSensorEnrichmentConfigsFromFile(rootFilePath);
     for(String sensorType: sensorEnrichmentConfigs.keySet()) {
       ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
@@ -160,7 +185,7 @@ public class ConfigurationsUtils {
       options.addOption(o);
     }
     {
-      Option o = new Option("p", "config_files", true, "Path to the source config files.  Must be named like \"$source\"-config.json");
+      Option o = new Option("p", "config_files", true, "Path to the source config files.  Must be named like \"$source\".json");
       o.setArgName("DIR_NAME");
       o.setRequired(false);
       options.addOption(o);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BaseBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BaseBoltTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BaseBoltTest.java
new file mode 100644
index 0000000..ada3854
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BaseBoltTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.bolt;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.metron.Constants;
+import org.apache.metron.utils.ConfigurationsUtils;
+import org.junit.Before;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class BaseBoltTest {
+
+  public String sampleConfigRoot = "../Metron-Testing/src/main/resources/sample/config/";
+  protected String zookeeperUrl;
+  protected Set<String> allConfigurationTypes = new HashSet<>();
+
+  @Before
+  public void setupConfiguration() throws Exception {
+    TestingServer testZkServer = new TestingServer(true);
+    this.zookeeperUrl = testZkServer.getConnectString();
+    byte[] globalConfig = ConfigurationsUtils.readGlobalConfigFromFile(sampleConfigRoot);
+    ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig, zookeeperUrl);
+    allConfigurationTypes.add(Constants.GLOBAL_CONFIG_NAME);
+    Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(sampleConfigRoot);
+    for (String sensorType : sensorEnrichmentConfigs.keySet()) {
+      ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
+      allConfigurationTypes.add(sensorType);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/ConfiguredBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/ConfiguredBoltTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/ConfiguredBoltTest.java
new file mode 100644
index 0000000..6c538f0
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/ConfiguredBoltTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.Constants;
+import org.apache.metron.domain.Configurations;
+import org.apache.metron.domain.SensorEnrichmentConfig;
+import org.apache.metron.utils.ConfigurationsUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+
+public class ConfiguredBoltTest extends BaseBoltTest {
+
+  private static Set<String> configsUpdated = new HashSet<>();
+
+  public static class StandAloneConfiguredBolt extends ConfiguredBolt {
+
+    public StandAloneConfiguredBolt(String zookeeperUrl) {
+      super(zookeeperUrl);
+    }
+
+    @Override
+    public void execute(Tuple input) {
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    }
+
+    @Override
+    protected void reloadCallback(String name, Configurations.Type type) {
+      configsUpdated.add(name);
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    Configurations sampleConfigurations = new Configurations();
+    TopologyContext topologyContext = mock(TopologyContext.class);
+    OutputCollector outputCollector = mock(OutputCollector.class);
+    try {
+      StandAloneConfiguredBolt configuredBolt = new StandAloneConfiguredBolt(null);
+      configuredBolt.prepare(new HashMap(), topologyContext, outputCollector);
+      Assert.fail("A valid zookeeper url must be supplied");
+    } catch (RuntimeException e){}
+
+    configsUpdated = new HashSet<>();
+    sampleConfigurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(sampleConfigRoot));
+    Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(sampleConfigRoot);
+    for (String sensorType : sensorEnrichmentConfigs.keySet()) {
+      sampleConfigurations.updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfigs.get(sensorType));
+    }
+
+    StandAloneConfiguredBolt configuredBolt = new StandAloneConfiguredBolt(zookeeperUrl);
+    configuredBolt.prepare(new HashMap(), topologyContext, outputCollector);
+    waitForConfigUpdate(allConfigurationTypes);
+    Assert.assertEquals(sampleConfigurations, configuredBolt.configurations);
+
+    configsUpdated = new HashSet<>();
+    Map<String, Object> sampleGlobalConfig = sampleConfigurations.getGlobalConfig();
+    sampleGlobalConfig.put("newGlobalField", "newGlobalValue");
+    ConfigurationsUtils.writeGlobalConfigToZookeeper(sampleGlobalConfig, zookeeperUrl);
+    waitForConfigUpdate(Constants.GLOBAL_CONFIG_NAME);
+    Assert.assertEquals("Add global config field", sampleConfigurations.getGlobalConfig(), configuredBolt.configurations.getGlobalConfig());
+
+    configsUpdated = new HashSet<>();
+    sampleGlobalConfig.remove("newGlobalField");
+    ConfigurationsUtils.writeGlobalConfigToZookeeper(sampleGlobalConfig, zookeeperUrl);
+    waitForConfigUpdate(Constants.GLOBAL_CONFIG_NAME);
+    Assert.assertEquals("Remove global config field", sampleConfigurations, configuredBolt.configurations);
+
+    configsUpdated = new HashSet<>();
+    String sensorType = "testSensorConfig";
+    SensorEnrichmentConfig testSensorConfig = new SensorEnrichmentConfig();
+    testSensorConfig.setBatchSize(50);
+    testSensorConfig.setIndex("test");
+    Map<String, List<String>> enrichmentFieldMap = new HashMap<>();
+    enrichmentFieldMap.put("enrichmentTest", new ArrayList<String>() {{
+      add("enrichmentField");
+    }});
+    testSensorConfig.setEnrichmentFieldMap(enrichmentFieldMap);
+    Map<String, List<String>> threatIntelFieldMap = new HashMap<>();
+    threatIntelFieldMap.put("threatIntelTest", new ArrayList<String>() {{
+      add("threatIntelField");
+    }});
+    testSensorConfig.setThreatIntelFieldMap(threatIntelFieldMap);
+    sampleConfigurations.updateSensorEnrichmentConfig(sensorType, testSensorConfig);
+    ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, testSensorConfig, zookeeperUrl);
+    waitForConfigUpdate(sensorType);
+    Assert.assertEquals("Add new sensor config", sampleConfigurations, configuredBolt.configurations);
+
+    configsUpdated = new HashSet<>();
+    String someConfigType = "someConfig";
+    Map<String, Object> someConfig = new HashMap<>();
+    someConfig.put("someField", "someValue");
+    sampleConfigurations.updateConfig(someConfigType, someConfig);
+    ConfigurationsUtils.writeConfigToZookeeper(someConfigType, someConfig, zookeeperUrl);
+    waitForConfigUpdate(someConfigType);
+    Assert.assertEquals("Add new misc config", sampleConfigurations, configuredBolt.configurations);
+    configuredBolt.cleanup();
+  }
+
+  private void waitForConfigUpdate(final String expectedConfigUpdate) {
+    waitForConfigUpdate(new HashSet<String>() {{ add(expectedConfigUpdate); }});
+  }
+
+  private void waitForConfigUpdate(Set<String> expectedConfigUpdates) {
+    int count = 0;
+    while (!configsUpdated.equals(expectedConfigUpdates)) {
+      if (count++ > 5) {
+        Assert.fail("ConfiguredBolt was not updated in time");
+        return;
+      }
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/ConfigurationsTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/ConfigurationsTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/ConfigurationsTest.java
new file mode 100644
index 0000000..5d883b6
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/ConfigurationsTest.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.domain;
+
+import junit.framework.Assert;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import nl.jqno.equalsverifier.Warning;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class ConfigurationsTest {
+
+  @Test
+  public void test() throws IOException {
+    EqualsVerifier.forClass(Configurations.class).suppress(Warning.NONFINAL_FIELDS, Warning.NULL_FIELDS).usingGetClass().verify();
+    Configurations configurations = new Configurations();
+    try {
+      configurations.updateConfig("someConfig", (byte[]) null);
+      Assert.fail("Updating a config with null should throw an IllegalStateException");
+    } catch(IllegalStateException e) {}
+    Assert.assertTrue(configurations.toString() != null && configurations.toString().length() > 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/SensorEnrichmentConfigTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/SensorEnrichmentConfigTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/SensorEnrichmentConfigTest.java
new file mode 100644
index 0000000..ed2b854
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/SensorEnrichmentConfigTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.domain;
+
+import junit.framework.Assert;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import nl.jqno.equalsverifier.Warning;
+import org.apache.metron.utils.ConfigurationsUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class SensorEnrichmentConfigTest {
+
+  public String sampleConfigRoot = "../Metron-Testing/src/main/resources/sample/config/";
+
+  @Test
+  public void test() throws IOException {
+    EqualsVerifier.forClass(SensorEnrichmentConfig.class).suppress(Warning.NONFINAL_FIELDS).usingGetClass().verify();
+    Map<String, byte[]> testSensorConfigMap = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(sampleConfigRoot);
+    byte[] sensorConfigBytes = testSensorConfigMap.get("yaf");
+    SensorEnrichmentConfig sensorEnrichmentConfig = SensorEnrichmentConfig.fromBytes(sensorConfigBytes);
+    Assert.assertNotNull(sensorEnrichmentConfig);
+    Assert.assertTrue(sensorEnrichmentConfig.toString() != null && sensorEnrichmentConfig.toString().length() > 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Common/src/test/java/org/apache/metron/utils/ConfigurationsUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/utils/ConfigurationsUtilsTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/utils/ConfigurationsUtilsTest.java
new file mode 100644
index 0000000..3214f0e
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/utils/ConfigurationsUtilsTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.utils;
+
+import junit.framework.Assert;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ConfigurationsUtilsTest {
+
+  public String sampleConfigRoot = "../Metron-Testing/src/main/resources/sample/config/";
+
+  private TestingServer testZkServer;
+  private String zookeeperUrl;
+  private CuratorFramework client;
+  private byte[] testGlobalConfig;
+  private Map<String, byte[]> testSensorConfigMap;
+
+  @Before
+  public void setup() throws Exception {
+    testZkServer = new TestingServer(true);
+    zookeeperUrl = testZkServer.getConnectString();
+    client = ConfigurationsUtils.getClient(zookeeperUrl);
+    client.start();
+    testGlobalConfig = ConfigurationsUtils.readGlobalConfigFromFile(sampleConfigRoot);
+    testSensorConfigMap = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(sampleConfigRoot);
+  }
+
+  @Test
+  public void test() throws Exception {
+    Assert.assertTrue(testGlobalConfig.length > 0);
+    ConfigurationsUtils.writeGlobalConfigToZookeeper(testGlobalConfig, zookeeperUrl);
+    byte[] readGlobalConfigBytes = ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(client);
+    Assert.assertTrue(Arrays.equals(testGlobalConfig, readGlobalConfigBytes));
+
+    Assert.assertTrue(testSensorConfigMap.size() > 0);
+    String testSensorType = "yaf";
+    byte[] testSensorConfigBytes = testSensorConfigMap.get(testSensorType);
+    ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(testSensorType, testSensorConfigBytes, zookeeperUrl);
+    byte[] readSensorConfigBytes = ConfigurationsUtils.readSensorEnrichmentConfigBytesFromZookeeper(testSensorType, client);
+    Assert.assertTrue(Arrays.equals(testSensorConfigBytes, readSensorConfigBytes));
+
+    String name = "testConfig";
+    Map<String, Object> testConfig = new HashMap<>();
+    testConfig.put("stringField", "value");
+    testConfig.put("intField", 1);
+    testConfig.put("doubleField", 1.1);
+    ConfigurationsUtils.writeConfigToZookeeper(name, testConfig, zookeeperUrl);
+    byte[] readConfigBytes = ConfigurationsUtils.readConfigBytesFromZookeeper(name, client);
+    Assert.assertTrue(Arrays.equals(JSONUtils.INSTANCE.toJSON(testConfig), readConfigBytes));
+
+  }
+
+  @Test
+  public void testCmdLine() throws Exception {
+    String[] args = {"-z", zookeeperUrl, "-p", sampleConfigRoot};
+    ConfigurationsUtils.main(args);
+    byte[] readGlobalConfigBytes = ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(client);
+    Assert.assertTrue(Arrays.equals(testGlobalConfig, readGlobalConfigBytes));
+    for(String sensorType: testSensorConfigMap.keySet()) {
+      byte[] readSensorConfigBytes = ConfigurationsUtils.readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client);
+      Assert.assertTrue(Arrays.equals(testSensorConfigMap.get(sensorType), readSensorConfigBytes));
+    }
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    client.close();
+    testZkServer.close();
+    testZkServer.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 7b76c57..180c630 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -29,6 +29,7 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import org.apache.metron.Constants;
 import org.apache.metron.bolt.ConfiguredBolt;
+import org.apache.metron.domain.Configurations;
 import org.apache.metron.domain.Enrichment;
 import org.apache.metron.domain.SensorEnrichmentConfig;
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
@@ -114,7 +115,7 @@ public class GenericEnrichmentBolt extends ConfiguredBolt {
     return this;
   }
   @Override
-  protected void reloadCallback() {
+  protected void reloadCallback(String name, Configurations.Type type) {
     if(invalidateCacheOnReload) {
       if (cache != null) {
         cache.invalidateAll();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/SolrEnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/SolrEnrichmentIntegrationTest.java b/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/SolrEnrichmentIntegrationTest.java
index afeb56b..ebe445f 100644
--- a/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/SolrEnrichmentIntegrationTest.java
+++ b/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/SolrEnrichmentIntegrationTest.java
@@ -53,7 +53,7 @@ public class SolrEnrichmentIntegrationTest extends EnrichmentIntegrationTest {
                   Configurations configurations = SampleUtil.getSampleConfigs();
                   Map<String, Object> globalConfig = configurations.getGlobalConfig();
                   globalConfig.put("solr.zookeeper", solrComponent.getZookeeperUrl());
-                  ConfigurationsUtils.writerGlobalConfigToZookeeper(JSONUtils.INSTANCE.toJSON(globalConfig), testZookeeperUrl);
+                  ConfigurationsUtils.writeGlobalConfigToZookeeper(JSONUtils.INSTANCE.toJSON(globalConfig), testZookeeperUrl);
                 } catch (Exception e) {
                   e.printStackTrace();
                 }