You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2017/10/20 21:20:19 UTC

[1/3] metron git commit: METRON-1241: Enable the REST API to use a cache for the zookeeper config similar to the Bolts closes apache/incubator-metron#795

Repository: metron
Updated Branches:
  refs/heads/master aee018476 -> cc111ec98


http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 49f111d..3316b32 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -17,10 +17,12 @@
  */
 package org.apache.metron.parsers.bolt;
 
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.*;
 
 import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
 import org.apache.metron.test.error.MetronErrorJSONMatcher;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -115,13 +117,35 @@ public class ParserBoltTest extends BaseBoltTest {
     }
   }
 
+  private static ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+    return new ConfigurationsUpdater<ParserConfigurations>(null, null) {
+      @Override
+      public void update(CuratorFramework client, String path, byte[] data) throws IOException { }
+
+      @Override
+      public void delete(CuratorFramework client, String path, byte[] data) throws IOException { }
+
+      @Override
+      public ConfigurationType getType() {
+        return ConfigurationType.PARSER;
+      }
+
+      @Override
+      public void update(String name, byte[] data) throws IOException { }
+
+      @Override
+      public void delete(String name) { }
+
+      @Override
+      public Class<ParserConfigurations> getConfigurationClass() {
+        return ParserConfigurations.class;
+      }
 
-  @Test
-  public void testEmpty() throws Exception {
-    String sensorType = "yaf";
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) {
       @Override
-      protected ParserConfigurations defaultConfigurations() {
+      public void forceUpdate(CuratorFramework client) { }
+
+      @Override
+      public ParserConfigurations defaultConfigurations() {
         return new ParserConfigurations() {
           @Override
           public SensorParserConfig getSensorParserConfig(String sensorType) {
@@ -135,11 +159,22 @@ public class ParserBoltTest extends BaseBoltTest {
           }
         };
       }
+    };
+  }
+
 
+  @Test
+  public void testEmpty() throws Exception {
+    String sensorType = "yaf";
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) {
+      @Override
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        return ParserBoltTest.createUpdater();
+      }
     };
 
     parserBolt.setCuratorFramework(client);
-    parserBolt.setTreeCache(cache);
+    parserBolt.setZKCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(parser, times(1)).init();
     verify(writer, times(1)).init();
@@ -165,28 +200,15 @@ public class ParserBoltTest extends BaseBoltTest {
     String sensorType = "yaf";
     ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) {
       @Override
-      protected ParserConfigurations defaultConfigurations() {
-        return new ParserConfigurations() {
-          @Override
-          public SensorParserConfig getSensorParserConfig(String sensorType) {
-            return new SensorParserConfig() {
-              @Override
-              public Map<String, Object> getParserConfig() {
-                return new HashMap<String, Object>() {{
-                }};
-              }
-
-
-            };
-          }
-        };
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        return ParserBoltTest.createUpdater();
       }
     };
 
     buildGlobalConfig(parserBolt);
 
     parserBolt.setCuratorFramework(client);
-    parserBolt.setTreeCache(cache);
+    parserBolt.setZKCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     byte[] sampleBinary = "some binary message".getBytes();
 
@@ -218,23 +240,13 @@ public class ParserBoltTest extends BaseBoltTest {
     String sensorType = "yaf";
     ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) {
       @Override
-      protected ParserConfigurations defaultConfigurations() {
-        return new ParserConfigurations() {
-          @Override
-          public SensorParserConfig getSensorParserConfig(String sensorType) {
-            return new SensorParserConfig() {
-              @Override
-              public Map<String, Object> getParserConfig() {
-                return new HashMap<String, Object>() {{
-                }};
-              }
-            };
-          }
-        };
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        return ParserBoltTest.createUpdater();
       }
+
     };
     parserBolt.setCuratorFramework(client);
-    parserBolt.setTreeCache(cache);
+    parserBolt.setZKCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(parser, times(1)).init();
     verify(writer, times(1)).init();
@@ -274,24 +286,13 @@ public void testImplicitBatchOfOne() throws Exception {
 
   ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
     @Override
-    protected ParserConfigurations defaultConfigurations() {
-      return new ParserConfigurations() {
-        @Override
-        public SensorParserConfig getSensorParserConfig(String sensorType) {
-          return new SensorParserConfig() {
-            @Override
-            public Map<String, Object> getParserConfig() {
-              return new HashMap<String, Object>() {{
-              }};
-            }
-          };
-        }
-      };
+    protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+      return ParserBoltTest.createUpdater();
     }
   };
 
   parserBolt.setCuratorFramework(client);
-  parserBolt.setTreeCache(cache);
+  parserBolt.setZKCache(cache);
   parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
   verify(parser, times(1)).init();
   verify(batchWriter, times(1)).init(any(), any(), any());
@@ -334,10 +335,14 @@ public void testImplicitBatchOfOne() throws Exception {
           throw new RuntimeException(e);
         }
       }
+      @Override
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        return ParserBoltTest.createUpdater();
+      }
     };
 
     parserBolt.setCuratorFramework(client);
-    parserBolt.setTreeCache(cache);
+    parserBolt.setZKCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(parser, times(1)).init();
     verify(batchWriter, times(1)).init(any(), any(), any());
@@ -371,10 +376,15 @@ public void testImplicitBatchOfOne() throws Exception {
           throw new RuntimeException(e);
         }
       }
+
+      @Override
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        return ParserBoltTest.createUpdater();
+      }
     };
 
     parserBolt.setCuratorFramework(client);
-    parserBolt.setTreeCache(cache);
+    parserBolt.setZKCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(parser, times(1)).init();
     verify(batchWriter, times(1)).init(any(), any(), any());
@@ -440,10 +450,15 @@ public void testImplicitBatchOfOne() throws Exception {
           throw new RuntimeException(e);
         }
       }
+
+      @Override
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        return ParserBoltTest.createUpdater();
+      }
     };
 
     parserBolt.setCuratorFramework(client);
-    parserBolt.setTreeCache(cache);
+    parserBolt.setZKCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     when(t1.getBinary(0)).thenReturn(new byte[] {});
     parserBolt.execute(t1);
@@ -459,25 +474,13 @@ public void testImplicitBatchOfOne() throws Exception {
 
     ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
       @Override
-      protected ParserConfigurations defaultConfigurations() {
-        return new ParserConfigurations() {
-          @Override
-          public SensorParserConfig getSensorParserConfig(String sensorType) {
-            return new SensorParserConfig() {
-              @Override
-              public Map<String, Object> getParserConfig() {
-                return new HashMap<String, Object>() {{
-                  put(IndexingConfigurations.BATCH_SIZE_CONF, "1");
-                }};
-              }
-            };
-          }
-        };
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        return ParserBoltTest.createUpdater();
       }
     };
 
     parserBolt.setCuratorFramework(client);
-    parserBolt.setTreeCache(cache);
+    parserBolt.setZKCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(parser, times(1)).init();
     verify(batchWriter, times(1)).init(any(), any(), any());
@@ -498,25 +501,13 @@ public void testImplicitBatchOfOne() throws Exception {
 
     ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
       @Override
-      protected ParserConfigurations defaultConfigurations() {
-        return new ParserConfigurations() {
-          @Override
-          public SensorParserConfig getSensorParserConfig(String sensorType) {
-            return new SensorParserConfig() {
-              @Override
-              public Map<String, Object> getParserConfig() {
-                return new HashMap<String, Object>() {{
-                  put(IndexingConfigurations.BATCH_SIZE_CONF, 5);
-                }};
-              }
-            };
-          }
-        };
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        return ParserBoltTest.createUpdater();
       }
     };
 
     parserBolt.setCuratorFramework(client);
-    parserBolt.setTreeCache(cache);
+    parserBolt.setZKCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(parser, times(1)).init();
     verify(batchWriter, times(1)).init(any(), any(), any());
@@ -541,31 +532,20 @@ public void testImplicitBatchOfOne() throws Exception {
 
 
   }
+
   @Test
   public void testBatchOfFiveWithError() throws Exception {
 
     String sensorType = "yaf";
     ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
       @Override
-      protected ParserConfigurations defaultConfigurations() {
-        return new ParserConfigurations() {
-          @Override
-          public SensorParserConfig getSensorParserConfig(String sensorType) {
-            return new SensorParserConfig() {
-              @Override
-              public Map<String, Object> getParserConfig() {
-                return new HashMap<String, Object>() {{
-                  put(IndexingConfigurations.BATCH_SIZE_CONF, 5);
-                }};
-              }
-            };
-          }
-        };
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        return ParserBoltTest.createUpdater();
       }
     };
 
     parserBolt.setCuratorFramework(client);
-    parserBolt.setTreeCache(cache);
+    parserBolt.setZKCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(parser, times(1)).init();
     verify(batchWriter, times(1)).init(any(), any(), any());

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-test-utilities/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/pom.xml b/metron-platform/metron-test-utilities/pom.xml
index 2502760..38f8a38 100644
--- a/metron-platform/metron-test-utilities/pom.xml
+++ b/metron-platform/metron-test-utilities/pom.xml
@@ -28,6 +28,11 @@
   </properties>
   <dependencies>
     <dependency>
+      <groupId>org.apache.metron</groupId>
+      <artifactId>metron-zookeeper</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+    <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
       <version>1.2.17</version>
@@ -127,11 +132,7 @@
         </exclusion>
       </exclusions>
     </dependency>
-    <dependency>
-      <groupId>org.apache.curator</groupId>
-      <artifactId>curator-recipes</artifactId>
-      <version>${global_curator_version}</version>
-    </dependency>
+
     <dependency>
       <groupId>org.apache.curator</groupId>
       <artifactId>curator-test</artifactId>

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java
index 75f999a..ac64b6a 100644
--- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.test.bolt;
 
