You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2017/10/12 17:58:10 UTC

[46/50] [abbrv] oodt git commit: [OODT-965] Added configuration change listening feature

[OODT-965] Added configuration change listening feature


Project: http://git-wip-us.apache.org/repos/asf/oodt/repo
Commit: http://git-wip-us.apache.org/repos/asf/oodt/commit/df1db1e4
Tree: http://git-wip-us.apache.org/repos/asf/oodt/tree/df1db1e4
Diff: http://git-wip-us.apache.org/repos/asf/oodt/diff/df1db1e4

Branch: refs/heads/master
Commit: df1db1e44b3289214693f3a0894681400782ba99
Parents: fc6311d
Author: Imesha Sudasingha <im...@gmail.com>
Authored: Sat Aug 19 16:21:10 2017 +0530
Committer: Imesha Sudasingha <im...@gmail.com>
Committed: Thu Oct 12 08:07:14 2017 +0530

----------------------------------------------------------------------
 config/pom.xml                                  |  2 +-
 .../org/apache/oodt/config/ConfigEventType.java | 45 +++++++++++++
 .../oodt/config/ConfigurationListener.java      | 31 +++++++++
 .../oodt/config/ConfigurationManager.java       | 23 +++++++
 .../java/org/apache/oodt/config/Constants.java  |  3 +
 .../DistributedConfigurationManager.java        | 64 +++++++++++++++---
 .../DistributedConfigurationPublisher.java      | 26 +++++++-
 .../oodt/config/distributed/ZNodePaths.java     | 27 ++++++++
 .../oodt/config/distributed/cli/CLIAction.java  | 24 +++++--
 config/src/main/resources/cmd-line-options.xml  | 31 +++++++++
 .../DistributedConfigurationManagerTest.java    | 69 ++++++++++++++++++--
 core/pom.xml                                    |  2 +-
 .../cas/filemgr/system/XmlRpcFileManager.java   | 18 ++++-
 pge/pom.xml                                     |  2 +-
 14 files changed, 343 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/pom.xml
