You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by lm...@apache.org on 2017/12/05 20:07:37 UTC

[1/2] knox git commit: KNOX-1013 - Monitor Ambari for Cluster Topology changes (Phil Zampino via lmccay)

Repository: knox
Updated Branches:
  refs/heads/master 13287d2c1 -> a874f399e


http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-server/src/main/java/org/apache/hadoop/gateway/services/DefaultGatewayServices.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/DefaultGatewayServices.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/DefaultGatewayServices.java
index 9dca344..626cec0 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/DefaultGatewayServices.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/DefaultGatewayServices.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.gateway.service.config.remote.RemoteConfigurationRegist
 import org.apache.hadoop.gateway.services.config.client.RemoteConfigurationRegistryClientService;
 import org.apache.hadoop.gateway.services.registry.impl.DefaultServiceDefinitionRegistry;
 import org.apache.hadoop.gateway.services.metrics.impl.DefaultMetricsService;
+import org.apache.hadoop.gateway.services.topology.impl.DefaultClusterConfigurationMonitorService;
 import org.apache.hadoop.gateway.services.topology.impl.DefaultTopologyService;
 import org.apache.hadoop.gateway.services.hostmap.impl.DefaultHostMapperService;
 import org.apache.hadoop.gateway.services.registry.impl.DefaultServiceRegistryService;