+import org.apache.metron.zookeeper.ZKCache;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -54,7 +55,7 @@ public abstract class BaseBoltTest {
   protected CuratorFramework client;
 
   @Mock
-  protected TreeCache cache;
+  protected ZKCache cache;
 
   @Before
   public void initMocks() {

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-zookeeper/pom.xml b/metron-platform/metron-zookeeper/pom.xml
new file mode 100644
index 0000000..e02cafd
--- /dev/null
+++ b/metron-platform/metron-zookeeper/pom.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- 
+  Licensed to the Apache Software 
+  Foundation (ASF) under one or more contributor license agreements. See the 
+  NOTICE file distributed with this work for additional information regarding 
+  copyright ownership. The ASF licenses this file to You under the Apache License, 
+  Version 2.0 (the "License"); you may not use this file except in compliance 
+  with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
+  Unless required by applicable law or agreed to in writing, software distributed 
+  under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+  OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+  the specific language governing permissions and limitations under the License. 
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.metron</groupId>
+    <artifactId>metron-platform</artifactId>
+    <version>0.4.1</version>
+  </parent>
+  <artifactId>metron-zookeeper</artifactId>
+  <name>metron-zookeeper</name>
+  <url>https://metron.apache.org/</url>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+  </properties>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-client</artifactId>
+      <version>${global_curator_version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-recipes</artifactId>
+      <version>${global_curator_version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${global_guava_version}</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/SimpleEventListener.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/SimpleEventListener.java b/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/SimpleEventListener.java
new file mode 100644
index 0000000..1078f1e
--- /dev/null
+++ b/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/SimpleEventListener.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.zookeeper;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+
+/**
+ * This is a simple convenience implementation of a TreeCacheListener.
+ * It allows multiple callbacks to be called with one listener.
+ */
+public class SimpleEventListener implements TreeCacheListener {
+
+  private static final Logger LOG =  LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * The callback interface.  This is to be implemented for all callbacks bound to a SimpleEventListener
+   */
+  public interface Callback {
+    /**
+     * Called upon an event.
+     * @param client The zookeeper client
+     * @param path The zookeeper path changed
+     * @param data The data that changed.
+     * @throws IOException
+     */
+    void apply(CuratorFramework client, String path, byte[] data) throws IOException;
+  }
+
+  /**
+   * Builder to create a SimpleEventListener
+   */
+  public static class Builder {
+    private EnumMap<TreeCacheEvent.Type, List<Callback>> callbacks = new EnumMap<>(TreeCacheEvent.Type.class);
+
+    /**
+     * Add a callback bound to one or more TreeCacheEvent.Type.
+     * @param callback The callback to be called when an event of each of types happens
+     * @param types The zookeeper event types to bind to
+     * @return The Builder
+     */
+    public Builder with(Callback callback, TreeCacheEvent.Type... types) {
+      return with(ImmutableList.of(callback), types);
+    }
+
+    /**
+     * Add a callback bound to one or more TreeCacheEvent.Type.
+     * @param callback The iterable of callbacks to be called when an event of each of types happens
+     * @param types The zookeeper event types to bind to
+     * @return The Builder
+     */
+    public Builder with(Iterable<? extends Callback> callback, TreeCacheEvent.Type... types) {
+      for(TreeCacheEvent.Type t : types) {
+        List<Callback> cbs = callbacks.get(t);
+        if(cbs == null) {
+          cbs = new ArrayList<>();
+        }
+        Iterables.addAll(cbs, callback);
+        callbacks.put(t, cbs);
+      }
+      return this;
+    }
+
+    /**
+     * Create the listener.
+     * @return The SimpleEventListener
+     */
+    public SimpleEventListener build() {
+      return new SimpleEventListener(callbacks);
+    }
+
+  }
+
+  EnumMap<TreeCacheEvent.Type, List<Callback>> callbacks;
+
+  private SimpleEventListener(EnumMap<TreeCacheEvent.Type, List<Callback>> callbacks) {
+    this.callbacks = callbacks;
+  }
+
+  @Override
+  public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
+    String path = null;
+    byte[] data = null;
+    if(event != null && event.getData() != null) {
+      path = event.getData().getPath();
+      data = event.getData().getData();
+    }
+    LOG.debug("Type: {}, Path: {}, Data: {}", event.getType(), (path == null?"":path) , (data == null?"":new String(data)));
+    List<Callback> callback = callbacks.get(event.getType());
+    if(callback != null) {
+      for(Callback cb : callback) {
+        cb.apply(client, path, data);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/ZKCache.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/ZKCache.java b/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/ZKCache.java
new file mode 100644
index 0000000..58a6329
--- /dev/null
+++ b/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/ZKCache.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.zookeeper;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * A zookeeper cache that composes the Curator TreeCache.  This is the basic point of
+ * abstraction to interact with metron configuration in Zookeeper.
+ */
+public class ZKCache implements AutoCloseable{
+  private static final Logger LOG =  LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final int DEFAULT_CLIENT_SLEEP_MS = 1000;
+  public static final int DEFAULT_MAX_RETRIES = 3;
+
+
+  /**
+   * Build a ZKCache instance.
+   */
+  public static class Builder {
+    private Optional<CuratorFramework> client = Optional.empty();
+    private boolean ownClient = false;
+    private List<TreeCacheListener> listener = new ArrayList<>();
+    private String zkRoot;
+
+    public Builder() { }
+
+    /**
+     * Specify your own client.  If you specify this, closing will not close your Client.
+     * If a client is not passed in, then one is created and will be closed when the ZKCache
+     * closes.
+     * @param client The CuratorFramework client.
+     * @return The Builder
+     */
+    public Builder withClient(CuratorFramework client) {
+      this.client = Optional.ofNullable(client);
+      ownClient = false;
+      return this;
+    }
+
+    /**
+     * Specify your own zookeeper URL.  If you pass this in, the ZKCache will own the client
+     * and it will be closed when the ZKCache is closed.
+     *
+     * @param zookeeperUrl The zookeeper quorum
+     * @return The Builder
+     */
+    public Builder withClient(String zookeeperUrl) {
+      this.client = Optional.ofNullable(createClient(zookeeperUrl, Optional.empty()));
+      ownClient = true;
+      return this;
+    }
+
+    /**
+     * Specify your own zookeeper URL.  If you pass this in, the ZKCache will own the client
+     * and it will be closed when the ZKCache is closed.
+     *
+     * @param zookeeperUrl The zookeeper quorum
+     * @param retryPolicy The RetryPolicy to use
+     * @return The Builder
+     */
+    public Builder withClient(String zookeeperUrl, RetryPolicy retryPolicy) {
+      this.client = Optional.ofNullable(createClient(zookeeperUrl, Optional.ofNullable(retryPolicy)));
+      ownClient = true;
+      return this;
+    }
+
+    /**
+     * Specify the treecache listener, which will be called when changes happen to the zookeeper root.
+     *
+     * @param listener The callback which is called when changes happen in zookeeper.
+     * @return The Builder
+     */
+    public Builder withListener(TreeCacheListener listener) {
+      this.listener.add(listener);
+      return this;
+    }
+
+    /**
+     * Specify the root in zookeeper to monitor.
+     * @param zkRoot The root path in zookeeper
+     * @return The Builder
+     */
+    public Builder withRoot(String zkRoot) {
+      this.zkRoot = zkRoot;
+      return this;
+    }
+
+    /**
+     * Create the ZKCache object based on the config passed in the Builder.
+     * @return The ZKCache
+     */
+    public ZKCache build() {
+      if(!client.isPresent()) {
+        throw new IllegalArgumentException("Zookeeper client must be specified.");
+      }
+      if(listener.isEmpty()) {
+        LOG.warn("Zookeeper listener is null or empty, which is very likely an error.");
+      }
+      if(zkRoot == null) {
+        throw new IllegalArgumentException("Zookeeper root must not be null.");
+      }
+      return new ZKCache(client.get(), listener, zkRoot, ownClient);
+    }
+
+  }
+
+  private CuratorFramework client;
+  private List<TreeCacheListener> listeners;
+  private TreeCache cache;
+  private String zkRoot;
+  private boolean ownClient = false;
+
+  private ZKCache(CuratorFramework client, List<TreeCacheListener> listeners, String zkRoot, boolean ownClient) {
+    this.client = client;
+    this.listeners = listeners;
+    this.ownClient = ownClient;
+    if(zkRoot == null) {
+      throw new IllegalArgumentException("Zookeeper root must not be null.");
+    }
+    this.zkRoot = zkRoot;
+  }
+
+  /**
+   * Return the client used.
+   * NOTE: DO NOT CLOSE THIS CLIENT OUT OF BAND.
+   * @return The Curator Client
+   */
+  public CuratorFramework getClient() {
+    return client;
+  }
+
+
+  /**
+   * Start the cache.
+   * @throws Exception If unable to be started.
+   */
+  public void start() throws Exception {
+    if(cache == null) {
+      if(ownClient) {
+        client.start();
+      }
+      TreeCache.Builder builder = TreeCache.newBuilder(client, zkRoot);
+      builder.setCacheData(true);
+      cache = builder.build();
+      for(TreeCacheListener l : listeners) {
+        cache.getListenable().addListener(l);
+      }
+      cache.start();
+    }
+  }
+
+  /**
+   * Close the cache, which closes the client if it's owned by the ZKCache.
+   */
+  @Override
+  public void close() {
+    cache.close();
+    if(ownClient) {
+      client.close();
+    }
+  }
+
+  public static CuratorFramework createClient(String zookeeperUrl, Optional<RetryPolicy> retryPolicy) {
+    return CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy.orElse(new ExponentialBackoffRetry(DEFAULT_CLIENT_SLEEP_MS, DEFAULT_MAX_RETRIES)));
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/pom.xml b/metron-platform/pom.xml
index 93ced81..3e72d78 100644
--- a/metron-platform/pom.xml
+++ b/metron-platform/pom.xml
@@ -60,6 +60,7 @@
 		<module>metron-elasticsearch</module>
 		<module>metron-storm-kafka</module>
 		<module>metron-storm-kafka-override</module>
+		<module>metron-zookeeper</module>
 	</modules>
 	<dependencies>
 		<dependency>

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-stellar/stellar-common/src/main/scripts/stellar
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/scripts/stellar b/metron-stellar/stellar-common/src/main/scripts/stellar
index a93d09e..2f1cdbe 100644
--- a/metron-stellar/stellar-common/src/main/scripts/stellar
+++ b/metron-stellar/stellar-common/src/main/scripts/stellar
@@ -33,4 +33,4 @@ export METRON_VERSION=${project.version}
 export METRON_HOME=/usr/metron/$METRON_VERSION
 export STELLAR_LIB=$(find $METRON_HOME/lib/ -name metron-parsers*.jar)
 export MANAGEMENT_LIB=$(find $METRON_HOME/lib/ -name metron-management*.jar)
-java $JVMFLAGS -cp "$HBASE_CONFIGS:${CONTRIB:-$METRON_HOME/contrib}:$STELLAR_LIB:$MANAGEMENT_LIB" org.apache.metron.stellar.common.shell.StellarShell "$@"
+java $JVMFLAGS -cp "${CONTRIB:-$METRON_HOME/contrib}:$STELLAR_LIB:$MANAGEMENT_LIB:$HBASE_CONFIGS" org.apache.metron.stellar.common.shell.StellarShell "$@"


[2/3] metron git commit: METRON-1241: Enable the REST API to use a cache for the zookeeper config similar to the Bolts closes apache/incubator-metron#795

Posted by ce...@apache.org.
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
index 6c518b1..ecf8a1b 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
@@ -36,11 +36,19 @@ import org.slf4j.LoggerFactory;
 public class Configurations implements Serializable {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private List<FieldValidator> validations = new ArrayList<>();
-  protected ConcurrentMap<String, Object> configurations = new ConcurrentHashMap<>();
+  protected Map<String, Object> configurations = new ConcurrentHashMap<>();
+
+  public Map<String, Object> getConfigurations() {
+    return configurations;
+  }
 
   @SuppressWarnings("unchecked")
   public Map<String, Object> getGlobalConfig() {
-    return (Map<String, Object>) configurations.getOrDefault(ConfigurationType.GLOBAL.getTypeName(), new HashMap());
+    return getGlobalConfig(true);
+  }
+
+  public Map<String, Object> getGlobalConfig(boolean emptyMapOnNonExistent) {
+    return (Map<String, Object>) getConfigurations().getOrDefault(ConfigurationType.GLOBAL.getTypeName(), emptyMapOnNonExistent?new HashMap():null);
   }
 
   public List<FieldValidator> getFieldValidations() {
@@ -59,10 +67,15 @@ public class Configurations implements Serializable {
   }
 
   public void updateGlobalConfig(Map<String, Object> globalConfig) {
-    configurations.put(ConfigurationType.GLOBAL.getTypeName(), globalConfig);
-    validations = FieldValidator.readValidations(getGlobalConfig());
+    if(globalConfig != null) {
+      getConfigurations().put(ConfigurationType.GLOBAL.getTypeName(), globalConfig);
+      validations = FieldValidator.readValidations(getGlobalConfig());
+    }
   }
 
+  public void deleteGlobalConfig() {
+    getConfigurations().remove(ConfigurationType.GLOBAL.getTypeName());
+  }
 
   @Override
   public boolean equals(Object o) {
@@ -72,14 +85,14 @@ public class Configurations implements Serializable {
     Configurations that = (Configurations) o;
 
     if (validations != null ? !validations.equals(that.validations) : that.validations != null) return false;
-    return configurations != null ? configurations.equals(that.configurations) : that.configurations == null;
+    return getConfigurations() != null ? getConfigurations().equals(that.getConfigurations()) : that.getConfigurations() == null;
 
   }
 
   @Override
   public int hashCode() {
     int result = validations != null ? validations.hashCode() : 0;
-    result = 31 * result + (configurations != null ? configurations.hashCode() : 0);
+    result = 31 * result + (getConfigurations() != null ? getConfigurations().hashCode() : 0);
     return result;
   }
 
@@ -87,7 +100,7 @@ public class Configurations implements Serializable {
   public String toString() {
     return "Configurations{" +
             "validations=" + validations +
-            ", configurations=" + configurations +
+            ", configurations=" + getConfigurations()+
             '}';
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
index fcc8050..ae21152 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
@@ -28,11 +28,16 @@ import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.lang.invoke.MethodHandles;
 import java.nio.file.Files;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
 import org.apache.commons.io.FilenameUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
@@ -44,8 +49,11 @@ import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.StellarFunctions;
 import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ConfigurationsUtils {
+  protected static final Logger LOG =  LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   public static CuratorFramework getClient(String zookeeperUrl) {
     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
@@ -175,28 +183,56 @@ public class ConfigurationsUtils {
     configurations.updateGlobalConfig(readGlobalConfigBytesFromZookeeper(client));
   }
 
-  public static void updateParserConfigsFromZookeeper(ParserConfigurations configurations, CuratorFramework client) throws Exception {
-    updateConfigsFromZookeeper(configurations, client);
-    List<String> sensorTypes = client.getChildren().forPath(PARSER.getZookeeperRoot());
+
+  private interface Callback {
+    void apply(String sensorType) throws Exception;
+  }
+
+  private static void updateConfigsFromZookeeper( Configurations configurations
+                                                , ConfigurationType type
+                                                , Callback callback
+                                                , CuratorFramework client
+                                                )  throws Exception
+  {
+    Exception globalUpdateException = null;
+    try {
+      updateConfigsFromZookeeper(configurations, client);
+    }
+    catch(Exception e) {
+      LOG.warn("Unable to update global config when updating indexing configs: " + e.getMessage(), e);
+      globalUpdateException = e;
+    }
+    List<String> sensorTypes = client.getChildren().forPath(type.getZookeeperRoot());
     for(String sensorType: sensorTypes) {
-      configurations.updateSensorParserConfig(sensorType, readSensorParserConfigBytesFromZookeeper(sensorType, client));
+      callback.apply(sensorType);
     }
+    if(globalUpdateException != null) {
+      throw globalUpdateException;
+    }
+  }
+
+  public static void updateParserConfigsFromZookeeper(ParserConfigurations configurations, CuratorFramework client) throws Exception {
+    updateConfigsFromZookeeper( configurations
+                              , PARSER
+                              , sensorType -> configurations.updateSensorParserConfig(sensorType, readSensorParserConfigBytesFromZookeeper(sensorType, client))
+                              , client
+                              );
   }
 
   public static void updateSensorIndexingConfigsFromZookeeper(IndexingConfigurations configurations, CuratorFramework client) throws Exception {
-    updateConfigsFromZookeeper(configurations, client);
-    List<String> sensorTypes = client.getChildren().forPath(INDEXING.getZookeeperRoot());
-    for(String sensorType: sensorTypes) {
-      configurations.updateSensorIndexingConfig(sensorType, readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client));
-    }
+    updateConfigsFromZookeeper( configurations
+                              , INDEXING
+                              , sensorType -> configurations.updateSensorIndexingConfig(sensorType, readSensorIndexingConfigBytesFromZookeeper(sensorType, client))
+                              , client
+                              );
   }
 
   public static void updateEnrichmentConfigsFromZookeeper(EnrichmentConfigurations configurations, CuratorFramework client) throws Exception {
-    updateConfigsFromZookeeper(configurations, client);
-    List<String> sensorTypes = client.getChildren().forPath(ENRICHMENT.getZookeeperRoot());
-    for(String sensorType: sensorTypes) {
-      configurations.updateSensorEnrichmentConfig(sensorType, readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client));
-    }
+    updateConfigsFromZookeeper( configurations
+                              , ENRICHMENT
+                              , sensorType -> configurations.updateSensorEnrichmentConfig(sensorType, readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client))
+                              , client
+                              );
   }
 
   public static SensorEnrichmentConfig readSensorEnrichmentConfigFromZookeeper(String sensorType, CuratorFramework client) throws Exception {

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java
index a28b15c..dfd7a65 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java
@@ -23,27 +23,43 @@ import org.apache.metron.common.utils.JSONUtils;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
 
 public class EnrichmentConfigurations extends Configurations {
 
-    public SensorEnrichmentConfig getSensorEnrichmentConfig(String sensorType) {
-        return (SensorEnrichmentConfig) configurations.get(getKey(sensorType));
-    }
+  public SensorEnrichmentConfig getSensorEnrichmentConfig(String sensorType) {
+    return (SensorEnrichmentConfig) getConfigurations().get(getKey(sensorType));
+  }
 
-    public void updateSensorEnrichmentConfig(String sensorType, byte[] data) throws IOException {
-        updateSensorEnrichmentConfig(sensorType, new ByteArrayInputStream(data));
-    }
+  public void updateSensorEnrichmentConfig(String sensorType, byte[] data) throws IOException {
+    updateSensorEnrichmentConfig(sensorType, new ByteArrayInputStream(data));
+  }
 
-    public void updateSensorEnrichmentConfig(String sensorType, InputStream io) throws IOException {
-        SensorEnrichmentConfig sensorEnrichmentConfig = JSONUtils.INSTANCE.load(io, SensorEnrichmentConfig.class);
-        updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfig);
-    }
+  public void updateSensorEnrichmentConfig(String sensorType, InputStream io) throws IOException {
+    SensorEnrichmentConfig sensorEnrichmentConfig = JSONUtils.INSTANCE.load(io, SensorEnrichmentConfig.class);
+    updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfig);
+  }
 
-    public void updateSensorEnrichmentConfig(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig) {
-        configurations.put(getKey(sensorType), sensorEnrichmentConfig);
-    }
+  public void updateSensorEnrichmentConfig(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig) {
+    getConfigurations().put(getKey(sensorType), sensorEnrichmentConfig);
+  }
+
+  public void delete(String sensorType) {
+    getConfigurations().remove(getKey(sensorType));
+  }
 
-    private String getKey(String sensorType) {
-        return ConfigurationType.ENRICHMENT.getTypeName() + "." + sensorType;
+  public List<String> getTypes() {
+    List<String> ret = new ArrayList<>();
+    for(String keyedSensor : getConfigurations().keySet()) {
+      if(!keyedSensor.isEmpty() && keyedSensor.startsWith(ConfigurationType.ENRICHMENT.getTypeName())) {
+        ret.add(keyedSensor.substring(ConfigurationType.ENRICHMENT.getTypeName().length() + 1));
+      }
     }
+    return ret;
+  }
+
+  public static String getKey(String sensorType) {
+    return ConfigurationType.ENRICHMENT.getTypeName() + "." + sensorType;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
index 3bd7645..003b6df 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
@@ -33,8 +33,36 @@ public class IndexingConfigurations extends Configurations {
   public static final String INDEX_CONF = "index";
   public static final String OUTPUT_PATH_FUNCTION_CONF = "outputPathFunction";
 
+  public Map<String, Object> getSensorIndexingConfig(String sensorType, boolean emptyMapOnNonExistent) {
+    Map<String, Object> ret = (Map<String, Object>) getConfigurations().get(getKey(sensorType));
+    if(ret == null) {
+      return emptyMapOnNonExistent?new HashMap<>():null;
+    }
+    else {
+      return ret;
+    }
+  }
+
+  public Map<String, Object> getSensorIndexingConfig(String sensorType) {
+    return getSensorIndexingConfig(sensorType, true);
+  }
+
+  public List<String> getTypes() {
+    List<String> ret = new ArrayList<>();
+    for(String keyedSensor : getConfigurations().keySet()) {
+      if(!keyedSensor.isEmpty() && keyedSensor.startsWith(ConfigurationType.INDEXING.getTypeName())) {
+        ret.add(keyedSensor.substring(ConfigurationType.INDEXING.getTypeName().length() + 1));
+      }
+    }
+    return ret;
+  }
+
+  public void delete(String sensorType) {
+    getConfigurations().remove(getKey(sensorType));
+  }
+
   public Map<String, Object> getSensorIndexingConfig(String sensorType, String writerName) {
-    Map<String, Object> ret = (Map<String, Object>) configurations.get(getKey(sensorType));
+    Map<String, Object> ret = (Map<String, Object>) getConfigurations().get(getKey(sensorType));
     if(ret == null) {
       return new HashMap();
     }
@@ -55,15 +83,15 @@ public class IndexingConfigurations extends Configurations {
   }
 
   public void updateSensorIndexingConfig(String sensorType, Map<String, Object> sensorIndexingConfig) {
-    configurations.put(getKey(sensorType), sensorIndexingConfig);
+    getConfigurations().put(getKey(sensorType), sensorIndexingConfig);
   }
 
-  private String getKey(String sensorType) {
+  public static String getKey(String sensorType) {
     return ConfigurationType.INDEXING.getTypeName() + "." + sensorType;
   }
 
   public boolean isDefault(String sensorName, String writerName) {
-    Map<String, Object> ret = (Map<String, Object>) configurations.get(getKey(sensorName));
+    Map<String, Object> ret = (Map<String, Object>) getConfigurations().get(getKey(sensorName));
     if(ret == null) {
       return true;
     }
@@ -100,7 +128,7 @@ public class IndexingConfigurations extends Configurations {
     String keyPrefixString = getKey("");
     int prefixStringLength = keyPrefixString.length();
     List<Integer> configuredBatchTimeouts = new ArrayList<>();
-    for (String sensorKeyString : configurations.keySet()) {
+    for (String sensorKeyString : getConfigurations().keySet()) {
       if (sensorKeyString.startsWith(keyPrefixString)) {
         String configuredSensorName = sensorKeyString.substring(prefixStringLength);
         configuredBatchTimeouts.add(getBatchTimeout(configuredSensorName, writerName));

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
index 0ec0ed4..72af833 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
@@ -22,11 +22,13 @@ import org.apache.metron.common.utils.JSONUtils;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
 
 public class ParserConfigurations extends Configurations {
 
   public SensorParserConfig getSensorParserConfig(String sensorType) {
-    return (SensorParserConfig) configurations.get(getKey(sensorType));
+    return (SensorParserConfig) getConfigurations().get(getKey(sensorType));
   }
 
   public void updateSensorParserConfig(String sensorType, byte[] data) throws IOException {
@@ -40,10 +42,24 @@ public class ParserConfigurations extends Configurations {
 
   public void updateSensorParserConfig(String sensorType, SensorParserConfig sensorParserConfig) {
     sensorParserConfig.init();
-    configurations.put(getKey(sensorType), sensorParserConfig);
+    getConfigurations().put(getKey(sensorType), sensorParserConfig);
   }
 
-  private String getKey(String sensorType) {
+  public List<String> getTypes() {
+    List<String> ret = new ArrayList<>();
+    for(String keyedSensor : getConfigurations().keySet()) {
+      if(!keyedSensor.isEmpty() && keyedSensor.startsWith(ConfigurationType.PARSER.getTypeName())) {
+        ret.add(keyedSensor.substring(ConfigurationType.PARSER.getTypeName().length() + 1));
+      }
+    }
+    return ret;
+  }
+
+  public void delete(String sensorType) {
+    getConfigurations().remove(getKey(sensorType));
+  }
+
+  public static String getKey(String sensorType) {
     return ConfigurationType.PARSER.getTypeName() + "." + sensorType;
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java
index 9a42426..55642a9 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java
@@ -80,6 +80,14 @@ public class ProfileResult {
   }
 
   @Override
+  public String toString() {
+    return "ProfileResult{" +
+            "profileExpressions=" + profileExpressions +
+            ", triageExpressions=" + triageExpressions +
+            '}';
+  }
+
+  @Override
   public boolean equals(Object o) {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java
index 1bca716..82af223 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java
@@ -43,6 +43,13 @@ public class ProfileResultExpressions {
   }
 
   @Override
+  public String toString() {
+    return "ProfileResultExpressions{" +
+            "expression='" + expression + '\'' +
+            '}';
+  }
+
+  @Override
   public boolean equals(Object o) {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java
index da74f64..fbe1706 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java
@@ -64,4 +64,27 @@ public class ProfileTriageExpressions {
   public Map<String, String> getExpressions() {
     return expressions;
   }
+
+  @Override
+  public String toString() {
+    return "ProfileTriageExpressions{" +
+            "expressions=" + expressions +
+            '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    ProfileTriageExpressions that = (ProfileTriageExpressions) o;
+
+    return getExpressions() != null ? getExpressions().equals(that.getExpressions()) : that.getExpressions() == null;
+
+  }
+
+  @Override
+  public int hashCode() {
+    return getExpressions() != null ? getExpressions().hashCode() : 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
index cd651bd..e7c081a 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
@@ -40,6 +40,13 @@ public class ProfilerConfig implements Serializable {
   }
 
   @Override
+  public String toString() {
+    return "ProfilerConfig{" +
+            "profiles=" + profiles +
+            '}';
+  }
+
+  @Override
   public boolean equals(Object o) {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java
index e001d74..f50d770 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java
@@ -31,7 +31,7 @@ import java.io.InputStream;
 public class ProfilerConfigurations extends Configurations {
 
   public ProfilerConfig getProfilerConfig() {
-    return (ProfilerConfig) configurations.get(getKey());
+    return (ProfilerConfig) getConfigurations().get(getKey());
   }
 
   public void updateProfilerConfig(byte[] data) throws IOException {
@@ -44,10 +44,15 @@ public class ProfilerConfigurations extends Configurations {
   }
 
   public void updateProfilerConfig(ProfilerConfig config) {
-    configurations.put(getKey(), config);
+    getConfigurations().put(getKey(), config);
   }
 
-  private String getKey() {
+  public static String getKey() {
     return ConfigurationType.PROFILER.getTypeName();
   }
+
+  public void delete() {
+    configurations.remove(getKey());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ConfigurationsCache.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ConfigurationsCache.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ConfigurationsCache.java
new file mode 100644
index 0000000..462d754
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ConfigurationsCache.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.zookeeper;
+
+import org.apache.metron.common.configuration.Configurations;
+
+/**
+ * A cache for multiple Configurations object.  This cache is generally kept in
+ * sync with zookeeper changes.
+ */
+public interface ConfigurationsCache extends AutoCloseable{
+  /**
+   * Return the Configurations object given the specific type of Configurations object.
+   * @param configClass The Configurations class to return
+   * @param <T> The type of Configurations class.
+   * @return The Configurations object
+   */
+  <T extends Configurations> T get(Class<T> configClass);
+
+  /**
+   * Reset the cache and reload from zookeeper.
+   */
+  void reset();
+
+  /**
+   * Start the cache.
+   */
+  void start();
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ZKConfigurationsCache.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ZKConfigurationsCache.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ZKConfigurationsCache.java
new file mode 100644
index 0000000..42967b2
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ZKConfigurationsCache.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.zookeeper;
+
+import com.google.common.collect.Iterables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.*;
+import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
+import org.apache.metron.zookeeper.SimpleEventListener;
+import org.apache.metron.zookeeper.ZKCache;
+import org.apache.metron.common.zookeeper.configurations.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+public class ZKConfigurationsCache implements ConfigurationsCache {
+  private static final Logger LOG =  LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+
+
+  private interface UpdaterSupplier {
+    ConfigurationsUpdater<? extends Configurations> create(Map<Class<? extends Configurations>, Configurations> configs,Reloadable reloadCallback);
+  }
+
+  public enum ConfiguredTypes implements UpdaterSupplier {
+    ENRICHMENT((c,r) -> new EnrichmentUpdater( r, createSupplier(EnrichmentConfigurations.class, c)))
+    ,PARSER((c,r) -> new ParserUpdater( r, createSupplier(ParserConfigurations.class, c)))
+    ,INDEXING((c,r) -> new IndexingUpdater( r, createSupplier(IndexingConfigurations.class, c)))
+    ,PROFILER((c,r) -> new ProfilerUpdater( r, createSupplier(ProfilerConfigurations.class, c)))
+    ;
+    UpdaterSupplier updaterSupplier;
+    ConfiguredTypes(UpdaterSupplier supplier) {
+      this.updaterSupplier = supplier;
+    }
+
+    @Override
+    public ConfigurationsUpdater<? extends Configurations>
+    create(Map<Class<? extends Configurations>, Configurations> configs, Reloadable reloadCallback) {
+      return updaterSupplier.create(configs, reloadCallback);
+    }
+  }
+
+  private List<ConfigurationsUpdater< ? extends Configurations>> updaters;
+  private final Map<Class<? extends Configurations>, Configurations> configs;
+  private ZKCache cache;
+  private CuratorFramework client;
+  ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  public ZKConfigurationsCache(CuratorFramework client, Reloadable reloadable, ConfiguredTypes... types) {
+    updaters = new ArrayList<>();
+    configs = new HashMap<>();
+    this.client = client;
+    for(ConfiguredTypes t : types) {
+      ConfigurationsUpdater<? extends Configurations> updater = t.create(configs, reloadable);
+      configs.put(updater.getConfigurationClass(), updater.defaultConfigurations());
+      updaters.add(updater);
+    }
+  }
+
+  public ZKConfigurationsCache(CuratorFramework client, Reloadable reloadable) {
+    this(client, reloadable, ConfiguredTypes.values());
+  }
+
+  public ZKConfigurationsCache(CuratorFramework client, ConfiguredTypes... types) {
+    this(client, (name, type) -> {}, types);
+  }
+
+  public ZKConfigurationsCache(CuratorFramework client) {
+    this(client, ConfiguredTypes.values());
+  }
+
+  private static <T extends Configurations> Supplier<T> createSupplier(Class<T> clazz, Map<Class<? extends Configurations>, Configurations> configs) {
+    return () -> clazz.cast(configs.get(clazz));
+  }
+
+  @Override
+  public void start() {
+    initializeCache(client);
+  }
+
+  @Override
+  public void close() {
+    Lock writeLock = lock.writeLock();
+    try {
+      writeLock.lock();
+      if (cache != null) {
+        cache.close();
+      }
+    }
+    finally{
+      writeLock.unlock();
+    }
+  }
+
+  public void reset() {
+    Lock writeLock = lock.writeLock();
+    try {
+      writeLock.lock();
+      close();
+      initializeCache(client);
+    }
+    finally{
+      writeLock.unlock();
+    }
+  }
+
+  private void initializeCache(CuratorFramework client) {
+    Lock writeLock = lock.writeLock();
+    try {
+      writeLock.lock();
+      SimpleEventListener listener = new SimpleEventListener.Builder()
+              .with(Iterables.transform(updaters, u -> u::update)
+                      , TreeCacheEvent.Type.NODE_ADDED
+                      , TreeCacheEvent.Type.NODE_UPDATED
+              )
+              .with(Iterables.transform(updaters, u -> u::delete)
+                      , TreeCacheEvent.Type.NODE_REMOVED
+              )
+              .build();
+      cache = new ZKCache.Builder()
+              .withClient(client)
+              .withListener(listener)
+              .withRoot(Constants.ZOOKEEPER_TOPOLOGY_ROOT)
+              .build();
+
+      for (ConfigurationsUpdater<? extends Configurations> updater : updaters) {
+        updater.forceUpdate(client);
+      }
+      cache.start();
+    } catch (Exception e) {
+      LOG.error("Unable to initialize zookeeper cache: " + e.getMessage(), e);
+      throw new IllegalStateException("Unable to initialize zookeeper cache: " + e.getMessage(), e);
+    }
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+
+  public <T extends Configurations> T get(Class<T> configClass){
+    Lock readLock = lock.readLock();
+    try {
+      readLock.lock();
+      return configClass.cast(configs.get(configClass));
+    }
+    finally{
+      readLock.unlock();
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ConfigurationsUpdater.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ConfigurationsUpdater.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ConfigurationsUpdater.java
new file mode 100644
index 0000000..9ff2bee
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ConfigurationsUpdater.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.zookeeper.configurations;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.Configurations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.util.function.Supplier;
+
+/**
+ * Handles update for an underlying Configurations object.  This is the base abstract implementation.
+ * You will find system-specific implementations (e.g. IndexingUpdater, ParserUpdater, etc.) which
+ * correspond to the various components of our system which accept configuration from zookeeper.
+ *
+ * @param <T> the Type of Configuration
+ */
+public abstract class ConfigurationsUpdater<T extends Configurations> implements Serializable {
+  protected static final Logger LOG =  LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private Reloadable reloadable;
+  private Supplier<T> configSupplier;
+
+  /**
+   * Construct a ConfigurationsUpdater
+   * @param reloadable A callback which gets called whenever a reload happens
+   * @param configSupplier A Supplier which creates the Configurations object.
+   */
+  public ConfigurationsUpdater(Reloadable reloadable
+                              , Supplier<T> configSupplier
+  )
+  {
+    this.reloadable = reloadable;
+    this.configSupplier = configSupplier;
+  }
+
+  /**
+   * Update callback, this is called whenever a path is updated in zookeeper which we are monitoring.
+   *
+   * @param client The CuratorFramework
+   * @param path The zookeeper path
+   * @param data The change.
+   * @throws IOException When update is impossible.
+   */
+  public void update(CuratorFramework client, String path, byte[] data) throws IOException {
+    if (data.length != 0) {
+      String name = path.substring(path.lastIndexOf("/") + 1);
+      if (path.startsWith(getType().getZookeeperRoot())) {
+        LOG.debug("Updating the {} config: {} -> {}", getType().name(), name, new String(data == null?"".getBytes():data));
+        update(name, data);
+        reloadCallback(name, getType());
+      } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
+        LOG.debug("Updating the global config: {}", new String(data == null?"".getBytes():data));
+        getConfigurations().updateGlobalConfig(data);
+        reloadCallback(name, ConfigurationType.GLOBAL);
+      }
+    }
+  }
+
+  /**
+   * Delete callback, this is called whenever a path is deleted in zookeeper which we are monitoring.
+   *
+   * @param client The CuratorFramework
+   * @param path The zookeeper path
+   * @param data The change.
+   * @throws IOException When update is impossible.
+   */
+  public void delete(CuratorFramework client, String path, byte[] data) throws IOException {
+    String name = path.substring(path.lastIndexOf("/") + 1);
+    if (path.startsWith(getType().getZookeeperRoot())) {
+      LOG.debug("Deleting {} {} config from internal cache", getType().name(), name);
+      delete(name);
+      reloadCallback(name, getType());
+    } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
+      LOG.debug("Deleting global config from internal cache");
+      getConfigurations().deleteGlobalConfig();
+      reloadCallback(name, ConfigurationType.GLOBAL);
+    }
+  }
+
+  /**
+   * The ConfigurationsType that we're monitoring.
+   * @return The ConfigurationsType enum
+   */
+  public abstract ConfigurationType getType();
+
+  /**
+   * The simple update.  This differs from the full update elsewhere in that
+   * this is ONLY called on updates to path to the zookeeper nodes which correspond
+   * to your configurations type (rather than all configurations type).
+   * @param name The path
+   * @param data The data updated
+   * @throws IOException when update is unable to happen
+   */
+  public abstract void update(String name, byte[] data) throws IOException;
+
+  /**
+   * The simple delete.  This differs from the full delete elsewhere in that
+   * this is ONLY called on deletes to path to the zookeeper nodes which correspond
+   * to your configurations type (rather than all configurations type).
+   * @param name the path
+   * @throws IOException when update is unable to happen
+   */
+  public abstract void delete(String name);
+
+  /**
+   *
+   * @return The Class for the Configurations type.
+   */
+  public abstract Class<T> getConfigurationClass();
+
+  /**
+   * This pulls the configuration from zookeeper and updates the cache.  It represents the initial state.
+   * Force update is called when the zookeeper cache is initialized to ensure that the caches are updated.
+   * @param client
+   */
+  public abstract void forceUpdate(CuratorFramework client);
+
+  /**
+   * Create an empty Configurations object of type T.
+   * @return
+   */
+  public abstract T defaultConfigurations();
+
+  protected void reloadCallback(String name, ConfigurationType type) {
+    reloadable.reloadCallback(name, type);
+  }
+
+  protected T getConfigurations() {
+    return configSupplier.get();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/EnrichmentUpdater.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/EnrichmentUpdater.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/EnrichmentUpdater.java
new file mode 100644
index 0000000..29de525
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/EnrichmentUpdater.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.zookeeper.configurations;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+public class EnrichmentUpdater extends ConfigurationsUpdater<EnrichmentConfigurations>{
+
+  public EnrichmentUpdater(Reloadable reloadable, Supplier<EnrichmentConfigurations> configSupplier) {
+    super(reloadable, configSupplier);
+  }
+
+  @Override
+  public Class<EnrichmentConfigurations> getConfigurationClass() {
+    return EnrichmentConfigurations.class;
+  }
+
+  @Override
+  public void forceUpdate(CuratorFramework client) {
+    try {
+      ConfigurationsUtils.updateEnrichmentConfigsFromZookeeper(getConfigurations(), client);
+    }
+    catch (KeeperException.NoNodeException nne) {
+      LOG.warn("No current enrichment configs in zookeeper, but the cache should load lazily...");
+    }
+    catch (Exception e) {
+      LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...", e);
+    }
+  }
+
+  @Override
+  public EnrichmentConfigurations defaultConfigurations() {
+    return new EnrichmentConfigurations();
+  }
+
+  @Override
+  public ConfigurationType getType() {
+    return ConfigurationType.ENRICHMENT;
+  }
+
+  @Override
+  public void delete(String name) {
+    getConfigurations().delete(name);
+  }
+
+  @Override
+  public void update(String name, byte[] data) throws IOException {
+    getConfigurations().updateSensorEnrichmentConfig(name, data);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/IndexingUpdater.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/IndexingUpdater.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/IndexingUpdater.java
new file mode 100644
index 0000000..8017930
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/IndexingUpdater.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.zookeeper.configurations;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+public class IndexingUpdater extends ConfigurationsUpdater<IndexingConfigurations> {
+  public IndexingUpdater(Reloadable reloadable, Supplier<IndexingConfigurations> configSupplier) {
+    super(reloadable, configSupplier);
+  }
+
+  @Override
+  public Class<IndexingConfigurations> getConfigurationClass() {
+    return IndexingConfigurations.class;
+  }
+
+  @Override
+  public void forceUpdate(CuratorFramework client) {
+    try {
+      ConfigurationsUtils.updateSensorIndexingConfigsFromZookeeper(getConfigurations(), client);
+    }
+    catch (KeeperException.NoNodeException nne) {
+      LOG.warn("No current indexing configs in zookeeper, but the cache should load lazily...");
+    }
+    catch (Exception e) {
+      LOG.warn("Unable to load indexing configs from zookeeper, but the cache should load lazily...", e);
+    }
+  }
+
+  @Override
+  public IndexingConfigurations defaultConfigurations() {
+    return new IndexingConfigurations();
+  }
+
+  @Override
+  public ConfigurationType getType() {
+    return ConfigurationType.INDEXING;
+  }
+
+  @Override
+  public void delete(String name) {
+    getConfigurations().delete(name);
+  }
+
+  @Override
+  public void update(String name, byte[] data) throws IOException {
+    getConfigurations().updateSensorIndexingConfig(name, data);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ParserUpdater.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ParserUpdater.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ParserUpdater.java
new file mode 100644
index 0000000..c91844e
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ParserUpdater.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.zookeeper.configurations;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+public class ParserUpdater extends ConfigurationsUpdater<ParserConfigurations> {
+  public ParserUpdater(Reloadable reloadable, Supplier<ParserConfigurations> configSupplier) {
+    super(reloadable, configSupplier);
+  }
+
+  @Override
+  public Class<ParserConfigurations> getConfigurationClass() {
+    return ParserConfigurations.class;
+  }
+
+  @Override
+  public void forceUpdate(CuratorFramework client) {
+    try {
+      ConfigurationsUtils.updateParserConfigsFromZookeeper(getConfigurations(), client);
+    }
+    catch (KeeperException.NoNodeException nne) {
+      LOG.warn("No current parser configs in zookeeper, but the cache should load lazily...");
+    }
+    catch (Exception e) {
+      LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...", e);
+    }
+  }
+
+  @Override
+  public ParserConfigurations defaultConfigurations() {
+    return new ParserConfigurations();
+  }
+
+  @Override
+  public ConfigurationType getType() {
+    return ConfigurationType.PARSER;
+  }
+
+  @Override
+  public void delete(String name) {
+    getConfigurations().delete(name);
+  }
+
+  @Override
+  public void update(String name, byte[] data) throws IOException {
+    getConfigurations().updateSensorParserConfig(name, data);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java
new file mode 100644
index 0000000..68c5203
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.zookeeper.configurations;
+
+import static org.apache.metron.common.configuration.ConfigurationType.PROFILER;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+public class ProfilerUpdater extends ConfigurationsUpdater<ProfilerConfigurations> {
+  public ProfilerUpdater(Reloadable reloadable, Supplier<ProfilerConfigurations> configSupplier) {
+    super(reloadable, configSupplier);
+  }
+
+  @Override
+  public Class<ProfilerConfigurations> getConfigurationClass() {
+    return ProfilerConfigurations.class;
+  }
+
+  private ProfilerConfig readFromZookeeper(CuratorFramework client) throws Exception {
+    byte[] raw = client.getData().forPath(PROFILER.getZookeeperRoot());
+    return JSONUtils.INSTANCE.load(new ByteArrayInputStream(raw), ProfilerConfig.class);
+  }
+
+  @Override
+  public void forceUpdate(CuratorFramework client) {
+    try {
+      ConfigurationsUtils.updateConfigsFromZookeeper(getConfigurations(), client);
+    }
+    catch (KeeperException.NoNodeException nne) {
+      LOG.warn("No current global configs in zookeeper, but the cache should load lazily...");
+    }
+    catch(Exception e) {
+      LOG.warn("Unable to load global configs from zookeeper, but the cache should load lazily...", e);
+    }
+    try {
+      ProfilerConfig config = readFromZookeeper(client);
+      if(config != null) {
+        getConfigurations().updateProfilerConfig(config);
+      }
+
+    }
+    catch (KeeperException.NoNodeException nne) {
+      LOG.warn("No current profiler configs in zookeeper, but the cache should load lazily...");
+    }
+    catch (Exception e) {
+      LOG.warn("Unable to load profiler configs from zookeeper, but the cache should load lazily...", e);
+    }
+  }
+
+  @Override
+  public ProfilerConfigurations defaultConfigurations() {
+    return new ProfilerConfigurations();
+  }
+
+  @Override
+  public ConfigurationType getType() {
+    return ConfigurationType.PROFILER;
+  }
+
+  @Override
+  public void delete(String name) {
+    getConfigurations().delete();
+  }
+
+  @Override
+  public void update(String name, byte[] data) throws IOException {
+    getConfigurations().updateProfilerConfig(data);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/Reloadable.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/Reloadable.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/Reloadable.java
new file mode 100644
index 0000000..308c74e
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/Reloadable.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.zookeeper.configurations;
+
+import org.apache.metron.common.configuration.ConfigurationType;
+
+import java.io.Serializable;
+
+public interface Reloadable extends Serializable {
+  void reloadCallback(String name, ConfigurationType type);
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/scripts/stellar
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/scripts/stellar b/metron-platform/metron-common/src/main/scripts/stellar
index 56c2d4d..3f53c49 100644
--- a/metron-platform/metron-common/src/main/scripts/stellar
+++ b/metron-platform/metron-common/src/main/scripts/stellar
@@ -33,4 +33,4 @@ export METRON_VERSION=${project.version}
 export METRON_HOME=/usr/metron/$METRON_VERSION
 export STELLAR_LIB=$(find $METRON_HOME/lib/ -name metron-parsers*.jar)
 export MANAGEMENT_LIB=$(find $METRON_HOME/lib/ -name metron-management*.jar)
-java $JVMFLAGS -cp "$HBASE_CONFIGS:${CONTRIB:-$METRON_HOME/contrib/*}:$STELLAR_LIB:$MANAGEMENT_LIB" org.apache.metron.stellar.common.shell.StellarShell "$@"
+java $JVMFLAGS -cp "${CONTRIB:-$METRON_HOME/contrib/*}:$STELLAR_LIB:$MANAGEMENT_LIB:$HBASE_CONFIGS" org.apache.metron.stellar.common.shell.StellarShell "$@"

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
new file mode 100644
index 0000000..64bf986
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.zookeeper;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.commons.io.IOUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.TestConstants;
+import org.apache.metron.common.configuration.*;
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.integration.components.ZKServerComponent;
+import org.junit.*;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.Map;
+
+import static org.apache.metron.integration.utils.TestUtils.assertEventually;
+
+public class ZKConfigurationsCacheIntegrationTest {
+  private CuratorFramework client;
+
+  /**
+   {
+  "profiles": [
+    {
+      "profile": "example2",
+      "foreach": "ip_src_addr",
+      "onlyif": "protocol == 'HTTP'",
+      "init": {
+        "total_bytes": 0.0
+      },
+      "update": {
+        "total_bytes": "total_bytes + bytes_in"
+      },
+      "result": "total_bytes",
+      "expires": 30
+    }
+  ]
+}
+   */
+  @Multiline
+  public static String profilerConfig;
+  /**
+   {
+    "hdfs" : {
+      "index": "yaf",
+      "batchSize": 1,
+      "enabled" : true
+    },
+    "elasticsearch" : {
+    "index": "yaf",
+    "batchSize": 25,
+    "batchTimeout": 7,
+    "enabled" : false
+    },
+    "solr" : {
+    "index": "yaf",
+    "batchSize": 5,
+    "enabled" : false
+    }
+  }
+   */
+  @Multiline
+  public static String testIndexingConfig;
+
+  /**
+   {
+    "enrichment": {
+      "fieldMap": { }
+    ,"fieldToTypeMap": { }
+   },
+    "threatIntel": {
+      "fieldMap": { },
+      "fieldToTypeMap": { },
+      "triageConfig" : { }
+     }
+   }
+   */
+  @Multiline
+  public static String testEnrichmentConfig;
+
+  /**
+  {
+  "parserClassName":"org.apache.metron.parsers.bro.BasicBroParser",
+  "sensorTopic":"brop",
+  "parserConfig": {}
+  }
+   */
+  @Multiline
+  public static String testParserConfig;
+
+  /**
+   *{
+  "es.clustername": "metron",
+  "es.ip": "localhost",
+  "es.port": 9300,
+  "es.date.format": "yyyy.MM.dd.HH"
+   }
+   */
+  @Multiline
+  public static String globalConfig;
+
+  public static File profilerDir = new File("../../metron-analytics/metron-profiler/src/test/config/zookeeper");
+  public ConfigurationsCache cache;
+
+  public ZKServerComponent zkComponent;
+
+  @Before
+  public void setup() throws Exception {
+    zkComponent = new ZKServerComponent();
+    zkComponent.start();
+    client = ConfigurationsUtils.getClient(zkComponent.getConnectionString());
+    client.start();
+    cache = new ZKConfigurationsCache(client);
+    cache.start();
+    {
+      //parser
+      byte[] config = IOUtils.toByteArray(new FileInputStream(new File(TestConstants.PARSER_CONFIGS_PATH + "/parsers/bro.json")));
+      ConfigurationsUtils.writeSensorParserConfigToZookeeper("bro", config, client);
+    }
+    {
+      //indexing
+      byte[] config = IOUtils.toByteArray(new FileInputStream(new File(TestConstants.SAMPLE_CONFIG_PATH + "/indexing/test.json")));
+      ConfigurationsUtils.writeSensorIndexingConfigToZookeeper("test", config, client);
+    }
+    {
+      //enrichments
+      byte[] config = IOUtils.toByteArray(new FileInputStream(new File(TestConstants.SAMPLE_CONFIG_PATH + "/enrichments/test.json")));
+      ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper("test", config, client);
+    }
+    {
+      //enrichments
+      byte[] config = IOUtils.toByteArray(new FileInputStream(new File(TestConstants.SAMPLE_CONFIG_PATH + "/enrichments/test.json")));
+      ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper("test", config, client);
+    }
+    {
+      //profiler
+      byte[] config = IOUtils.toByteArray(new FileInputStream(new File(profilerDir, "/readme-example-1/profiler.json")));
+      ConfigurationsUtils.writeProfilerConfigToZookeeper( config, client);
+    }
+    {
+      //global config
+      byte[] config = IOUtils.toByteArray(new FileInputStream(new File(TestConstants.SAMPLE_CONFIG_PATH + "/global.json")));
+      ConfigurationsUtils.writeGlobalConfigToZookeeper(config, client);
+    }
+  }
+
+  @After
+  public void teardown() throws Exception {
+    if(cache != null) {
+      cache.close();
+    }
+    if(client != null) {
+      client.close();
+    }
+    if(zkComponent != null) {
+      zkComponent.stop();
+    }
+  }
+
+
+  @Test
+  public void validateDelete() throws Exception {
+    client.delete().forPath(ConfigurationType.GLOBAL.getZookeeperRoot());
+    client.delete().forPath(ConfigurationType.INDEXING.getZookeeperRoot() + "/test");
+    client.delete().forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/test");
+    client.delete().forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/bro");
+    client.delete().forPath(ConfigurationType.PROFILER.getZookeeperRoot() );
+    //global
+    {
+      IndexingConfigurations config = cache.get( IndexingConfigurations.class);
+      assertEventually(() -> Assert.assertNull(config.getGlobalConfig(false)));
+    }
+    //indexing
+    {
+      IndexingConfigurations config = cache.get( IndexingConfigurations.class);
+      assertEventually(() -> Assert.assertNull(config.getSensorIndexingConfig("test", false)));
+      assertEventually(() -> Assert.assertNull(config.getGlobalConfig(false)));
+    }
+    //enrichment
+    {
+      EnrichmentConfigurations config = cache.get( EnrichmentConfigurations.class);
+      assertEventually(() -> Assert.assertNull(config.getSensorEnrichmentConfig("test")));
+      assertEventually(()-> Assert.assertNull(config.getGlobalConfig(false)));
+    }
+    //parser
+    {
+      ParserConfigurations config = cache.get( ParserConfigurations.class);
+      assertEventually(() -> Assert.assertNull(config.getSensorParserConfig("bro")));
+      assertEventually(() -> Assert.assertNull(config.getGlobalConfig(false)));
+    }
+    //profiler
+    {
+      ProfilerConfigurations config = cache.get( ProfilerConfigurations.class);
+      assertEventually(() -> Assert.assertNull(config.getProfilerConfig()));
+      assertEventually(() -> Assert.assertNull(config.getGlobalConfig(false)));
+    }
+  }
+
+  @Test
+  public void validateUpdate() throws Exception {
+    ConfigurationsUtils.writeSensorIndexingConfigToZookeeper("test", testIndexingConfig.getBytes(), client);
+    ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig.getBytes(), client);
+    ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper("test", testEnrichmentConfig.getBytes(), client);
+    ConfigurationsUtils.writeSensorParserConfigToZookeeper("bro", testParserConfig.getBytes(), client);
+    ConfigurationsUtils.writeProfilerConfigToZookeeper( profilerConfig.getBytes(), client);
+    //indexing
+    {
+      Map<String, Object> expectedConfig = JSONUtils.INSTANCE.load(testIndexingConfig, new TypeReference<Map<String, Object>>() {});
+      IndexingConfigurations config = cache.get( IndexingConfigurations.class);
+      assertEventually(() -> Assert.assertEquals(expectedConfig, config.getSensorIndexingConfig("test")));
+    }
+    //enrichment
+    {
+      SensorEnrichmentConfig expectedConfig = JSONUtils.INSTANCE.load(testEnrichmentConfig, SensorEnrichmentConfig.class);
+      Map<String, Object> expectedGlobalConfig = JSONUtils.INSTANCE.load(globalConfig, new TypeReference<Map<String, Object>>() {});
+      EnrichmentConfigurations config = cache.get( EnrichmentConfigurations.class);
+      assertEventually(() -> Assert.assertEquals(expectedConfig, config.getSensorEnrichmentConfig("test")));
+      assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, config.getGlobalConfig()));
+    }
+    //parsers
+    {
+      SensorParserConfig expectedConfig = JSONUtils.INSTANCE.load(testParserConfig, SensorParserConfig.class);
+      ParserConfigurations config = cache.get( ParserConfigurations.class);
+      assertEventually(() -> Assert.assertEquals(expectedConfig, config.getSensorParserConfig("bro")));
+    }
+    //profiler
+    {
+      ProfilerConfig expectedConfig = JSONUtils.INSTANCE.load(profilerConfig, ProfilerConfig.class);
+      ProfilerConfigurations config = cache.get( ProfilerConfigurations.class);
+      assertEventually(() -> Assert.assertEquals(expectedConfig, config.getProfilerConfig()));
+    }
+  }
+
+  @Test
+  public void validateBaseWrite() throws Exception {
+    File globalConfigFile = new File(TestConstants.SAMPLE_CONFIG_PATH + "/global.json");
+    Map<String, Object> expectedGlobalConfig = JSONUtils.INSTANCE.load(globalConfigFile, new TypeReference<Map<String, Object>>() { });
+    //indexing
+    {
+      File inFile = new File(TestConstants.SAMPLE_CONFIG_PATH + "/indexing/test.json");
+      Map<String, Object> expectedConfig = JSONUtils.INSTANCE.load(inFile, new TypeReference<Map<String, Object>>() {
+      });
+      IndexingConfigurations config = cache.get( IndexingConfigurations.class);
+      assertEventually(() -> Assert.assertEquals(expectedConfig, config.getSensorIndexingConfig("test")));
+      assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, config.getGlobalConfig()));
+      assertEventually(() -> Assert.assertNull(config.getSensorIndexingConfig("notthere", false)));
+    }
+    //enrichment
+    {
+      File inFile = new File(TestConstants.SAMPLE_CONFIG_PATH + "/enrichments/test.json");
+      SensorEnrichmentConfig expectedConfig = JSONUtils.INSTANCE.load(inFile, SensorEnrichmentConfig.class);
+      EnrichmentConfigurations config = cache.get( EnrichmentConfigurations.class);
+      assertEventually(() -> Assert.assertEquals(expectedConfig, config.getSensorEnrichmentConfig("test")));
+      assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, config.getGlobalConfig()));
+      assertEventually(() -> Assert.assertNull(config.getSensorEnrichmentConfig("notthere")));
+    }
+    //parsers
+    {
+      File inFile = new File(TestConstants.PARSER_CONFIGS_PATH + "/parsers/bro.json");
+      SensorParserConfig expectedConfig = JSONUtils.INSTANCE.load(inFile, SensorParserConfig.class);
+      ParserConfigurations config = cache.get( ParserConfigurations.class);
+      assertEventually(() -> Assert.assertEquals(expectedConfig, config.getSensorParserConfig("bro")));
+      assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, config.getGlobalConfig()));
+      assertEventually(() -> Assert.assertNull(config.getSensorParserConfig("notthere")));
+    }
+    //profiler
+    {
+      File inFile = new File(profilerDir, "/readme-example-1/profiler.json");
+      ProfilerConfig expectedConfig = JSONUtils.INSTANCE.load(inFile, ProfilerConfig.class);
+      ProfilerConfigurations config = cache.get( ProfilerConfigurations.class);
+      assertEventually(() -> Assert.assertEquals(expectedConfig, config.getProfilerConfig()));
+      assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, config.getGlobalConfig()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
index 5f6f22f..308638e 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
@@ -123,7 +123,7 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
             .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
             .withMessageGetterField("message");
     bulkMessageWriterBolt.setCuratorFramework(client);
-    bulkMessageWriterBolt.setTreeCache(cache);
+    bulkMessageWriterBolt.setZKCache(cache);
     bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType,
             new FileInputStream(sampleSensorIndexingConfigPath));
     bulkMessageWriterBolt.declareOutputFields(declarer);
@@ -182,7 +182,7 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
             .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
             .withMessageGetterField("message").withBatchTimeoutDivisor(3);
     bulkMessageWriterBolt.setCuratorFramework(client);
-    bulkMessageWriterBolt.setTreeCache(cache);
+    bulkMessageWriterBolt.setZKCache(cache);
     bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType,
             new FileInputStream(sampleSensorIndexingConfigPath));
     bulkMessageWriterBolt.declareOutputFields(declarer);
@@ -226,7 +226,7 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
             .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
             .withMessageGetterField("message");
     bulkMessageWriterBolt.setCuratorFramework(client);
-    bulkMessageWriterBolt.setTreeCache(cache);
+    bulkMessageWriterBolt.setZKCache(cache);
     bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType
             , new FileInputStream(sampleSensorIndexingConfigPath));
     bulkMessageWriterBolt.declareOutputFields(declarer);

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
index 77dd4cf..52135e3 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
@@ -73,7 +73,7 @@ public class EnrichmentJoinBoltTest extends BaseEnrichmentBoltTest {
   public void test() throws IOException {
     EnrichmentJoinBolt enrichmentJoinBolt = new EnrichmentJoinBolt("zookeeperUrl");
     enrichmentJoinBolt.setCuratorFramework(client);
-    enrichmentJoinBolt.setTreeCache(cache);
+    enrichmentJoinBolt.setZKCache(cache);
     enrichmentJoinBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath));
     enrichmentJoinBolt.withMaxCacheSize(100);
     enrichmentJoinBolt.withMaxTimeRetain(10000);

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java
index c79eb10..cbe7ed6 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java
@@ -58,7 +58,7 @@ public class EnrichmentSplitterBoltTest extends BaseEnrichmentBoltTest {
 
     EnrichmentSplitterBolt enrichmentSplitterBolt = new EnrichmentSplitterBolt("zookeeperUrl").withEnrichments(enrichments);
     enrichmentSplitterBolt.setCuratorFramework(client);
-    enrichmentSplitterBolt.setTreeCache(cache);
+    enrichmentSplitterBolt.setZKCache(cache);
     enrichmentSplitterBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath));
     enrichmentSplitterBolt.prepare(new HashMap<>(), topologyContext, outputCollector);
 

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
index 90322fe..d7b54dd 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
@@ -161,7 +161,7 @@ public class GenericEnrichmentBoltTest extends BaseEnrichmentBoltTest {
       }
     };
     genericEnrichmentBolt.setCuratorFramework(client);
-    genericEnrichmentBolt.setTreeCache(cache);
+    genericEnrichmentBolt.setZKCache(cache);
     genericEnrichmentBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath));
 
     HashMap<String, Object> globalConfig = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
index e03dc71..1bb1083 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
@@ -99,7 +99,7 @@ public class JoinBoltTest extends BaseEnrichmentBoltTest {
     }
     joinBolt = new StandAloneJoinBolt("zookeeperUrl");
     joinBolt.setCuratorFramework(client);
-    joinBolt.setTreeCache(cache);
+    joinBolt.setZKCache(cache);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/SplitBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/SplitBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/SplitBoltTest.java
index 41d34de..57dae13 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/SplitBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/SplitBoltTest.java
@@ -88,7 +88,7 @@ public class SplitBoltTest extends BaseEnrichmentBoltTest {
   public void test() {
     StandAloneSplitBolt splitBolt = spy(new StandAloneSplitBolt("zookeeperUrl"));
     splitBolt.setCuratorFramework(client);
-    splitBolt.setTreeCache(cache);
+    splitBolt.setZKCache(cache);
     doCallRealMethod().when(splitBolt).reloadCallback(anyString(), any(ConfigurationType.class));
     splitBolt.prepare(new HashMap(), topologyContext, outputCollector);
     splitBolt.declareOutputFields(declarer);

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
index 04617ff..62b2570 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
@@ -157,7 +157,7 @@ public class ThreatIntelJoinBoltTest extends BaseEnrichmentBoltTest {
 
     ThreatIntelJoinBolt threatIntelJoinBolt = new ThreatIntelJoinBolt("zookeeperUrl");
     threatIntelJoinBolt.setCuratorFramework(client);
-    threatIntelJoinBolt.setTreeCache(cache);
+    threatIntelJoinBolt.setZKCache(cache);
 
     SensorEnrichmentConfig enrichmentConfig = JSONUtils.INSTANCE.load(
             new FileInputStream(sampleSensorEnrichmentConfigPath), SensorEnrichmentConfig.class);

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java
index 4feba2e..f7869e5 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java
@@ -33,7 +33,7 @@ public class ThreatIntelSplitterBoltTest extends BaseEnrichmentBoltTest {
     String threatIntelType = "hbaseThreatIntel";
     ThreatIntelSplitterBolt threatIntelSplitterBolt = new ThreatIntelSplitterBolt("zookeeperUrl");
     threatIntelSplitterBolt.setCuratorFramework(client);
-    threatIntelSplitterBolt.setTreeCache(cache);
+    threatIntelSplitterBolt.setZKCache(cache);
     threatIntelSplitterBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath));
     threatIntelSplitterBolt.prepare(new HashMap<>(), topologyContext, outputCollector);
     Map<String, Object> fieldMap = threatIntelSplitterBolt.getFieldMap(sensorType);

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java
index e67d3c9..9577a43 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java
@@ -31,6 +31,28 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class TestUtils {
+  public static long MAX_ASSERT_WAIT_MS = 30000L;
+  public interface Assertion {
+    void apply() throws Exception;
+  }
+  public static void assertEventually(Assertion assertion) throws Exception {
+    assertEventually(assertion, MAX_ASSERT_WAIT_MS);
+  }
+  private static void assertEventually(Assertion assertion
+                             , long msToWait
+                             ) throws Exception {
+    long delta = msToWait/10;
+    for(int i = 0;i < 10;++i) {
+      try{
+        assertion.apply();
+        return;
+      }
+      catch(AssertionError t) {
+      }
+      Thread.sleep(delta);
+    }
+    assertion.apply();
+  }
 
   public static List<byte[]> readSampleData(String samplePath) throws IOException {
     BufferedReader br = new BufferedReader(new FileReader(samplePath));


[3/3] metron git commit: METRON-1241: Enable the REST API to use a cache for the zookeeper config similar to the Bolts closes apache/incubator-metron#795

Posted by ce...@apache.org.
METRON-1241: Enable the REST API to use a cache for the zookeeper config similar to the Bolts closes apache/incubator-metron#795


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/cc111ec9
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/cc111ec9
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/cc111ec9

Branch: refs/heads/master
Commit: cc111ec984a78db43c4df222851f59280ff5eff9
Parents: aee0184
Author: cstella <ce...@gmail.com>
Authored: Fri Oct 20 17:20:06 2017 -0400
Committer: cstella <ce...@gmail.com>
Committed: Fri Oct 20 17:20:06 2017 -0400

----------------------------------------------------------------------
 .../profiler/bolt/ProfileBuilderBoltTest.java   |   2 +-
 .../profiler/bolt/ProfileSplitterBoltTest.java  |   2 +-
 .../metron/rest/config/ZookeeperConfig.java     |  11 +
 .../service/impl/GlobalConfigServiceImpl.java   |  33 ++-
 .../impl/SensorEnrichmentConfigServiceImpl.java |  43 ++-
 .../impl/SensorIndexingConfigServiceImpl.java   |  40 +--
 .../impl/SensorParserConfigServiceImpl.java     |  38 ++-
 .../apache/metron/rest/config/TestConfig.java   |  11 +
 .../GlobalConfigControllerIntegrationTest.java  |   6 +-
 ...richmentConfigControllerIntegrationTest.java |   6 +-
 ...IndexingConfigControllerIntegrationTest.java |   6 +-
 ...orParserConfigControllerIntegrationTest.java |  19 +-
 .../StormControllerIntegrationTest.java         |  12 +
 .../impl/GlobalConfigServiceImplTest.java       |  30 +-
 .../SensorEnrichmentConfigServiceImplTest.java  |  99 +++----
 .../SensorIndexingConfigServiceImplTest.java    | 100 +++----
 .../impl/SensorParserConfigServiceImplTest.java | 105 +++----
 metron-platform/metron-common/pom.xml           |  10 +-
 .../metron/common/bolt/ConfiguredBolt.java      |  54 ++--
 .../common/bolt/ConfiguredEnrichmentBolt.java   |  30 +-
 .../common/bolt/ConfiguredIndexingBolt.java     |  28 +-
 .../common/bolt/ConfiguredParserBolt.java       |  30 +-
 .../common/bolt/ConfiguredProfilerBolt.java     |  47 +--
 .../common/configuration/Configurations.java    |  27 +-
 .../configuration/ConfigurationsUtils.java      |  64 +++-
 .../configuration/EnrichmentConfigurations.java |  46 ++-
 .../configuration/IndexingConfigurations.java   |  38 ++-
 .../configuration/ParserConfigurations.java     |  22 +-
 .../configuration/profiler/ProfileResult.java   |   8 +
 .../profiler/ProfileResultExpressions.java      |   7 +
 .../profiler/ProfileTriageExpressions.java      |  23 ++
 .../configuration/profiler/ProfilerConfig.java  |   7 +
 .../profiler/ProfilerConfigurations.java        |  11 +-
 .../common/zookeeper/ConfigurationsCache.java   |  44 +++
 .../common/zookeeper/ZKConfigurationsCache.java | 179 +++++++++++
 .../configurations/ConfigurationsUpdater.java   | 152 ++++++++++
 .../configurations/EnrichmentUpdater.java       |  78 +++++
 .../configurations/IndexingUpdater.java         |  74 +++++
 .../zookeeper/configurations/ParserUpdater.java |  74 +++++
 .../configurations/ProfilerUpdater.java         |  96 ++++++
 .../zookeeper/configurations/Reloadable.java    |  27 ++
 .../metron-common/src/main/scripts/stellar      |   2 +-
 .../ZKConfigurationsCacheIntegrationTest.java   | 296 +++++++++++++++++++
 .../bolt/BulkMessageWriterBoltTest.java         |   6 +-
 .../enrichment/bolt/EnrichmentJoinBoltTest.java |   2 +-
 .../bolt/EnrichmentSplitterBoltTest.java        |   2 +-
 .../bolt/GenericEnrichmentBoltTest.java         |   2 +-
 .../metron/enrichment/bolt/JoinBoltTest.java    |   2 +-
 .../metron/enrichment/bolt/SplitBoltTest.java   |   2 +-
 .../bolt/ThreatIntelJoinBoltTest.java           |   2 +-
 .../bolt/ThreatIntelSplitterBoltTest.java       |   2 +-
 .../metron/integration/utils/TestUtils.java     |  22 ++
 .../metron/parsers/bolt/ParserBoltTest.java     | 176 +++++------
 metron-platform/metron-test-utilities/pom.xml   |  11 +-
 .../apache/metron/test/bolt/BaseBoltTest.java   |   3 +-
 metron-platform/metron-zookeeper/pom.xml        |  48 +++
 .../metron/zookeeper/SimpleEventListener.java   | 123 ++++++++
 .../org/apache/metron/zookeeper/ZKCache.java    | 196 ++++++++++++
 metron-platform/pom.xml                         |   1 +
 .../stellar-common/src/main/scripts/stellar     |   2 +-
 60 files changed, 2027 insertions(+), 612 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
index 62be86e..21d61ab 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
@@ -147,7 +147,7 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
 
     ProfileBuilderBolt bolt = new ProfileBuilderBolt("zookeeperURL");
     bolt.setCuratorFramework(client);
-    bolt.setTreeCache(cache);
+    bolt.setZKCache(cache);
     bolt.withPeriodDuration(10, TimeUnit.MINUTES);
     bolt.withProfileTimeToLive(30, TimeUnit.MINUTES);
 

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
index d51401f..beab8d5 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
@@ -140,7 +140,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
 
     ProfileSplitterBolt bolt = new ProfileSplitterBolt("zookeeperURL");
     bolt.setCuratorFramework(client);
-    bolt.setTreeCache(cache);
+    bolt.setZKCache(cache);
     bolt.getConfigurations().updateProfilerConfig(profilerConfig.getBytes("UTF-8"));
     bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
 

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java
index 1f72afb..6f4656e 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java
@@ -24,6 +24,8 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
+import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
 import org.apache.metron.rest.MetronRestConstants;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -37,6 +39,15 @@ import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
 public class ZookeeperConfig {
 
   @Bean(initMethod = "start", destroyMethod="close")
+  public ConfigurationsCache cache(CuratorFramework client) {
+    return new ZKConfigurationsCache( client
+                                    , ZKConfigurationsCache.ConfiguredTypes.ENRICHMENT
+                                    , ZKConfigurationsCache.ConfiguredTypes.PARSER
+                                    , ZKConfigurationsCache.ConfiguredTypes.INDEXING
+                                    );
+  }
+
+  @Bean(initMethod = "start", destroyMethod="close")
   public CuratorFramework client(Environment environment) {
     int sleepTime = Integer.parseInt(environment.getProperty(MetronRestConstants.CURATOR_SLEEP_TIME));
     int maxRetries = Integer.parseInt(environment.getProperty(MetronRestConstants.CURATOR_MAX_RETRIES));

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
index e80380b..ed67994 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
@@ -17,27 +17,34 @@
  */
 package org.apache.metron.rest.service.impl;
 
-import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
-import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.service.GlobalConfigService;
+import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
 import org.apache.zookeeper.KeeperException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.io.ByteArrayInputStream;
 import java.util.Map;
 
 @Service
 public class GlobalConfigServiceImpl implements GlobalConfigService {
     private CuratorFramework client;
 
+    private ConfigurationsCache cache;
+
     @Autowired
-    public GlobalConfigServiceImpl(CuratorFramework client) {
+    public GlobalConfigServiceImpl(CuratorFramework client, ConfigurationsCache cache) {
       this.client = client;
+      this.cache = cache;
+    }
+
+    public void setCache(ConfigurationsCache cache) {
+      this.cache = cache;
     }
 
     @Override
@@ -52,16 +59,14 @@ public class GlobalConfigServiceImpl implements GlobalConfigService {
 
     @Override
     public Map<String, Object> get() throws RestException {
-        Map<String, Object> globalConfig;
-        try {
-            byte[] globalConfigBytes = ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(client);
-            globalConfig = JSONUtils.INSTANCE.load(new ByteArrayInputStream(globalConfigBytes), new TypeReference<Map<String, Object>>(){});
-        } catch (KeeperException.NoNodeException e) {
-            return null;
-        } catch (Exception e) {
-          throw new RestException(e);
-        }
-        return globalConfig;
+      Map<String, Object> globalConfig;
+      try {
+        EnrichmentConfigurations configs = cache.get( EnrichmentConfigurations.class);
+        globalConfig = configs.getGlobalConfig(false);
+      } catch (Exception e) {
+        throw new RestException(e.getMessage(), e);
+      }
+      return globalConfig;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
index d4438a4..293b113 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
@@ -22,9 +22,12 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.metron.common.aggregator.Aggregators;
 import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.service.SensorEnrichmentConfigService;
+import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
 import org.apache.zookeeper.KeeperException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -43,10 +46,13 @@ public class SensorEnrichmentConfigServiceImpl implements SensorEnrichmentConfig
 
     private CuratorFramework client;
 
+    private ConfigurationsCache cache;
+
     @Autowired
-    public SensorEnrichmentConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client) {
+    public SensorEnrichmentConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client, ConfigurationsCache cache) {
       this.objectMapper = objectMapper;
       this.client = client;
+      this.cache = cache;
     }
 
     @Override
@@ -61,38 +67,27 @@ public class SensorEnrichmentConfigServiceImpl implements SensorEnrichmentConfig
 
     @Override
     public SensorEnrichmentConfig findOne(String name) throws RestException {
-        SensorEnrichmentConfig sensorEnrichmentConfig;
-        try {
-            sensorEnrichmentConfig = ConfigurationsUtils.readSensorEnrichmentConfigFromZookeeper(name, client);
-        } catch (KeeperException.NoNodeException e) {
-          return null;
-        } catch (Exception e) {
-          throw new RestException(e);
-        }
-      return sensorEnrichmentConfig;
+      EnrichmentConfigurations configs = cache.get( EnrichmentConfigurations.class);
+      return configs.getSensorEnrichmentConfig(name);
     }
 
     @Override
     public Map<String, SensorEnrichmentConfig> getAll() throws RestException {
-        Map<String, SensorEnrichmentConfig> sensorEnrichmentConfigs = new HashMap<>();
-        List<String> sensorNames = getAllTypes();
-        for (String name : sensorNames) {
-            sensorEnrichmentConfigs.put(name, findOne(name));
+      Map<String, SensorEnrichmentConfig> sensorEnrichmentConfigs = new HashMap<>();
+      List<String> sensorNames = getAllTypes();
+      for (String name : sensorNames) {
+        SensorEnrichmentConfig config = findOne(name);
+        if(config != null) {
+          sensorEnrichmentConfigs.put(name, config);
         }
-        return sensorEnrichmentConfigs;
+      }
+      return sensorEnrichmentConfigs;
     }
 
     @Override
     public List<String> getAllTypes() throws RestException {
-        List<String> types;
-        try {
-            types = client.getChildren().forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot());
-        } catch (KeeperException.NoNodeException e) {
-            types = new ArrayList<>();
-        } catch (Exception e) {
-          throw new RestException(e);
-        }
-      return types;
+      EnrichmentConfigurations configs = cache.get( EnrichmentConfigurations.class);
+      return configs.getTypes();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java
index 9f984e0..5c73b26 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java
@@ -17,20 +17,19 @@
  */
 package org.apache.metron.rest.service.impl;
 
-import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
-import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.service.SensorIndexingConfigService;
+import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
 import org.apache.zookeeper.KeeperException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.io.ByteArrayInputStream;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,10 +41,13 @@ public class SensorIndexingConfigServiceImpl implements SensorIndexingConfigServ
 
   private CuratorFramework client;
 
+  private ConfigurationsCache cache;
+
   @Autowired
-  public SensorIndexingConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client) {
+  public SensorIndexingConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client, ConfigurationsCache cache) {
     this.objectMapper = objectMapper;
     this.client = client;
+    this.cache = cache;
   }
 
   @Override
@@ -60,16 +62,8 @@ public class SensorIndexingConfigServiceImpl implements SensorIndexingConfigServ
 
   @Override
   public Map<String, Object> findOne(String name) throws RestException {
-    Map<String, Object> sensorIndexingConfig;
-    try {
-      byte[] sensorIndexingConfigBytes = ConfigurationsUtils.readSensorIndexingConfigBytesFromZookeeper(name, client);
-      sensorIndexingConfig = JSONUtils.INSTANCE.load(new ByteArrayInputStream(sensorIndexingConfigBytes), new TypeReference<Map<String, Object>>(){});
-    } catch (KeeperException.NoNodeException e) {
-      return null;
-    } catch (Exception e) {
-      throw new RestException(e);
-    }
-    return sensorIndexingConfig;
+    IndexingConfigurations configs = cache.get( IndexingConfigurations.class);
+    return configs.getSensorIndexingConfig(name, false);
   }
 
   @Override
@@ -77,22 +71,18 @@ public class SensorIndexingConfigServiceImpl implements SensorIndexingConfigServ
     Map<String, Map<String, Object>> sensorIndexingConfigs = new HashMap<>();
     List<String> sensorNames = getAllTypes();
     for (String name : sensorNames) {
-      sensorIndexingConfigs.put(name, findOne(name));
+      Map<String, Object> config = findOne(name);
+      if(config != null) {
+        sensorIndexingConfigs.put(name, config);
+      }
     }
     return sensorIndexingConfigs;
   }
 
   @Override
   public List<String> getAllTypes() throws RestException {
-    List<String> types;
-    try {
-        types = client.getChildren().forPath(ConfigurationType.INDEXING.getZookeeperRoot());
-    } catch (KeeperException.NoNodeException e) {
-        types = new ArrayList<>();
-    } catch (Exception e) {
-      throw new RestException(e);
-    }
-    return types;
+    IndexingConfigurations configs = cache.get( IndexingConfigurations.class);
+    return configs.getTypes();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
index f99b41c..7e70344 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
@@ -29,13 +29,16 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.hadoop.fs.Path;
 import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
 import org.apache.metron.parsers.interfaces.MessageParser;
 import org.apache.metron.rest.MetronRestConstants;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.model.ParseMessageRequest;
 import org.apache.metron.rest.service.GrokService;
 import org.apache.metron.rest.service.SensorParserConfigService;
+import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
 import org.apache.zookeeper.KeeperException;
 import org.json.simple.JSONObject;
 import org.reflections.Reflections;
@@ -49,17 +52,21 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
 
   private CuratorFramework client;
 
+  private ConfigurationsCache cache;
+
   private GrokService grokService;
 
+  private Map<String, String> availableParsers;
+
   @Autowired
   public SensorParserConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client,
-      GrokService grokService) {
+      GrokService grokService, ConfigurationsCache cache) {
     this.objectMapper = objectMapper;
     this.client = client;
     this.grokService = grokService;
+    this.cache = cache;
   }
 
-  private Map<String, String> availableParsers;
 
   @Override
   public SensorParserConfig save(SensorParserConfig sensorParserConfig) throws RestException {
@@ -74,15 +81,8 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
 
   @Override
   public SensorParserConfig findOne(String name) throws RestException {
-    SensorParserConfig sensorParserConfig;
-    try {
-      sensorParserConfig = ConfigurationsUtils.readSensorParserConfigFromZookeeper(name, client);
-    } catch (KeeperException.NoNodeException e) {
-      return null;
-    } catch (Exception e) {
-      throw new RestException(e);
-    }
-    return sensorParserConfig;
+    ParserConfigurations configs = cache.get( ParserConfigurations.class);
+    return configs.getSensorParserConfig(name);
   }
 
   @Override
@@ -90,7 +90,10 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
     List<SensorParserConfig> sensorParserConfigs = new ArrayList<>();
     List<String> sensorNames = getAllTypes();
     for (String name : sensorNames) {
-      sensorParserConfigs.add(findOne(name));
+      SensorParserConfig config = findOne(name);
+      if(config != null) {
+        sensorParserConfigs.add(config);
+      }
     }
     return sensorParserConfigs;
   }
@@ -109,15 +112,8 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
 
   @Override
   public List<String> getAllTypes() throws RestException {
-    List<String> types;
-    try {
-      types = client.getChildren().forPath(ConfigurationType.PARSER.getZookeeperRoot());
-    } catch (KeeperException.NoNodeException e) {
-      types = new ArrayList<>();
-    } catch (Exception e) {
-      throw new RestException(e);
-    }
-    return types;
+    ParserConfigurations configs = cache.get( ParserConfigurations.class);
+    return configs.getTypes();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
index ea64fbe..1150189 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
@@ -36,6 +36,8 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
+import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
 import org.apache.metron.hbase.mock.MockHBaseTableProvider;
 import org.apache.metron.integration.ComponentRunner;
 import org.apache.metron.integration.UnableToStartException;
@@ -75,6 +77,15 @@ public class TestConfig {
     return new KafkaComponent().withTopologyProperties(zkProperties);
   }
 
+  @Bean(initMethod = "start", destroyMethod="close")
+  public ConfigurationsCache cache(CuratorFramework client) {
+    return new ZKConfigurationsCache( client
+                                    , ZKConfigurationsCache.ConfiguredTypes.ENRICHMENT
+                                    , ZKConfigurationsCache.ConfiguredTypes.PARSER
+                                    , ZKConfigurationsCache.ConfiguredTypes.INDEXING
+                                    );
+  }
+
   @Bean(destroyMethod = "stop")
   public ComponentRunner componentRunner(ZKServerComponent zkServerComponent, KafkaComponent kafkaWithZKComponent) {
     ComponentRunner runner = new ComponentRunner.Builder()

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GlobalConfigControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GlobalConfigControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GlobalConfigControllerIntegrationTest.java
index f4e18ea..abb75b1 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GlobalConfigControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GlobalConfigControllerIntegrationTest.java
@@ -31,6 +31,7 @@ import org.springframework.test.web.servlet.MockMvc;
 import org.springframework.test.web.servlet.setup.MockMvcBuilders;
 import org.springframework.web.context.WebApplicationContext;
 
+import static org.apache.metron.integration.utils.TestUtils.assertEventually;
 import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
 import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
 import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic;
@@ -97,9 +98,10 @@ public class GlobalConfigControllerIntegrationTest {
                 .andExpect(status().isCreated())
                 .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")));
 
-        this.mockMvc.perform(post(globalConfigUrl).with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(globalJson))
+        assertEventually(() -> this.mockMvc.perform(post(globalConfigUrl).with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(globalJson))
                 .andExpect(status().isOk())
-                .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")));
+                .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+        );
 
         this.mockMvc.perform(get(globalConfigUrl).with(httpBasic(user,password)))
                 .andExpect(status().isOk());

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java
index dd4eff7..15a2370 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java
@@ -31,6 +31,7 @@ import org.springframework.test.web.servlet.MockMvc;
 import org.springframework.test.web.servlet.setup.MockMvcBuilders;
 import org.springframework.web.context.WebApplicationContext;
 
+import static org.apache.metron.integration.utils.TestUtils.assertEventually;
 import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
 import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
 import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic;
@@ -167,7 +168,7 @@ public class SensorEnrichmentConfigControllerIntegrationTest {
             .andExpect(jsonPath("$.threatIntel.triageConfig.riskLevelRules[0].score").value(10))
             .andExpect(jsonPath("$.threatIntel.triageConfig.aggregator").value("MAX"));
 
-    this.mockMvc.perform(post(sensorEnrichmentConfigUrl + "/broTest").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson))
+    assertEventually(() -> this.mockMvc.perform(post(sensorEnrichmentConfigUrl + "/broTest").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson))
             .andExpect(status().isOk())
             .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
             .andExpect(jsonPath("$.enrichment.fieldMap.geo[0]").value("ip_dst_addr"))
@@ -183,7 +184,8 @@ public class SensorEnrichmentConfigControllerIntegrationTest {
             .andExpect(jsonPath("$.threatIntel.fieldToTypeMap.ip_dst_addr[0]").value("malicious_ip"))
             .andExpect(jsonPath("$.threatIntel.triageConfig.riskLevelRules[0].rule").value("ip_src_addr == '10.122.196.204' or ip_dst_addr == '10.122.196.204'"))
             .andExpect(jsonPath("$.threatIntel.triageConfig.riskLevelRules[0].score").value(10))
-            .andExpect(jsonPath("$.threatIntel.triageConfig.aggregator").value("MAX"));
+            .andExpect(jsonPath("$.threatIntel.triageConfig.aggregator").value("MAX") )
+    );
 
     this.mockMvc.perform(get(sensorEnrichmentConfigUrl + "/broTest").with(httpBasic(user,password)))
             .andExpect(status().isOk())

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorIndexingConfigControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorIndexingConfigControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorIndexingConfigControllerIntegrationTest.java
index cebcde6..674c55a 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorIndexingConfigControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorIndexingConfigControllerIntegrationTest.java
@@ -31,6 +31,7 @@ import org.springframework.test.web.servlet.MockMvc;
 import org.springframework.test.web.servlet.setup.MockMvcBuilders;
 import org.springframework.web.context.WebApplicationContext;
 
+import static org.apache.metron.integration.utils.TestUtils.assertEventually;
 import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
 import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
 import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic;
@@ -103,11 +104,12 @@ public class SensorIndexingConfigControllerIntegrationTest {
             .andExpect(jsonPath("$.index").value("broTest"))
             .andExpect(jsonPath("$.batchSize").value(1));
 
-    this.mockMvc.perform(post(sensorIndexingConfigUrl + "/broTest").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson))
+    assertEventually(() -> this.mockMvc.perform(post(sensorIndexingConfigUrl + "/broTest").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson))
             .andExpect(status().isOk())
             .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
             .andExpect(jsonPath("$.index").value("broTest"))
-            .andExpect(jsonPath("$.batchSize").value(1));
+            .andExpect(jsonPath("$.batchSize").value(1))
+    );
 
     this.mockMvc.perform(get(sensorIndexingConfigUrl + "/broTest").with(httpBasic(user,password)))
             .andExpect(status().isOk())

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java
index 6e2d788..d8aea72 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java
@@ -38,7 +38,9 @@ import org.springframework.web.context.WebApplicationContext;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.metron.integration.utils.TestUtils.assertEventually;
 import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
 import static org.hamcrest.Matchers.hasSize;
 import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
@@ -198,16 +200,16 @@ public class SensorParserConfigControllerIntegrationTest {
     this.sensorParserConfigService.delete("broTest");
     this.sensorParserConfigService.delete("squidTest");
     Method[] method = SensorParserConfig.class.getMethods();
-    int numFields = 0;
+    final AtomicInteger numFields = new AtomicInteger(0);
     for(Method m : method) {
       if(m.getName().startsWith("set")) {
-        numFields++;
+        numFields.set(numFields.get() + 1);
       }
     }
     this.mockMvc.perform(post(sensorParserConfigUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(squidJson))
             .andExpect(status().isCreated())
             .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
-            .andExpect(jsonPath("$.*", hasSize(numFields)))
+            .andExpect(jsonPath("$.*", hasSize(numFields.get())))
             .andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.GrokParser"))
             .andExpect(jsonPath("$.sensorTopic").value("squidTest"))
             .andExpect(jsonPath("$.parserConfig.grokPath").value("target/patterns/squidTest"))
@@ -219,10 +221,10 @@ public class SensorParserConfigControllerIntegrationTest {
             .andExpect(jsonPath("$.fieldTransformations[0].config.full_hostname").value("URL_TO_HOST(url)"))
             .andExpect(jsonPath("$.fieldTransformations[0].config.domain_without_subdomains").value("DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"));
 
-    this.mockMvc.perform(get(sensorParserConfigUrl + "/squidTest").with(httpBasic(user,password)))
+    assertEventually(() -> this.mockMvc.perform(get(sensorParserConfigUrl + "/squidTest").with(httpBasic(user,password)))
             .andExpect(status().isOk())
             .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
-            .andExpect(jsonPath("$.*", hasSize(numFields)))
+            .andExpect(jsonPath("$.*", hasSize(numFields.get())))
             .andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.GrokParser"))
             .andExpect(jsonPath("$.sensorTopic").value("squidTest"))
             .andExpect(jsonPath("$.parserConfig.grokPath").value("target/patterns/squidTest"))
@@ -232,7 +234,8 @@ public class SensorParserConfigControllerIntegrationTest {
             .andExpect(jsonPath("$.fieldTransformations[0].output[0]").value("full_hostname"))
             .andExpect(jsonPath("$.fieldTransformations[0].output[1]").value("domain_without_subdomains"))
             .andExpect(jsonPath("$.fieldTransformations[0].config.full_hostname").value("URL_TO_HOST(url)"))
-            .andExpect(jsonPath("$.fieldTransformations[0].config.domain_without_subdomains").value("DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"));
+            .andExpect(jsonPath("$.fieldTransformations[0].config.domain_without_subdomains").value("DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"))
+    );
 
     this.mockMvc.perform(get(sensorParserConfigUrl).with(httpBasic(user,password)))
             .andExpect(status().isOk())
@@ -251,7 +254,7 @@ public class SensorParserConfigControllerIntegrationTest {
     this.mockMvc.perform(post(sensorParserConfigUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson))
             .andExpect(status().isCreated())
             .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
-            .andExpect(jsonPath("$.*", hasSize(numFields)))
+            .andExpect(jsonPath("$.*", hasSize(numFields.get())))
             .andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.bro.BasicBroParser"))
             .andExpect(jsonPath("$.sensorTopic").value("broTest"))
             .andExpect(jsonPath("$.readMetadata").value("true"))
@@ -261,7 +264,7 @@ public class SensorParserConfigControllerIntegrationTest {
     this.mockMvc.perform(post(sensorParserConfigUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson))
             .andExpect(status().isOk())
             .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
-            .andExpect(jsonPath("$.*", hasSize(numFields)))
+            .andExpect(jsonPath("$.*", hasSize(numFields.get())))
             .andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.bro.BasicBroParser"))
             .andExpect(jsonPath("$.sensorTopic").value("broTest"))
             .andExpect(jsonPath("$.readMetadata").value("true"))

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java
index 5c6dd12..e3518ca 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java
@@ -18,9 +18,11 @@
 package org.apache.metron.rest.controller;
 
 import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.integration.utils.TestUtils;
 import org.apache.metron.rest.model.TopologyStatusCode;
 import org.apache.metron.rest.service.GlobalConfigService;
 import org.apache.metron.rest.service.SensorParserConfigService;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -169,6 +171,11 @@ public class StormControllerIntegrationTest {
             .andExpect(jsonPath("$.message").value(TopologyStatusCode.GLOBAL_CONFIG_MISSING.name()));
 
     globalConfigService.save(globalConfig);
+    {
+      final Map<String, Object> expectedGlobalConfig = globalConfig;
+      //we must wait for the config to find its way into the config.
+      TestUtils.assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, globalConfigService.get()));
+    }
 
     this.mockMvc.perform(get(stormUrl + "/parser/start/broTest").with(httpBasic(user,password)))
             .andExpect(status().isOk())
@@ -179,6 +186,11 @@ public class StormControllerIntegrationTest {
     sensorParserConfig.setParserClassName("org.apache.metron.parsers.bro.BasicBroParser");
     sensorParserConfig.setSensorTopic("broTest");
     sensorParserConfigService.save(sensorParserConfig);
+    {
+      final Map<String, Object> expectedGlobalConfig = globalConfig;
+      //we must wait for the config to find its way into the config.
+      TestUtils.assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, globalConfigService.get()));
+    }
 
     this.mockMvc.perform(get(stormUrl + "/parser/start/broTest").with(httpBasic(user,password)))
             .andExpect(status().isOk())

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java
index 824fb4b..85a66b3 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java
@@ -28,11 +28,15 @@ import static org.mockito.Mockito.when;
 
 import java.util.HashMap;
 import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.DeleteBuilder;
 import org.apache.curator.framework.api.GetDataBuilder;
 import org.apache.curator.framework.api.SetDataBuilder;
 import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.service.GlobalConfigService;
 import org.apache.zookeeper.KeeperException;
@@ -49,11 +53,13 @@ public class GlobalConfigServiceImplTest {
 
   CuratorFramework curatorFramework;
   GlobalConfigService globalConfigService;
+  ConfigurationsCache cache;
 
   @Before
   public void setUp() throws Exception {
     curatorFramework = mock(CuratorFramework.class);
-    globalConfigService = new GlobalConfigServiceImpl(curatorFramework);
+    cache = mock(ConfigurationsCache.class);
+    globalConfigService = new GlobalConfigServiceImpl(curatorFramework, cache);
   }
 
 
@@ -98,25 +104,19 @@ public class GlobalConfigServiceImplTest {
       put("k", "v");
     }};
 
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenReturn(config.getBytes());
-
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
+    EnrichmentConfigurations configs = new EnrichmentConfigurations(){
+      @Override
+      public Map<String, Object> getConfigurations() {
+        return ImmutableMap.of(ConfigurationType.GLOBAL.getTypeName(), configMap);
+      }
+    };
+    when(cache.get( eq(EnrichmentConfigurations.class)))
+            .thenReturn(configs);
 
     assertEquals(configMap, globalConfigService.get());
   }
 
   @Test
-  public void getShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception {
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenThrow(KeeperException.NoNodeException.class);
-
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
-
-    assertNull(globalConfigService.get());
-  }
-
-  @Test
   public void getShouldWrapNonNoNodeExceptionInRestException() throws Exception {
     exception.expect(RestException.class);
 

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java
index c26a210..0a78f4a 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java
@@ -18,6 +18,7 @@
 package org.apache.metron.rest.service.impl;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.DeleteBuilder;
@@ -25,9 +26,11 @@ import org.apache.curator.framework.api.GetChildrenBuilder;
 import org.apache.curator.framework.api.GetDataBuilder;
 import org.apache.curator.framework.api.SetDataBuilder;
 import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.common.configuration.enrichment.EnrichmentConfig;
 import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
 import org.apache.metron.common.configuration.enrichment.threatintel.ThreatIntelConfig;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.service.SensorEnrichmentConfigService;
 import org.apache.zookeeper.KeeperException;
@@ -40,6 +43,7 @@ import org.junit.rules.ExpectedException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -79,11 +83,14 @@ public class SensorEnrichmentConfigServiceImplTest {
   @Multiline
   public static String broJson;
 
+  ConfigurationsCache cache;
+
   @Before
   public void setUp() throws Exception {
     objectMapper = mock(ObjectMapper.class);
     curatorFramework = mock(CuratorFramework.class);
-    sensorEnrichmentConfigService = new SensorEnrichmentConfigServiceImpl(objectMapper, curatorFramework);
+    cache = mock(ConfigurationsCache.class);
+    sensorEnrichmentConfigService = new SensorEnrichmentConfigServiceImpl(objectMapper, curatorFramework, cache);
   }
 
 
@@ -125,84 +132,54 @@ public class SensorEnrichmentConfigServiceImplTest {
   public void findOneShouldProperlyReturnSensorEnrichmentConfig() throws Exception {
     final SensorEnrichmentConfig sensorEnrichmentConfig = getTestSensorEnrichmentConfig();
 
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes());
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
+    EnrichmentConfigurations configs = new EnrichmentConfigurations(){
+      @Override
+      public Map<String, Object> getConfigurations() {
+        return ImmutableMap.of(EnrichmentConfigurations.getKey("bro"), sensorEnrichmentConfig);
+      }
+    };
+    when(cache.get(eq(EnrichmentConfigurations.class)))
+            .thenReturn(configs);
 
+    //We only have bro, so we should expect it to be returned
     assertEquals(getTestSensorEnrichmentConfig(), sensorEnrichmentConfigService.findOne("bro"));
-  }
-
-  @Test
-  public void findOneShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception {
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/bro")).thenThrow(KeeperException.NoNodeException.class);
-
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
-
-    assertNull(sensorEnrichmentConfigService.findOne("bro"));
-  }
-
-  @Test
-  public void findOneShouldWrapNonNoNodeExceptionInRestException() throws Exception {
-    exception.expect(RestException.class);
-
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/bro")).thenThrow(Exception.class);
-
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
-
-    sensorEnrichmentConfigService.findOne("bro");
+    //and blah should be a miss.
+    assertNull(sensorEnrichmentConfigService.findOne("blah"));
   }
 
   @Test
   public void getAllTypesShouldProperlyReturnTypes() throws Exception {
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot()))
-            .thenReturn(new ArrayList() {{
-              add("bro");
-              add("squid");
-            }});
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
+
+    EnrichmentConfigurations configs = new EnrichmentConfigurations(){
+      @Override
+      public Map<String, Object> getConfigurations() {
+        return ImmutableMap.of(EnrichmentConfigurations.getKey("bro"), new HashMap<>()
+                              ,EnrichmentConfigurations.getKey("squid"), new HashMap<>()
+                              );
+      }
+    };
+    when(cache.get(eq(EnrichmentConfigurations.class)))
+            .thenReturn(configs);
 
     assertEquals(new ArrayList() {{
       add("bro");
       add("squid");
     }}, sensorEnrichmentConfigService.getAllTypes());
-  }
 
-  @Test
-  public void getAllTypesShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception {
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot())).thenThrow(KeeperException.NoNodeException.class);
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
-    assertEquals(new ArrayList<>(), sensorEnrichmentConfigService.getAllTypes());
   }
 
-  @Test
-  public void getAllTypesShouldWrapNonNoNodeExceptionInRestException() throws Exception {
-    exception.expect(RestException.class);
-
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot())).thenThrow(Exception.class);
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
-    sensorEnrichmentConfigService.getAllTypes();
-  }
 
   @Test
   public void getAllShouldProperlyReturnSensorEnrichmentConfigs() throws Exception {
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot()))
-            .thenReturn(new ArrayList() {{
-              add("bro");
-            }});
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
     final SensorEnrichmentConfig sensorEnrichmentConfig = getTestSensorEnrichmentConfig();
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes());
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
+    EnrichmentConfigurations configs = new EnrichmentConfigurations(){
+      @Override
+      public Map<String, Object> getConfigurations() {
+        return ImmutableMap.of(EnrichmentConfigurations.getKey("bro"), sensorEnrichmentConfig);
+      }
+    };
+    when(cache.get( eq(EnrichmentConfigurations.class)))
+            .thenReturn(configs);
 
     assertEquals(new HashMap() {{ put("bro", sensorEnrichmentConfig);}}, sensorEnrichmentConfigService.getAll());
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImplTest.java
index 43ca0f7..9641a52 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImplTest.java
@@ -17,7 +17,10 @@
  */
 package org.apache.metron.rest.service.impl;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.DeleteBuilder;
@@ -25,6 +28,9 @@ import org.apache.curator.framework.api.GetChildrenBuilder;
 import org.apache.curator.framework.api.GetDataBuilder;
 import org.apache.curator.framework.api.SetDataBuilder;
 import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.service.SensorIndexingConfigService;
 import org.apache.zookeeper.KeeperException;
@@ -36,6 +42,7 @@ import org.junit.rules.ExpectedException;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -55,6 +62,7 @@ public class SensorIndexingConfigServiceImplTest {
   ObjectMapper objectMapper;
   CuratorFramework curatorFramework;
   SensorIndexingConfigService sensorIndexingConfigService;
+  ConfigurationsCache cache;
 
   /**
    {
@@ -72,7 +80,8 @@ public class SensorIndexingConfigServiceImplTest {
   public void setUp() throws Exception {
     objectMapper = mock(ObjectMapper.class);
     curatorFramework = mock(CuratorFramework.class);
-    sensorIndexingConfigService = new SensorIndexingConfigServiceImpl(objectMapper, curatorFramework);
+    cache = mock(ConfigurationsCache.class);
+    sensorIndexingConfigService = new SensorIndexingConfigServiceImpl(objectMapper, curatorFramework, cache);
   }
 
 
@@ -114,44 +123,36 @@ public class SensorIndexingConfigServiceImplTest {
   public void findOneShouldProperlyReturnSensorEnrichmentConfig() throws Exception {
     final Map<String, Object> sensorIndexingConfig = getTestSensorIndexingConfig();
 
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes());
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
+    IndexingConfigurations configs = new IndexingConfigurations(){
+      @Override
+      public Map<String, Object> getConfigurations() {
+        return ImmutableMap.of(IndexingConfigurations.getKey("bro"), sensorIndexingConfig);
+      }
+    };
+    when(cache.get( eq(IndexingConfigurations.class)))
+            .thenReturn(configs);
 
+    //We only have bro, so we should expect it to be returned
     assertEquals(getTestSensorIndexingConfig(), sensorIndexingConfigService.findOne("bro"));
+    //and blah should be a miss.
+    assertNull(sensorIndexingConfigService.findOne("blah"));
   }
 
-  @Test
-  public void findOneShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception {
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot() + "/bro")).thenThrow(KeeperException.NoNodeException.class);
-
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
-
-    assertNull(sensorIndexingConfigService.findOne("bro"));
-  }
-
-  @Test
-  public void findOneShouldWrapNonNoNodeExceptionInRestException() throws Exception {
-    exception.expect(RestException.class);
 
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot() + "/bro")).thenThrow(Exception.class);
 
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
-
-    sensorIndexingConfigService.findOne("bro");
-  }
 
   @Test
   public void getAllTypesShouldProperlyReturnTypes() throws Exception {
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot()))
-            .thenReturn(new ArrayList() {{
-              add("bro");
-              add("squid");
-            }});
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
+    IndexingConfigurations configs = new IndexingConfigurations(){
+      @Override
+      public Map<String, Object> getConfigurations() {
+        return ImmutableMap.of(IndexingConfigurations.getKey("bro"), new HashMap<>()
+                              ,IndexingConfigurations.getKey("squid"), new HashMap<>()
+                              );
+      }
+    };
+    when(cache.get(eq(IndexingConfigurations.class)))
+            .thenReturn(configs);
 
     assertEquals(new ArrayList() {{
       add("bro");
@@ -159,39 +160,18 @@ public class SensorIndexingConfigServiceImplTest {
     }}, sensorIndexingConfigService.getAllTypes());
   }
 
-  @Test
-  public void getAllTypesShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception {
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot())).thenThrow(KeeperException.NoNodeException.class);
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
-    assertEquals(new ArrayList<>(), sensorIndexingConfigService.getAllTypes());
-  }
 
   @Test
-  public void getAllTypesShouldWrapNonNoNodeExceptionInRestException() throws Exception {
-    exception.expect(RestException.class);
-
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot())).thenThrow(Exception.class);
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
-    sensorIndexingConfigService.getAllTypes();
-  }
-
-  @Test
-  public void getAllShouldProperlyReturnSensorEnrichmentConfigs() throws Exception {
+  public void getAllShouldProperlyReturnIndexingConfigs() throws Exception {
     final Map<String, Object> sensorIndexingConfig = getTestSensorIndexingConfig();
-
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot()))
-            .thenReturn(new ArrayList() {{
-              add("bro");
-            }});
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes());
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
+    IndexingConfigurations configs = new IndexingConfigurations(){
+      @Override
+      public Map<String, Object> getConfigurations() {
+        return ImmutableMap.of(IndexingConfigurations.getKey("bro"), sensorIndexingConfig );
+      }
+    };
+    when(cache.get(eq(IndexingConfigurations.class)))
+            .thenReturn(configs);
 
     assertEquals(new HashMap() {{ put("bro", sensorIndexingConfig);}}, sensorIndexingConfigService.getAll());
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java
index c96a796..7998c21 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java
@@ -18,6 +18,7 @@
 package org.apache.metron.rest.service.impl;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
 import oi.thekraken.grok.api.Grok;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.curator.framework.CuratorFramework;
@@ -27,7 +28,9 @@ import org.apache.curator.framework.api.GetDataBuilder;
 import org.apache.curator.framework.api.SetDataBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.model.ParseMessageRequest;
 import org.apache.metron.rest.service.GrokService;
@@ -95,6 +98,8 @@ public class SensorParserConfigServiceImplTest {
 
   private String user = "user1";
 
+  ConfigurationsCache cache;
+
   @Before
   public void setUp() throws Exception {
     objectMapper = mock(ObjectMapper.class);
@@ -105,7 +110,8 @@ public class SensorParserConfigServiceImplTest {
     SecurityContextHolder.getContext().setAuthentication(authentication);
     when(environment.getProperty(GROK_TEMP_PATH_SPRING_PROPERTY)).thenReturn("./target");
     grokService = new GrokServiceImpl(environment, mock(Grok.class), new HdfsServiceImpl(new Configuration()));
-    sensorParserConfigService = new SensorParserConfigServiceImpl(objectMapper, curatorFramework, grokService);
+    cache = mock(ConfigurationsCache.class);
+    sensorParserConfigService = new SensorParserConfigServiceImpl(objectMapper, curatorFramework, grokService, cache);
   }
 
 
@@ -144,47 +150,36 @@ public class SensorParserConfigServiceImplTest {
   }
 
   @Test
-  public void findOneShouldProperlyReturnSensorEnrichmentConfig() throws Exception {
+  public void findOneShouldProperlyReturnSensorParserConfig() throws Exception {
     final SensorParserConfig sensorParserConfig = getTestBroSensorParserConfig();
 
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes());
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
+    ParserConfigurations configs = new ParserConfigurations(){
+      @Override
+      public Map<String, Object> getConfigurations() {
+        return ImmutableMap.of(ParserConfigurations.getKey("bro"), sensorParserConfig);
+      }
+    };
+    when(cache.get(eq(ParserConfigurations.class)))
+            .thenReturn(configs);
 
+    //We only have bro, so we should expect it to be returned
     assertEquals(getTestBroSensorParserConfig(), sensorParserConfigService.findOne("bro"));
-  }
-
-  @Test
-  public void findOneShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception {
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/bro")).thenThrow(KeeperException.NoNodeException.class);
-
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
-
-    assertNull(sensorParserConfigService.findOne("bro"));
-  }
-
-  @Test
-  public void findOneShouldWrapNonNoNodeExceptionInRestException() throws Exception {
-    exception.expect(RestException.class);
-
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/bro")).thenThrow(Exception.class);
-
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
-
-    sensorParserConfigService.findOne("bro");
+    //and blah should be a miss.
+    assertNull(sensorParserConfigService.findOne("blah"));
   }
 
   @Test
   public void getAllTypesShouldProperlyReturnTypes() throws Exception {
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot()))
-            .thenReturn(new ArrayList() {{
-              add("bro");
-              add("squid");
-            }});
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
+    ParserConfigurations configs = new ParserConfigurations(){
+      @Override
+      public Map<String, Object> getConfigurations() {
+        return ImmutableMap.of(ParserConfigurations.getKey("bro"), new HashMap<>()
+                              ,ParserConfigurations.getKey("squid"), new HashMap<>()
+                              );
+      }
+    };
+    when(cache.get( eq(ParserConfigurations.class)))
+            .thenReturn(configs);
 
     assertEquals(new ArrayList() {{
       add("bro");
@@ -193,41 +188,19 @@ public class SensorParserConfigServiceImplTest {
   }
 
   @Test
-  public void getAllTypesShouldReturnEmptyListWhenNoNodeExceptionIsThrown() throws Exception {
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot())).thenThrow(KeeperException.NoNodeException.class);
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
-    assertEquals(new ArrayList<>(), sensorParserConfigService.getAllTypes());
-  }
-
-  @Test
-  public void getAllTypesShouldWrapNonNoNodeExceptionInRestException() throws Exception {
-    exception.expect(RestException.class);
-
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot())).thenThrow(Exception.class);
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
-    sensorParserConfigService.getAllTypes();
-  }
-
-  @Test
   public void getAllShouldProperlyReturnSensorParserConfigs() throws Exception {
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot()))
-            .thenReturn(new ArrayList() {{
-              add("bro");
-              add("squid");
-            }});
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
     final SensorParserConfig broSensorParserConfig = getTestBroSensorParserConfig();
     final SensorParserConfig squidSensorParserConfig = getTestSquidSensorParserConfig();
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes());
-    when(getDataBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/squid")).thenReturn(squidJson.getBytes());
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
+    ParserConfigurations configs = new ParserConfigurations(){
+      @Override
+      public Map<String, Object> getConfigurations() {
+        return ImmutableMap.of(ParserConfigurations.getKey("bro"), broSensorParserConfig
+                              ,ParserConfigurations.getKey("squid"), squidSensorParserConfig
+                              );
+      }
+    };
+    when(cache.get( eq(ParserConfigurations.class)))
+            .thenReturn(configs);
 
     assertEquals(new ArrayList() {{
       add(getTestBroSensorParserConfig());

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml
index 3054881..8734d63 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -54,6 +54,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
+            <artifactId>metron-zookeeper</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
             <artifactId>metron-integration-test</artifactId>
             <version>${project.parent.version}</version>
             <scope>test</scope>
@@ -289,11 +294,6 @@
             <version>${global_jackson_version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-recipes</artifactId>
-            <version>${global_curator_version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>flux-core</artifactId>
             <version>${global_flux_version}</version>

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
index a97091a..6f15746 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
@@ -17,54 +17,58 @@
  */
 package org.apache.metron.common.bolt;
 
-import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.Map;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.cache.TreeCache;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.zookeeper.SimpleEventListener;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
+import org.apache.metron.common.zookeeper.configurations.Reloadable;
+import org.apache.metron.zookeeper.ZKCache;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.base.BaseRichBolt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class ConfiguredBolt<CONFIG_T extends Configurations> extends BaseRichBolt {
+public abstract class ConfiguredBolt<CONFIG_T extends Configurations> extends BaseRichBolt implements Reloadable {
 
   private static final Logger LOG =  LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private String zookeeperUrl;
 
   protected CuratorFramework client;
-  protected TreeCache cache;
-  private final CONFIG_T configurations = defaultConfigurations();
+  protected ZKCache cache;
+  private final CONFIG_T configurations;
   public ConfiguredBolt(String zookeeperUrl) {
     this.zookeeperUrl = zookeeperUrl;
+    this.configurations = createUpdater().defaultConfigurations();
   }
 
   public void setCuratorFramework(CuratorFramework client) {
     this.client = client;
   }
 
-  public void setTreeCache(TreeCache cache) {
+  public void setZKCache(ZKCache cache) {
     this.cache = cache;
   }
 
+  @Override
   public void reloadCallback(String name, ConfigurationType type) {
   }
+
   public CONFIG_T getConfigurations() {
     return configurations;
   }
-  protected abstract CONFIG_T defaultConfigurations();
 
+  protected abstract ConfigurationsUpdater<CONFIG_T> createUpdater();
 
 
   @Override
@@ -85,30 +89,30 @@ public abstract class ConfiguredBolt<CONFIG_T extends Configurations> extends Ba
       //zookeeper.
       ConfigurationsUtils.setupStellarStatically(client);
       if (cache == null) {
-        cache = new TreeCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT);
-        TreeCacheListener listener = new TreeCacheListener() {
-          @Override
-          public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
-            if (event.getType().equals(TreeCacheEvent.Type.NODE_ADDED) || event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) {
-              String path = event.getData().getPath();
-              byte[] data = event.getData().getData();
-              updateConfig(path, data);
-            }
-          }
-        };
-        cache.getListenable().addListener(listener);
-        loadConfig();
+        ConfigurationsUpdater<CONFIG_T> updater = createUpdater();
+        SimpleEventListener listener = new SimpleEventListener.Builder()
+                                                              .with( updater::update
+                                                                   , TreeCacheEvent.Type.NODE_ADDED
+                                                                   , TreeCacheEvent.Type.NODE_UPDATED
+                                                                   )
+                                                              .with( updater::delete
+                                                                   , TreeCacheEvent.Type.NODE_REMOVED
+                                                                   )
+                                                              .build();
+        cache = new ZKCache.Builder()
+                           .withClient(client)
+                           .withListener(listener)
+                           .withRoot(Constants.ZOOKEEPER_TOPOLOGY_ROOT)
+                           .build();
+        updater.forceUpdate(client);
+        cache.start();
       }
-      cache.start();
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
       throw new RuntimeException(e);
     }
   }
 
-  abstract public void loadConfig();
-  abstract public void updateConfig(String path, byte[] data) throws IOException;
-
   @Override
   public void cleanup() {
     cache.close();

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
index 9c3ee97..54fd7e8 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
@@ -22,6 +22,8 @@ import java.lang.invoke.MethodHandles;
 import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
+import org.apache.metron.common.zookeeper.configurations.EnrichmentUpdater;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,31 +37,7 @@ public abstract class ConfiguredEnrichmentBolt extends ConfiguredBolt<Enrichment
   }
 
   @Override
-  protected EnrichmentConfigurations defaultConfigurations() {
-    return new EnrichmentConfigurations();
-  }
-
-  @Override
-  public void loadConfig() {
-    try {
-
-      ConfigurationsUtils.updateEnrichmentConfigsFromZookeeper(getConfigurations(), client);
-    } catch (Exception e) {
-      LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
-    }
-  }
-
-  @Override
-  public void updateConfig(String path, byte[] data) throws IOException {
-    if (data.length != 0) {
-      String name = path.substring(path.lastIndexOf("/") + 1);
-      if (path.startsWith(ConfigurationType.ENRICHMENT.getZookeeperRoot())) {
-        getConfigurations().updateSensorEnrichmentConfig(name, data);
-        reloadCallback(name, ConfigurationType.ENRICHMENT);
-      } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
-        getConfigurations().updateGlobalConfig(data);
-        reloadCallback(name, ConfigurationType.GLOBAL);
-      }
-    }
+  protected ConfigurationsUpdater<EnrichmentConfigurations> createUpdater() {
+    return new EnrichmentUpdater(this, this::getConfigurations);
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
index cddcada..09300e4 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
@@ -22,6 +22,8 @@ import java.lang.invoke.MethodHandles;
 import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
+import org.apache.metron.common.zookeeper.configurations.IndexingUpdater;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,30 +35,8 @@ public abstract class ConfiguredIndexingBolt extends ConfiguredBolt<IndexingConf
   }
 
   @Override
-  protected IndexingConfigurations defaultConfigurations() {
-    return new IndexingConfigurations();
+  protected ConfigurationsUpdater<IndexingConfigurations> createUpdater() {
+    return new IndexingUpdater(this, this::getConfigurations);
   }
 
-  @Override
-  public void loadConfig() {
-    try {
-      ConfigurationsUtils.updateSensorIndexingConfigsFromZookeeper(getConfigurations(), client);
-    } catch (Exception e) {
-      LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
-    }
-  }
-
-  @Override
-  public void updateConfig(String path, byte[] data) throws IOException {
-    if (data.length != 0) {
-      String name = path.substring(path.lastIndexOf("/") + 1);
-      if (path.startsWith(ConfigurationType.INDEXING.getZookeeperRoot())) {
-        getConfigurations().updateSensorIndexingConfig(name, data);
-        reloadCallback(name, ConfigurationType.INDEXING);
-      } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
-        getConfigurations().updateGlobalConfig(data);
-        reloadCallback(name, ConfigurationType.GLOBAL);
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
index 99313fa..2f13658 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
@@ -23,6 +23,8 @@ import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
+import org.apache.metron.common.zookeeper.configurations.ParserUpdater;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,34 +43,14 @@ public abstract class ConfiguredParserBolt extends ConfiguredBolt<ParserConfigur
     return getConfigurations().getSensorParserConfig(sensorType);
   }
 
-  @Override
-  protected ParserConfigurations defaultConfigurations() {
-    return new ParserConfigurations();
-  }
-
   public String getSensorType() {
     return sensorType;
   }
-  @Override
-  public void loadConfig() {
-    try {
-      ConfigurationsUtils.updateParserConfigsFromZookeeper(getConfigurations(), client);
-    } catch (Exception e) {
-      LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
-    }
-  }
+
 
   @Override
-  public void updateConfig(String path, byte[] data) throws IOException {
-    if (data.length != 0) {
-      String name = path.substring(path.lastIndexOf("/") + 1);
-      if (path.startsWith(ConfigurationType.PARSER.getZookeeperRoot())) {
-        getConfigurations().updateSensorParserConfig(name, data);
-        reloadCallback(name, ConfigurationType.PARSER);
-      } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
-        getConfigurations().updateGlobalConfig(data);
-        reloadCallback(name, ConfigurationType.GLOBAL);
-      }
-    }
+  protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+    return new ParserUpdater(this, this::getConfigurations);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
index 22ff3a9..90575d0 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
@@ -17,16 +17,12 @@
  */
 package org.apache.metron.common.bolt;
 
-import static org.apache.metron.common.configuration.ConfigurationType.PROFILER;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
 import java.lang.invoke.MethodHandles;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.profiler.ProfilerConfig;
 import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
-import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
+import org.apache.metron.common.zookeeper.configurations.ProfilerUpdater;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,43 +42,8 @@ public abstract class ConfiguredProfilerBolt extends ConfiguredBolt<ProfilerConf
   }
 
   @Override
-  protected ProfilerConfigurations defaultConfigurations() {
-    return new ProfilerConfigurations();
+  protected ConfigurationsUpdater<ProfilerConfigurations> createUpdater() {
+    return new ProfilerUpdater(this, this::getConfigurations);
   }
 
-  @Override
-  public void loadConfig() {
-    try {
-      ProfilerConfig config = readFromZookeeper(client);
-      if(config != null) {
-        getConfigurations().updateProfilerConfig(config);
-      }
-
-    } catch (Exception e) {
-      LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
-    }
-  }
-
-  private ProfilerConfig readFromZookeeper(CuratorFramework client) throws Exception {
-    byte[] raw = client.getData().forPath(PROFILER.getZookeeperRoot());
-    return JSONUtils.INSTANCE.load(new ByteArrayInputStream(raw), ProfilerConfig.class);
-  }
-
-  @Override
-  public void updateConfig(String path, byte[] data) throws IOException {
-    if (data.length != 0) {
-      String name = path.substring(path.lastIndexOf("/") + 1);
-
-      // update the profiler configuration from zookeeper
-      if (path.startsWith(ConfigurationType.PROFILER.getZookeeperRoot())) {
-        getConfigurations().updateProfilerConfig(data);
-        reloadCallback(name, ConfigurationType.PROFILER);
-
-      // update the global configuration from zookeeper
-      } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
-        getConfigurations().updateGlobalConfig(data);
-        reloadCallback(name, ConfigurationType.GLOBAL);
-      }
-    }
-  }
 }