----------------------------------------------------------------------
diff --git a/config/pom.xml b/config/pom.xml
index 3f26499..e34ff44 100644
--- a/config/pom.xml
+++ b/config/pom.xml
@@ -26,7 +26,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.curator</groupId>
-            <artifactId>curator-framework</artifactId>
+            <artifactId>curator-recipes</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.curator</groupId>

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/src/main/java/org/apache/oodt/config/ConfigEventType.java
----------------------------------------------------------------------
diff --git a/config/src/main/java/org/apache/oodt/config/ConfigEventType.java b/config/src/main/java/org/apache/oodt/config/ConfigEventType.java
new file mode 100644
index 0000000..8a465b0
--- /dev/null
+++ b/config/src/main/java/org/apache/oodt/config/ConfigEventType.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.config;
+
+public enum ConfigEventType {
+    PUBLISH("publish"),
+    CLEAR("clear");
+
+    private String name;
+
+    ConfigEventType(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+
+    public static ConfigEventType parse(String string) {
+        switch (string) {
+            case "publish":
+                return PUBLISH;
+            case "clear":
+                return CLEAR;
+            default:
+                return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/src/main/java/org/apache/oodt/config/ConfigurationListener.java
----------------------------------------------------------------------
diff --git a/config/src/main/java/org/apache/oodt/config/ConfigurationListener.java b/config/src/main/java/org/apache/oodt/config/ConfigurationListener.java
new file mode 100644
index 0000000..a5abcab
--- /dev/null
+++ b/config/src/main/java/org/apache/oodt/config/ConfigurationListener.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.config;
+
+/**
+ * The interface which should be implemented in order to listen for configuration changes.
+ *
+ * @author Imesha Sudasingha
+ */
+public interface ConfigurationListener {
+
+    /**
+     * This method is invoked when there has been any change in configuration of the interested component
+     */
+    void configurationChanged(ConfigEventType type);
+}

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/src/main/java/org/apache/oodt/config/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/config/src/main/java/org/apache/oodt/config/ConfigurationManager.java b/config/src/main/java/org/apache/oodt/config/ConfigurationManager.java
index 53aacef..9158fad 100644
--- a/config/src/main/java/org/apache/oodt/config/ConfigurationManager.java
+++ b/config/src/main/java/org/apache/oodt/config/ConfigurationManager.java
@@ -17,7 +17,9 @@
 
 package org.apache.oodt.config;
 
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * The abstract class to define functions of the configuration managers.
@@ -28,6 +30,7 @@ public abstract class ConfigurationManager {
 
     protected Component component;
     protected String project;
+    private Set<ConfigurationListener> configurationListeners = new HashSet<>(1);
 
     public ConfigurationManager(Component component) {
         this(component, Constants.DEFAULT_PROJECT);
@@ -38,6 +41,12 @@ public abstract class ConfigurationManager {
         this.project = project;
     }
 
+    /**
+     * Loads configuration required for {@link #component}. If distributed configuration management is enabled, this
+     * will download configuration from zookeeper. Else, this will load properties files specified.
+     *
+     * @throws Exception
+     */
     public abstract void loadConfiguration() throws Exception;
 
     /**
@@ -47,6 +56,20 @@ public abstract class ConfigurationManager {
      */
     public abstract void clearConfiguration();
 
+    public synchronized void addConfigurationListener(ConfigurationListener listener) {
+        configurationListeners.add(listener);
+    }
+
+    public synchronized void removeConfigurationListener(ConfigurationListener listener) {
+        configurationListeners.remove(listener);
+    }
+
+    protected synchronized void notifyConfigurationChange(ConfigEventType type) {
+        for (ConfigurationListener listener : configurationListeners) {
+            listener.configurationChanged(type);
+        }
+    }
+
     public Component getComponent() {
         return component;
     }

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/src/main/java/org/apache/oodt/config/Constants.java
----------------------------------------------------------------------
diff --git a/config/src/main/java/org/apache/oodt/config/Constants.java b/config/src/main/java/org/apache/oodt/config/Constants.java
index 9cd217f..3824afb 100644
--- a/config/src/main/java/org/apache/oodt/config/Constants.java
+++ b/config/src/main/java/org/apache/oodt/config/Constants.java
@@ -93,5 +93,8 @@ public class Constants {
 
         /** Where other configuration files will be stored */
         public static final String CONFIGURATION_PATH_NAME = "configuration";
+
+        /** Path to be watched for configuration changes */
+        public static final String NOTIFICATIONS_PATH = "notifications";
     }
 }

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/src/main/java/org/apache/oodt/config/distributed/DistributedConfigurationManager.java
----------------------------------------------------------------------
diff --git a/config/src/main/java/org/apache/oodt/config/distributed/DistributedConfigurationManager.java b/config/src/main/java/org/apache/oodt/config/distributed/DistributedConfigurationManager.java
index ae8912a..6dfb24b 100644
--- a/config/src/main/java/org/apache/oodt/config/distributed/DistributedConfigurationManager.java
+++ b/config/src/main/java/org/apache/oodt/config/distributed/DistributedConfigurationManager.java
@@ -19,7 +19,10 @@ package org.apache.oodt.config.distributed;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
 import org.apache.oodt.config.Component;
+import org.apache.oodt.config.ConfigEventType;
 import org.apache.oodt.config.ConfigurationManager;
 import org.apache.oodt.config.Constants;
 import org.apache.oodt.config.Constants.Properties;
@@ -38,6 +41,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.oodt.config.Constants.Properties.ZK_CONNECT_STRING;
 import static org.apache.oodt.config.Constants.Properties.ZK_PROPERTIES_FILE;
+import static org.apache.oodt.config.Constants.Properties.ZK_STARTUP_TIMEOUT;
 import static org.apache.oodt.config.distributed.utils.ConfigUtils.getOODTProjectName;
 
 /**
@@ -57,12 +61,41 @@ public class DistributedConfigurationManager extends ConfigurationManager {
 
     private List<String> savedFiles = new ArrayList<>();
 
+    /** {@link NodeCache} to watch for configuration change notifications */
+    private NodeCache nodeCache;
+    private NodeCacheListener nodeCacheListener = new NodeCacheListener() {
+        @Override
+        public void nodeChanged() throws Exception {
+            byte[] data = client.getData().forPath(zNodePaths.getNotificationsZNodePath());
+            if (data == null) {
+                return;
+            }
+
+            String event = new String(data);
+            ConfigEventType type = ConfigEventType.parse(event);
+            if (type != null) {
+                logger.info("Configuration changed event of type: '{}' received", type);
+                switch (type) {
+                    case PUBLISH:
+                        loadConfiguration();
+                        break;
+                    case CLEAR:
+                        clearConfiguration();
+                        break;
+                }
+
+                notifyConfigurationChange(type);
+            }
+        }
+    };
+
     public DistributedConfigurationManager(Component component) {
         super(component, getOODTProjectName());
         this.zNodePaths = new ZNodePaths(this.project, this.component.getName());
 
-        if (System.getProperty(ZK_PROPERTIES_FILE) == null && System.getProperty(Constants.Properties.ZK_CONNECT_STRING) == null) {
-            throw new IllegalArgumentException("Zookeeper requires system properties " + ZK_PROPERTIES_FILE + " or " + ZK_CONNECT_STRING + " to be set");
+        if (System.getProperty(ZK_PROPERTIES_FILE) == null && System.getProperty(ZK_CONNECT_STRING) == null) {
+            throw new IllegalArgumentException("Zookeeper requires system properties " + ZK_PROPERTIES_FILE + " or " +
+                    ZK_CONNECT_STRING + " to be set");
         }
 
         if (System.getProperty(ZK_PROPERTIES_FILE) != null) {
@@ -73,11 +106,11 @@ public class DistributedConfigurationManager extends ConfigurationManager {
             }
         }
 
-        if (System.getProperty(Constants.Properties.ZK_CONNECT_STRING) == null) {
+        if (System.getProperty(ZK_CONNECT_STRING) == null) {
             throw new IllegalArgumentException("Zookeeper requires a proper connect string to connect to zookeeper ensemble");
         }
 
-        connectString = System.getProperty(Constants.Properties.ZK_CONNECT_STRING);
+        connectString = System.getProperty(ZK_CONNECT_STRING);
         logger.info("Using zookeeper connect string : {}", connectString);
         startZookeeper();
     }
@@ -90,7 +123,8 @@ public class DistributedConfigurationManager extends ConfigurationManager {
         client = CuratorUtils.newCuratorFrameworkClient(connectString, logger);
         client.start();
         logger.info("Curator framework start operation invoked");
-        int startupTimeOutMs = Integer.parseInt(System.getProperty(Properties.ZK_STARTUP_TIMEOUT, "30000"));
+
+        int startupTimeOutMs = Integer.parseInt(System.getProperty(ZK_STARTUP_TIMEOUT, "30000"));
         try {
             logger.info("Waiting to connect to zookeeper, startupTimeout : {}", startupTimeOutMs);
             client.blockUntilConnected(startupTimeOutMs, TimeUnit.MILLISECONDS);
@@ -103,10 +137,21 @@ public class DistributedConfigurationManager extends ConfigurationManager {
         }
 
         logger.info("CuratorFramework client started successfully");
+
+        nodeCache = new NodeCache(client, zNodePaths.getNotificationsZNodePath());
+        nodeCache.getListenable().addListener(nodeCacheListener);
+        try {
+            logger.debug("Starting NodeCache to watch for configuration changes");
+            nodeCache.start(true);
+        } catch (Exception e) {
+            logger.error("Error occurred when start listening for configuration changes", e);
+            throw new IllegalStateException("Unable to start listening for configuration changes", e);
+        }
+        logger.info("NodeCache for watching configuration changes started successfully");
     }
 
     @Override
-    public void loadConfiguration() throws Exception {
+    public synchronized void loadConfiguration() throws Exception {
         logger.debug("Loading properties for : {}", component);
         loadProperties();
         logger.info("Properties loaded for : {}", component);
@@ -170,9 +215,8 @@ public class DistributedConfigurationManager extends ConfigurationManager {
     private void saveFile(String path, byte[] data) throws IOException {
         String localFilePath = ConfigUtils.fixForComponentHome(component, path);
         File localFile = new File(localFilePath);
-        if (localFile.exists()) {
-            logger.warn("Deleting already existing file at {} before writing new content", localFilePath);
-            localFile.delete();
+        if (localFile.exists() && localFile.delete()) {
+            logger.warn("Deleted already existing file at {} before writing new content", localFilePath);
         }
 
         logger.debug("Storing configuration in file: {}", localFilePath);
@@ -183,7 +227,7 @@ public class DistributedConfigurationManager extends ConfigurationManager {
 
     /** {@inheritDoc} */
     @Override
-    public void clearConfiguration() {
+    public synchronized void clearConfiguration() {
         for (String path : savedFiles) {
             logger.debug("Removing saved file {}", path);
             File file = new File(path);

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/src/main/java/org/apache/oodt/config/distributed/DistributedConfigurationPublisher.java
----------------------------------------------------------------------
diff --git a/config/src/main/java/org/apache/oodt/config/distributed/DistributedConfigurationPublisher.java b/config/src/main/java/org/apache/oodt/config/distributed/DistributedConfigurationPublisher.java
index 6229e96..9d3dd4a 100644
--- a/config/src/main/java/org/apache/oodt/config/distributed/DistributedConfigurationPublisher.java
+++ b/config/src/main/java/org/apache/oodt/config/distributed/DistributedConfigurationPublisher.java
@@ -20,6 +20,7 @@ package org.apache.oodt.config.distributed;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.oodt.config.Component;
+import org.apache.oodt.config.ConfigEventType;
 import org.apache.oodt.config.Constants;
 import org.apache.oodt.config.distributed.utils.CuratorUtils;
 import org.apache.zookeeper.data.Stat;
@@ -83,6 +84,14 @@ public class DistributedConfigurationPublisher {
         logger.info("Using zookeeper connect string : {}", connectString);
 
         startZookeeper();
+
+        try {
+            logger.debug("Creating ZNode paths");
+            zNodePaths.createZNodes(client);
+        } catch (Exception e) {
+            logger.error("Error occurred when creating initial ZNode paths", e);
+            throw new IllegalStateException("Unable to create ZNode paths", e);
+        }
     }
 
     /**
@@ -145,7 +154,8 @@ public class DistributedConfigurationPublisher {
      */
     public boolean verifyPublishedConfiguration() {
         try {
-            return verifyPublishedConfiguration(propertiesFiles, true) && verifyPublishedConfiguration(configFiles, false);
+            return verifyPublishedConfiguration(propertiesFiles, true) &&
+                    verifyPublishedConfiguration(configFiles, false);
         } catch (Exception e) {
             logger.error("Error occurred when checking published config", e);
             return false;
@@ -165,6 +175,17 @@ public class DistributedConfigurationPublisher {
         logger.info("Configuration cleared!");
     }
 
+    /**
+     * Notifies the watching {@link org.apache.oodt.config.ConfigurationManager}s about the configuration change
+     *
+     * @param type {@link ConfigEventType}
+     * @throws Exception
+     */
+    public void notifyConfigEvent(ConfigEventType type) throws Exception {
+        logger.info("Notifying event: '{}' to configuration managers of {}", type, component);
+        client.setData().forPath(zNodePaths.getNotificationsZNodePath(), type.toString().getBytes());
+    }
+
     private void publishConfiguration(Map<String, String> fileMapping, boolean isProperties) throws Exception {
         for (Map.Entry<String, String> entry : fileMapping.entrySet()) {
             String filePath = entry.getKey();
@@ -173,7 +194,8 @@ public class DistributedConfigurationPublisher {
 
             String content = getFileContent(filePath);
 
-            String zNodePath = isProperties ? zNodePaths.getPropertiesZNodePath(relativeZNodePath) : zNodePaths.getConfigurationZNodePath(relativeZNodePath);
+            String zNodePath = isProperties ? zNodePaths.getPropertiesZNodePath(relativeZNodePath) :
+                    zNodePaths.getConfigurationZNodePath(relativeZNodePath);
             if (client.checkExists().forPath(zNodePath) != null) {
                 byte[] bytes = client.getData().forPath(zNodePath);
                 String existingData = new String(bytes);

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/src/main/java/org/apache/oodt/config/distributed/ZNodePaths.java
----------------------------------------------------------------------
diff --git a/config/src/main/java/org/apache/oodt/config/distributed/ZNodePaths.java b/config/src/main/java/org/apache/oodt/config/distributed/ZNodePaths.java
index cf3ca00..6324ac1 100644
--- a/config/src/main/java/org/apache/oodt/config/distributed/ZNodePaths.java
+++ b/config/src/main/java/org/apache/oodt/config/distributed/ZNodePaths.java
@@ -17,9 +17,14 @@
 
 package org.apache.oodt.config.distributed;
 
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.oodt.config.distributed.utils.CuratorUtils;
+import org.apache.zookeeper.CreateMode;
+
 import static org.apache.oodt.config.Constants.DEFAULT_PROJECT;
 import static org.apache.oodt.config.Constants.ZPaths.COMPONENTS_PATH_NAME;
 import static org.apache.oodt.config.Constants.ZPaths.CONFIGURATION_PATH_NAME;
+import static org.apache.oodt.config.Constants.ZPaths.NOTIFICATIONS_PATH;
 import static org.apache.oodt.config.Constants.ZPaths.PROJECTS_PATH_NAME;
 import static org.apache.oodt.config.Constants.ZPaths.PROPERTIES_PATH_NAME;
 import static org.apache.oodt.config.Constants.ZPaths.SEPARATOR;
@@ -43,6 +48,9 @@ public class ZNodePaths {
     private String configurationZNodePath;
     private String configurationZNodeRoot;
 
+    /** ZNode to be watched for configuration changes. /projects/${project}/components/${component}/notifications */
+    private String notificationsZNodePath;
+
     /**
      * Creates the ZNode path structure accordingly to the <pre>componentName</pre> and <pre>propertiesFileNames</pre>
      * given.
@@ -70,6 +78,21 @@ public class ZNodePaths {
 
         configurationZNodePath = componentZNodeRoot + CONFIGURATION_PATH_NAME;
         configurationZNodeRoot = configurationZNodePath + SEPARATOR;
+
+        notificationsZNodePath = componentZNodeRoot + NOTIFICATIONS_PATH;
+    }
+
+    /**
+     * Creates the initial ZNode structure in zookeeper. Supposed to be called by the {@link
+     * DistributedConfigurationPublisher}.
+     *
+     * @param client {@link CuratorFramework} instance
+     * @throws Exception
+     */
+    public void createZNodes(CuratorFramework client) throws Exception {
+        CuratorUtils.createZNodeIfNotExists(client, propertiesZNodePath, CreateMode.PERSISTENT, new byte[1]);
+        CuratorUtils.createZNodeIfNotExists(client, configurationZNodePath, CreateMode.PERSISTENT, new byte[1]);
+        CuratorUtils.createZNodeIfNotExists(client, notificationsZNodePath, CreateMode.PERSISTENT, new byte[1]);
     }
 
     public String getComponentZNodePath() {
@@ -99,4 +122,8 @@ public class ZNodePaths {
     public String getLocalPropertiesFilePath(String zNodePath) {
         return zNodePath.substring(propertiesZNodeRoot.length());
     }
+
+    public String getNotificationsZNodePath() {
+        return notificationsZNodePath;
+    }
 }

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/src/main/java/org/apache/oodt/config/distributed/cli/CLIAction.java
----------------------------------------------------------------------
diff --git a/config/src/main/java/org/apache/oodt/config/distributed/cli/CLIAction.java b/config/src/main/java/org/apache/oodt/config/distributed/cli/CLIAction.java
index 1ab1b30..50e567a 100644
--- a/config/src/main/java/org/apache/oodt/config/distributed/cli/CLIAction.java
+++ b/config/src/main/java/org/apache/oodt/config/distributed/cli/CLIAction.java
@@ -19,6 +19,7 @@ package org.apache.oodt.config.distributed.cli;
 
 import org.apache.oodt.cas.cli.action.CmdLineAction;
 import org.apache.oodt.cas.cli.exception.CmdLineActionException;
+import org.apache.oodt.config.ConfigEventType;
 import org.apache.oodt.config.distributed.DistributedConfigurationPublisher;
 import org.springframework.beans.BeansException;
 import org.springframework.context.ApplicationContext;
@@ -36,12 +37,9 @@ import static org.apache.oodt.config.Constants.Properties.ZK_CONNECT_STRING;
  */
 public class CLIAction extends CmdLineAction {
 
-    public enum Action {
-        PUBLISH, VERIFY, CLEAR
-    }
-
     private String connectString;
     private String configFile = DEFAULT_CONFIG_PUBLISHER_XML;
+    private boolean notify = false;
 
     private Action action;
 
@@ -60,12 +58,18 @@ public class CLIAction extends CmdLineAction {
                 switch (action) {
                     case PUBLISH:
                         publish(publisher);
+                        if (notify) {
+                            publisher.notifyConfigEvent(ConfigEventType.PUBLISH);
+                        }
                         break;
                     case VERIFY:
                         verify(publisher);
                         break;
                     case CLEAR:
                         clear(publisher);
+                        if (notify) {
+                            publisher.notifyConfigEvent(ConfigEventType.CLEAR);
+                        }
                         break;
                 }
                 publisher.destroy();
@@ -123,4 +127,16 @@ public class CLIAction extends CmdLineAction {
     public void setConfigFile(String configFile) {
         this.configFile = configFile;
     }
+
+    public boolean isNotify() {
+        return notify;
+    }
+
+    public void setNotify(boolean notify) {
+        this.notify = notify;
+    }
+
+    public enum Action {
+        PUBLISH, VERIFY, CLEAR
+    }
 }

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/src/main/resources/cmd-line-options.xml
----------------------------------------------------------------------
diff --git a/config/src/main/resources/cmd-line-options.xml b/config/src/main/resources/cmd-line-options.xml
index 698ad32..3aa506b 100644
--- a/config/src/main/resources/cmd-line-options.xml
+++ b/config/src/main/resources/cmd-line-options.xml
@@ -82,4 +82,35 @@
             </bean>
         </property>
     </bean>
+
+    <bean id="notifyConfigChange" class="org.apache.oodt.cas.cli.option.AdvancedCmdLineOption">
+        <property name="shortOption" value="n"/>
+        <property name="longOption" value="notify"/>
+        <property name="description" value="Notify the configuration managers' about the configuration change done"/>
+        <property name="hasArgs" value="false"/>
+        <property name="requirementRules">
+            <list>
+                <bean class="org.apache.oodt.cas.cli.option.require.ActionDependencyRule"
+                      p:actionName="publish" p:relation="OPTIONAL"/>
+                <bean class="org.apache.oodt.cas.cli.option.require.ActionDependencyRule"
+                      p:actionName="clear" p:relation="OPTIONAL"/>
+                <bean class="org.apache.oodt.cas.cli.option.require.ActionDependencyRule"
+                      p:actionName="verify" p:relation="OPTIONAL"/>
+            </list>
+        </property>
+        <property name="handler">
+            <bean class="org.apache.oodt.cas.cli.option.handler.ApplyToActionHandler">
+                <property name="applyToActions">
+                    <list>
+                        <bean class="org.apache.oodt.cas.cli.option.handler.ApplyToAction"
+                              p:actionName="publish" p:methodName="setNotify"/>
+                        <bean class="org.apache.oodt.cas.cli.option.handler.ApplyToAction"
+                              p:actionName="verify" p:methodName="setNotify"/>
+                        <bean class="org.apache.oodt.cas.cli.option.handler.ApplyToAction"
+                              p:actionName="clear" p:methodName="setNotify"/>
+                    </list>
+                </property>
+            </bean>
+        </property>
+    </bean>
 </beans>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/config/src/test/java/org/apache/oodt/config/distributed/DistributedConfigurationManagerTest.java
----------------------------------------------------------------------
diff --git a/config/src/test/java/org/apache/oodt/config/distributed/DistributedConfigurationManagerTest.java b/config/src/test/java/org/apache/oodt/config/distributed/DistributedConfigurationManagerTest.java
index 0bf2dde..b553643 100644
--- a/config/src/test/java/org/apache/oodt/config/distributed/DistributedConfigurationManagerTest.java
+++ b/config/src/test/java/org/apache/oodt/config/distributed/DistributedConfigurationManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.oodt.config.distributed;
 
+import org.apache.oodt.config.Component;
 import org.apache.oodt.config.ConfigurationManager;
 import org.apache.oodt.config.distributed.cli.ConfigPublisher;
 import org.apache.oodt.config.distributed.utils.ConfigUtils;
@@ -32,6 +33,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -50,6 +52,7 @@ public class DistributedConfigurationManagerTest extends AbstractDistributedConf
     private static final String CONFIG_PUBLISHER_XML = "config-publisher.xml";
 
     private List<DistributedConfigurationPublisher> publishers;
+    private Map<Component, ConfigurationManager> configurationManagers;
 
     @Before
     public void setUpTest() throws Exception {
@@ -65,21 +68,24 @@ public class DistributedConfigurationManagerTest extends AbstractDistributedConf
         ApplicationContext applicationContext = new ClassPathXmlApplicationContext(CONFIG_PUBLISHER_XML);
         Map distributedConfigurationPublishers = applicationContext.getBeansOfType(DistributedConfigurationPublisher.class);
 
-        publishers = new ArrayList<>(distributedConfigurationPublishers.values().size());
+        publishers = new ArrayList<>();
+        configurationManagers = new HashMap<>();
         for (Object bean : distributedConfigurationPublishers.values()) {
             DistributedConfigurationPublisher publisher = (DistributedConfigurationPublisher) bean;
 
+            System.setProperty(OODT_PROJECT, publisher.getProject());
             System.setProperty(publisher.getComponent().getHome(), ".");
             publishers.add(publisher);
+            configurationManagers.put(publisher.getComponent(), new DistributedConfigurationManager(publisher.getComponent()));
+            System.clearProperty(OODT_PROJECT);
         }
     }
 
     @Test
     public void loadConfigurationTest() throws Exception {
         for (DistributedConfigurationPublisher publisher : publishers) {
-            System.setProperty(OODT_PROJECT, publisher.getProject());
 
-            ConfigurationManager configurationManager = new DistributedConfigurationManager(publisher.getComponent());
+            ConfigurationManager configurationManager = configurationManagers.get(publisher.getComponent());
             configurationManager.loadConfiguration();
 
             // Checking for configuration files
@@ -117,8 +123,63 @@ public class DistributedConfigurationManagerTest extends AbstractDistributedConf
                 File file = new File(localFile);
                 Assert.assertFalse(file.exists());
             }
+        }
+    }
 
-            System.clearProperty(OODT_PROJECT);
+    @Test
+    public void notifyConfigurationChangeTest() throws Exception {
+        // First publish config. Then check if config has downloaded locally.
+        ConfigPublisher.main(new String[]{
+                "-connectString", zookeeper.getConnectString(),
+                "-config", CONFIG_PUBLISHER_XML,
+                "-notify",
+                "-a", "publish"
+        });
+        Thread.sleep(5000);
+        checkFileExistence(true);
+
+        // Now clear config. Then check if config has deleted locally.
+        ConfigPublisher.main(new String[]{
+                "-connectString", zookeeper.getConnectString(),
+                "-config", CONFIG_PUBLISHER_XML,
+                "-notify",
+                "-a", "clear"
+        });
+        Thread.sleep(5000);
+        checkFileExistence(false);
+
+        // First publish config. Then check if config has downloaded locally.
+        ConfigPublisher.main(new String[]{
+                "-connectString", zookeeper.getConnectString(),
+                "-config", CONFIG_PUBLISHER_XML,
+                "-notify",
+                "-a", "publish"
+        });
+        Thread.sleep(5000);
+        checkFileExistence(true);
+    }
+
+    private void checkFileExistence(boolean exists) {
+        for (DistributedConfigurationPublisher publisher : publishers) {
+            for (String fileName : publisher.getPropertiesFiles().values()) {
+                fileName = ConfigUtils.fixForComponentHome(publisher.getComponent(), fileName);
+                File file = new File(fileName);
+                if (exists) {
+                    Assert.assertTrue(file.exists());
+                } else {
+                    Assert.assertFalse(file.exists());
+                }
+            }
+
+            for (String fileName : publisher.getConfigFiles().values()) {
+                fileName = ConfigUtils.fixForComponentHome(publisher.getComponent(), fileName);
+                File file = new File(fileName);
+                if (exists) {
+                    Assert.assertTrue(file.exists());
+                } else {
+                    Assert.assertFalse(file.exists());
+                }
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 5e84d83..f6e0fc3 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -244,7 +244,7 @@ the License.
       </dependency>
       <dependency>
         <groupId>org.apache.curator</groupId>
-        <artifactId>curator-framework</artifactId>
+        <artifactId>curator-recipes</artifactId>
         <version>3.3.0</version>
       </dependency>
       <dependency>

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/XmlRpcFileManager.java
----------------------------------------------------------------------
diff --git a/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/XmlRpcFileManager.java b/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/XmlRpcFileManager.java
index 4ac48bb..37d32d2 100644
--- a/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/XmlRpcFileManager.java
+++ b/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/XmlRpcFileManager.java
@@ -54,6 +54,8 @@ import org.apache.oodt.cas.metadata.Metadata;
 import org.apache.oodt.cas.metadata.exceptions.MetExtractionException;
 import org.apache.oodt.commons.date.DateUtils;
 import org.apache.oodt.config.Component;
+import org.apache.oodt.config.ConfigEventType;
+import org.apache.oodt.config.ConfigurationListener;
 import org.apache.oodt.config.ConfigurationManager;
 import org.apache.oodt.config.ConfigurationManagerFactory;
 import org.apache.xmlrpc.WebServer;
@@ -112,6 +114,18 @@ public class XmlRpcFileManager {
 
   /** Configuration Manager instance which will handle the configuration aspect in distributed/standalone manner */
   private ConfigurationManager configurationManager;
+  private ConfigurationListener configurationListener = new ConfigurationListener() {
+    @Override
+    public void configurationChanged(ConfigEventType type) {
+      switch (type) {
+        case PUBLISH:
+          refreshConfigAndPolicy();
+          break;
+        case CLEAR:
+          // TODO: 8/19/17 What should we do if the config has been cleared?
+      }
+    }
+  };
 
   /**
    * <p> Creates a new XmlRpcFileManager with the given metadata store factory, and the given data store factory, on the
@@ -135,8 +149,9 @@ public class XmlRpcFileManager {
     }
 
     configurationManager = ConfigurationManagerFactory.getConfigurationManager(Component.FILE_MANAGER, propertiesFiles);
-
+    configurationManager.addConfigurationListener(configurationListener);
     this.loadConfiguration();
+
     LOG.log(Level.INFO, "File Manager started by " + System.getProperty("user.name", "unknown"));
   }
 
@@ -1224,6 +1239,7 @@ public class XmlRpcFileManager {
   }
 
   public boolean shutdown() {
+    configurationManager.removeConfigurationListener(configurationListener);
     configurationManager.clearConfiguration();
     if (this.webServer != null) {
       this.webServer.shutdown();

http://git-wip-us.apache.org/repos/asf/oodt/blob/df1db1e4/pge/pom.xml
----------------------------------------------------------------------
diff --git a/pge/pom.xml b/pge/pom.xml
index 6fd4d72..72a33d9 100644
--- a/pge/pom.xml
+++ b/pge/pom.xml
@@ -27,7 +27,7 @@ the License.
   <artifactId>cas-pge</artifactId>
   <name>CAS PGE Adaptor Framework</name>
   <description>Allows data processing jobs not written in conformance with the
-        PCS PGE interface to be run within the PCS.</description>
+        PCS PGE (Production Generation Executive) interface to be run within the PCS.</description>
   <!-- All dependencies should be listed in core/pom.xml and be ordered alphabetically by package and artifact.
      Once the dependency is in the core pom, it can then be used in other modules without the version tags.
      For example, within core/pom.xml: