You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by lm...@apache.org on 2017/12/05 20:07:37 UTC
[1/2] knox git commit: KNOX-1013 - Monitor Ambari for Cluster
Topology changes (Phil Zampino via lmccay)
Repository: knox
Updated Branches:
refs/heads/master 13287d2c1 -> a874f399e
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-server/src/main/java/org/apache/hadoop/gateway/services/DefaultGatewayServices.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/DefaultGatewayServices.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/DefaultGatewayServices.java
index 9dca344..626cec0 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/DefaultGatewayServices.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/DefaultGatewayServices.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.gateway.service.config.remote.RemoteConfigurationRegist
import org.apache.hadoop.gateway.services.config.client.RemoteConfigurationRegistryClientService;
import org.apache.hadoop.gateway.services.registry.impl.DefaultServiceDefinitionRegistry;
import org.apache.hadoop.gateway.services.metrics.impl.DefaultMetricsService;
+import org.apache.hadoop.gateway.services.topology.impl.DefaultClusterConfigurationMonitorService;
import org.apache.hadoop.gateway.services.topology.impl.DefaultTopologyService;
import org.apache.hadoop.gateway.services.hostmap.impl.DefaultHostMapperService;
import org.apache.hadoop.gateway.services.registry.impl.DefaultServiceRegistryService;
@@ -112,6 +113,11 @@ public class DefaultGatewayServices implements GatewayServices {
registryClientService.init(config, options);
services.put(REMOTE_REGISTRY_CLIENT_SERVICE, registryClientService);
+ DefaultClusterConfigurationMonitorService ccs = new DefaultClusterConfigurationMonitorService();
+ ccs.setAliasService(alias);
+ ccs.init(config, options);
+ services.put(CLUSTER_CONFIGURATION_MONITOR_SERVICE, ccs);
+
DefaultTopologyService tops = new DefaultTopologyService();
tops.setAliasService(alias);
tops.init( config, options );
@@ -144,6 +150,8 @@ public class DefaultGatewayServices implements GatewayServices {
(RemoteConfigurationRegistryClientService)services.get(REMOTE_REGISTRY_CLIENT_SERVICE);
clientService.start();
+ (services.get(CLUSTER_CONFIGURATION_MONITOR_SERVICE)).start();
+
DefaultTopologyService tops = (DefaultTopologyService)services.get(TOPOLOGY_SERVICE);
tops.start();
@@ -156,6 +164,8 @@ public class DefaultGatewayServices implements GatewayServices {
ks.stop();
+ (services.get(CLUSTER_CONFIGURATION_MONITOR_SERVICE)).stop();
+
DefaultAliasService alias = (DefaultAliasService) services.get(ALIAS_SERVICE);
alias.stop();
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultClusterConfigurationMonitorService.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultClusterConfigurationMonitorService.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultClusterConfigurationMonitorService.java
new file mode 100644
index 0000000..342ce11
--- /dev/null
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultClusterConfigurationMonitorService.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.services.topology.impl;
+
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.services.ServiceLifecycleException;
+import org.apache.hadoop.gateway.services.security.AliasService;
+import org.apache.hadoop.gateway.topology.ClusterConfigurationMonitorService;
+import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitor;
+import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+
+public class DefaultClusterConfigurationMonitorService implements ClusterConfigurationMonitorService {
+
+ private AliasService aliasService = null;
+
+ private Map<String, ClusterConfigurationMonitor> monitors = new HashMap<>();
+
+ @Override
+ public void init(GatewayConfig config, Map<String, String> options) throws ServiceLifecycleException {
+ ServiceLoader<ClusterConfigurationMonitorProvider> providers =
+ ServiceLoader.load(ClusterConfigurationMonitorProvider.class);
+ for (ClusterConfigurationMonitorProvider provider : providers) {
+ // Check the gateway configuration to determine if this type of monitor is enabled
+ if (config.isClusterMonitorEnabled(provider.getType())) {
+ ClusterConfigurationMonitor monitor = provider.newInstance(config, aliasService);
+ if (monitor != null) {
+ monitors.put(provider.getType(), monitor);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void start() {
+ for (ClusterConfigurationMonitor monitor : monitors.values()) {
+ monitor.start();
+ }
+ }
+
+ @Override
+ public void stop() {
+ for (ClusterConfigurationMonitor monitor : monitors.values()) {
+ monitor.stop();
+ }
+ }
+
+ @Override
+ public ClusterConfigurationMonitor getMonitor(String type) {
+ return monitors.get(type);
+ }
+
+ @Override
+ public void addListener(ClusterConfigurationMonitor.ConfigurationChangeListener listener) {
+ for (ClusterConfigurationMonitor monitor : monitors.values()) {
+ monitor.addListener(listener);
+ }
+ }
+
+ public void setAliasService(AliasService aliasService) {
+ this.aliasService = aliasService;
+ }
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultTopologyService.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultTopologyService.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultTopologyService.java
index 5fc3620..aded6cd 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultTopologyService.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultTopologyService.java
@@ -28,6 +28,7 @@ import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
import org.apache.commons.io.monitor.FileAlterationMonitor;
import org.apache.commons.io.monitor.FileAlterationObserver;
import org.apache.hadoop.gateway.GatewayMessages;
+import org.apache.hadoop.gateway.GatewayServer;
import org.apache.hadoop.gateway.audit.api.Action;
import org.apache.hadoop.gateway.audit.api.ActionOutcome;
import org.apache.hadoop.gateway.audit.api.AuditServiceFactory;
@@ -37,15 +38,18 @@ import org.apache.hadoop.gateway.audit.log4j.audit.AuditConstants;
import org.apache.hadoop.gateway.config.GatewayConfig;
import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
import org.apache.hadoop.gateway.service.definition.ServiceDefinition;
+import org.apache.hadoop.gateway.services.GatewayServices;
import org.apache.hadoop.gateway.services.ServiceLifecycleException;
import org.apache.hadoop.gateway.services.security.AliasService;
import org.apache.hadoop.gateway.services.topology.TopologyService;
+import org.apache.hadoop.gateway.topology.ClusterConfigurationMonitorService;
import org.apache.hadoop.gateway.topology.Topology;
import org.apache.hadoop.gateway.topology.TopologyEvent;
import org.apache.hadoop.gateway.topology.TopologyListener;
import org.apache.hadoop.gateway.topology.TopologyMonitor;
import org.apache.hadoop.gateway.topology.TopologyProvider;
import org.apache.hadoop.gateway.topology.builder.TopologyBuilder;
+import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitor;
import org.apache.hadoop.gateway.topology.monitor.RemoteConfigurationMonitor;
import org.apache.hadoop.gateway.topology.monitor.RemoteConfigurationMonitorFactory;
import org.apache.hadoop.gateway.topology.simple.SimpleDescriptorHandler;
@@ -554,7 +558,10 @@ public class DefaultTopologyService
@Override
public void start() {
-
+ // Register a cluster configuration monitor listener for change notifications
+ ClusterConfigurationMonitorService ccms =
+ GatewayServer.getGatewayServices().getService(GatewayServices.CLUSTER_CONFIGURATION_MONITOR_SERVICE);
+ ccms.addListener(new TopologyDiscoveryTrigger(this));
}
@Override
@@ -589,11 +596,17 @@ public class DefaultTopologyService
// This happens prior to the start-up loading of the topologies.
String[] descriptorFilenames = descriptorsDirectory.list();
if (descriptorFilenames != null) {
- for (String descriptorFilename : descriptorFilenames) {
- if (DescriptorsMonitor.isDescriptorFile(descriptorFilename)) {
- descriptorsMonitor.onFileChange(new File(descriptorsDirectory, descriptorFilename));
- }
+ for (String descriptorFilename : descriptorFilenames) {
+ if (DescriptorsMonitor.isDescriptorFile(descriptorFilename)) {
+ // If there isn't a corresponding topology file, or if the descriptor has been modified since the
+ // corresponding topology file was generated, then trigger generation of one
+ File matchingTopologyFile = getExistingFile(topologiesDirectory, FilenameUtils.getBaseName(descriptorFilename));
+ if (matchingTopologyFile == null ||
+ matchingTopologyFile.lastModified() < (new File(descriptorsDirectory, descriptorFilename)).lastModified()) {
+ descriptorsMonitor.onFileChange(new File(descriptorsDirectory, descriptorFilename));
+ }
}
+ }
}
// Initialize the remote configuration monitor, if it has been configured
@@ -604,7 +617,6 @@ public class DefaultTopologyService
}
}
-
/**
* Utility method for listing the files in the specified directory.
* This method is "nicer" than the File#listFiles() because it will not return null.
@@ -847,4 +859,37 @@ public class DefaultTopologyService
}
}
+ /**
+ * Listener for Ambari config change events, which will trigger re-generation (including re-discovery) of the
+ * affected topologies.
+ */
+ private static class TopologyDiscoveryTrigger implements ClusterConfigurationMonitor.ConfigurationChangeListener {
+
+ private TopologyService topologyService = null;
+
+ TopologyDiscoveryTrigger(TopologyService topologyService) {
+ this.topologyService = topologyService;
+ }
+
+ @Override
+ public void onConfigurationChange(String source, String clusterName) {
+ log.noticedClusterConfigurationChange(source, clusterName);
+ try {
+ // Identify any descriptors associated with the cluster configuration change
+ for (File descriptor : topologyService.getDescriptors()) {
+ String descriptorContent = FileUtils.readFileToString(descriptor);
+ if (descriptorContent.contains(source)) {
+ if (descriptorContent.contains(clusterName)) {
+ log.triggeringTopologyRegeneration(source, clusterName, descriptor.getAbsolutePath());
+ // 'Touch' the descriptor to trigger re-generation of the associated topology
+ descriptor.setLastModified(System.currentTimeMillis());
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.errorRespondingToConfigChange(source, clusterName, e);
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/simple/SimpleDescriptorHandler.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/simple/SimpleDescriptorHandler.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/simple/SimpleDescriptorHandler.java
index c44710a..6b9df0d 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/simple/SimpleDescriptorHandler.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/simple/SimpleDescriptorHandler.java
@@ -54,6 +54,8 @@ public class SimpleDescriptorHandler {
private static final SimpleDescriptorMessages log = MessagesFactory.get(SimpleDescriptorMessages.class);
+ private static Map<String, ServiceDiscovery> discoveryInstances = new HashMap<>();
+
public static Map<String, File> handle(File desc) throws IOException {
return handle(desc, NO_GATEWAY_SERVICES);
}
@@ -89,7 +91,12 @@ public class SimpleDescriptorHandler {
discoveryType = "AMBARI";
}
- ServiceDiscovery sd = ServiceDiscoveryFactory.get(discoveryType, gatewayServices);
+ // Use the cached discovery object for the required type, if it has already been loaded
+ ServiceDiscovery sd = discoveryInstances.get(discoveryType);
+ if (sd == null) {
+ sd = ServiceDiscoveryFactory.get(discoveryType, gatewayServices);
+ discoveryInstances.put(discoveryType, sd);
+ }
ServiceDiscovery.Cluster cluster = sd.discover(sdc, desc.getClusterName());
List<String> validServiceNames = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-spi/src/main/java/org/apache/hadoop/gateway/config/GatewayConfig.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/config/GatewayConfig.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/config/GatewayConfig.java
index 5cfaf36..e45fd11 100644
--- a/gateway-spi/src/main/java/org/apache/hadoop/gateway/config/GatewayConfig.java
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/config/GatewayConfig.java
@@ -313,7 +313,23 @@ public interface GatewayConfig {
* @return
*/
boolean isGatewayServerHeaderEnabled();
-
+
+ /**
+ *
+ * @param type The type of cluster configuration monitor for which the interval should be returned.
+ *
+ * @return The polling interval configuration value, or -1 if it has not been configured.
+ */
+ int getClusterMonitorPollingInterval(String type);
+
+ /**
+ *
+ * @param type The type of cluster configuration monitor for which the interval should be returned.
+ *
+ * @return The enabled status of the specified type of cluster configuration monitor.
+ */
+ boolean isClusterMonitorEnabled(String type);
+
/**
* @return The list of the names of any remote registry configurations defined herein.
*/
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/GatewayServices.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/GatewayServices.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/GatewayServices.java
index 2894bbc..222b1f0 100644
--- a/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/GatewayServices.java
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/GatewayServices.java
@@ -41,6 +41,8 @@ public interface GatewayServices extends Service, ProviderDeploymentContributor
String REMOTE_REGISTRY_CLIENT_SERVICE = "RemoteConfigRegistryClientService";
+ String CLUSTER_CONFIGURATION_MONITOR_SERVICE = "ClusterConfigurationMonitorService";
+
public abstract Collection<String> getServiceNames();
public abstract <T> T getService( String serviceName );
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/ClusterConfigurationMonitorService.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/ClusterConfigurationMonitorService.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/ClusterConfigurationMonitorService.java
new file mode 100644
index 0000000..961f2e5
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/ClusterConfigurationMonitorService.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.topology;
+
+import org.apache.hadoop.gateway.services.Service;
+import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitor;
+
+/**
+ * Gateway service for managing cluster configuration monitors.
+ */
+public interface ClusterConfigurationMonitorService extends Service {
+
+ /**
+ *
+ * @param type The type of monitor (e.g., Ambari)
+ *
+ * @return The monitor associated with the specified type, or null if there is no such monitor.
+ */
+ ClusterConfigurationMonitor getMonitor(String type);
+
+
+ /**
+ * Register for configuration change notifications from <em>any</em> of the monitors managed by this service.
+ *
+ * @param listener The listener to register.
+ */
+ void addListener(ClusterConfigurationMonitor.ConfigurationChangeListener listener);
+
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/discovery/ClusterConfigurationMonitor.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/discovery/ClusterConfigurationMonitor.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/discovery/ClusterConfigurationMonitor.java
new file mode 100644
index 0000000..fc3614d
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/discovery/ClusterConfigurationMonitor.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.topology.discovery;
+
+public interface ClusterConfigurationMonitor {
+
+ /**
+ * Start the monitor.
+ */
+ void start();
+
+ /**
+ * Stop the monitor.
+ */
+ void stop();
+
+ /**
+ *
+ * @param interval The polling interval, in seconds
+ */
+ void setPollingInterval(int interval);
+
+ /**
+ * Register for notifications from the monitor.
+ */
+ void addListener(ConfigurationChangeListener listener);
+
+ /**
+ * Monitor listener interface for receiving notifications that a configuration has changed.
+ */
+ interface ConfigurationChangeListener {
+ void onConfigurationChange(String source, String clusterName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/discovery/ClusterConfigurationMonitorProvider.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/discovery/ClusterConfigurationMonitorProvider.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/discovery/ClusterConfigurationMonitorProvider.java
new file mode 100644
index 0000000..a8d5f30
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/discovery/ClusterConfigurationMonitorProvider.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.topology.discovery;
+
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.services.security.AliasService;
+
+public interface ClusterConfigurationMonitorProvider {
+
+ String getType();
+
+ ClusterConfigurationMonitor newInstance(GatewayConfig config, AliasService aliasService);
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-test-release-utils/src/main/java/org/apache/hadoop/gateway/GatewayTestConfig.java
----------------------------------------------------------------------
diff --git a/gateway-test-release-utils/src/main/java/org/apache/hadoop/gateway/GatewayTestConfig.java b/gateway-test-release-utils/src/main/java/org/apache/hadoop/gateway/GatewayTestConfig.java
index f7ea633..e04c581 100644
--- a/gateway-test-release-utils/src/main/java/org/apache/hadoop/gateway/GatewayTestConfig.java
+++ b/gateway-test-release-utils/src/main/java/org/apache/hadoop/gateway/GatewayTestConfig.java
@@ -640,4 +640,14 @@ public class GatewayTestConfig extends Configuration implements GatewayConfig {
public String getRemoteConfigurationMonitorClientName() {
return null;
}
+
+ @Override
+ public int getClusterMonitorPollingInterval(String type) {
+ return 600;
+ }
+
+ @Override
+ public boolean isClusterMonitorEnabled(String type) {
+ return false;
+ }
}
[2/2] knox git commit: KNOX-1013 - Monitor Ambari for Cluster
Topology changes (Phil Zampino via lmccay)
Posted by lm...@apache.org.
KNOX-1013 - Monitor Ambari for Cluster Topology changes (Phil Zampino via lmccay)
Project: http://git-wip-us.apache.org/repos/asf/knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/a874f399
Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/a874f399
Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/a874f399
Branch: refs/heads/master
Commit: a874f399e05835c359ded4ee2c7b822c7baa3231
Parents: 13287d2
Author: Larry McCay <lm...@hortonworks.com>
Authored: Tue Dec 5 15:07:16 2017 -0500
Committer: Larry McCay <lm...@hortonworks.com>
Committed: Tue Dec 5 15:07:32 2017 -0500
----------------------------------------------------------------------
.../discovery/ambari/AmbariClientCommon.java | 102 ++++
.../discovery/ambari/AmbariCluster.java | 5 +
...bariClusterConfigurationMonitorProvider.java | 35 ++
.../ambari/AmbariConfigurationMonitor.java | 525 +++++++++++++++++++
.../ambari/AmbariServiceDiscovery.java | 228 ++++----
.../ambari/AmbariServiceDiscoveryMessages.java | 51 +-
.../topology/discovery/ambari/RESTInvoker.java | 136 +++++
.../ambari/ServiceURLPropertyConfig.java | 2 +-
...iscovery.ClusterConfigurationMonitorProvider | 19 +
.../ambari/AmbariConfigurationMonitorTest.java | 319 +++++++++++
.../ambari/AmbariServiceDiscoveryTest.java | 28 +-
gateway-release/home/conf/gateway-site.xml | 12 +
.../apache/hadoop/gateway/GatewayMessages.java | 16 +
.../gateway/config/impl/GatewayConfigImpl.java | 15 +
.../services/DefaultGatewayServices.java | 10 +
...faultClusterConfigurationMonitorService.java | 81 +++
.../topology/impl/DefaultTopologyService.java | 57 +-
.../simple/SimpleDescriptorHandler.java | 9 +-
.../hadoop/gateway/config/GatewayConfig.java | 18 +-
.../gateway/services/GatewayServices.java | 2 +
.../ClusterConfigurationMonitorService.java | 43 ++
.../discovery/ClusterConfigurationMonitor.java | 48 ++
.../ClusterConfigurationMonitorProvider.java | 27 +
.../hadoop/gateway/GatewayTestConfig.java | 10 +
24 files changed, 1633 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClientCommon.java
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClientCommon.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClientCommon.java
new file mode 100644
index 0000000..a2bf4ea
--- /dev/null
+++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClientCommon.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.topology.discovery.ambari;
+
+import net.minidev.json.JSONArray;
+import net.minidev.json.JSONObject;
+import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.apache.hadoop.gateway.services.security.AliasService;
+import org.apache.hadoop.gateway.topology.discovery.ServiceDiscoveryConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+
+class AmbariClientCommon {
+
+ static final String AMBARI_CLUSTERS_URI = "/api/v1/clusters";
+
+ static final String AMBARI_HOSTROLES_URI =
+ AMBARI_CLUSTERS_URI + "/%s/services?fields=components/host_components/HostRoles";
+
+ static final String AMBARI_SERVICECONFIGS_URI =
+ AMBARI_CLUSTERS_URI + "/%s/configurations/service_config_versions?is_current=true";
+
+ private static final AmbariServiceDiscoveryMessages log = MessagesFactory.get(AmbariServiceDiscoveryMessages.class);
+
+ private RESTInvoker restClient;
+
+
+ AmbariClientCommon(AliasService aliasService) {
+ this(new RESTInvoker(aliasService));
+ }
+
+
+ AmbariClientCommon(RESTInvoker restInvoker) {
+ this.restClient = restInvoker;
+ }
+
+
+
+ Map<String, Map<String, AmbariCluster.ServiceConfiguration>> getActiveServiceConfigurations(String clusterName,
+ ServiceDiscoveryConfig config) {
+ return getActiveServiceConfigurations(config.getAddress(),
+ clusterName,
+ config.getUser(),
+ config.getPasswordAlias());
+ }
+
+
+ Map<String, Map<String, AmbariCluster.ServiceConfiguration>> getActiveServiceConfigurations(String discoveryAddress,
+ String clusterName,
+ String discoveryUser,
+ String discoveryPwdAlias) {
+ Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigurations = new HashMap<>();
+
+ String serviceConfigsURL = String.format("%s" + AMBARI_SERVICECONFIGS_URI, discoveryAddress, clusterName);
+
+ JSONObject serviceConfigsJSON = restClient.invoke(serviceConfigsURL, discoveryUser, discoveryPwdAlias);
+ if (serviceConfigsJSON != null) {
+ // Process the service configurations
+ JSONArray serviceConfigs = (JSONArray) serviceConfigsJSON.get("items");
+ for (Object serviceConfig : serviceConfigs) {
+ String serviceName = (String) ((JSONObject) serviceConfig).get("service_name");
+ JSONArray configurations = (JSONArray) ((JSONObject) serviceConfig).get("configurations");
+ for (Object configuration : configurations) {
+ String configType = (String) ((JSONObject) configuration).get("type");
+ String configVersion = String.valueOf(((JSONObject) configuration).get("version"));
+
+ Map<String, String> configProps = new HashMap<>();
+ JSONObject configProperties = (JSONObject) ((JSONObject) configuration).get("properties");
+ for (String propertyName : configProperties.keySet()) {
+ configProps.put(propertyName, String.valueOf(((JSONObject) configProperties).get(propertyName)));
+ }
+ if (!serviceConfigurations.containsKey(serviceName)) {
+ serviceConfigurations.put(serviceName, new HashMap<>());
+ }
+ serviceConfigurations.get(serviceName).put(configType,
+ new AmbariCluster.ServiceConfiguration(configType,
+ configVersion,
+ configProps));
+ }
+ }
+ }
+
+ return serviceConfigurations;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariCluster.java
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariCluster.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariCluster.java
index c841d9c..1d308cc 100644
--- a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariCluster.java
+++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariCluster.java
@@ -63,6 +63,11 @@ class AmbariCluster implements ServiceDiscovery.Cluster {
}
+ Map<String, Map<String, ServiceConfiguration>> getServiceConfigurations() {
+ return serviceConfigurations;
+ }
+
+
Map<String, AmbariComponent> getComponents() {
return components;
}
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClusterConfigurationMonitorProvider.java
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClusterConfigurationMonitorProvider.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClusterConfigurationMonitorProvider.java
new file mode 100644
index 0000000..3b31124
--- /dev/null
+++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClusterConfigurationMonitorProvider.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.topology.discovery.ambari;
+
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.services.security.AliasService;
+import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitor;
+import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider;
+
+public class AmbariClusterConfigurationMonitorProvider implements ClusterConfigurationMonitorProvider {
+
+ @Override
+ public String getType() {
+ return AmbariConfigurationMonitor.getType();
+ }
+
+ @Override
+ public ClusterConfigurationMonitor newInstance(GatewayConfig config, AliasService aliasService) {
+ return new AmbariConfigurationMonitor(config, aliasService);
+ }
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitor.java
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitor.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitor.java
new file mode 100644
index 0000000..e4b5e43
--- /dev/null
+++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitor.java
@@ -0,0 +1,525 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.topology.discovery.ambari;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.apache.hadoop.gateway.services.security.AliasService;
+import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitor;
+import org.apache.hadoop.gateway.topology.discovery.ServiceDiscoveryConfig;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+
+class AmbariConfigurationMonitor implements ClusterConfigurationMonitor {
+
+ private static final String TYPE = "Ambari";
+
+ private static final String CLUSTERS_DATA_DIR_NAME = "clusters";
+
+ private static final String PERSISTED_FILE_COMMENT = "Generated File. Do Not Edit!";
+
+ private static final String PROP_CLUSTER_PREFIX = "cluster.";
+ private static final String PROP_CLUSTER_SOURCE = PROP_CLUSTER_PREFIX + "source";
+ private static final String PROP_CLUSTER_NAME = PROP_CLUSTER_PREFIX + "name";
+ private static final String PROP_CLUSTER_USER = PROP_CLUSTER_PREFIX + "user";
+ private static final String PROP_CLUSTER_ALIAS = PROP_CLUSTER_PREFIX + "pwd.alias";
+
+ static final String INTERVAL_PROPERTY_NAME = "org.apache.hadoop.gateway.topology.discovery.ambari.monitor.interval";
+
+
+ private static final AmbariServiceDiscoveryMessages log = MessagesFactory.get(AmbariServiceDiscoveryMessages.class);
+
+ // Ambari address
+ // clusterName -> ServiceDiscoveryConfig
+ //
+ Map<String, Map<String, ServiceDiscoveryConfig>> clusterMonitorConfigurations = new HashMap<>();
+
+ // Ambari address
+ // clusterName
+ // configType -> version
+ //
+ Map<String, Map<String, Map<String, String>>> ambariClusterConfigVersions = new HashMap<>();
+
+ ReadWriteLock configVersionsLock = new ReentrantReadWriteLock();
+
+ private List<ConfigurationChangeListener> changeListeners = new ArrayList<>();
+
+ private AmbariClientCommon ambariClient;
+
+ PollingConfigAnalyzer internalMonitor;
+
+ GatewayConfig gatewayConfig = null;
+
+ static String getType() {
+ return TYPE;
+ }
+
+ AmbariConfigurationMonitor(GatewayConfig config, AliasService aliasService) {
+ this.gatewayConfig = config;
+ this.ambariClient = new AmbariClientCommon(aliasService);
+ this.internalMonitor = new PollingConfigAnalyzer(this);
+
+ // Override the default polling interval if it has been configured
+ int interval = config.getClusterMonitorPollingInterval(getType());
+ if (interval > 0) {
+ setPollingInterval(interval);
+ }
+
+ init();
+ }
+
+ @Override
+ public void setPollingInterval(int interval) {
+ internalMonitor.setInterval(interval);
+ }
+
+ private void init() {
+ loadDiscoveryConfiguration();
+ loadClusterVersionData();
+ }
+
+ /**
+ * Load any previously-persisted service discovery configurations.
+ * This is necessary for checking previously-deployed topologies.
+ */
+ private void loadDiscoveryConfiguration() {
+ File persistenceDir = getPersistenceDir();
+ if (persistenceDir != null) {
+ Collection<File> persistedConfigs = FileUtils.listFiles(persistenceDir, new String[]{"conf"}, false);
+ for (File persisted : persistedConfigs) {
+ Properties props = new Properties();
+ try {
+ props.load(new FileInputStream(persisted));
+
+ addDiscoveryConfig(props.getProperty(PROP_CLUSTER_NAME), new ServiceDiscoveryConfig() {
+ public String getAddress() {
+ return props.getProperty(PROP_CLUSTER_SOURCE);
+ }
+
+ public String getUser() {
+ return props.getProperty(PROP_CLUSTER_USER);
+ }
+
+ public String getPasswordAlias() {
+ return props.getProperty(PROP_CLUSTER_ALIAS);
+ }
+ });
+ } catch (IOException e) {
+ log.failedToLoadClusterMonitorServiceDiscoveryConfig(getType(), e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Load any previously-persisted cluster configuration version records, so the monitor will check
+ * previously-deployed topologies against the current cluster configuration.
+ */
+ private void loadClusterVersionData() {
+ File persistenceDir = getPersistenceDir();
+ if (persistenceDir != null) {
+ Collection<File> persistedConfigs = FileUtils.listFiles(getPersistenceDir(), new String[]{"ver"}, false);
+ for (File persisted : persistedConfigs) {
+ Properties props = new Properties();
+ try {
+ props.load(new FileInputStream(persisted));
+
+ String source = props.getProperty(PROP_CLUSTER_SOURCE);
+ String clusterName = props.getProperty(PROP_CLUSTER_NAME);
+
+ Map<String, String> configVersions = new HashMap<>();
+ for (String name : props.stringPropertyNames()) {
+ if (!name.startsWith(PROP_CLUSTER_PREFIX)) { // Ignore implementation-specific properties
+ configVersions.put(name, props.getProperty(name));
+ }
+ }
+
+ // Map the config versions to the cluster name
+ addClusterConfigVersions(source, clusterName, configVersions);
+
+ } catch (IOException e) {
+ log.failedToLoadClusterMonitorConfigVersions(getType(), e);
+ }
+ }
+ }
+ }
+
+ private void persistDiscoveryConfiguration(String clusterName, ServiceDiscoveryConfig sdc) {
+ File persistenceDir = getPersistenceDir();
+ if (persistenceDir != null) {
+
+ Properties props = new Properties();
+ props.setProperty(PROP_CLUSTER_NAME, clusterName);
+ props.setProperty(PROP_CLUSTER_SOURCE, sdc.getAddress());
+
+ String username = sdc.getUser();
+ if (username != null) {
+ props.setProperty(PROP_CLUSTER_USER, username);
+ }
+ String pwdAlias = sdc.getPasswordAlias();
+ if (pwdAlias != null) {
+ props.setProperty(PROP_CLUSTER_ALIAS, pwdAlias);
+ }
+
+ persist(props, getDiscoveryConfigPersistenceFile(sdc.getAddress(), clusterName));
+ }
+ }
+
+ private void persistClusterVersionData(String address, String clusterName, Map<String, String> configVersions) {
+ File persistenceDir = getPersistenceDir();
+ if (persistenceDir != null) {
+ Properties props = new Properties();
+ props.setProperty(PROP_CLUSTER_NAME, clusterName);
+ props.setProperty(PROP_CLUSTER_SOURCE, address);
+ for (String name : configVersions.keySet()) {
+ props.setProperty(name, configVersions.get(name));
+ }
+
+ persist(props, getConfigVersionsPersistenceFile(address, clusterName));
+ }
+ }
+
+ private void persist(Properties props, File dest) {
+ try {
+ props.store(new FileOutputStream(dest), PERSISTED_FILE_COMMENT);
+ } catch (Exception e) {
+ log.failedToPersistClusterMonitorData(getType(), dest.getAbsolutePath(), e);
+ }
+ }
+
+ private File getPersistenceDir() {
+ File persistenceDir = null;
+
+ File dataDir = new File(gatewayConfig.getGatewayDataDir());
+ if (dataDir.exists()) {
+ File clustersDir = new File(dataDir, CLUSTERS_DATA_DIR_NAME);
+ if (!clustersDir.exists()) {
+ clustersDir.mkdirs();
+ }
+ persistenceDir = clustersDir;
+ }
+
+ return persistenceDir;
+ }
+
+ private File getDiscoveryConfigPersistenceFile(String address, String clusterName) {
+ return getPersistenceFile(address, clusterName, "conf");
+ }
+
+ private File getConfigVersionsPersistenceFile(String address, String clusterName) {
+ return getPersistenceFile(address, clusterName, "ver");
+ }
+
+ private File getPersistenceFile(String address, String clusterName, String ext) {
+ String fileName = address.replace(":", "_").replace("/", "_") + "-" + clusterName + "." + ext;
+ return new File(getPersistenceDir(), fileName);
+ }
+
+ /**
+ * Add cluster configuration details to the monitor's in-memory record.
+ *
+ * @param address An Ambari instance address.
+ * @param clusterName The name of a cluster associated with the Ambari instance.
+ * @param configVersions A Map of configuration types and their corresponding versions.
+ */
+ private void addClusterConfigVersions(String address, String clusterName, Map<String, String> configVersions) {
+ configVersionsLock.writeLock().lock();
+ try {
+ ambariClusterConfigVersions.computeIfAbsent(address, k -> new HashMap<>())
+ .put(clusterName, configVersions);
+ } finally {
+ configVersionsLock.writeLock().unlock();
+ }
+ }
+
+ public void start() {
+ (new Thread(internalMonitor, "AmbariConfigurationMonitor")).start();
+ }
+
+ public void stop() {
+ internalMonitor.stop();
+ }
+
+ @Override
+ public void addListener(ConfigurationChangeListener listener) {
+ changeListeners.add(listener);
+ }
+
+ /**
+ * Add discovery configuration details for the specified cluster, so the monitor knows how to connect to check for
+ * changes.
+ *
+ * @param clusterName The name of the cluster.
+ * @param config The associated service discovery configuration.
+ */
+ void addDiscoveryConfig(String clusterName, ServiceDiscoveryConfig config) {
+ clusterMonitorConfigurations.computeIfAbsent(config.getAddress(), k -> new HashMap<>()).put(clusterName, config);
+ }
+
+
+ /**
+ * Get the service discovery configuration associated with the specified Ambari instance and cluster.
+ *
+ * @param address An Ambari instance address.
+ * @param clusterName The name of a cluster associated with the Ambari instance.
+ *
+ * @return The associated ServiceDiscoveryConfig object.
+ */
+ ServiceDiscoveryConfig getDiscoveryConfig(String address, String clusterName) {
+ ServiceDiscoveryConfig config = null;
+ if (clusterMonitorConfigurations.containsKey(address)) {
+ config = clusterMonitorConfigurations.get(address).get(clusterName);
+ }
+ return config;
+ }
+
+
+ /**
+ * Add cluster configuration data to the monitor, which it will use when determining if configuration has changed.
+ *
+ * @param cluster An AmbariCluster object.
+ * @param discoveryConfig The discovery configuration associated with the cluster.
+ */
+ void addClusterConfigVersions(AmbariCluster cluster, ServiceDiscoveryConfig discoveryConfig) {
+
+ String clusterName = cluster.getName();
+
+ // Register the cluster discovery configuration for the monitor connections
+ persistDiscoveryConfiguration(clusterName, discoveryConfig);
+ addDiscoveryConfig(clusterName, discoveryConfig);
+
+ // Build the set of configuration versions
+ Map<String, String> configVersions = new HashMap<>();
+ Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigs = cluster.getServiceConfigurations();
+ for (String serviceName : serviceConfigs.keySet()) {
+ Map<String, AmbariCluster.ServiceConfiguration> configTypeVersionMap = serviceConfigs.get(serviceName);
+ for (AmbariCluster.ServiceConfiguration config : configTypeVersionMap.values()) {
+ String configType = config.getType();
+ String version = config.getVersion();
+ configVersions.put(configType, version);
+ }
+ }
+
+ persistClusterVersionData(discoveryConfig.getAddress(), clusterName, configVersions);
+ addClusterConfigVersions(discoveryConfig.getAddress(), clusterName, configVersions);
+ }
+
+
+ /**
+ * Remove the configuration record for the specified Ambari instance and cluster name.
+ *
+ * @param address An Ambari instance address.
+ * @param clusterName The name of a cluster associated with the Ambari instance.
+ *
+ * @return The removed data; A Map of configuration types and their corresponding versions.
+ */
+ Map<String, String> removeClusterConfigVersions(String address, String clusterName) {
+ Map<String, String> result = new HashMap<>();
+
+ configVersionsLock.writeLock().lock();
+ try {
+ if (ambariClusterConfigVersions.containsKey(address)) {
+ result.putAll(ambariClusterConfigVersions.get(address).remove(clusterName));
+ }
+ } finally {
+ configVersionsLock.writeLock().unlock();
+ }
+
+ // Delete the associated persisted record
+ File persisted = getConfigVersionsPersistenceFile(address, clusterName);
+ if (persisted.exists()) {
+ persisted.delete();
+ }
+
+ return result;
+ }
+
+ /**
+ * Get the cluster configuration details for the specified cluster and Ambari instance.
+ *
+ * @param address An Ambari instance address.
+ * @param clusterName The name of a cluster associated with the Ambari instance.
+ *
+ * @return A Map of configuration types and their corresponding versions.
+ */
+ Map<String, String> getClusterConfigVersions(String address, String clusterName) {
+ Map<String, String> result = new HashMap<>();
+
+ configVersionsLock.readLock().lock();
+ try {
+ if (ambariClusterConfigVersions.containsKey(address)) {
+ result.putAll(ambariClusterConfigVersions.get(address).get(clusterName));
+ }
+ } finally {
+ configVersionsLock.readLock().unlock();
+ }
+
+ return result;
+ }
+
+
+ /**
+ * Get all the clusters the monitor knows about.
+ *
+ * @return A Map of Ambari instance addresses to associated cluster names.
+ */
+ Map<String, List<String>> getClusterNames() {
+ Map<String, List<String>> result = new HashMap<>();
+
+ configVersionsLock.readLock().lock();
+ try {
+ for (String address : ambariClusterConfigVersions.keySet()) {
+ List<String> clusterNames = new ArrayList<>();
+ clusterNames.addAll(ambariClusterConfigVersions.get(address).keySet());
+ result.put(address, clusterNames);
+ }
+ } finally {
+ configVersionsLock.readLock().unlock();
+ }
+
+ return result;
+
+ }
+
+
+ /**
+ * Notify registered change listeners.
+ *
+ * @param source The address of the Ambari instance from which the cluster details were determined.
+ * @param clusterName The name of the cluster whose configuration details have changed.
+ */
+ void notifyChangeListeners(String source, String clusterName) {
+ for (ConfigurationChangeListener listener : changeListeners) {
+ listener.onConfigurationChange(source, clusterName);
+ }
+ }
+
+
+ /**
+ * Request the current active configuration version info from Ambari.
+ *
+ * @param address The Ambari instance address.
+ * @param clusterName The name of the cluster for which the details are desired.
+ *
+ * @return A Map of service configuration types and their corresponding versions.
+ */
+ Map<String, String> getUpdatedConfigVersions(String address, String clusterName) {
+ Map<String, String> configVersions = new HashMap<>();
+
+ Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigs =
+ ambariClient.getActiveServiceConfigurations(clusterName, getDiscoveryConfig(address, clusterName));
+
+ for (Map<String, AmbariCluster.ServiceConfiguration> serviceConfig : serviceConfigs.values()) {
+ for (AmbariCluster.ServiceConfiguration config : serviceConfig.values()) {
+ configVersions.put(config.getType(), config.getVersion());
+ }
+ }
+
+ return configVersions;
+ }
+
+
+ /**
+ * The thread that polls Ambari for configuration details for clusters associated with discovered topologies,
+ * compares them with the current recorded values, and notifies any listeners when differences are discovered.
+ */
+ static final class PollingConfigAnalyzer implements Runnable {
+
+ private static final int DEFAULT_POLLING_INTERVAL = 60;
+
+ // Polling interval in seconds
+ private int interval = DEFAULT_POLLING_INTERVAL;
+
+ private AmbariConfigurationMonitor delegate;
+
+ private boolean isActive = false;
+
+ PollingConfigAnalyzer(AmbariConfigurationMonitor delegate) {
+ this.delegate = delegate;
+ this.interval = Integer.getInteger(INTERVAL_PROPERTY_NAME, PollingConfigAnalyzer.DEFAULT_POLLING_INTERVAL);
+ }
+
+ void setInterval(int interval) {
+ this.interval = interval;
+ }
+
+
+ void stop() {
+ isActive = false;
+ }
+
+ @Override
+ public void run() {
+ isActive = true;
+
+ log.startedAmbariConfigMonitor(interval);
+
+ while (isActive) {
+ for (Map.Entry<String, List<String>> entry : delegate.getClusterNames().entrySet()) {
+ String address = entry.getKey();
+ for (String clusterName : entry.getValue()) {
+ Map<String, String> configVersions = delegate.getClusterConfigVersions(address, clusterName);
+ if (configVersions != null && !configVersions.isEmpty()) {
+ Map<String, String> updatedVersions = delegate.getUpdatedConfigVersions(address, clusterName);
+ if (updatedVersions != null && !updatedVersions.isEmpty()) {
+ boolean configHasChanged = false;
+
+ // If the config sets don't match in size, then something has changed
+ if (updatedVersions.size() != configVersions.size()) {
+ configHasChanged = true;
+ } else {
+ // Perform the comparison of all the config versions
+ for (Map.Entry<String, String> configVersion : configVersions.entrySet()) {
+ if (!updatedVersions.get(configVersion.getKey()).equals(configVersion.getValue())) {
+ configHasChanged = true;
+ break;
+ }
+ }
+ }
+
+ // If a change has occurred, notify the listeners
+ if (configHasChanged) {
+ delegate.notifyChangeListeners(address, clusterName);
+ }
+ }
+ }
+ }
+ }
+
+ try {
+ Thread.sleep(interval * 1000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscovery.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscovery.java
index b7f9f53..765a928 100644
--- a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscovery.java
+++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscovery.java
@@ -16,7 +16,7 @@
*/
package org.apache.hadoop.gateway.topology.discovery.ambari;
-import java.io.IOException;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -25,38 +25,32 @@ import java.util.Properties;
import net.minidev.json.JSONArray;
import net.minidev.json.JSONObject;
-import net.minidev.json.JSONValue;
-import org.apache.hadoop.gateway.config.ConfigurationException;
import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.apache.hadoop.gateway.services.GatewayServices;
import org.apache.hadoop.gateway.services.security.AliasService;
-import org.apache.hadoop.gateway.services.security.AliasServiceException;
+import org.apache.hadoop.gateway.topology.ClusterConfigurationMonitorService;
+import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitor;
import org.apache.hadoop.gateway.topology.discovery.GatewayService;
import org.apache.hadoop.gateway.topology.discovery.ServiceDiscovery;
import org.apache.hadoop.gateway.topology.discovery.ServiceDiscoveryConfig;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpStatus;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.message.BasicHeader;
-import org.apache.http.util.EntityUtils;
class AmbariServiceDiscovery implements ServiceDiscovery {
static final String TYPE = "AMBARI";
- static final String AMBARI_CLUSTERS_URI = "/api/v1/clusters";
+ static final String AMBARI_CLUSTERS_URI = AmbariClientCommon.AMBARI_CLUSTERS_URI;
- static final String AMBARI_HOSTROLES_URI =
- AMBARI_CLUSTERS_URI + "/%s/services?fields=components/host_components/HostRoles";
+ static final String AMBARI_HOSTROLES_URI = AmbariClientCommon.AMBARI_HOSTROLES_URI;
- static final String AMBARI_SERVICECONFIGS_URI =
- AMBARI_CLUSTERS_URI + "/%s/configurations/service_config_versions?is_current=true";
+ static final String AMBARI_SERVICECONFIGS_URI = AmbariClientCommon.AMBARI_SERVICECONFIGS_URI;
private static final String COMPONENT_CONFIG_MAPPING_FILE =
"ambari-service-discovery-component-config-mapping.properties";
+ private static final String GATEWAY_SERVICES_ACCESSOR_CLASS = "org.apache.hadoop.gateway.GatewayServer";
+ private static final String GATEWAY_SERVICES_ACCESSOR_METHOD = "getGatewayServices";
+
private static final AmbariServiceDiscoveryMessages log = MessagesFactory.get(AmbariServiceDiscoveryMessages.class);
// Map of component names to service configuration types
@@ -69,21 +63,76 @@ class AmbariServiceDiscovery implements ServiceDiscovery {
componentServiceConfigs.put(componentName, configMapping.getProperty(componentName));
}
} catch (Exception e) {
- log.failedToLoadServiceDiscoveryConfiguration(COMPONENT_CONFIG_MAPPING_FILE, e);
+ log.failedToLoadServiceDiscoveryURLDefConfiguration(COMPONENT_CONFIG_MAPPING_FILE, e);
}
}
- private static final String DEFAULT_USER_ALIAS = "ambari.discovery.user";
- private static final String DEFAULT_PWD_ALIAS = "ambari.discovery.password";
-
@GatewayService
private AliasService aliasService;
- private CloseableHttpClient httpClient = null;
+ private RESTInvoker restClient;
+ private AmbariClientCommon ambariClient;
+ // This is used to update the monitor when new cluster configuration details are discovered.
+ private AmbariConfigurationMonitor configChangeMonitor;
+
+ private boolean isInitialized = false;
AmbariServiceDiscovery() {
- httpClient = org.apache.http.impl.client.HttpClients.createDefault();
+ }
+
+
+ AmbariServiceDiscovery(RESTInvoker restClient) {
+ this.restClient = restClient;
+ }
+
+
+ /**
+ * Initialization must be subsequent to construction because the AliasService member isn't assigned until after
+ * construction time. This is called internally prior to discovery invocations to make sure the clients have been
+ * initialized.
+ */
+ private void init() {
+ if (!isInitialized) {
+ if (this.restClient == null) {
+ this.restClient = new RESTInvoker(aliasService);
+ }
+ this.ambariClient = new AmbariClientCommon(restClient);
+ this.configChangeMonitor = getConfigurationChangeMonitor();
+
+ isInitialized = true;
+ }
+ }
+
+
+ /**
+ * Get the Ambari configuration change monitor from the associated gateway service.
+ */
+ private AmbariConfigurationMonitor getConfigurationChangeMonitor() {
+ AmbariConfigurationMonitor ambariMonitor = null;
+ try {
+ Class clazz = Class.forName(GATEWAY_SERVICES_ACCESSOR_CLASS);
+ if (clazz != null) {
+ Method m = clazz.getDeclaredMethod(GATEWAY_SERVICES_ACCESSOR_METHOD);
+ if (m != null) {
+ Object obj = m.invoke(null);
+ if (GatewayServices.class.isAssignableFrom(obj.getClass())) {
+ ClusterConfigurationMonitorService clusterMonitorService =
+ ((GatewayServices) obj).getService(GatewayServices.CLUSTER_CONFIGURATION_MONITOR_SERVICE);
+ ClusterConfigurationMonitor monitor =
+ clusterMonitorService.getMonitor(AmbariConfigurationMonitor.getType());
+ if (monitor != null) {
+ if (AmbariConfigurationMonitor.class.isAssignableFrom(monitor.getClass())) {
+ ambariMonitor = (AmbariConfigurationMonitor) monitor;
+ }
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.errorAccessingConfigurationChangeMonitor(e);
+ }
+ return ambariMonitor;
}
@@ -95,14 +144,16 @@ class AmbariServiceDiscovery implements ServiceDiscovery {
@Override
public Map<String, Cluster> discover(ServiceDiscoveryConfig config) {
- Map<String, Cluster> clusters = new HashMap<String, Cluster>();
+ Map<String, Cluster> clusters = new HashMap<>();
+
+ init();
String discoveryAddress = config.getAddress();
// Invoke Ambari REST API to discover the available clusters
String clustersDiscoveryURL = String.format("%s" + AMBARI_CLUSTERS_URI, discoveryAddress);
- JSONObject json = invokeREST(clustersDiscoveryURL, config.getUser(), config.getPasswordAlias());
+ JSONObject json = restClient.invoke(clustersDiscoveryURL, config.getUser(), config.getPasswordAlias());
// Parse the cluster names from the response, and perform the cluster discovery
JSONArray clusterItems = (JSONArray) json.get("items");
@@ -126,13 +177,15 @@ class AmbariServiceDiscovery implements ServiceDiscovery {
Map<String, String> serviceComponents = new HashMap<>();
+ init();
+
String discoveryAddress = config.getAddress();
String discoveryUser = config.getUser();
String discoveryPwdAlias = config.getPasswordAlias();
Map<String, List<String>> componentHostNames = new HashMap<>();
String hostRolesURL = String.format("%s" + AMBARI_HOSTROLES_URI, discoveryAddress, clusterName);
- JSONObject hostRolesJSON = invokeREST(hostRolesURL, discoveryUser, discoveryPwdAlias);
+ JSONObject hostRolesJSON = restClient.invoke(hostRolesURL, discoveryUser, discoveryPwdAlias);
if (hostRolesJSON != null) {
// Process the host roles JSON
JSONArray items = (JSONArray) hostRolesJSON.get("items");
@@ -158,7 +211,7 @@ class AmbariServiceDiscovery implements ServiceDiscovery {
if (hostName != null) {
log.discoveredServiceHost(serviceName, hostName);
if (!componentHostNames.containsKey(componentName)) {
- componentHostNames.put(componentName, new ArrayList<String>());
+ componentHostNames.put(componentName, new ArrayList<>());
}
componentHostNames.get(componentName).add(hostName);
}
@@ -167,31 +220,15 @@ class AmbariServiceDiscovery implements ServiceDiscovery {
}
}
+ // Service configurations
Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigurations =
- new HashMap<String, Map<String, AmbariCluster.ServiceConfiguration>>();
- String serviceConfigsURL = String.format("%s" + AMBARI_SERVICECONFIGS_URI, discoveryAddress, clusterName);
- JSONObject serviceConfigsJSON = invokeREST(serviceConfigsURL, discoveryUser, discoveryPwdAlias);
- if (serviceConfigsJSON != null) {
- // Process the service configurations
- JSONArray serviceConfigs = (JSONArray) serviceConfigsJSON.get("items");
- for (Object serviceConfig : serviceConfigs) {
- String serviceName = (String) ((JSONObject) serviceConfig).get("service_name");
- JSONArray configurations = (JSONArray) ((JSONObject) serviceConfig).get("configurations");
- for (Object configuration : configurations) {
- String configType = (String) ((JSONObject) configuration).get("type");
- String configVersion = String.valueOf(((JSONObject) configuration).get("version"));
-
- Map<String, String> configProps = new HashMap<String, String>();
- JSONObject configProperties = (JSONObject) ((JSONObject) configuration).get("properties");
- for (String propertyName : configProperties.keySet()) {
- configProps.put(propertyName, String.valueOf(((JSONObject) configProperties).get(propertyName)));
- }
- if (!serviceConfigurations.containsKey(serviceName)) {
- serviceConfigurations.put(serviceName, new HashMap<String, AmbariCluster.ServiceConfiguration>());
- }
- serviceConfigurations.get(serviceName).put(configType, new AmbariCluster.ServiceConfiguration(configType, configVersion, configProps));
- cluster.addServiceConfiguration(serviceName, configType, new AmbariCluster.ServiceConfiguration(configType, configVersion, configProps));
- }
+ ambariClient.getActiveServiceConfigurations(discoveryAddress,
+ clusterName,
+ discoveryUser,
+ discoveryPwdAlias);
+ for (String serviceName : serviceConfigurations.keySet()) {
+ for (Map.Entry<String, AmbariCluster.ServiceConfiguration> serviceConfig : serviceConfigurations.get(serviceName).entrySet()) {
+ cluster.addServiceConfiguration(serviceName, serviceConfig.getKey(), serviceConfig.getValue());
}
}
@@ -214,93 +251,12 @@ class AmbariServiceDiscovery implements ServiceDiscovery {
}
}
- return cluster;
- }
-
-
- protected JSONObject invokeREST(String url, String username, String passwordAlias) {
- JSONObject result = null;
-
- CloseableHttpResponse response = null;
- try {
- HttpGet request = new HttpGet(url);
-
- // If no configured username, then use default username alias
- String password = null;
- if (username == null) {
- if (aliasService != null) {
- try {
- char[] defaultUser = aliasService.getPasswordFromAliasForGateway(DEFAULT_USER_ALIAS);
- if (defaultUser != null) {
- username = new String(defaultUser);
- }
- } catch (AliasServiceException e) {
- log.aliasServiceUserError(DEFAULT_USER_ALIAS, e.getLocalizedMessage());
- }
- }
-
- // If username is still null
- if (username == null) {
- log.aliasServiceUserNotFound();
- throw new ConfigurationException("No username is configured for Ambari service discovery.");
- }
- }
-
- if (aliasService != null) {
- // If no password alias is configured, then try the default alias
- if (passwordAlias == null) {
- passwordAlias = DEFAULT_PWD_ALIAS;
- }
-
- try {
- char[] pwd = aliasService.getPasswordFromAliasForGateway(passwordAlias);
- if (pwd != null) {
- password = new String(pwd);
- }
-
- } catch (AliasServiceException e) {
- log.aliasServicePasswordError(passwordAlias, e.getLocalizedMessage());
- }
- }
-
- // If the password could not be determined
- if (password == null) {
- log.aliasServicePasswordNotFound();
- throw new ConfigurationException("No password is configured for Ambari service discovery.");
- }
-
- // Add an auth header if credentials are available
- String encodedCreds =
- org.apache.commons.codec.binary.Base64.encodeBase64String((username + ":" + password).getBytes());
- request.addHeader(new BasicHeader("Authorization", "Basic " + encodedCreds));
-
- response = httpClient.execute(request);
-
- if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) {
- HttpEntity entity = response.getEntity();
- if (entity != null) {
- result = (JSONObject) JSONValue.parse((EntityUtils.toString(entity)));
- log.debugJSON(result.toJSONString());
- } else {
- log.noJSON(url);
- }
- } else {
- log.unexpectedRestResponseStatusCode(url, response.getStatusLine().getStatusCode());
- }
-
- } catch (IOException e) {
- log.restInvocationError(url, e);
- } finally {
- if(response != null) {
- try {
- response.close();
- } catch (IOException e) {
- // Ignore
- }
- }
+ if (configChangeMonitor != null) {
+ // Notify the cluster config monitor about these cluster configuration details
+ configChangeMonitor.addClusterConfigVersions(cluster, config);
}
- return result;
- }
+ return cluster;
+ }
}
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryMessages.java
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryMessages.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryMessages.java
index 0661224..51bbe0e 100644
--- a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryMessages.java
+++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryMessages.java
@@ -25,24 +25,44 @@ import org.apache.hadoop.gateway.i18n.messages.StackTrace;
public interface AmbariServiceDiscoveryMessages {
@Message(level = MessageLevel.ERROR,
- text = "Failed to load service discovery configuration: {1}")
- void failedToLoadServiceDiscoveryConfiguration(@StackTrace(level = MessageLevel.ERROR) Exception e);
+ text = "Failed to persist data for cluster configuration monitor {0} {1}: {2}")
+ void failedToPersistClusterMonitorData(final String monitor,
+ final String filename,
+ @StackTrace(level = MessageLevel.DEBUG) Exception e);
@Message(level = MessageLevel.ERROR,
- text = "Failed to load service discovery configuration {0}: {1}")
- void failedToLoadServiceDiscoveryConfiguration(final String configuration,
- @StackTrace(level = MessageLevel.ERROR) Exception e);
+ text = "Failed to load persisted service discovery configuration for cluster monitor {0} : {1}")
+ void failedToLoadClusterMonitorServiceDiscoveryConfig(final String monitor,
+ @StackTrace(level = MessageLevel.DEBUG) Exception e);
+
+ @Message(level = MessageLevel.ERROR,
+ text = "Failed to load persisted cluster configuration version data for cluster monitor {0} : {1}")
+ void failedToLoadClusterMonitorConfigVersions(final String monitor,
+ @StackTrace(level = MessageLevel.DEBUG) Exception e);
+
+ @Message(level = MessageLevel.ERROR,
+ text = "Unable to access the Ambari Configuration Change Monitor: {0}")
+ void errorAccessingConfigurationChangeMonitor(@StackTrace(level = MessageLevel.DEBUG) Exception e);
+
+ @Message(level = MessageLevel.ERROR,
+ text = "Failed to load service discovery URL definition configuration: {1}")
+ void failedToLoadServiceDiscoveryURLDefConfiguration(@StackTrace(level = MessageLevel.DEBUG) Exception e);
+
+ @Message(level = MessageLevel.ERROR,
+ text = "Failed to load service discovery URL definition configuration {0}: {1}")
+ void failedToLoadServiceDiscoveryURLDefConfiguration(final String configuration,
+ @StackTrace(level = MessageLevel.ERROR) Exception e);
@Message(level = MessageLevel.ERROR,
text = "Encountered an error during cluster {0} discovery: {1}")
void clusterDiscoveryError(final String clusterName,
- @StackTrace(level = MessageLevel.ERROR) Exception e);
+ @StackTrace(level = MessageLevel.DEBUG) Exception e);
@Message(level = MessageLevel.DEBUG,
text = "REST invocation {0} failed: {1}")
void restInvocationError(final String url,
- @StackTrace(level = MessageLevel.ERROR) Exception e);
+ @StackTrace(level = MessageLevel.DEBUG) Exception e);
@Message(level = MessageLevel.ERROR,
@@ -75,20 +95,23 @@ public interface AmbariServiceDiscoveryMessages {
void noJSON(final String url);
- @Message(level = MessageLevel.DEBUG,
+ @Message(level = MessageLevel.TRACE,
text = "REST invocation result: {0}")
void debugJSON(final String json);
+
@Message(level = MessageLevel.DEBUG,
- text = "Loaded component configuration mappings: {0}")
+ text = "Loaded component configuration mappings: {0}")
void loadedComponentConfigMappings(final String mappings);
+
@Message(level = MessageLevel.ERROR,
text = "Failed to load component configuration property mappings {0}: {1}")
void failedToLoadComponentConfigMappings(final String mappings,
- @StackTrace(level = MessageLevel.ERROR) Exception e);
+ @StackTrace(level = MessageLevel.DEBUG) Exception e);
- @Message(level = MessageLevel.DEBUG,
+
+ @Message(level = MessageLevel.TRACE,
text = "Discovered: Service: {0}, Host: {1}")
void discoveredServiceHost(final String serviceName, final String hostName);
@@ -114,8 +137,12 @@ public interface AmbariServiceDiscoveryMessages {
@Message(level = MessageLevel.DEBUG,
- text = "Determined the service URL mapping property {0} value: {1}")
+ text = "Determined the service URL mapping property {0} value: {1}")
void determinedPropertyValue(final String propertyName, final String propertyValue);
+ @Message(level = MessageLevel.INFO,
+ text = "Started Ambari cluster configuration monitor (checking every {0} seconds)")
+ void startedAmbariConfigMonitor(final long pollingInterval);
+
}
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/RESTInvoker.java
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/RESTInvoker.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/RESTInvoker.java
new file mode 100644
index 0000000..6a6fad8
--- /dev/null
+++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/RESTInvoker.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.topology.discovery.ambari;
+
+import net.minidev.json.JSONObject;
+import net.minidev.json.JSONValue;
+import org.apache.hadoop.gateway.config.ConfigurationException;
+import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.apache.hadoop.gateway.services.security.AliasService;
+import org.apache.hadoop.gateway.services.security.AliasServiceException;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+
+class RESTInvoker {
+
+ private static final String DEFAULT_USER_ALIAS = "ambari.discovery.user";
+ private static final String DEFAULT_PWD_ALIAS = "ambari.discovery.password";
+
+ private static final AmbariServiceDiscoveryMessages log = MessagesFactory.get(AmbariServiceDiscoveryMessages.class);
+
+ private AliasService aliasService = null;
+
+ private CloseableHttpClient httpClient = org.apache.http.impl.client.HttpClients.createDefault();
+
+
+ RESTInvoker(AliasService aliasService) {
+ this.aliasService = aliasService;
+ }
+
+
+ JSONObject invoke(String url, String username, String passwordAlias) {
+ JSONObject result = null;
+
+ CloseableHttpResponse response = null;
+ try {
+ HttpGet request = new HttpGet(url);
+
+ // If no configured username, then use default username alias
+ String password = null;
+ if (username == null) {
+ if (aliasService != null) {
+ try {
+ char[] defaultUser = aliasService.getPasswordFromAliasForGateway(DEFAULT_USER_ALIAS);
+ if (defaultUser != null) {
+ username = new String(defaultUser);
+ }
+ } catch (AliasServiceException e) {
+ log.aliasServiceUserError(DEFAULT_USER_ALIAS, e.getLocalizedMessage());
+ }
+ }
+
+ // If username is still null
+ if (username == null) {
+ log.aliasServiceUserNotFound();
+ throw new ConfigurationException("No username is configured for Ambari service discovery.");
+ }
+ }
+
+ if (aliasService != null) {
+ // If no password alias is configured, then try the default alias
+ if (passwordAlias == null) {
+ passwordAlias = DEFAULT_PWD_ALIAS;
+ }
+
+ try {
+ char[] pwd = aliasService.getPasswordFromAliasForGateway(passwordAlias);
+ if (pwd != null) {
+ password = new String(pwd);
+ }
+
+ } catch (AliasServiceException e) {
+ log.aliasServicePasswordError(passwordAlias, e.getLocalizedMessage());
+ }
+ }
+
+ // If the password could not be determined
+ if (password == null) {
+ log.aliasServicePasswordNotFound();
+ throw new ConfigurationException("No password is configured for Ambari service discovery.");
+ }
+
+ // Add an auth header if credentials are available
+ String encodedCreds =
+ org.apache.commons.codec.binary.Base64.encodeBase64String((username + ":" + password).getBytes());
+ request.addHeader(new BasicHeader("Authorization", "Basic " + encodedCreds));
+
+ response = httpClient.execute(request);
+
+ if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) {
+ HttpEntity entity = response.getEntity();
+ if (entity != null) {
+ result = (JSONObject) JSONValue.parse((EntityUtils.toString(entity)));
+ log.debugJSON(result.toJSONString());
+ } else {
+ log.noJSON(url);
+ }
+ } else {
+ log.unexpectedRestResponseStatusCode(url, response.getStatusLine().getStatusCode());
+ }
+
+ } catch (IOException e) {
+ log.restInvocationError(url, e);
+ } finally {
+ if(response != null) {
+ try {
+ response.close();
+ } catch (IOException e) {
+ // Ignore
+ }
+ }
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/ServiceURLPropertyConfig.java
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/ServiceURLPropertyConfig.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/ServiceURLPropertyConfig.java
index 3330cc3..deb5bb3 100644
--- a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/ServiceURLPropertyConfig.java
+++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/ServiceURLPropertyConfig.java
@@ -110,7 +110,7 @@ class ServiceURLPropertyConfig {
}
}
} catch (Exception e) {
- log.failedToLoadServiceDiscoveryConfiguration(e);
+ log.failedToLoadServiceDiscoveryURLDefConfiguration(e);
} finally {
try {
source.close();
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/resources/META-INF/services/org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/main/resources/META-INF/services/org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider b/gateway-discovery-ambari/src/main/resources/META-INF/services/org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider
new file mode 100644
index 0000000..d9b2b05
--- /dev/null
+++ b/gateway-discovery-ambari/src/main/resources/META-INF/services/org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider
@@ -0,0 +1,19 @@
+##########################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##########################################################################
+
+org.apache.hadoop.gateway.topology.discovery.ambari.AmbariClusterConfigurationMonitorProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitorTest.java
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitorTest.java b/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitorTest.java
new file mode 100644
index 0000000..2d8b276
--- /dev/null
+++ b/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitorTest.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.topology.discovery.ambari;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.topology.discovery.ServiceDiscoveryConfig;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class AmbariConfigurationMonitorTest {
+
+ private File dataDir = null;
+
+ @Before
+ public void setup() throws Exception {
+ File targetDir = new File( System.getProperty("user.dir"), "target");
+ File tempDir = new File(targetDir, this.getClass().getName() + "__data__" + UUID.randomUUID());
+ FileUtils.forceMkdir(tempDir);
+ dataDir = tempDir;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ dataDir.delete();
+ }
+
+ @Test
+ public void testPollingMonitor() throws Exception {
+ final String addr1 = "http://host1:8080";
+ final String addr2 = "http://host2:8080";
+ final String cluster1Name = "Cluster_One";
+ final String cluster2Name = "Cluster_Two";
+
+
+ GatewayConfig config = EasyMock.createNiceMock(GatewayConfig.class);
+ EasyMock.expect(config.getGatewayDataDir()).andReturn(dataDir.getAbsolutePath()).anyTimes();
+ EasyMock.expect(config.getClusterMonitorPollingInterval(AmbariConfigurationMonitor.getType()))
+ .andReturn(10)
+ .anyTimes();
+ EasyMock.replay(config);
+
+ // Create the monitor
+ TestableAmbariConfigurationMonitor monitor = new TestableAmbariConfigurationMonitor(config);
+
+ // Clear the system property now that the monitor has been initialized
+ System.clearProperty(AmbariConfigurationMonitor.INTERVAL_PROPERTY_NAME);
+
+
+ // Sequence of config changes for testing monitoring for updates
+ Map<String, Map<String, List<List<AmbariCluster.ServiceConfiguration>>>> updateConfigurations = new HashMap<>();
+
+ updateConfigurations.put(addr1, new HashMap<>());
+ updateConfigurations.get(addr1).put(cluster1Name, Arrays.asList(Arrays.asList(createTestServiceConfig("zoo.cfg", "3"),
+ createTestServiceConfig("hive-site", "2")),
+ Arrays.asList(createTestServiceConfig("zoo.cfg", "3"),
+ createTestServiceConfig("hive-site", "3")),
+ Arrays.asList(createTestServiceConfig("zoo.cfg", "2"),
+ createTestServiceConfig("hive-site", "1"))));
+
+ updateConfigurations.put(addr2, new HashMap<>());
+ updateConfigurations.get(addr2).put(cluster2Name, Arrays.asList(Arrays.asList(createTestServiceConfig("zoo.cfg", "1"),
+ createTestServiceConfig("hive-site", "1")),
+ Collections.singletonList(createTestServiceConfig("zoo.cfg", "1")),
+ Arrays.asList(createTestServiceConfig("zoo.cfg", "1"),
+ createTestServiceConfig("hive-site", "2"))));
+
+ updateConfigurations.get(addr2).put(cluster1Name, Arrays.asList(Arrays.asList(createTestServiceConfig("zoo.cfg", "2"),
+ createTestServiceConfig("hive-site", "4")),
+ Arrays.asList(createTestServiceConfig("zoo.cfg", "3"),
+ createTestServiceConfig("hive-site", "4"),
+ createTestServiceConfig("yarn-site", "1")),
+ Arrays.asList(createTestServiceConfig("zoo.cfg", "1"),
+ createTestServiceConfig("hive-site", "2"))));
+
+ Map<String, Map<String, Integer>> configChangeIndex = new HashMap<>();
+ configChangeIndex.put(addr1, new HashMap<>());
+ configChangeIndex.get(addr1).put(cluster1Name, 0);
+ configChangeIndex.get(addr1).put(cluster2Name, 0);
+ configChangeIndex.put(addr2, new HashMap<>());
+ configChangeIndex.get(addr2).put(cluster2Name, 0);
+
+ // Setup the initial test update data
+ // Cluster 1 data change
+ monitor.addTestConfigVersion(addr1, cluster1Name, "zoo.cfg", "2");
+ monitor.addTestConfigVersion(addr1, cluster1Name, "hive-site", "1");
+
+ // Cluster 2 NO data change
+ monitor.addTestConfigVersion(addr2, cluster1Name, "zoo.cfg", "1");
+ monitor.addTestConfigVersion(addr2, cluster1Name, "hive-site", "1");
+
+ // Cluster 3 data change
+ monitor.addTestConfigVersion(addr2, cluster2Name, "zoo.cfg", "1");
+ monitor.addTestConfigVersion(addr2, cluster2Name, "hive-site", "2");
+
+ Map<String, Map<String, AmbariCluster.ServiceConfiguration>> initialAmbariClusterConfigs = new HashMap<>();
+
+ Map<String, AmbariCluster.ServiceConfiguration> cluster1Configs = new HashMap<>();
+ AmbariCluster.ServiceConfiguration zooCfg = createTestServiceConfig("zoo.cfg", "1");
+ cluster1Configs.put("ZOOKEEPER", zooCfg);
+
+ AmbariCluster.ServiceConfiguration hiveSite = createTestServiceConfig("hive-site", "1");
+ cluster1Configs.put("Hive", hiveSite);
+
+ initialAmbariClusterConfigs.put(cluster1Name, cluster1Configs);
+ AmbariCluster cluster1 = createTestCluster(cluster1Name, initialAmbariClusterConfigs);
+
+ // Tell the monitor about the cluster configurations
+ monitor.addClusterConfigVersions(cluster1, createTestDiscoveryConfig(addr1));
+
+ monitor.addClusterConfigVersions(createTestCluster(cluster2Name, initialAmbariClusterConfigs),
+ createTestDiscoveryConfig(addr2));
+
+ monitor.addClusterConfigVersions(createTestCluster(cluster1Name, initialAmbariClusterConfigs),
+ createTestDiscoveryConfig(addr2));
+
+ final Map<String, Map<String, Integer>> changeNotifications = new HashMap<>();
+ monitor.addListener((src, cname) -> {
+// System.out.println("Cluster config changed: " + cname + " @ " + src);
+ // Record the notification
+ Integer notificationCount = changeNotifications.computeIfAbsent(src, s -> new HashMap<>())
+ .computeIfAbsent(cname, c -> Integer.valueOf(0));
+ changeNotifications.get(src).put(cname, (notificationCount+=1));
+
+ // Update the config version
+ int changeIndex = configChangeIndex.get(src).get(cname);
+ if (changeIndex < updateConfigurations.get(src).get(cname).size()) {
+ List<AmbariCluster.ServiceConfiguration> changes = updateConfigurations.get(src).get(cname).get(changeIndex);
+
+// System.out.println("Applying config update " + changeIndex + " to " + cname + " @ " + src + " ...");
+ for (AmbariCluster.ServiceConfiguration change : changes) {
+ monitor.updateConfigState(src, cname, change.getType(), change.getVersion());
+// System.out.println(" Updated " + change.getType() + " to version " + change.getVersion());
+ }
+
+ // Increment the change index
+ configChangeIndex.get(src).replace(cname, changeIndex + 1);
+
+// System.out.println("Monitor config updated for " + cname + " @ " + src + " : " + changeIndex );
+ }
+ });
+
+ try {
+ monitor.start();
+
+ long expiration = System.currentTimeMillis() + (1000 * 30);
+ while (!areChangeUpdatesExhausted(updateConfigurations, configChangeIndex)
+ && (System.currentTimeMillis() < expiration)) {
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException e) {
+ //
+ }
+ }
+
+ } finally {
+ monitor.stop();
+ }
+
+ assertNotNull("Expected changes to have been reported for source 1.",
+ changeNotifications.get(addr1));
+
+ assertEquals("Expected changes to have been reported.",
+ 3, changeNotifications.get(addr1).get(cluster1Name).intValue());
+
+ assertNotNull("Expected changes to have been reported for source 2.",
+ changeNotifications.get(addr2));
+
+ assertEquals("Expected changes to have been reported.",
+ 3, changeNotifications.get(addr2).get(cluster2Name).intValue());
+
+ assertNull("Expected changes to have been reported.",
+ changeNotifications.get(addr2).get(cluster1Name));
+ }
+
+
+ private static boolean areChangeUpdatesExhausted(Map<String, Map<String, List<List<AmbariCluster.ServiceConfiguration>>>> updates,
+ Map<String, Map<String, Integer>> configChangeIndeces) {
+ boolean isExhausted = true;
+
+ for (String address : updates.keySet()) {
+ Map<String, List<List<AmbariCluster.ServiceConfiguration>>> clusterConfigs = updates.get(address);
+ for (String clusterName : clusterConfigs.keySet()) {
+ Integer configChangeCount = clusterConfigs.get(clusterName).size();
+ if (configChangeIndeces.get(address).containsKey(clusterName)) {
+ if (configChangeIndeces.get(address).get(clusterName) < configChangeCount) {
+ isExhausted = false;
+ break;
+ }
+ }
+ }
+ }
+
+ return isExhausted;
+ }
+
+ /**
+ *
+ * @param name The cluster name
+ * @param serviceConfigs A map of service configurations (keyed by service name)
+ *
+ * @return
+ */
+ private AmbariCluster createTestCluster(String name,
+ Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigs) {
+ AmbariCluster c = EasyMock.createNiceMock(AmbariCluster.class);
+ EasyMock.expect(c.getName()).andReturn(name).anyTimes();
+ EasyMock.expect(c.getServiceConfigurations()).andReturn(serviceConfigs).anyTimes();
+ EasyMock.replay(c);
+ return c;
+ }
+
+ private AmbariCluster.ServiceConfiguration createTestServiceConfig(String name, String version) {
+ AmbariCluster.ServiceConfiguration sc = EasyMock.createNiceMock(AmbariCluster.ServiceConfiguration.class);
+ EasyMock.expect(sc.getType()).andReturn(name).anyTimes();
+ EasyMock.expect(sc.getVersion()).andReturn(version).anyTimes();
+ EasyMock.replay(sc);
+ return sc;
+ }
+
+ private ServiceDiscoveryConfig createTestDiscoveryConfig(String address) {
+ return createTestDiscoveryConfig(address, null, null);
+ }
+
+ private ServiceDiscoveryConfig createTestDiscoveryConfig(String address, String username, String pwdAlias) {
+ ServiceDiscoveryConfig sdc = EasyMock.createNiceMock(ServiceDiscoveryConfig.class);
+ EasyMock.expect(sdc.getAddress()).andReturn(address).anyTimes();
+ EasyMock.expect(sdc.getUser()).andReturn(username).anyTimes();
+ EasyMock.expect(sdc.getPasswordAlias()).andReturn(pwdAlias).anyTimes();
+ EasyMock.replay(sdc);
+ return sdc;
+ }
+
+ /**
+ * AmbariConfigurationMonitor extension that replaces the collection of updated configuration data with a static
+ * mechanism rather than the REST invocation mechanism.
+ */
+ private static final class TestableAmbariConfigurationMonitor extends AmbariConfigurationMonitor {
+
+ Map<String, Map<String, Map<String, String>>> configVersionData = new HashMap<>();
+
+ TestableAmbariConfigurationMonitor(GatewayConfig config) {
+ super(config, null);
+ }
+
+ void addTestConfigVersion(String address, String clusterName, String configType, String configVersion) {
+ configVersionData.computeIfAbsent(address, a -> new HashMap<>())
+ .computeIfAbsent(clusterName, cl -> new HashMap<>())
+ .put(configType, configVersion);
+ }
+
+ void addTestConfigVersions(String address, String clusterName, Map<String, String> configVersions) {
+ configVersionData.computeIfAbsent(address, a -> new HashMap<>())
+ .computeIfAbsent(clusterName, cl -> new HashMap<>())
+ .putAll(configVersions);
+ }
+
+ void updateTestConfigVersion(String address, String clusterName, String configType, String updatedVersions) {
+ configVersionData.computeIfAbsent(address, a -> new HashMap<>())
+ .computeIfAbsent(clusterName, cl -> new HashMap<>())
+ .replace(configType, updatedVersions);
+ }
+
+ void updateTestConfigVersions(String address, String clusterName, Map<String, String> updatedVersions) {
+ configVersionData.computeIfAbsent(address, a -> new HashMap<>())
+ .computeIfAbsent(clusterName, cl -> new HashMap<>())
+ .replaceAll((k,v) -> updatedVersions.get(k));
+ }
+
+ void updateConfigState(String address, String clusterName, String configType, String configVersion) {
+ configVersionsLock.writeLock().lock();
+ try {
+ if (ambariClusterConfigVersions.containsKey(address)) {
+ ambariClusterConfigVersions.get(address).get(clusterName).replace(configType, configVersion);
+ }
+ } finally {
+ configVersionsLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ Map<String, String> getUpdatedConfigVersions(String address, String clusterName) {
+ Map<String, Map<String, String>> clusterConfigVersions = configVersionData.get(address);
+ if (clusterConfigVersions != null) {
+ return clusterConfigVersions.get(clusterName);
+ }
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryTest.java b/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryTest.java
index f7f0553..d4dad95 100644
--- a/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryTest.java
+++ b/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryTest.java
@@ -119,26 +119,38 @@ public class AmbariServiceDiscoveryTest {
*/
private static final class TestAmbariServiceDiscovery extends AmbariServiceDiscovery {
+ final static String CLUSTER_PLACEHOLDER = TestRESTInvoker.CLUSTER_PLACEHOLDER;
+
+ TestAmbariServiceDiscovery(String clusterName) {
+ super(new TestRESTInvoker(clusterName));
+ }
+
+ }
+
+ private static final class TestRESTInvoker extends RESTInvoker {
+
final static String CLUSTER_PLACEHOLDER = "CLUSTER_NAME";
private Map<String, JSONObject> cannedResponses = new HashMap<>();
- TestAmbariServiceDiscovery(String clusterName) {
+ TestRESTInvoker(String clusterName) {
+ super(null);
+
cannedResponses.put(AmbariServiceDiscovery.AMBARI_CLUSTERS_URI,
- (JSONObject) JSONValue.parse(CLUSTERS_JSON_TEMPLATE.replaceAll(CLUSTER_PLACEHOLDER,
- clusterName)));
+ (JSONObject) JSONValue.parse(CLUSTERS_JSON_TEMPLATE.replaceAll(CLUSTER_PLACEHOLDER,
+ clusterName)));
cannedResponses.put(String.format(AmbariServiceDiscovery.AMBARI_HOSTROLES_URI, clusterName),
- (JSONObject) JSONValue.parse(HOSTROLES_JSON_TEMPLATE.replaceAll(CLUSTER_PLACEHOLDER,
- clusterName)));
+ (JSONObject) JSONValue.parse(HOSTROLES_JSON_TEMPLATE.replaceAll(CLUSTER_PLACEHOLDER,
+ clusterName)));
cannedResponses.put(String.format(AmbariServiceDiscovery.AMBARI_SERVICECONFIGS_URI, clusterName),
- (JSONObject) JSONValue.parse(SERVICECONFIGS_JSON_TEMPLATE.replaceAll(CLUSTER_PLACEHOLDER,
- clusterName)));
+ (JSONObject) JSONValue.parse(SERVICECONFIGS_JSON_TEMPLATE.replaceAll(CLUSTER_PLACEHOLDER,
+ clusterName)));
}
@Override
- protected JSONObject invokeREST(String url, String username, String passwordAlias) {
+ JSONObject invoke(String url, String username, String passwordAlias) {
return cannedResponses.get(url.substring(url.indexOf("/api")));
}
}
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-release/home/conf/gateway-site.xml
----------------------------------------------------------------------
diff --git a/gateway-release/home/conf/gateway-site.xml b/gateway-release/home/conf/gateway-site.xml
index e06db72..fec5e87 100644
--- a/gateway-release/home/conf/gateway-site.xml
+++ b/gateway-release/home/conf/gateway-site.xml
@@ -73,4 +73,16 @@ limitations under the License.
<description>Enable/Disable cookie scoping feature.</description>
</property>
+ <property>
+ <name>gateway.cluster.config.monitor.ambari.enabled</name>
+ <value>false</value>
+ <description>Enable/disable Ambari cluster configuration monitoring.</description>
+ </property>
+
+ <property>
+ <name>gateway.cluster.config.monitor.ambari.interval</name>
+ <value>60</value>
+ <description>The interval (in seconds) for polling Ambari for cluster configuration changes.</description>
+ </property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayMessages.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayMessages.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayMessages.java
index ab0ab39..92b02ea 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayMessages.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayMessages.java
@@ -596,4 +596,20 @@ public interface GatewayMessages {
text = "Correcting the suspect permissions for the remote configuration registry entry \"{0}\"." )
void correctingSuspectWritableRemoteConfigurationEntry(String entryPath);
+ @Message(level = MessageLevel.INFO,
+ text = "A cluster configuration change was noticed for {1} @ {0}")
+ void noticedClusterConfigurationChange(final String source, final String clusterName);
+
+
+ @Message(level = MessageLevel.INFO,
+ text = "Triggering topology regeneration for descriptor {2} because of change to the {1} @ {0} configuration.")
+ void triggeringTopologyRegeneration(final String source, final String clusterName, final String affected);
+
+
+ @Message(level = MessageLevel.ERROR,
+ text = "Encountered an error while responding to {1} @ {0} configuration change: {2}")
+ void errorRespondingToConfigChange(final String source,
+ final String clusterName,
+ @StackTrace(level = MessageLevel.DEBUG) Exception e);
+
}
http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-server/src/main/java/org/apache/hadoop/gateway/config/impl/GatewayConfigImpl.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/config/impl/GatewayConfigImpl.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/config/impl/GatewayConfigImpl.java
index f6bb9b0..95ae1f2 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/config/impl/GatewayConfigImpl.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/config/impl/GatewayConfigImpl.java
@@ -170,6 +170,11 @@ public class GatewayConfigImpl extends Configuration implements GatewayConfig {
public static final String MIME_TYPES_TO_COMPRESS = GATEWAY_CONFIG_FILE_PREFIX
+ ".gzip.compress.mime.types";
+ public static final String CLUSTER_CONFIG_MONITOR_PREFIX = GATEWAY_CONFIG_FILE_PREFIX + ".cluster.config.monitor.";
+ public static final String CLUSTER_CONFIG_MONITOR_INTERVAL_SUFFIX = ".interval";
+ public static final String CLUSTER_CONFIG_MONITOR_ENABLED_SUFFIX = ".enabled";
+
+
// These config property names are not inline with the convention of using the
// GATEWAY_CONFIG_FILE_PREFIX as is done by those above. These are left for
// backward compatibility.
@@ -942,6 +947,16 @@ public class GatewayConfigImpl extends Configuration implements GatewayConfig {
}
@Override
+ public int getClusterMonitorPollingInterval(String type) {
+ return getInt(CLUSTER_CONFIG_MONITOR_PREFIX + type.toLowerCase() + CLUSTER_CONFIG_MONITOR_INTERVAL_SUFFIX, -1);
+ }
+
+ @Override
+ public boolean isClusterMonitorEnabled(String type) {
+ return getBoolean(CLUSTER_CONFIG_MONITOR_PREFIX + type.toLowerCase() + CLUSTER_CONFIG_MONITOR_ENABLED_SUFFIX, true);
+ }
+
+ @Override
public List<String> getRemoteRegistryConfigurationNames() {
List<String> result = new ArrayList<>();