@@ -112,6 +113,11 @@ public class DefaultGatewayServices implements GatewayServices {
     registryClientService.init(config, options);
     services.put(REMOTE_REGISTRY_CLIENT_SERVICE, registryClientService);
 
+    DefaultClusterConfigurationMonitorService ccs = new DefaultClusterConfigurationMonitorService();
+    ccs.setAliasService(alias);
+    ccs.init(config, options);
+    services.put(CLUSTER_CONFIGURATION_MONITOR_SERVICE, ccs);
+
     DefaultTopologyService tops = new DefaultTopologyService();
     tops.setAliasService(alias);
     tops.init(  config, options  );
@@ -144,6 +150,8 @@ public class DefaultGatewayServices implements GatewayServices {
                             (RemoteConfigurationRegistryClientService)services.get(REMOTE_REGISTRY_CLIENT_SERVICE);
     clientService.start();
 
+    (services.get(CLUSTER_CONFIGURATION_MONITOR_SERVICE)).start();
+
     DefaultTopologyService tops = (DefaultTopologyService)services.get(TOPOLOGY_SERVICE);
     tops.start();
 
@@ -156,6 +164,8 @@ public class DefaultGatewayServices implements GatewayServices {
 
     ks.stop();
 
+    (services.get(CLUSTER_CONFIGURATION_MONITOR_SERVICE)).stop();
+
     DefaultAliasService alias = (DefaultAliasService) services.get(ALIAS_SERVICE);
     alias.stop();
 

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultClusterConfigurationMonitorService.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultClusterConfigurationMonitorService.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultClusterConfigurationMonitorService.java
new file mode 100644
index 0000000..342ce11
--- /dev/null
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultClusterConfigurationMonitorService.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.services.topology.impl;
+
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.services.ServiceLifecycleException;
+import org.apache.hadoop.gateway.services.security.AliasService;
+import org.apache.hadoop.gateway.topology.ClusterConfigurationMonitorService;
+import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitor;
+import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+
+public class DefaultClusterConfigurationMonitorService implements ClusterConfigurationMonitorService {
+
+    private AliasService aliasService = null;
+
+    private Map<String, ClusterConfigurationMonitor> monitors = new HashMap<>();
+
+    @Override
+    public void init(GatewayConfig config, Map<String, String> options) throws ServiceLifecycleException {
+        ServiceLoader<ClusterConfigurationMonitorProvider> providers =
+                                                        ServiceLoader.load(ClusterConfigurationMonitorProvider.class);
+        for (ClusterConfigurationMonitorProvider provider : providers) {
+            // Check the gateway configuration to determine if this type of monitor is enabled
+            if (config.isClusterMonitorEnabled(provider.getType())) {
+                ClusterConfigurationMonitor monitor = provider.newInstance(config, aliasService);
+                if (monitor != null) {
+                    monitors.put(provider.getType(), monitor);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void start() {
+        for (ClusterConfigurationMonitor monitor : monitors.values()) {
+            monitor.start();
+        }
+    }
+
+    @Override
+    public void stop() {
+        for (ClusterConfigurationMonitor monitor : monitors.values()) {
+            monitor.stop();
+        }
+    }
+
+    @Override
+    public ClusterConfigurationMonitor getMonitor(String type) {
+        return monitors.get(type);
+    }
+
+    @Override
+    public void addListener(ClusterConfigurationMonitor.ConfigurationChangeListener listener) {
+        for (ClusterConfigurationMonitor monitor : monitors.values()) {
+            monitor.addListener(listener);
+        }
+    }
+
+    public void setAliasService(AliasService aliasService) {
+        this.aliasService = aliasService;
+    }
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultTopologyService.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultTopologyService.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultTopologyService.java
index 5fc3620..aded6cd 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultTopologyService.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultTopologyService.java
@@ -28,6 +28,7 @@ import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
 import org.apache.commons.io.monitor.FileAlterationMonitor;
 import org.apache.commons.io.monitor.FileAlterationObserver;
 import org.apache.hadoop.gateway.GatewayMessages;
+import org.apache.hadoop.gateway.GatewayServer;
 import org.apache.hadoop.gateway.audit.api.Action;
 import org.apache.hadoop.gateway.audit.api.ActionOutcome;
 import org.apache.hadoop.gateway.audit.api.AuditServiceFactory;
@@ -37,15 +38,18 @@ import org.apache.hadoop.gateway.audit.log4j.audit.AuditConstants;
 import org.apache.hadoop.gateway.config.GatewayConfig;
 import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
 import org.apache.hadoop.gateway.service.definition.ServiceDefinition;
+import org.apache.hadoop.gateway.services.GatewayServices;
 import org.apache.hadoop.gateway.services.ServiceLifecycleException;
 import org.apache.hadoop.gateway.services.security.AliasService;
 import org.apache.hadoop.gateway.services.topology.TopologyService;
+import org.apache.hadoop.gateway.topology.ClusterConfigurationMonitorService;
 import org.apache.hadoop.gateway.topology.Topology;
 import org.apache.hadoop.gateway.topology.TopologyEvent;
 import org.apache.hadoop.gateway.topology.TopologyListener;
 import org.apache.hadoop.gateway.topology.TopologyMonitor;
 import org.apache.hadoop.gateway.topology.TopologyProvider;
 import org.apache.hadoop.gateway.topology.builder.TopologyBuilder;
+import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitor;
 import org.apache.hadoop.gateway.topology.monitor.RemoteConfigurationMonitor;
 import org.apache.hadoop.gateway.topology.monitor.RemoteConfigurationMonitorFactory;
 import org.apache.hadoop.gateway.topology.simple.SimpleDescriptorHandler;
@@ -554,7 +558,10 @@ public class DefaultTopologyService
 
   @Override
   public void start() {
-
+    // Register a cluster configuration monitor listener for change notifications
+    ClusterConfigurationMonitorService ccms =
+                  GatewayServer.getGatewayServices().getService(GatewayServices.CLUSTER_CONFIGURATION_MONITOR_SERVICE);
+    ccms.addListener(new TopologyDiscoveryTrigger(this));
   }
 
   @Override
@@ -589,11 +596,17 @@ public class DefaultTopologyService
       // This happens prior to the start-up loading of the topologies.
       String[] descriptorFilenames =  descriptorsDirectory.list();
       if (descriptorFilenames != null) {
-          for (String descriptorFilename : descriptorFilenames) {
-              if (DescriptorsMonitor.isDescriptorFile(descriptorFilename)) {
-                  descriptorsMonitor.onFileChange(new File(descriptorsDirectory, descriptorFilename));
-              }
+        for (String descriptorFilename : descriptorFilenames) {
+          if (DescriptorsMonitor.isDescriptorFile(descriptorFilename)) {
+            // If there isn't a corresponding topology file, or if the descriptor has been modified since the
+            // corresponding topology file was generated, then trigger generation of one
+            File matchingTopologyFile = getExistingFile(topologiesDirectory, FilenameUtils.getBaseName(descriptorFilename));
+            if (matchingTopologyFile == null ||
+                    matchingTopologyFile.lastModified() < (new File(descriptorsDirectory, descriptorFilename)).lastModified()) {
+              descriptorsMonitor.onFileChange(new File(descriptorsDirectory, descriptorFilename));
+            }
           }
+        }
       }
 
       // Initialize the remote configuration monitor, if it has been configured
@@ -604,7 +617,6 @@ public class DefaultTopologyService
     }
   }
 
-
   /**
    * Utility method for listing the files in the specified directory.
    * This method is "nicer" than the File#listFiles() because it will not return null.
@@ -847,4 +859,37 @@ public class DefaultTopologyService
     }
   }
 
+  /**
+   * Listener for Ambari config change events, which will trigger re-generation (including re-discovery) of the
+   * affected topologies.
+   */
+  private static class TopologyDiscoveryTrigger implements ClusterConfigurationMonitor.ConfigurationChangeListener {
+
+    private TopologyService topologyService = null;
+
+    TopologyDiscoveryTrigger(TopologyService topologyService) {
+      this.topologyService = topologyService;
+    }
+
+    @Override
+    public void onConfigurationChange(String source, String clusterName) {
+      log.noticedClusterConfigurationChange(source, clusterName);
+      try {
+        // Identify any descriptors associated with the cluster configuration change
+        for (File descriptor : topologyService.getDescriptors()) {
+          String descriptorContent = FileUtils.readFileToString(descriptor);
+          if (descriptorContent.contains(source)) {
+            if (descriptorContent.contains(clusterName)) {
+              log.triggeringTopologyRegeneration(source, clusterName, descriptor.getAbsolutePath());
+              // 'Touch' the descriptor to trigger re-generation of the associated topology
+              descriptor.setLastModified(System.currentTimeMillis());
+            }
+          }
+        }
+      } catch (Exception e) {
+        log.errorRespondingToConfigChange(source, clusterName, e);
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/simple/SimpleDescriptorHandler.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/simple/SimpleDescriptorHandler.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/simple/SimpleDescriptorHandler.java
index c44710a..6b9df0d 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/simple/SimpleDescriptorHandler.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/simple/SimpleDescriptorHandler.java
@@ -54,6 +54,8 @@ public class SimpleDescriptorHandler {
 
     private static final SimpleDescriptorMessages log = MessagesFactory.get(SimpleDescriptorMessages.class);
 
+    private static Map<String, ServiceDiscovery> discoveryInstances = new HashMap<>();
+
     public static Map<String, File> handle(File desc) throws IOException {
         return handle(desc, NO_GATEWAY_SERVICES);
     }
@@ -89,7 +91,12 @@ public class SimpleDescriptorHandler {
             discoveryType = "AMBARI";
         }
 
-        ServiceDiscovery sd = ServiceDiscoveryFactory.get(discoveryType, gatewayServices);
+        // Use the cached discovery object for the required type, if it has already been loaded
+        ServiceDiscovery sd = discoveryInstances.get(discoveryType);
+        if (sd == null) {
+            sd = ServiceDiscoveryFactory.get(discoveryType, gatewayServices);
+            discoveryInstances.put(discoveryType, sd);
+        }
         ServiceDiscovery.Cluster cluster = sd.discover(sdc, desc.getClusterName());
 
         List<String> validServiceNames = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-spi/src/main/java/org/apache/hadoop/gateway/config/GatewayConfig.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/config/GatewayConfig.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/config/GatewayConfig.java
index 5cfaf36..e45fd11 100644
--- a/gateway-spi/src/main/java/org/apache/hadoop/gateway/config/GatewayConfig.java
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/config/GatewayConfig.java
@@ -313,7 +313,23 @@ public interface GatewayConfig {
    * @return
    */
   boolean isGatewayServerHeaderEnabled();
-
+  
+  /**
+   *
+   * @param type The type of cluster configuration monitor for which the interval should be returned.
+   *
+   * @return The polling interval configuration value, or -1 if it has not been configured.
+   */
+  int getClusterMonitorPollingInterval(String type);
+  
+  /**
+   *
+   * @param type The type of cluster configuration monitor for which the interval should be returned.
+   *
+   * @return The enabled status of the specified type of cluster configuration monitor.
+   */
+  boolean isClusterMonitorEnabled(String type);
+  
   /**
    * @return The list of the names of any remote registry configurations defined herein.
    */

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/GatewayServices.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/GatewayServices.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/GatewayServices.java
index 2894bbc..222b1f0 100644
--- a/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/GatewayServices.java
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/GatewayServices.java
@@ -41,6 +41,8 @@ public interface GatewayServices extends Service, ProviderDeploymentContributor
 
   String REMOTE_REGISTRY_CLIENT_SERVICE = "RemoteConfigRegistryClientService";
 
+  String CLUSTER_CONFIGURATION_MONITOR_SERVICE = "ClusterConfigurationMonitorService";
+
   public abstract Collection<String> getServiceNames();
 
   public abstract <T> T getService( String serviceName );

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/ClusterConfigurationMonitorService.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/ClusterConfigurationMonitorService.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/ClusterConfigurationMonitorService.java
new file mode 100644
index 0000000..961f2e5
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/ClusterConfigurationMonitorService.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.topology;
+
+import org.apache.hadoop.gateway.services.Service;
+import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitor;
+
+/**
+ * Gateway service for managing cluster configuration monitors.
+ */
+public interface ClusterConfigurationMonitorService extends Service {
+
+    /**
+     *
+     * @param type The type of monitor (e.g., Ambari)
+     *
+     * @return The monitor associated with the specified type, or null if there is no such monitor.
+     */
+    ClusterConfigurationMonitor getMonitor(String type);
+
+
+    /**
+     * Register for configuration change notifications from <em>any</em> of the monitors managed by this service.
+     *
+     * @param listener The listener to register.
+     */
+    void addListener(ClusterConfigurationMonitor.ConfigurationChangeListener listener);
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/discovery/ClusterConfigurationMonitor.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/discovery/ClusterConfigurationMonitor.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/discovery/ClusterConfigurationMonitor.java
new file mode 100644
index 0000000..fc3614d
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/discovery/ClusterConfigurationMonitor.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.topology.discovery;
+
+public interface ClusterConfigurationMonitor {
+
+    /**
+     * Start the monitor.
+     */
+    void start();
+
+    /**
+     * Stop the monitor.
+     */
+    void stop();
+
+    /**
+     *
+     * @param interval The polling interval, in seconds
+     */
+    void setPollingInterval(int interval);
+
+    /**
+     * Register for notifications from the monitor.
+     */
+    void addListener(ConfigurationChangeListener listener);
+
+    /**
+     * Monitor listener interface for receiving notifications that a configuration has changed.
+     */
+    interface ConfigurationChangeListener {
+        void onConfigurationChange(String source, String clusterName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/discovery/ClusterConfigurationMonitorProvider.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/discovery/ClusterConfigurationMonitorProvider.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/discovery/ClusterConfigurationMonitorProvider.java
new file mode 100644
index 0000000..a8d5f30
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/discovery/ClusterConfigurationMonitorProvider.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.topology.discovery;
+
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.services.security.AliasService;
+
+public interface ClusterConfigurationMonitorProvider {
+
+    String getType();
+
+    ClusterConfigurationMonitor newInstance(GatewayConfig config, AliasService aliasService);
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-test-release-utils/src/main/java/org/apache/hadoop/gateway/GatewayTestConfig.java
----------------------------------------------------------------------
diff --git a/gateway-test-release-utils/src/main/java/org/apache/hadoop/gateway/GatewayTestConfig.java b/gateway-test-release-utils/src/main/java/org/apache/hadoop/gateway/GatewayTestConfig.java
index f7ea633..e04c581 100644
--- a/gateway-test-release-utils/src/main/java/org/apache/hadoop/gateway/GatewayTestConfig.java
+++ b/gateway-test-release-utils/src/main/java/org/apache/hadoop/gateway/GatewayTestConfig.java
@@ -640,4 +640,14 @@ public class GatewayTestConfig extends Configuration implements GatewayConfig {
   public String getRemoteConfigurationMonitorClientName() {
     return null;
   }
+
+  @Override
+  public int getClusterMonitorPollingInterval(String type) {
+    return 600;
+  }
+
+  @Override
+  public boolean isClusterMonitorEnabled(String type) {
+    return false;
+  }
 }


[2/2] knox git commit: KNOX-1013 - Monitor Ambari for Cluster Topology changes (Phil Zampino via lmccay)

Posted by lm...@apache.org.
KNOX-1013 - Monitor Ambari for Cluster Topology changes (Phil Zampino via lmccay)

Project: http://git-wip-us.apache.org/repos/asf/knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/a874f399
Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/a874f399
Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/a874f399

Branch: refs/heads/master
Commit: a874f399e05835c359ded4ee2c7b822c7baa3231
Parents: 13287d2
Author: Larry McCay <lm...@hortonworks.com>
Authored: Tue Dec 5 15:07:16 2017 -0500
Committer: Larry McCay <lm...@hortonworks.com>
Committed: Tue Dec 5 15:07:32 2017 -0500

----------------------------------------------------------------------
 .../discovery/ambari/AmbariClientCommon.java    | 102 ++++
 .../discovery/ambari/AmbariCluster.java         |   5 +
 ...bariClusterConfigurationMonitorProvider.java |  35 ++
 .../ambari/AmbariConfigurationMonitor.java      | 525 +++++++++++++++++++
 .../ambari/AmbariServiceDiscovery.java          | 228 ++++----
 .../ambari/AmbariServiceDiscoveryMessages.java  |  51 +-
 .../topology/discovery/ambari/RESTInvoker.java  | 136 +++++
 .../ambari/ServiceURLPropertyConfig.java        |   2 +-
 ...iscovery.ClusterConfigurationMonitorProvider |  19 +
 .../ambari/AmbariConfigurationMonitorTest.java  | 319 +++++++++++
 .../ambari/AmbariServiceDiscoveryTest.java      |  28 +-
 gateway-release/home/conf/gateway-site.xml      |  12 +
 .../apache/hadoop/gateway/GatewayMessages.java  |  16 +
 .../gateway/config/impl/GatewayConfigImpl.java  |  15 +
 .../services/DefaultGatewayServices.java        |  10 +
 ...faultClusterConfigurationMonitorService.java |  81 +++
 .../topology/impl/DefaultTopologyService.java   |  57 +-
 .../simple/SimpleDescriptorHandler.java         |   9 +-
 .../hadoop/gateway/config/GatewayConfig.java    |  18 +-
 .../gateway/services/GatewayServices.java       |   2 +
 .../ClusterConfigurationMonitorService.java     |  43 ++
 .../discovery/ClusterConfigurationMonitor.java  |  48 ++
 .../ClusterConfigurationMonitorProvider.java    |  27 +
 .../hadoop/gateway/GatewayTestConfig.java       |  10 +
 24 files changed, 1633 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClientCommon.java
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClientCommon.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClientCommon.java
new file mode 100644
index 0000000..a2bf4ea
--- /dev/null
+++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClientCommon.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.topology.discovery.ambari;
+
+import net.minidev.json.JSONArray;
+import net.minidev.json.JSONObject;
+import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.apache.hadoop.gateway.services.security.AliasService;
+import org.apache.hadoop.gateway.topology.discovery.ServiceDiscoveryConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+
+class AmbariClientCommon {
+
+    static final String AMBARI_CLUSTERS_URI = "/api/v1/clusters";
+
+    static final String AMBARI_HOSTROLES_URI =
+                                    AMBARI_CLUSTERS_URI + "/%s/services?fields=components/host_components/HostRoles";
+
+    static final String AMBARI_SERVICECONFIGS_URI =
+                                    AMBARI_CLUSTERS_URI + "/%s/configurations/service_config_versions?is_current=true";
+
+    private static final AmbariServiceDiscoveryMessages log = MessagesFactory.get(AmbariServiceDiscoveryMessages.class);
+
+    private RESTInvoker restClient;
+
+
+    AmbariClientCommon(AliasService aliasService) {
+        this(new RESTInvoker(aliasService));
+    }
+
+
+    AmbariClientCommon(RESTInvoker restInvoker) {
+        this.restClient = restInvoker;
+    }
+
+
+
+    Map<String, Map<String, AmbariCluster.ServiceConfiguration>> getActiveServiceConfigurations(String clusterName,
+                                                                                                ServiceDiscoveryConfig config) {
+        return getActiveServiceConfigurations(config.getAddress(),
+                                              clusterName,
+                                              config.getUser(),
+                                              config.getPasswordAlias());
+    }
+
+
+    Map<String, Map<String, AmbariCluster.ServiceConfiguration>> getActiveServiceConfigurations(String discoveryAddress,
+                                                                                                String clusterName,
+                                                                                                String discoveryUser,
+                                                                                                String discoveryPwdAlias) {
+        Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigurations = new HashMap<>();
+
+        String serviceConfigsURL = String.format("%s" + AMBARI_SERVICECONFIGS_URI, discoveryAddress, clusterName);
+
+        JSONObject serviceConfigsJSON = restClient.invoke(serviceConfigsURL, discoveryUser, discoveryPwdAlias);
+        if (serviceConfigsJSON != null) {
+            // Process the service configurations
+            JSONArray serviceConfigs = (JSONArray) serviceConfigsJSON.get("items");
+            for (Object serviceConfig : serviceConfigs) {
+                String serviceName = (String) ((JSONObject) serviceConfig).get("service_name");
+                JSONArray configurations = (JSONArray) ((JSONObject) serviceConfig).get("configurations");
+                for (Object configuration : configurations) {
+                    String configType = (String) ((JSONObject) configuration).get("type");
+                    String configVersion = String.valueOf(((JSONObject) configuration).get("version"));
+
+                    Map<String, String> configProps = new HashMap<>();
+                    JSONObject configProperties = (JSONObject) ((JSONObject) configuration).get("properties");
+                    for (String propertyName : configProperties.keySet()) {
+                        configProps.put(propertyName, String.valueOf(((JSONObject) configProperties).get(propertyName)));
+                    }
+                    if (!serviceConfigurations.containsKey(serviceName)) {
+                        serviceConfigurations.put(serviceName, new HashMap<>());
+                    }
+                    serviceConfigurations.get(serviceName).put(configType,
+                                                               new AmbariCluster.ServiceConfiguration(configType,
+                                                                                                      configVersion,
+                                                                                                      configProps));
+                }
+            }
+        }
+
+        return serviceConfigurations;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariCluster.java
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariCluster.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariCluster.java
index c841d9c..1d308cc 100644
--- a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariCluster.java
+++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariCluster.java
@@ -63,6 +63,11 @@ class AmbariCluster implements ServiceDiscovery.Cluster {
     }
 
 
+    Map<String, Map<String, ServiceConfiguration>> getServiceConfigurations() {
+        return serviceConfigurations;
+    }
+
+
     Map<String, AmbariComponent> getComponents() {
         return components;
     }

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClusterConfigurationMonitorProvider.java
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClusterConfigurationMonitorProvider.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClusterConfigurationMonitorProvider.java
new file mode 100644
index 0000000..3b31124
--- /dev/null
+++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClusterConfigurationMonitorProvider.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.topology.discovery.ambari;
+
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.services.security.AliasService;
+import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitor;
+import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider;
+
+public class AmbariClusterConfigurationMonitorProvider implements ClusterConfigurationMonitorProvider {
+
+    @Override
+    public String getType() {
+        return AmbariConfigurationMonitor.getType();
+    }
+
+    @Override
+    public ClusterConfigurationMonitor newInstance(GatewayConfig config, AliasService aliasService) {
+        return new AmbariConfigurationMonitor(config, aliasService);
+    }
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitor.java
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitor.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitor.java
new file mode 100644
index 0000000..e4b5e43
--- /dev/null
+++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitor.java
@@ -0,0 +1,525 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.topology.discovery.ambari;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.apache.hadoop.gateway.services.security.AliasService;
+import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitor;
+import org.apache.hadoop.gateway.topology.discovery.ServiceDiscoveryConfig;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+
+class AmbariConfigurationMonitor implements ClusterConfigurationMonitor {
+
+    private static final String TYPE = "Ambari";
+
+    private static final String CLUSTERS_DATA_DIR_NAME = "clusters";
+
+    private static final String PERSISTED_FILE_COMMENT = "Generated File. Do Not Edit!";
+
+    private static final String PROP_CLUSTER_PREFIX = "cluster.";
+    private static final String PROP_CLUSTER_SOURCE = PROP_CLUSTER_PREFIX + "source";
+    private static final String PROP_CLUSTER_NAME   = PROP_CLUSTER_PREFIX + "name";
+    private static final String PROP_CLUSTER_USER   = PROP_CLUSTER_PREFIX + "user";
+    private static final String PROP_CLUSTER_ALIAS  = PROP_CLUSTER_PREFIX + "pwd.alias";
+
+    static final String INTERVAL_PROPERTY_NAME = "org.apache.hadoop.gateway.topology.discovery.ambari.monitor.interval";
+
+
+    private static final AmbariServiceDiscoveryMessages log = MessagesFactory.get(AmbariServiceDiscoveryMessages.class);
+
+    // Ambari address
+    //    clusterName -> ServiceDiscoveryConfig
+    //
+    Map<String, Map<String, ServiceDiscoveryConfig>> clusterMonitorConfigurations = new HashMap<>();
+
+    // Ambari address
+    //    clusterName
+    //        configType -> version
+    //
+    Map<String, Map<String, Map<String, String>>> ambariClusterConfigVersions = new HashMap<>();
+
+    ReadWriteLock configVersionsLock = new ReentrantReadWriteLock();
+
+    private List<ConfigurationChangeListener> changeListeners = new ArrayList<>();
+
+    private AmbariClientCommon ambariClient;
+
+    PollingConfigAnalyzer internalMonitor;
+
+    GatewayConfig gatewayConfig = null;
+
+    static String getType() {
+        return TYPE;
+    }
+
+    AmbariConfigurationMonitor(GatewayConfig config, AliasService aliasService) {
+        this.gatewayConfig   = config;
+        this.ambariClient    = new AmbariClientCommon(aliasService);
+        this.internalMonitor = new PollingConfigAnalyzer(this);
+
+        // Override the default polling interval if it has been configured
+        int interval = config.getClusterMonitorPollingInterval(getType());
+        if (interval > 0) {
+            setPollingInterval(interval);
+        }
+
+        init();
+    }
+
+    @Override
+    public void setPollingInterval(int interval) {
+        internalMonitor.setInterval(interval);
+    }
+
+    private void init() {
+        loadDiscoveryConfiguration();
+        loadClusterVersionData();
+    }
+
+    /**
+     * Load any previously-persisted service discovery configurations.
+     * This is necessary for checking previously-deployed topologies.
+     */
+    private void loadDiscoveryConfiguration() {
+        File persistenceDir = getPersistenceDir();
+        if (persistenceDir != null) {
+            Collection<File> persistedConfigs = FileUtils.listFiles(persistenceDir, new String[]{"conf"}, false);
+            for (File persisted : persistedConfigs) {
+                Properties props = new Properties();
+                try {
+                    props.load(new FileInputStream(persisted));
+
+                    addDiscoveryConfig(props.getProperty(PROP_CLUSTER_NAME), new ServiceDiscoveryConfig() {
+                                                            public String getAddress() {
+                                                                return props.getProperty(PROP_CLUSTER_SOURCE);
+                                                            }
+
+                                                            public String getUser() {
+                                                                return props.getProperty(PROP_CLUSTER_USER);
+                                                            }
+
+                                                            public String getPasswordAlias() {
+                                                                return props.getProperty(PROP_CLUSTER_ALIAS);
+                                                            }
+                                                        });
+                } catch (IOException e) {
+                    log.failedToLoadClusterMonitorServiceDiscoveryConfig(getType(), e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Load any previously-persisted cluster configuration version records, so the monitor will check
+     * previously-deployed topologies against the current cluster configuration.
+     */
+    private void loadClusterVersionData() {
+        File persistenceDir = getPersistenceDir();
+        if (persistenceDir != null) {
+            Collection<File> persistedConfigs = FileUtils.listFiles(getPersistenceDir(), new String[]{"ver"}, false);
+            for (File persisted : persistedConfigs) {
+                Properties props = new Properties();
+                try {
+                    props.load(new FileInputStream(persisted));
+
+                    String source = props.getProperty(PROP_CLUSTER_SOURCE);
+                    String clusterName = props.getProperty(PROP_CLUSTER_NAME);
+
+                    Map<String, String> configVersions = new HashMap<>();
+                    for (String name : props.stringPropertyNames()) {
+                        if (!name.startsWith(PROP_CLUSTER_PREFIX)) { // Ignore implementation-specific properties
+                            configVersions.put(name, props.getProperty(name));
+                        }
+                    }
+
+                    // Map the config versions to the cluster name
+                    addClusterConfigVersions(source, clusterName, configVersions);
+
+                } catch (IOException e) {
+                    log.failedToLoadClusterMonitorConfigVersions(getType(), e);
+                }
+            }
+        }
+    }
+
+    private void persistDiscoveryConfiguration(String clusterName, ServiceDiscoveryConfig sdc) {
+        File persistenceDir = getPersistenceDir();
+        if (persistenceDir != null) {
+
+            Properties props = new Properties();
+            props.setProperty(PROP_CLUSTER_NAME, clusterName);
+            props.setProperty(PROP_CLUSTER_SOURCE, sdc.getAddress());
+
+            String username = sdc.getUser();
+            if (username != null) {
+                props.setProperty(PROP_CLUSTER_USER, username);
+            }
+            String pwdAlias = sdc.getPasswordAlias();
+            if (pwdAlias != null) {
+                props.setProperty(PROP_CLUSTER_ALIAS, pwdAlias);
+            }
+
+            persist(props, getDiscoveryConfigPersistenceFile(sdc.getAddress(), clusterName));
+        }
+    }
+
+    private void persistClusterVersionData(String address, String clusterName, Map<String, String> configVersions) {
+        File persistenceDir = getPersistenceDir();
+        if (persistenceDir != null) {
+            Properties props = new Properties();
+            props.setProperty(PROP_CLUSTER_NAME, clusterName);
+            props.setProperty(PROP_CLUSTER_SOURCE, address);
+            for (String name : configVersions.keySet()) {
+                props.setProperty(name, configVersions.get(name));
+            }
+
+            persist(props, getConfigVersionsPersistenceFile(address, clusterName));
+        }
+    }
+
+    private void persist(Properties props, File dest) {
+        try {
+            props.store(new FileOutputStream(dest), PERSISTED_FILE_COMMENT);
+        } catch (Exception e) {
+            log.failedToPersistClusterMonitorData(getType(), dest.getAbsolutePath(), e);
+        }
+    }
+
+    private File getPersistenceDir() {
+        File persistenceDir = null;
+
+        File dataDir = new File(gatewayConfig.getGatewayDataDir());
+        if (dataDir.exists()) {
+            File clustersDir = new File(dataDir, CLUSTERS_DATA_DIR_NAME);
+            if (!clustersDir.exists()) {
+                clustersDir.mkdirs();
+            }
+            persistenceDir = clustersDir;
+        }
+
+        return persistenceDir;
+    }
+
+    private File getDiscoveryConfigPersistenceFile(String address, String clusterName) {
+        return getPersistenceFile(address, clusterName, "conf");
+    }
+
+    private File getConfigVersionsPersistenceFile(String address, String clusterName) {
+        return getPersistenceFile(address, clusterName, "ver");
+    }
+
+    private File getPersistenceFile(String address, String clusterName, String ext) {
+        String fileName = address.replace(":", "_").replace("/", "_") + "-" + clusterName + "." + ext;
+        return new File(getPersistenceDir(), fileName);
+    }
+
+    /**
+     * Add cluster configuration details to the monitor's in-memory record.
+     *
+     * @param address        An Ambari instance address.
+     * @param clusterName    The name of a cluster associated with the Ambari instance.
+     * @param configVersions A Map of configuration types and their corresponding versions.
+     */
+    private void addClusterConfigVersions(String address, String clusterName, Map<String, String> configVersions) {
+        configVersionsLock.writeLock().lock();
+        try {
+            ambariClusterConfigVersions.computeIfAbsent(address, k -> new HashMap<>())
+                                       .put(clusterName, configVersions);
+        } finally {
+            configVersionsLock.writeLock().unlock();
+        }
+    }
+
+    public void start() {
+        (new Thread(internalMonitor, "AmbariConfigurationMonitor")).start();
+    }
+
+    public void stop() {
+        internalMonitor.stop();
+    }
+
+    @Override
+    public void addListener(ConfigurationChangeListener listener) {
+        changeListeners.add(listener);
+    }
+
+    /**
+     * Add discovery configuration details for the specified cluster, so the monitor knows how to connect to check for
+     * changes.
+     *
+     * @param clusterName The name of the cluster.
+     * @param config      The associated service discovery configuration.
+     */
+    void addDiscoveryConfig(String clusterName, ServiceDiscoveryConfig config) {
+        clusterMonitorConfigurations.computeIfAbsent(config.getAddress(), k -> new HashMap<>()).put(clusterName, config);
+    }
+
+
+    /**
+     * Get the service discovery configuration associated with the specified Ambari instance and cluster.
+     *
+     * @param address     An Ambari instance address.
+     * @param clusterName The name of a cluster associated with the Ambari instance.
+     *
+     * @return The associated ServiceDiscoveryConfig object.
+     */
+    ServiceDiscoveryConfig getDiscoveryConfig(String address, String clusterName) {
+        ServiceDiscoveryConfig config = null;
+        if (clusterMonitorConfigurations.containsKey(address)) {
+            config = clusterMonitorConfigurations.get(address).get(clusterName);
+        }
+        return config;
+    }
+
+
+    /**
+     * Add cluster configuration data to the monitor, which it will use when determining if configuration has changed.
+     *
+     * @param cluster         An AmbariCluster object.
+     * @param discoveryConfig The discovery configuration associated with the cluster.
+     */
+    void addClusterConfigVersions(AmbariCluster cluster, ServiceDiscoveryConfig discoveryConfig) {
+
+        String clusterName = cluster.getName();
+
+        // Register the cluster discovery configuration for the monitor connections
+        persistDiscoveryConfiguration(clusterName, discoveryConfig);
+        addDiscoveryConfig(clusterName, discoveryConfig);
+
+        // Build the set of configuration versions
+        Map<String, String> configVersions = new HashMap<>();
+        Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigs = cluster.getServiceConfigurations();
+        for (String serviceName : serviceConfigs.keySet()) {
+            Map<String, AmbariCluster.ServiceConfiguration> configTypeVersionMap = serviceConfigs.get(serviceName);
+            for (AmbariCluster.ServiceConfiguration config : configTypeVersionMap.values()) {
+                String configType = config.getType();
+                String version = config.getVersion();
+                configVersions.put(configType, version);
+            }
+        }
+
+        persistClusterVersionData(discoveryConfig.getAddress(), clusterName, configVersions);
+        addClusterConfigVersions(discoveryConfig.getAddress(), clusterName, configVersions);
+    }
+
+
+    /**
+     * Remove the configuration record for the specified Ambari instance and cluster name.
+     *
+     * @param address     An Ambari instance address.
+     * @param clusterName The name of a cluster associated with the Ambari instance.
+     *
+     * @return The removed data; A Map of configuration types and their corresponding versions.
+     */
+    Map<String, String> removeClusterConfigVersions(String address, String clusterName) {
+        Map<String, String> result = new HashMap<>();
+
+        configVersionsLock.writeLock().lock();
+        try {
+            if (ambariClusterConfigVersions.containsKey(address)) {
+                result.putAll(ambariClusterConfigVersions.get(address).remove(clusterName));
+            }
+        } finally {
+            configVersionsLock.writeLock().unlock();
+        }
+
+        // Delete the associated persisted record
+        File persisted = getConfigVersionsPersistenceFile(address, clusterName);
+        if (persisted.exists()) {
+            persisted.delete();
+        }
+
+        return result;
+    }
+
+    /**
+     * Get the cluster configuration details for the specified cluster and Ambari instance.
+     *
+     * @param address     An Ambari instance address.
+     * @param clusterName The name of a cluster associated with the Ambari instance.
+     *
+     * @return A Map of configuration types and their corresponding versions.
+     */
+    Map<String, String> getClusterConfigVersions(String address, String clusterName) {
+        Map<String, String> result = new HashMap<>();
+
+        configVersionsLock.readLock().lock();
+        try {
+            if (ambariClusterConfigVersions.containsKey(address)) {
+                result.putAll(ambariClusterConfigVersions.get(address).get(clusterName));
+            }
+        } finally {
+            configVersionsLock.readLock().unlock();
+        }
+
+        return result;
+    }
+
+
+    /**
+     * Get all the clusters the monitor knows about.
+     *
+     * @return A Map of Ambari instance addresses to associated cluster names.
+     */
+    Map<String, List<String>> getClusterNames() {
+        Map<String, List<String>> result = new HashMap<>();
+
+        configVersionsLock.readLock().lock();
+        try {
+            for (String address : ambariClusterConfigVersions.keySet()) {
+                List<String> clusterNames = new ArrayList<>();
+                clusterNames.addAll(ambariClusterConfigVersions.get(address).keySet());
+                result.put(address, clusterNames);
+            }
+        } finally {
+            configVersionsLock.readLock().unlock();
+        }
+
+        return result;
+
+    }
+
+
+    /**
+     * Notify registered change listeners.
+     *
+     * @param source      The address of the Ambari instance from which the cluster details were determined.
+     * @param clusterName The name of the cluster whose configuration details have changed.
+     */
+    void notifyChangeListeners(String source, String clusterName) {
+        for (ConfigurationChangeListener listener : changeListeners) {
+            listener.onConfigurationChange(source, clusterName);
+        }
+    }
+
+
+    /**
+     * Request the current active configuration version info from Ambari.
+     *
+     * @param address     The Ambari instance address.
+     * @param clusterName The name of the cluster for which the details are desired.
+     *
+     * @return A Map of service configuration types and their corresponding versions.
+     */
+    Map<String, String> getUpdatedConfigVersions(String address, String clusterName) {
+        Map<String, String> configVersions = new HashMap<>();
+
+        Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigs =
+                    ambariClient.getActiveServiceConfigurations(clusterName, getDiscoveryConfig(address, clusterName));
+
+        for (Map<String, AmbariCluster.ServiceConfiguration> serviceConfig : serviceConfigs.values()) {
+            for (AmbariCluster.ServiceConfiguration config : serviceConfig.values()) {
+                configVersions.put(config.getType(), config.getVersion());
+            }
+        }
+
+        return configVersions;
+    }
+
+
+    /**
+     * The thread that polls Ambari for configuration details for clusters associated with discovered topologies,
+     * compares them with the current recorded values, and notifies any listeners when differences are discovered.
+     */
+    static final class PollingConfigAnalyzer implements Runnable {
+
+        private static final int DEFAULT_POLLING_INTERVAL = 60;
+
+        // Polling interval in seconds
+        private int interval = DEFAULT_POLLING_INTERVAL;
+
+        private AmbariConfigurationMonitor delegate;
+
+        private boolean isActive = false;
+
+        PollingConfigAnalyzer(AmbariConfigurationMonitor delegate) {
+            this.delegate = delegate;
+            this.interval = Integer.getInteger(INTERVAL_PROPERTY_NAME, PollingConfigAnalyzer.DEFAULT_POLLING_INTERVAL);
+        }
+
+        void setInterval(int interval) {
+            this.interval = interval;
+        }
+
+
+        void stop() {
+            isActive = false;
+        }
+
+        @Override
+        public void run() {
+            isActive = true;
+
+            log.startedAmbariConfigMonitor(interval);
+
+            while (isActive) {
+                for (Map.Entry<String, List<String>> entry : delegate.getClusterNames().entrySet()) {
+                    String address = entry.getKey();
+                    for (String clusterName : entry.getValue()) {
+                        Map<String, String> configVersions = delegate.getClusterConfigVersions(address, clusterName);
+                        if (configVersions != null && !configVersions.isEmpty()) {
+                            Map<String, String> updatedVersions = delegate.getUpdatedConfigVersions(address, clusterName);
+                            if (updatedVersions != null && !updatedVersions.isEmpty()) {
+                                boolean configHasChanged = false;
+
+                                // If the config sets don't match in size, then something has changed
+                                if (updatedVersions.size() != configVersions.size()) {
+                                    configHasChanged = true;
+                                } else {
+                                    // Perform the comparison of all the config versions
+                                    for (Map.Entry<String, String> configVersion : configVersions.entrySet()) {
+                                        if (!updatedVersions.get(configVersion.getKey()).equals(configVersion.getValue())) {
+                                            configHasChanged = true;
+                                            break;
+                                        }
+                                    }
+                                }
+
+                                // If a change has occurred, notify the listeners
+                                if (configHasChanged) {
+                                    delegate.notifyChangeListeners(address, clusterName);
+                                }
+                            }
+                        }
+                    }
+                }
+
+                try {
+                    Thread.sleep(interval * 1000);
+                } catch (InterruptedException e) {
+                    // Ignore
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscovery.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscovery.java
index b7f9f53..765a928 100644
--- a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscovery.java
+++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscovery.java
@@ -16,7 +16,7 @@
  */
 package org.apache.hadoop.gateway.topology.discovery.ambari;
 
-import java.io.IOException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -25,38 +25,32 @@ import java.util.Properties;
 
 import net.minidev.json.JSONArray;
 import net.minidev.json.JSONObject;
-import net.minidev.json.JSONValue;
-import org.apache.hadoop.gateway.config.ConfigurationException;
 import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.apache.hadoop.gateway.services.GatewayServices;
 import org.apache.hadoop.gateway.services.security.AliasService;
-import org.apache.hadoop.gateway.services.security.AliasServiceException;
+import org.apache.hadoop.gateway.topology.ClusterConfigurationMonitorService;
+import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitor;
 import org.apache.hadoop.gateway.topology.discovery.GatewayService;
 import org.apache.hadoop.gateway.topology.discovery.ServiceDiscovery;
 import org.apache.hadoop.gateway.topology.discovery.ServiceDiscoveryConfig;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpStatus;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.message.BasicHeader;
-import org.apache.http.util.EntityUtils;
 
 
 class AmbariServiceDiscovery implements ServiceDiscovery {
 
     static final String TYPE = "AMBARI";
 
-    static final String AMBARI_CLUSTERS_URI = "/api/v1/clusters";
+    static final String AMBARI_CLUSTERS_URI = AmbariClientCommon.AMBARI_CLUSTERS_URI;
 
-    static final String AMBARI_HOSTROLES_URI =
-                                       AMBARI_CLUSTERS_URI + "/%s/services?fields=components/host_components/HostRoles";
+    static final String AMBARI_HOSTROLES_URI = AmbariClientCommon.AMBARI_HOSTROLES_URI;
 
-    static final String AMBARI_SERVICECONFIGS_URI =
-            AMBARI_CLUSTERS_URI + "/%s/configurations/service_config_versions?is_current=true";
+    static final String AMBARI_SERVICECONFIGS_URI = AmbariClientCommon.AMBARI_SERVICECONFIGS_URI;
 
     private static final String COMPONENT_CONFIG_MAPPING_FILE =
                                                         "ambari-service-discovery-component-config-mapping.properties";
 
+    private static final String GATEWAY_SERVICES_ACCESSOR_CLASS  = "org.apache.hadoop.gateway.GatewayServer";
+    private static final String GATEWAY_SERVICES_ACCESSOR_METHOD = "getGatewayServices";
+
     private static final AmbariServiceDiscoveryMessages log = MessagesFactory.get(AmbariServiceDiscoveryMessages.class);
 
     // Map of component names to service configuration types
@@ -69,21 +63,76 @@ class AmbariServiceDiscovery implements ServiceDiscovery {
                 componentServiceConfigs.put(componentName, configMapping.getProperty(componentName));
             }
         } catch (Exception e) {
-            log.failedToLoadServiceDiscoveryConfiguration(COMPONENT_CONFIG_MAPPING_FILE, e);
+            log.failedToLoadServiceDiscoveryURLDefConfiguration(COMPONENT_CONFIG_MAPPING_FILE, e);
         }
     }
 
-    private static final String DEFAULT_USER_ALIAS = "ambari.discovery.user";
-    private static final String DEFAULT_PWD_ALIAS  = "ambari.discovery.password";
-
     @GatewayService
     private AliasService aliasService;
 
-    private CloseableHttpClient httpClient = null;
+    private RESTInvoker restClient;
+    private AmbariClientCommon ambariClient;
 
+    // This is used to update the monitor when new cluster configuration details are discovered.
+    private AmbariConfigurationMonitor configChangeMonitor;
+
+    private boolean isInitialized = false;
 
     AmbariServiceDiscovery() {
-        httpClient = org.apache.http.impl.client.HttpClients.createDefault();
+    }
+
+
+    AmbariServiceDiscovery(RESTInvoker restClient) {
+        this.restClient = restClient;
+    }
+
+
+    /**
+     * Initialization must be subsequent to construction because the AliasService member isn't assigned until after
+     * construction time. This is called internally prior to discovery invocations to make sure the clients have been
+     * initialized.
+     */
+    private void init() {
+        if (!isInitialized) {
+            if (this.restClient == null) {
+                this.restClient = new RESTInvoker(aliasService);
+            }
+            this.ambariClient = new AmbariClientCommon(restClient);
+            this.configChangeMonitor = getConfigurationChangeMonitor();
+
+            isInitialized = true;
+        }
+    }
+
+
+    /**
+     * Get the Ambari configuration change monitor from the associated gateway service.
+     */
+    private AmbariConfigurationMonitor getConfigurationChangeMonitor() {
+        AmbariConfigurationMonitor ambariMonitor = null;
+        try {
+            Class clazz = Class.forName(GATEWAY_SERVICES_ACCESSOR_CLASS);
+            if (clazz != null) {
+                Method m = clazz.getDeclaredMethod(GATEWAY_SERVICES_ACCESSOR_METHOD);
+                if (m != null) {
+                    Object obj = m.invoke(null);
+                    if (GatewayServices.class.isAssignableFrom(obj.getClass())) {
+                        ClusterConfigurationMonitorService clusterMonitorService =
+                              ((GatewayServices) obj).getService(GatewayServices.CLUSTER_CONFIGURATION_MONITOR_SERVICE);
+                        ClusterConfigurationMonitor monitor =
+                                                 clusterMonitorService.getMonitor(AmbariConfigurationMonitor.getType());
+                        if (monitor != null) {
+                            if (AmbariConfigurationMonitor.class.isAssignableFrom(monitor.getClass())) {
+                                ambariMonitor = (AmbariConfigurationMonitor) monitor;
+                            }
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.errorAccessingConfigurationChangeMonitor(e);
+        }
+        return ambariMonitor;
     }
 
 
@@ -95,14 +144,16 @@ class AmbariServiceDiscovery implements ServiceDiscovery {
 
     @Override
     public Map<String, Cluster> discover(ServiceDiscoveryConfig config) {
-        Map<String, Cluster> clusters = new HashMap<String, Cluster>();
+        Map<String, Cluster> clusters = new HashMap<>();
+
+        init();
 
         String discoveryAddress = config.getAddress();
 
         // Invoke Ambari REST API to discover the available clusters
         String clustersDiscoveryURL = String.format("%s" + AMBARI_CLUSTERS_URI, discoveryAddress);
 
-        JSONObject json = invokeREST(clustersDiscoveryURL, config.getUser(), config.getPasswordAlias());
+        JSONObject json = restClient.invoke(clustersDiscoveryURL, config.getUser(), config.getPasswordAlias());
 
         // Parse the cluster names from the response, and perform the cluster discovery
         JSONArray clusterItems = (JSONArray) json.get("items");
@@ -126,13 +177,15 @@ class AmbariServiceDiscovery implements ServiceDiscovery {
 
         Map<String, String> serviceComponents = new HashMap<>();
 
+        init();
+
         String discoveryAddress = config.getAddress();
         String discoveryUser = config.getUser();
         String discoveryPwdAlias = config.getPasswordAlias();
 
         Map<String, List<String>> componentHostNames = new HashMap<>();
         String hostRolesURL = String.format("%s" + AMBARI_HOSTROLES_URI, discoveryAddress, clusterName);
-        JSONObject hostRolesJSON = invokeREST(hostRolesURL, discoveryUser, discoveryPwdAlias);
+        JSONObject hostRolesJSON = restClient.invoke(hostRolesURL, discoveryUser, discoveryPwdAlias);
         if (hostRolesJSON != null) {
             // Process the host roles JSON
             JSONArray items = (JSONArray) hostRolesJSON.get("items");
@@ -158,7 +211,7 @@ class AmbariServiceDiscovery implements ServiceDiscovery {
                         if (hostName != null) {
                             log.discoveredServiceHost(serviceName, hostName);
                             if (!componentHostNames.containsKey(componentName)) {
-                                componentHostNames.put(componentName, new ArrayList<String>());
+                                componentHostNames.put(componentName, new ArrayList<>());
                             }
                             componentHostNames.get(componentName).add(hostName);
                         }
@@ -167,31 +220,15 @@ class AmbariServiceDiscovery implements ServiceDiscovery {
             }
         }
 
+        // Service configurations
         Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigurations =
-                                                 new HashMap<String, Map<String, AmbariCluster.ServiceConfiguration>>();
-        String serviceConfigsURL = String.format("%s" + AMBARI_SERVICECONFIGS_URI, discoveryAddress, clusterName);
-        JSONObject serviceConfigsJSON = invokeREST(serviceConfigsURL, discoveryUser, discoveryPwdAlias);
-        if (serviceConfigsJSON != null) {
-            // Process the service configurations
-            JSONArray serviceConfigs = (JSONArray) serviceConfigsJSON.get("items");
-            for (Object serviceConfig : serviceConfigs) {
-                String serviceName = (String) ((JSONObject) serviceConfig).get("service_name");
-                JSONArray configurations = (JSONArray) ((JSONObject) serviceConfig).get("configurations");
-                for (Object configuration : configurations) {
-                    String configType = (String) ((JSONObject) configuration).get("type");
-                    String configVersion = String.valueOf(((JSONObject) configuration).get("version"));
-
-                    Map<String, String> configProps = new HashMap<String, String>();
-                    JSONObject configProperties = (JSONObject) ((JSONObject) configuration).get("properties");
-                    for (String propertyName : configProperties.keySet()) {
-                        configProps.put(propertyName, String.valueOf(((JSONObject) configProperties).get(propertyName)));
-                    }
-                    if (!serviceConfigurations.containsKey(serviceName)) {
-                        serviceConfigurations.put(serviceName, new HashMap<String, AmbariCluster.ServiceConfiguration>());
-                    }
-                    serviceConfigurations.get(serviceName).put(configType, new AmbariCluster.ServiceConfiguration(configType, configVersion, configProps));
-                    cluster.addServiceConfiguration(serviceName, configType, new AmbariCluster.ServiceConfiguration(configType, configVersion, configProps));
-                }
+                                                        ambariClient.getActiveServiceConfigurations(discoveryAddress,
+                                                                                                    clusterName,
+                                                                                                    discoveryUser,
+                                                                                                    discoveryPwdAlias);
+        for (String serviceName : serviceConfigurations.keySet()) {
+            for (Map.Entry<String, AmbariCluster.ServiceConfiguration> serviceConfig : serviceConfigurations.get(serviceName).entrySet()) {
+                cluster.addServiceConfiguration(serviceName, serviceConfig.getKey(), serviceConfig.getValue());
             }
         }
 
@@ -214,93 +251,12 @@ class AmbariServiceDiscovery implements ServiceDiscovery {
             }
         }
 
-        return cluster;
-    }
-
-
-    protected JSONObject invokeREST(String url, String username, String passwordAlias) {
-        JSONObject result = null;
-
-        CloseableHttpResponse response = null;
-        try {
-            HttpGet request = new HttpGet(url);
-
-            // If no configured username, then use default username alias
-            String password = null;
-            if (username == null) {
-                if (aliasService != null) {
-                    try {
-                        char[] defaultUser = aliasService.getPasswordFromAliasForGateway(DEFAULT_USER_ALIAS);
-                        if (defaultUser != null) {
-                            username = new String(defaultUser);
-                        }
-                    } catch (AliasServiceException e) {
-                        log.aliasServiceUserError(DEFAULT_USER_ALIAS, e.getLocalizedMessage());
-                    }
-                }
-
-                // If username is still null
-                if (username == null) {
-                    log.aliasServiceUserNotFound();
-                    throw new ConfigurationException("No username is configured for Ambari service discovery.");
-                }
-            }
-
-            if (aliasService != null) {
-                // If no password alias is configured, then try the default alias
-                if (passwordAlias == null) {
-                    passwordAlias = DEFAULT_PWD_ALIAS;
-                }
-
-                try {
-                    char[] pwd = aliasService.getPasswordFromAliasForGateway(passwordAlias);
-                    if (pwd != null) {
-                        password = new String(pwd);
-                    }
-
-                } catch (AliasServiceException e) {
-                    log.aliasServicePasswordError(passwordAlias, e.getLocalizedMessage());
-                }
-            }
-
-            // If the password could not be determined
-            if (password == null) {
-                log.aliasServicePasswordNotFound();
-                throw new ConfigurationException("No password is configured for Ambari service discovery.");
-            }
-
-            // Add an auth header if credentials are available
-            String encodedCreds =
-                    org.apache.commons.codec.binary.Base64.encodeBase64String((username + ":" + password).getBytes());
-            request.addHeader(new BasicHeader("Authorization", "Basic " + encodedCreds));
-
-            response = httpClient.execute(request);
-
-            if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) {
-                HttpEntity entity = response.getEntity();
-                if (entity != null) {
-                    result = (JSONObject) JSONValue.parse((EntityUtils.toString(entity)));
-                    log.debugJSON(result.toJSONString());
-                } else {
-                    log.noJSON(url);
-                }
-            } else {
-                log.unexpectedRestResponseStatusCode(url, response.getStatusLine().getStatusCode());
-            }
-
-        } catch (IOException e) {
-            log.restInvocationError(url, e);
-        } finally {
-            if(response != null) {
-                try {
-                    response.close();
-                } catch (IOException e) {
-                    // Ignore
-                }
-            }
+        if (configChangeMonitor != null) {
+            // Notify the cluster config monitor about these cluster configuration details
+            configChangeMonitor.addClusterConfigVersions(cluster, config);
         }
-        return result;
-    }
 
+        return cluster;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryMessages.java
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryMessages.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryMessages.java
index 0661224..51bbe0e 100644
--- a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryMessages.java
+++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryMessages.java
@@ -25,24 +25,44 @@ import org.apache.hadoop.gateway.i18n.messages.StackTrace;
 public interface AmbariServiceDiscoveryMessages {
 
     @Message(level = MessageLevel.ERROR,
-            text = "Failed to load service discovery configuration: {1}")
-    void failedToLoadServiceDiscoveryConfiguration(@StackTrace(level = MessageLevel.ERROR) Exception e);
+             text = "Failed to persist data for cluster configuration monitor {0} {1}: {2}")
+    void failedToPersistClusterMonitorData(final String monitor,
+                                           final String filename,
+                                           @StackTrace(level = MessageLevel.DEBUG) Exception e);
 
     @Message(level = MessageLevel.ERROR,
-             text = "Failed to load service discovery configuration {0}: {1}")
-    void failedToLoadServiceDiscoveryConfiguration(final String configuration,
-                               @StackTrace(level = MessageLevel.ERROR) Exception e);
+             text = "Failed to load persisted service discovery configuration for cluster monitor {0} : {1}")
+    void failedToLoadClusterMonitorServiceDiscoveryConfig(final String monitor,
+                                                          @StackTrace(level = MessageLevel.DEBUG) Exception e);
+
+    @Message(level = MessageLevel.ERROR,
+            text = "Failed to load persisted cluster configuration version data for cluster monitor {0} : {1}")
+    void failedToLoadClusterMonitorConfigVersions(final String monitor,
+                                                  @StackTrace(level = MessageLevel.DEBUG) Exception e);
+
+    @Message(level = MessageLevel.ERROR,
+             text = "Unable to access the Ambari Configuration Change Monitor: {0}")
+    void errorAccessingConfigurationChangeMonitor(@StackTrace(level = MessageLevel.DEBUG) Exception e);
+
+    @Message(level = MessageLevel.ERROR,
+             text = "Failed to load service discovery URL definition configuration: {1}")
+    void failedToLoadServiceDiscoveryURLDefConfiguration(@StackTrace(level = MessageLevel.DEBUG) Exception e);
+
+    @Message(level = MessageLevel.ERROR,
+             text = "Failed to load service discovery URL definition configuration {0}: {1}")
+    void failedToLoadServiceDiscoveryURLDefConfiguration(final String configuration,
+                                                         @StackTrace(level = MessageLevel.ERROR) Exception e);
 
     @Message(level = MessageLevel.ERROR,
              text = "Encountered an error during cluster {0} discovery: {1}")
     void clusterDiscoveryError(final String clusterName,
-                               @StackTrace(level = MessageLevel.ERROR) Exception e);
+                               @StackTrace(level = MessageLevel.DEBUG) Exception e);
 
 
     @Message(level = MessageLevel.DEBUG,
              text = "REST invocation {0} failed: {1}")
     void restInvocationError(final String url,
-                             @StackTrace(level = MessageLevel.ERROR) Exception e);
+                             @StackTrace(level = MessageLevel.DEBUG) Exception e);
 
 
     @Message(level = MessageLevel.ERROR,
@@ -75,20 +95,23 @@ public interface AmbariServiceDiscoveryMessages {
     void noJSON(final String url);
 
 
-    @Message(level = MessageLevel.DEBUG,
+    @Message(level = MessageLevel.TRACE,
              text = "REST invocation result: {0}")
     void debugJSON(final String json);
 
+
     @Message(level = MessageLevel.DEBUG,
-            text = "Loaded component configuration mappings: {0}")
+             text = "Loaded component configuration mappings: {0}")
     void loadedComponentConfigMappings(final String mappings);
 
+
     @Message(level = MessageLevel.ERROR,
              text = "Failed to load component configuration property mappings {0}: {1}")
     void failedToLoadComponentConfigMappings(final String mappings,
-                                             @StackTrace(level = MessageLevel.ERROR) Exception e);
+                                             @StackTrace(level = MessageLevel.DEBUG) Exception e);
 
-    @Message(level = MessageLevel.DEBUG,
+
+    @Message(level = MessageLevel.TRACE,
              text = "Discovered: Service: {0}, Host: {1}")
     void discoveredServiceHost(final String serviceName, final String hostName);
 
@@ -114,8 +137,12 @@ public interface AmbariServiceDiscoveryMessages {
 
 
     @Message(level = MessageLevel.DEBUG,
-            text = "Determined the service URL mapping property {0} value: {1}")
+             text = "Determined the service URL mapping property {0} value: {1}")
     void determinedPropertyValue(final String propertyName, final String propertyValue);
 
 
+    @Message(level = MessageLevel.INFO,
+             text = "Started Ambari cluster configuration monitor (checking every {0} seconds)")
+    void startedAmbariConfigMonitor(final long pollingInterval);
+
 }

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/RESTInvoker.java
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/RESTInvoker.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/RESTInvoker.java
new file mode 100644
index 0000000..6a6fad8
--- /dev/null
+++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/RESTInvoker.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.topology.discovery.ambari;
+
+import net.minidev.json.JSONObject;
+import net.minidev.json.JSONValue;
+import org.apache.hadoop.gateway.config.ConfigurationException;
+import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.apache.hadoop.gateway.services.security.AliasService;
+import org.apache.hadoop.gateway.services.security.AliasServiceException;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+
+class RESTInvoker {
+
+    private static final String DEFAULT_USER_ALIAS = "ambari.discovery.user";
+    private static final String DEFAULT_PWD_ALIAS  = "ambari.discovery.password";
+
+    private static final AmbariServiceDiscoveryMessages log = MessagesFactory.get(AmbariServiceDiscoveryMessages.class);
+
+    private AliasService aliasService = null;
+
+    private CloseableHttpClient httpClient = org.apache.http.impl.client.HttpClients.createDefault();
+
+
+    RESTInvoker(AliasService aliasService) {
+        this.aliasService = aliasService;
+    }
+
+
+    JSONObject invoke(String url, String username, String passwordAlias) {
+        JSONObject result = null;
+
+        CloseableHttpResponse response = null;
+        try {
+            HttpGet request = new HttpGet(url);
+
+            // If no configured username, then use default username alias
+            String password = null;
+            if (username == null) {
+                if (aliasService != null) {
+                    try {
+                        char[] defaultUser = aliasService.getPasswordFromAliasForGateway(DEFAULT_USER_ALIAS);
+                        if (defaultUser != null) {
+                            username = new String(defaultUser);
+                        }
+                    } catch (AliasServiceException e) {
+                        log.aliasServiceUserError(DEFAULT_USER_ALIAS, e.getLocalizedMessage());
+                    }
+                }
+
+                // If username is still null
+                if (username == null) {
+                    log.aliasServiceUserNotFound();
+                    throw new ConfigurationException("No username is configured for Ambari service discovery.");
+                }
+            }
+
+            if (aliasService != null) {
+                // If no password alias is configured, then try the default alias
+                if (passwordAlias == null) {
+                    passwordAlias = DEFAULT_PWD_ALIAS;
+                }
+
+                try {
+                    char[] pwd = aliasService.getPasswordFromAliasForGateway(passwordAlias);
+                    if (pwd != null) {
+                        password = new String(pwd);
+                    }
+
+                } catch (AliasServiceException e) {
+                    log.aliasServicePasswordError(passwordAlias, e.getLocalizedMessage());
+                }
+            }
+
+            // If the password could not be determined
+            if (password == null) {
+                log.aliasServicePasswordNotFound();
+                throw new ConfigurationException("No password is configured for Ambari service discovery.");
+            }
+
+            // Add an auth header if credentials are available
+            String encodedCreds =
+                    org.apache.commons.codec.binary.Base64.encodeBase64String((username + ":" + password).getBytes());
+            request.addHeader(new BasicHeader("Authorization", "Basic " + encodedCreds));
+
+            response = httpClient.execute(request);
+
+            if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) {
+                HttpEntity entity = response.getEntity();
+                if (entity != null) {
+                    result = (JSONObject) JSONValue.parse((EntityUtils.toString(entity)));
+                    log.debugJSON(result.toJSONString());
+                } else {
+                    log.noJSON(url);
+                }
+            } else {
+                log.unexpectedRestResponseStatusCode(url, response.getStatusLine().getStatusCode());
+            }
+
+        } catch (IOException e) {
+            log.restInvocationError(url, e);
+        } finally {
+            if(response != null) {
+                try {
+                    response.close();
+                } catch (IOException e) {
+                    // Ignore
+                }
+            }
+        }
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/ServiceURLPropertyConfig.java
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/ServiceURLPropertyConfig.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/ServiceURLPropertyConfig.java
index 3330cc3..deb5bb3 100644
--- a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/ServiceURLPropertyConfig.java
+++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/ServiceURLPropertyConfig.java
@@ -110,7 +110,7 @@ class ServiceURLPropertyConfig {
                 }
             }
         } catch (Exception e) {
-            log.failedToLoadServiceDiscoveryConfiguration(e);
+            log.failedToLoadServiceDiscoveryURLDefConfiguration(e);
         } finally {
             try {
                 source.close();

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/resources/META-INF/services/org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/main/resources/META-INF/services/org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider b/gateway-discovery-ambari/src/main/resources/META-INF/services/org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider
new file mode 100644
index 0000000..d9b2b05
--- /dev/null
+++ b/gateway-discovery-ambari/src/main/resources/META-INF/services/org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider
@@ -0,0 +1,19 @@
+##########################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##########################################################################
+
+org.apache.hadoop.gateway.topology.discovery.ambari.AmbariClusterConfigurationMonitorProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitorTest.java
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitorTest.java b/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitorTest.java
new file mode 100644
index 0000000..2d8b276
--- /dev/null
+++ b/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitorTest.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.topology.discovery.ambari;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.topology.discovery.ServiceDiscoveryConfig;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class AmbariConfigurationMonitorTest {
+
+    private File dataDir = null;
+
+    @Before
+    public void setup() throws Exception {
+        File targetDir = new File( System.getProperty("user.dir"), "target");
+        File tempDir = new File(targetDir, this.getClass().getName() + "__data__" + UUID.randomUUID());
+        FileUtils.forceMkdir(tempDir);
+        dataDir = tempDir;
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        dataDir.delete();
+    }
+
+    @Test
+    public void testPollingMonitor() throws Exception {
+        final String addr1 = "http://host1:8080";
+        final String addr2 = "http://host2:8080";
+        final String cluster1Name = "Cluster_One";
+        final String cluster2Name = "Cluster_Two";
+
+
+        GatewayConfig config = EasyMock.createNiceMock(GatewayConfig.class);
+        EasyMock.expect(config.getGatewayDataDir()).andReturn(dataDir.getAbsolutePath()).anyTimes();
+        EasyMock.expect(config.getClusterMonitorPollingInterval(AmbariConfigurationMonitor.getType()))
+                .andReturn(10)
+                .anyTimes();
+        EasyMock.replay(config);
+
+        // Create the monitor
+        TestableAmbariConfigurationMonitor monitor = new TestableAmbariConfigurationMonitor(config);
+
+        // Clear the system property now that the monitor has been initialized
+        System.clearProperty(AmbariConfigurationMonitor.INTERVAL_PROPERTY_NAME);
+
+
+        // Sequence of config changes for testing monitoring for updates
+        Map<String, Map<String, List<List<AmbariCluster.ServiceConfiguration>>>> updateConfigurations = new HashMap<>();
+
+        updateConfigurations.put(addr1, new HashMap<>());
+        updateConfigurations.get(addr1).put(cluster1Name, Arrays.asList(Arrays.asList(createTestServiceConfig("zoo.cfg", "3"),
+                                                                                      createTestServiceConfig("hive-site", "2")),
+                                                                        Arrays.asList(createTestServiceConfig("zoo.cfg", "3"),
+                                                                                      createTestServiceConfig("hive-site", "3")),
+                                                                        Arrays.asList(createTestServiceConfig("zoo.cfg", "2"),
+                                                                                      createTestServiceConfig("hive-site", "1"))));
+
+        updateConfigurations.put(addr2, new HashMap<>());
+        updateConfigurations.get(addr2).put(cluster2Name, Arrays.asList(Arrays.asList(createTestServiceConfig("zoo.cfg", "1"),
+                                                                                      createTestServiceConfig("hive-site", "1")),
+                                                                        Collections.singletonList(createTestServiceConfig("zoo.cfg", "1")),
+                                                                        Arrays.asList(createTestServiceConfig("zoo.cfg", "1"),
+                                                                                      createTestServiceConfig("hive-site", "2"))));
+
+        updateConfigurations.get(addr2).put(cluster1Name, Arrays.asList(Arrays.asList(createTestServiceConfig("zoo.cfg", "2"),
+                                                                                      createTestServiceConfig("hive-site", "4")),
+                                                                        Arrays.asList(createTestServiceConfig("zoo.cfg", "3"),
+                                                                                      createTestServiceConfig("hive-site", "4"),
+                                                                                      createTestServiceConfig("yarn-site", "1")),
+                                                                        Arrays.asList(createTestServiceConfig("zoo.cfg", "1"),
+                                                                                      createTestServiceConfig("hive-site", "2"))));
+
+        Map<String, Map<String, Integer>> configChangeIndex = new HashMap<>();
+        configChangeIndex.put(addr1, new HashMap<>());
+        configChangeIndex.get(addr1).put(cluster1Name, 0);
+        configChangeIndex.get(addr1).put(cluster2Name, 0);
+        configChangeIndex.put(addr2, new HashMap<>());
+        configChangeIndex.get(addr2).put(cluster2Name, 0);
+
+        // Setup the initial test update data
+        // Cluster 1 data change
+        monitor.addTestConfigVersion(addr1, cluster1Name, "zoo.cfg", "2");
+        monitor.addTestConfigVersion(addr1, cluster1Name, "hive-site", "1");
+
+        // Cluster 2 NO data change
+        monitor.addTestConfigVersion(addr2, cluster1Name, "zoo.cfg", "1");
+        monitor.addTestConfigVersion(addr2, cluster1Name, "hive-site", "1");
+
+        // Cluster 3 data change
+        monitor.addTestConfigVersion(addr2, cluster2Name, "zoo.cfg", "1");
+        monitor.addTestConfigVersion(addr2, cluster2Name, "hive-site", "2");
+
+        Map<String, Map<String, AmbariCluster.ServiceConfiguration>> initialAmbariClusterConfigs = new HashMap<>();
+
+        Map<String, AmbariCluster.ServiceConfiguration> cluster1Configs = new HashMap<>();
+        AmbariCluster.ServiceConfiguration zooCfg = createTestServiceConfig("zoo.cfg", "1");
+        cluster1Configs.put("ZOOKEEPER", zooCfg);
+
+        AmbariCluster.ServiceConfiguration hiveSite = createTestServiceConfig("hive-site", "1");
+        cluster1Configs.put("Hive", hiveSite);
+
+        initialAmbariClusterConfigs.put(cluster1Name, cluster1Configs);
+        AmbariCluster cluster1 = createTestCluster(cluster1Name, initialAmbariClusterConfigs);
+
+        // Tell the monitor about the cluster configurations
+        monitor.addClusterConfigVersions(cluster1, createTestDiscoveryConfig(addr1));
+
+        monitor.addClusterConfigVersions(createTestCluster(cluster2Name, initialAmbariClusterConfigs),
+                                         createTestDiscoveryConfig(addr2));
+
+        monitor.addClusterConfigVersions(createTestCluster(cluster1Name, initialAmbariClusterConfigs),
+                                         createTestDiscoveryConfig(addr2));
+
+        final Map<String, Map<String, Integer>> changeNotifications = new HashMap<>();
+        monitor.addListener((src, cname) -> {
+//            System.out.println("Cluster config changed: " + cname + " @ " + src);
+            // Record the notification
+            Integer notificationCount  = changeNotifications.computeIfAbsent(src, s -> new HashMap<>())
+                                                            .computeIfAbsent(cname, c -> Integer.valueOf(0));
+            changeNotifications.get(src).put(cname, (notificationCount+=1));
+
+            // Update the config version
+            int changeIndex = configChangeIndex.get(src).get(cname);
+            if (changeIndex < updateConfigurations.get(src).get(cname).size()) {
+                List<AmbariCluster.ServiceConfiguration> changes = updateConfigurations.get(src).get(cname).get(changeIndex);
+
+//                System.out.println("Applying config update " + changeIndex + " to " + cname + " @ " + src + " ...");
+                for (AmbariCluster.ServiceConfiguration change : changes) {
+                    monitor.updateConfigState(src, cname, change.getType(), change.getVersion());
+//                    System.out.println("    Updated " + change.getType() + " to version " + change.getVersion());
+                }
+
+                // Increment the change index
+                configChangeIndex.get(src).replace(cname, changeIndex + 1);
+
+//                System.out.println("Monitor config updated for " + cname + " @ " + src + " : " + changeIndex );
+            }
+        });
+
+        try {
+            monitor.start();
+
+            long expiration = System.currentTimeMillis() + (1000 * 30);
+            while (!areChangeUpdatesExhausted(updateConfigurations, configChangeIndex)
+                                                                        && (System.currentTimeMillis() < expiration)) {
+                try {
+                    Thread.sleep(5);
+                } catch (InterruptedException e) {
+                    //
+                }
+            }
+
+        } finally {
+            monitor.stop();
+        }
+
+        assertNotNull("Expected changes to have been reported for source 1.",
+                      changeNotifications.get(addr1));
+
+        assertEquals("Expected changes to have been reported.",
+                     3, changeNotifications.get(addr1).get(cluster1Name).intValue());
+
+        assertNotNull("Expected changes to have been reported for source 2.",
+                      changeNotifications.get(addr2));
+
+        assertEquals("Expected changes to have been reported.",
+                     3, changeNotifications.get(addr2).get(cluster2Name).intValue());
+
+        assertNull("Expected changes to have been reported.",
+                   changeNotifications.get(addr2).get(cluster1Name));
+    }
+
+
+    private static boolean areChangeUpdatesExhausted(Map<String, Map<String, List<List<AmbariCluster.ServiceConfiguration>>>> updates,
+                                              Map<String, Map<String, Integer>> configChangeIndeces) {
+        boolean isExhausted = true;
+
+        for (String address : updates.keySet()) {
+            Map<String, List<List<AmbariCluster.ServiceConfiguration>>> clusterConfigs = updates.get(address);
+            for (String clusterName : clusterConfigs.keySet()) {
+                Integer configChangeCount = clusterConfigs.get(clusterName).size();
+                if (configChangeIndeces.get(address).containsKey(clusterName)) {
+                    if (configChangeIndeces.get(address).get(clusterName) < configChangeCount) {
+                        isExhausted = false;
+                        break;
+                    }
+                }
+            }
+        }
+
+        return isExhausted;
+    }
+
+    /**
+     *
+     * @param name           The cluster name
+     * @param serviceConfigs A map of service configurations (keyed by service name)
+     *
+     * @return
+     */
+    private AmbariCluster createTestCluster(String name,
+                                            Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigs) {
+        AmbariCluster c = EasyMock.createNiceMock(AmbariCluster.class);
+        EasyMock.expect(c.getName()).andReturn(name).anyTimes();
+        EasyMock.expect(c.getServiceConfigurations()).andReturn(serviceConfigs).anyTimes();
+        EasyMock.replay(c);
+        return c;
+    }
+
+    private AmbariCluster.ServiceConfiguration createTestServiceConfig(String name, String version) {
+        AmbariCluster.ServiceConfiguration sc = EasyMock.createNiceMock(AmbariCluster.ServiceConfiguration.class);
+        EasyMock.expect(sc.getType()).andReturn(name).anyTimes();
+        EasyMock.expect(sc.getVersion()).andReturn(version).anyTimes();
+        EasyMock.replay(sc);
+        return sc;
+    }
+
+    private ServiceDiscoveryConfig createTestDiscoveryConfig(String address) {
+        return createTestDiscoveryConfig(address, null, null);
+    }
+
+    private ServiceDiscoveryConfig createTestDiscoveryConfig(String address, String username, String pwdAlias) {
+        ServiceDiscoveryConfig sdc = EasyMock.createNiceMock(ServiceDiscoveryConfig.class);
+        EasyMock.expect(sdc.getAddress()).andReturn(address).anyTimes();
+        EasyMock.expect(sdc.getUser()).andReturn(username).anyTimes();
+        EasyMock.expect(sdc.getPasswordAlias()).andReturn(pwdAlias).anyTimes();
+        EasyMock.replay(sdc);
+        return sdc;
+    }
+
+    /**
+     * AmbariConfigurationMonitor extension that replaces the collection of updated configuration data with a static
+     * mechanism rather than the REST invocation mechanism.
+     */
+    private static final class TestableAmbariConfigurationMonitor extends AmbariConfigurationMonitor {
+
+        Map<String, Map<String, Map<String, String>>> configVersionData = new HashMap<>();
+
+        TestableAmbariConfigurationMonitor(GatewayConfig config) {
+            super(config, null);
+        }
+
+        void addTestConfigVersion(String address, String clusterName, String configType, String configVersion) {
+            configVersionData.computeIfAbsent(address, a -> new HashMap<>())
+                             .computeIfAbsent(clusterName, cl -> new HashMap<>())
+                             .put(configType, configVersion);
+        }
+
+        void addTestConfigVersions(String address, String clusterName, Map<String, String> configVersions) {
+            configVersionData.computeIfAbsent(address, a -> new HashMap<>())
+                             .computeIfAbsent(clusterName, cl -> new HashMap<>())
+                             .putAll(configVersions);
+        }
+
+        void updateTestConfigVersion(String address, String clusterName, String configType, String updatedVersions) {
+            configVersionData.computeIfAbsent(address, a -> new HashMap<>())
+                             .computeIfAbsent(clusterName, cl -> new HashMap<>())
+                             .replace(configType, updatedVersions);
+        }
+
+        void updateTestConfigVersions(String address, String clusterName, Map<String, String> updatedVersions) {
+            configVersionData.computeIfAbsent(address, a -> new HashMap<>())
+                             .computeIfAbsent(clusterName, cl -> new HashMap<>())
+                             .replaceAll((k,v) -> updatedVersions.get(k));
+        }
+
+        void updateConfigState(String address, String clusterName, String configType, String configVersion) {
+            configVersionsLock.writeLock().lock();
+            try {
+                if (ambariClusterConfigVersions.containsKey(address)) {
+                    ambariClusterConfigVersions.get(address).get(clusterName).replace(configType, configVersion);
+                }
+            } finally {
+                configVersionsLock.writeLock().unlock();
+            }
+        }
+
+        @Override
+        Map<String, String> getUpdatedConfigVersions(String address, String clusterName) {
+            Map<String, Map<String, String>> clusterConfigVersions = configVersionData.get(address);
+            if (clusterConfigVersions != null) {
+                return clusterConfigVersions.get(clusterName);
+            }
+            return null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryTest.java b/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryTest.java
index f7f0553..d4dad95 100644
--- a/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryTest.java
+++ b/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryTest.java
@@ -119,26 +119,38 @@ public class AmbariServiceDiscoveryTest {
      */
     private static final class TestAmbariServiceDiscovery extends AmbariServiceDiscovery {
 
+        final static String CLUSTER_PLACEHOLDER = TestRESTInvoker.CLUSTER_PLACEHOLDER;
+
+        TestAmbariServiceDiscovery(String clusterName) {
+            super(new TestRESTInvoker(clusterName));
+        }
+
+    }
+
+    private static final class TestRESTInvoker extends RESTInvoker {
+
         final static String CLUSTER_PLACEHOLDER = "CLUSTER_NAME";
 
         private Map<String, JSONObject> cannedResponses = new HashMap<>();
 
-        TestAmbariServiceDiscovery(String clusterName) {
+        TestRESTInvoker(String clusterName) {
+            super(null);
+
             cannedResponses.put(AmbariServiceDiscovery.AMBARI_CLUSTERS_URI,
-                                (JSONObject) JSONValue.parse(CLUSTERS_JSON_TEMPLATE.replaceAll(CLUSTER_PLACEHOLDER,
-                                                                                               clusterName)));
+                    (JSONObject) JSONValue.parse(CLUSTERS_JSON_TEMPLATE.replaceAll(CLUSTER_PLACEHOLDER,
+                            clusterName)));
 
             cannedResponses.put(String.format(AmbariServiceDiscovery.AMBARI_HOSTROLES_URI, clusterName),
-                                (JSONObject) JSONValue.parse(HOSTROLES_JSON_TEMPLATE.replaceAll(CLUSTER_PLACEHOLDER,
-                                                                                                clusterName)));
+                    (JSONObject) JSONValue.parse(HOSTROLES_JSON_TEMPLATE.replaceAll(CLUSTER_PLACEHOLDER,
+                            clusterName)));
 
             cannedResponses.put(String.format(AmbariServiceDiscovery.AMBARI_SERVICECONFIGS_URI, clusterName),
-                                (JSONObject) JSONValue.parse(SERVICECONFIGS_JSON_TEMPLATE.replaceAll(CLUSTER_PLACEHOLDER,
-                                                                                                     clusterName)));
+                    (JSONObject) JSONValue.parse(SERVICECONFIGS_JSON_TEMPLATE.replaceAll(CLUSTER_PLACEHOLDER,
+                            clusterName)));
         }
 
         @Override
-        protected JSONObject invokeREST(String url, String username, String passwordAlias) {
+        JSONObject invoke(String url, String username, String passwordAlias) {
             return cannedResponses.get(url.substring(url.indexOf("/api")));
         }
     }

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-release/home/conf/gateway-site.xml
----------------------------------------------------------------------
diff --git a/gateway-release/home/conf/gateway-site.xml b/gateway-release/home/conf/gateway-site.xml
index e06db72..fec5e87 100644
--- a/gateway-release/home/conf/gateway-site.xml
+++ b/gateway-release/home/conf/gateway-site.xml
@@ -73,4 +73,16 @@ limitations under the License.
         <description>Enable/Disable cookie scoping feature.</description>
     </property>
 
+    <property>
+        <name>gateway.cluster.config.monitor.ambari.enabled</name>
+        <value>false</value>
+        <description>Enable/disable Ambari cluster configuration monitoring.</description>
+    </property>
+
+    <property>
+        <name>gateway.cluster.config.monitor.ambari.interval</name>
+        <value>60</value>
+        <description>The interval (in seconds) for polling Ambari for cluster configuration changes.</description>
+    </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayMessages.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayMessages.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayMessages.java
index ab0ab39..92b02ea 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayMessages.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayMessages.java
@@ -596,4 +596,20 @@ public interface GatewayMessages {
             text = "Correcting the suspect permissions for the remote configuration registry entry \"{0}\"." )
   void correctingSuspectWritableRemoteConfigurationEntry(String entryPath);
 
+  @Message(level = MessageLevel.INFO,
+           text = "A cluster configuration change was noticed for {1} @ {0}")
+  void noticedClusterConfigurationChange(final String source, final String clusterName);
+
+
+  @Message(level = MessageLevel.INFO,
+           text = "Triggering topology regeneration for descriptor {2} because of change to the {1} @ {0} configuration.")
+  void triggeringTopologyRegeneration(final String source, final String clusterName, final String affected);
+
+
+  @Message(level = MessageLevel.ERROR,
+           text = "Encountered an error while responding to {1} @ {0} configuration change: {2}")
+  void errorRespondingToConfigChange(final String source,
+                                     final String clusterName,
+                                     @StackTrace(level = MessageLevel.DEBUG) Exception e);
+
 }

http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-server/src/main/java/org/apache/hadoop/gateway/config/impl/GatewayConfigImpl.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/config/impl/GatewayConfigImpl.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/config/impl/GatewayConfigImpl.java
index f6bb9b0..95ae1f2 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/config/impl/GatewayConfigImpl.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/config/impl/GatewayConfigImpl.java
@@ -170,6 +170,11 @@ public class GatewayConfigImpl extends Configuration implements GatewayConfig {
   public static final String MIME_TYPES_TO_COMPRESS = GATEWAY_CONFIG_FILE_PREFIX
       + ".gzip.compress.mime.types";
 
+  public static final String CLUSTER_CONFIG_MONITOR_PREFIX = GATEWAY_CONFIG_FILE_PREFIX + ".cluster.config.monitor.";
+  public static final String CLUSTER_CONFIG_MONITOR_INTERVAL_SUFFIX = ".interval";
+  public static final String CLUSTER_CONFIG_MONITOR_ENABLED_SUFFIX = ".enabled";
+
+
   // These config property names are not inline with the convention of using the
   // GATEWAY_CONFIG_FILE_PREFIX as is done by those above. These are left for
   // backward compatibility. 
@@ -942,6 +947,16 @@ public class GatewayConfigImpl extends Configuration implements GatewayConfig {
   }
 
   @Override
+  public int getClusterMonitorPollingInterval(String type) {
+    return getInt(CLUSTER_CONFIG_MONITOR_PREFIX + type.toLowerCase() + CLUSTER_CONFIG_MONITOR_INTERVAL_SUFFIX, -1);
+  }
+  
+  @Override
+  public boolean isClusterMonitorEnabled(String type) {
+    return getBoolean(CLUSTER_CONFIG_MONITOR_PREFIX + type.toLowerCase() + CLUSTER_CONFIG_MONITOR_ENABLED_SUFFIX, true);
+  }
+
+  @Override
   public List<String> getRemoteRegistryConfigurationNames() {
     List<String> result = new ArrayList<>();