You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by mo...@apache.org on 2018/01/11 17:39:17 UTC
[50/53] [abbrv] knox git commit: Merge branch 'master' into
KNOX-998-Package_Restructuring
http://git-wip-us.apache.org/repos/asf/knox/blob/e5fd0622/gateway-server/src/main/java/org/apache/knox/gateway/services/topology/impl/DefaultTopologyService.java
----------------------------------------------------------------------
diff --cc gateway-server/src/main/java/org/apache/knox/gateway/services/topology/impl/DefaultTopologyService.java
index c6e373d,0000000..543d294
mode 100644,000000..100644
--- a/gateway-server/src/main/java/org/apache/knox/gateway/services/topology/impl/DefaultTopologyService.java
+++ b/gateway-server/src/main/java/org/apache/knox/gateway/services/topology/impl/DefaultTopologyService.java
@@@ -1,895 -1,0 +1,915 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.knox.gateway.services.topology.impl;
+
+
+import org.apache.commons.digester3.Digester;
+import org.apache.commons.digester3.binder.DigesterLoader;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.io.monitor.FileAlterationListener;
+import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
+import org.apache.commons.io.monitor.FileAlterationMonitor;
+import org.apache.commons.io.monitor.FileAlterationObserver;
+import org.apache.knox.gateway.GatewayMessages;
+import org.apache.knox.gateway.GatewayServer;
+import org.apache.knox.gateway.audit.api.Action;
+import org.apache.knox.gateway.audit.api.ActionOutcome;
+import org.apache.knox.gateway.audit.api.AuditServiceFactory;
+import org.apache.knox.gateway.audit.api.Auditor;
+import org.apache.knox.gateway.audit.api.ResourceType;
+import org.apache.knox.gateway.audit.log4j.audit.AuditConstants;
+import org.apache.knox.gateway.config.GatewayConfig;
+import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+import org.apache.knox.gateway.service.definition.ServiceDefinition;
+import org.apache.knox.gateway.services.GatewayServices;
+import org.apache.knox.gateway.services.ServiceLifecycleException;
+import org.apache.knox.gateway.services.security.AliasService;
+import org.apache.knox.gateway.services.topology.TopologyService;
+import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService;
+import org.apache.knox.gateway.topology.Topology;
+import org.apache.knox.gateway.topology.TopologyEvent;
+import org.apache.knox.gateway.topology.TopologyListener;
+import org.apache.knox.gateway.topology.TopologyMonitor;
+import org.apache.knox.gateway.topology.TopologyProvider;
+import org.apache.knox.gateway.topology.builder.TopologyBuilder;
+import org.apache.knox.gateway.topology.discovery.ClusterConfigurationMonitor;
+import org.apache.knox.gateway.topology.monitor.RemoteConfigurationMonitor;
+import org.apache.knox.gateway.topology.monitor.RemoteConfigurationMonitorFactory;
++import org.apache.knox.gateway.topology.simple.SimpleDescriptor;
++import org.apache.knox.gateway.topology.simple.SimpleDescriptorFactory;
+import org.apache.knox.gateway.topology.simple.SimpleDescriptorHandler;
+import org.apache.knox.gateway.topology.validation.TopologyValidator;
+import org.apache.knox.gateway.topology.xml.AmbariFormatXmlTopologyRules;
+import org.apache.knox.gateway.topology.xml.KnoxFormatXmlTopologyRules;
+import org.apache.knox.gateway.util.ServiceDefinitionsLoader;
+import org.eclipse.persistence.jaxb.JAXBContextProperties;
+import org.xml.sax.SAXException;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.commons.digester3.binder.DigesterLoader.newLoader;
+
+
+public class DefaultTopologyService
+ extends FileAlterationListenerAdaptor
+ implements TopologyService, TopologyMonitor, TopologyProvider, FileFilter, FileAlterationListener {
+
+ private static Auditor auditor = AuditServiceFactory.getAuditService().getAuditor(
+ AuditConstants.DEFAULT_AUDITOR_NAME, AuditConstants.KNOX_SERVICE_NAME,
+ AuditConstants.KNOX_COMPONENT_NAME);
+
+ private static final List<String> SUPPORTED_TOPOLOGY_FILE_EXTENSIONS = new ArrayList<String>();
+ static {
+ SUPPORTED_TOPOLOGY_FILE_EXTENSIONS.add("xml");
+ SUPPORTED_TOPOLOGY_FILE_EXTENSIONS.add("conf");
+ }
+
+ private static GatewayMessages log = MessagesFactory.get(GatewayMessages.class);
+ private static DigesterLoader digesterLoader = newLoader(new KnoxFormatXmlTopologyRules(), new AmbariFormatXmlTopologyRules());
+ private List<FileAlterationMonitor> monitors = new ArrayList<>();
+ private File topologiesDirectory;
+ private File sharedProvidersDirectory;
+ private File descriptorsDirectory;
+
+ private DescriptorsMonitor descriptorsMonitor;
+
+ private Set<TopologyListener> listeners;
+ private volatile Map<File, Topology> topologies;
+ private AliasService aliasService;
+
+ private RemoteConfigurationMonitor remoteMonitor = null;
+
+ private Topology loadTopology(File file) throws IOException, SAXException, URISyntaxException, InterruptedException {
+ final long TIMEOUT = 250; //ms
+ final long DELAY = 50; //ms
+ log.loadingTopologyFile(file.getAbsolutePath());
+ Topology topology;
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ topology = loadTopologyAttempt(file);
+ break;
+ } catch (IOException e) {
+ if (System.currentTimeMillis() - start < TIMEOUT) {
+ log.failedToLoadTopologyRetrying(file.getAbsolutePath(), Long.toString(DELAY), e);
+ Thread.sleep(DELAY);
+ } else {
+ throw e;
+ }
+ } catch (SAXException e) {
+ if (System.currentTimeMillis() - start < TIMEOUT) {
+ log.failedToLoadTopologyRetrying(file.getAbsolutePath(), Long.toString(DELAY), e);
+ Thread.sleep(DELAY);
+ } else {
+ throw e;
+ }
+ }
+ }
+ return topology;
+ }
+
+ private Topology loadTopologyAttempt(File file) throws IOException, SAXException, URISyntaxException {
+ Topology topology;
+ Digester digester = digesterLoader.newDigester();
+ TopologyBuilder topologyBuilder = digester.parse(FileUtils.openInputStream(file));
+ if (null == topologyBuilder) {
+ return null;
+ }
+ topology = topologyBuilder.build();
+ topology.setUri(file.toURI());
+ topology.setName(FilenameUtils.removeExtension(file.getName()));
+ topology.setTimestamp(file.lastModified());
+ return topology;
+ }
+
+ private void redeployTopology(Topology topology) {
+ File topologyFile = new File(topology.getUri());
+ try {
+ TopologyValidator tv = new TopologyValidator(topology);
+
+ if(tv.validateTopology()) {
+ throw new SAXException(tv.getErrorString());
+ }
+
+ long start = System.currentTimeMillis();
+ long limit = 1000L; // One second.
+ long elapsed = 1;
+ while (elapsed <= limit) {
+ try {
+ long origTimestamp = topologyFile.lastModified();
+ long setTimestamp = Math.max(System.currentTimeMillis(), topologyFile.lastModified() + elapsed);
+ if(topologyFile.setLastModified(setTimestamp)) {
+ long newTimstamp = topologyFile.lastModified();
+ if(newTimstamp > origTimestamp) {
+ break;
+ } else {
+ Thread.sleep(10);
+ elapsed = System.currentTimeMillis() - start;
+ continue;
+ }
+ } else {
+ auditor.audit(Action.REDEPLOY, topology.getName(), ResourceType.TOPOLOGY,
+ ActionOutcome.FAILURE);
+ log.failedToRedeployTopology(topology.getName());
+ break;
+ }
+ } catch (InterruptedException e) {
+ auditor.audit(Action.REDEPLOY, topology.getName(), ResourceType.TOPOLOGY,
+ ActionOutcome.FAILURE);
+ log.failedToRedeployTopology(topology.getName(), e);
+ e.printStackTrace();
+ }
+ }
+ } catch (SAXException e) {
+ auditor.audit(Action.REDEPLOY, topology.getName(), ResourceType.TOPOLOGY, ActionOutcome.FAILURE);
+ log.failedToRedeployTopology(topology.getName(), e);
+ }
+ }
+
+ private List<TopologyEvent> createChangeEvents(
+ Map<File, Topology> oldTopologies,
+ Map<File, Topology> newTopologies) {
+ ArrayList<TopologyEvent> events = new ArrayList<TopologyEvent>();
+ // Go through the old topologies and find anything that was deleted.
+ for (File file : oldTopologies.keySet()) {
+ if (!newTopologies.containsKey(file)) {
+ events.add(new TopologyEvent(TopologyEvent.Type.DELETED, oldTopologies.get(file)));
+ }
+ }
+ // Go through the new topologies and figure out what was updated vs added.
+ for (File file : newTopologies.keySet()) {
+ if (oldTopologies.containsKey(file)) {
+ Topology oldTopology = oldTopologies.get(file);
+ Topology newTopology = newTopologies.get(file);
+ if (newTopology.getTimestamp() > oldTopology.getTimestamp()) {
+ events.add(new TopologyEvent(TopologyEvent.Type.UPDATED, newTopologies.get(file)));
+ }
+ } else {
+ events.add(new TopologyEvent(TopologyEvent.Type.CREATED, newTopologies.get(file)));
+ }
+ }
+ return events;
+ }
+
+ private File calculateAbsoluteProvidersConfigDir(GatewayConfig config) {
+ File pcDir = new File(config.getGatewayProvidersConfigDir());
+ return pcDir.getAbsoluteFile();
+ }
+
+ private File calculateAbsoluteDescriptorsDir(GatewayConfig config) {
+ File descDir = new File(config.getGatewayDescriptorsDir());
+ return descDir.getAbsoluteFile();
+ }
+
+ private File calculateAbsoluteTopologiesDir(GatewayConfig config) {
+ File topoDir = new File(config.getGatewayTopologyDir());
+ topoDir = topoDir.getAbsoluteFile();
+ return topoDir;
+ }
+
+ private File calculateAbsoluteConfigDir(GatewayConfig config) {
+ File configDir;
+
+ String path = config.getGatewayConfDir();
+ configDir = (path != null) ? new File(path) : (new File(config.getGatewayTopologyDir())).getParentFile();
+
+ return configDir.getAbsoluteFile();
+ }
+
+ private void initListener(FileAlterationMonitor monitor,
+ File directory,
+ FileFilter filter,
+ FileAlterationListener listener) {
+ monitors.add(monitor);
+ FileAlterationObserver observer = new FileAlterationObserver(directory, filter);
+ observer.addListener(listener);
+ monitor.addObserver(observer);
+ }
+
+ private void initListener(File directory, FileFilter filter, FileAlterationListener listener) throws IOException, SAXException {
+ // Increasing the monitoring interval to 5 seconds as profiling has shown
+ // this is rather expensive in terms of generated garbage objects.
+ initListener(new FileAlterationMonitor(5000L), directory, filter, listener);
+ }
+
+ private Map<File, Topology> loadTopologies(File directory) {
+ Map<File, Topology> map = new HashMap<>();
+ if (directory.isDirectory() && directory.canRead()) {
+ File[] existingTopologies = directory.listFiles(this);
+ if (existingTopologies != null) {
+ for (File file : existingTopologies) {
+ try {
+ Topology loadTopology = loadTopology(file);
+ if (null != loadTopology) {
+ map.put(file, loadTopology);
+ } else {
+ auditor.audit(Action.LOAD, file.getAbsolutePath(), ResourceType.TOPOLOGY,
+ ActionOutcome.FAILURE);
+ log.failedToLoadTopology(file.getAbsolutePath());
+ }
+ } catch (IOException e) {
+ // Maybe it makes sense to throw exception
+ auditor.audit(Action.LOAD, file.getAbsolutePath(), ResourceType.TOPOLOGY,
+ ActionOutcome.FAILURE);
+ log.failedToLoadTopology(file.getAbsolutePath(), e);
+ } catch (SAXException e) {
+ // Maybe it makes sense to throw exception
+ auditor.audit(Action.LOAD, file.getAbsolutePath(), ResourceType.TOPOLOGY,
+ ActionOutcome.FAILURE);
+ log.failedToLoadTopology(file.getAbsolutePath(), e);
+ } catch (Exception e) {
+ // Maybe it makes sense to throw exception
+ auditor.audit(Action.LOAD, file.getAbsolutePath(), ResourceType.TOPOLOGY,
+ ActionOutcome.FAILURE);
+ log.failedToLoadTopology(file.getAbsolutePath(), e);
+ }
+ }
+ }
+ }
+ return map;
+ }
+
+ public void setAliasService(AliasService as) {
+ this.aliasService = as;
+ }
+
+ public void deployTopology(Topology t){
+
+ try {
+ File temp = new File(topologiesDirectory.getAbsolutePath() + "/" + t.getName() + ".xml.temp");
+ Package topologyPkg = Topology.class.getPackage();
+ String pkgName = topologyPkg.getName();
+ String bindingFile = pkgName.replace(".", "/") + "/topology_binding-xml.xml";
+
+ Map<String, Object> properties = new HashMap<>(1);
+ properties.put(JAXBContextProperties.OXM_METADATA_SOURCE, bindingFile);
+ JAXBContext jc = JAXBContext.newInstance(pkgName, Topology.class.getClassLoader(), properties);
+ Marshaller mr = jc.createMarshaller();
+
+ mr.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
+ mr.marshal(t, temp);
+
+ File topology = new File(topologiesDirectory.getAbsolutePath() + "/" + t.getName() + ".xml");
+ if(!temp.renameTo(topology)) {
+ FileUtils.forceDelete(temp);
+ throw new IOException("Could not rename temp file");
+ }
+
+ // This code will check if the topology is valid, and retrieve the errors if it is not.
+ TopologyValidator validator = new TopologyValidator( topology.getAbsolutePath() );
+ if( !validator.validateTopology() ){
+ throw new SAXException( validator.getErrorString() );
+ }
+
+
+ } catch (JAXBException e) {
+ auditor.audit(Action.DEPLOY, t.getName(), ResourceType.TOPOLOGY, ActionOutcome.FAILURE);
+ log.failedToDeployTopology(t.getName(), e);
+ } catch (IOException io) {
+ auditor.audit(Action.DEPLOY, t.getName(), ResourceType.TOPOLOGY, ActionOutcome.FAILURE);
+ log.failedToDeployTopology(t.getName(), io);
+ } catch (SAXException sx){
+ auditor.audit(Action.DEPLOY, t.getName(), ResourceType.TOPOLOGY, ActionOutcome.FAILURE);
+ log.failedToDeployTopology(t.getName(), sx);
+ }
+ reloadTopologies();
+ }
+
+ public void redeployTopologies(String topologyName) {
+
+ for (Topology topology : getTopologies()) {
+ if (topologyName == null || topologyName.equals(topology.getName())) {
+ redeployTopology(topology);
+ }
+ }
+
+ }
+
+ public void reloadTopologies() {
+ try {
+ synchronized (this) {
+ Map<File, Topology> oldTopologies = topologies;
+ Map<File, Topology> newTopologies = loadTopologies(topologiesDirectory);
+ List<TopologyEvent> events = createChangeEvents(oldTopologies, newTopologies);
+ topologies = newTopologies;
+ notifyChangeListeners(events);
+ }
+ } catch (Exception e) {
+ // Maybe it makes sense to throw exception
+ log.failedToReloadTopologies(e);
+ }
+ }
+
+ public void deleteTopology(Topology t) {
+ File topoDir = topologiesDirectory;
+
+ if(topoDir.isDirectory() && topoDir.canRead()) {
+ for (File f : listFiles(topoDir)) {
+ String fName = FilenameUtils.getBaseName(f.getName());
+ if(fName.equals(t.getName())) {
+ f.delete();
+ }
+ }
+ }
+ reloadTopologies();
+ }
+
+ private void notifyChangeListeners(List<TopologyEvent> events) {
+ for (TopologyListener listener : listeners) {
+ try {
+ listener.handleTopologyEvent(events);
+ } catch (RuntimeException e) {
+ auditor.audit(Action.LOAD, "Topology_Event", ResourceType.TOPOLOGY, ActionOutcome.FAILURE);
+ log.failedToHandleTopologyEvents(e);
+ }
+ }
+ }
+
+ public Map<String, List<String>> getServiceTestURLs(Topology t, GatewayConfig config) {
+ File tFile = null;
+ Map<String, List<String>> urls = new HashMap<>();
+ if (topologiesDirectory.isDirectory() && topologiesDirectory.canRead()) {
+ for (File f : listFiles(topologiesDirectory)) {
+ if (FilenameUtils.removeExtension(f.getName()).equals(t.getName())) {
+ tFile = f;
+ }
+ }
+ }
+ Set<ServiceDefinition> defs;
+ if(tFile != null) {
+ defs = ServiceDefinitionsLoader.getServiceDefinitions(new File(config.getGatewayServicesDir()));
+
+ for(ServiceDefinition def : defs) {
+ urls.put(def.getRole(), def.getTestURLs());
+ }
+ }
+ return urls;
+ }
+
+ public Collection<Topology> getTopologies() {
+ Map<File, Topology> map = topologies;
+ return Collections.unmodifiableCollection(map.values());
+ }
+
+ @Override
+ public boolean deployProviderConfiguration(String name, String content) {
+ return writeConfig(sharedProvidersDirectory, name, content);
+ }
+
+ @Override
+ public Collection<File> getProviderConfigurations() {
+ List<File> providerConfigs = new ArrayList<>();
+ for (File providerConfig : listFiles(sharedProvidersDirectory)) {
+ if (SharedProviderConfigMonitor.SUPPORTED_EXTENSIONS.contains(FilenameUtils.getExtension(providerConfig.getName()))) {
+ providerConfigs.add(providerConfig);
+ }
+ }
+ return providerConfigs;
+ }
+
+ @Override
+ public boolean deleteProviderConfiguration(String name) {
+ boolean result = false;
+
+ File providerConfig = getExistingFile(sharedProvidersDirectory, name);
+ if (providerConfig != null) {
+ List<String> references = descriptorsMonitor.getReferencingDescriptors(providerConfig.getAbsolutePath());
+ if (references.isEmpty()) {
+ result = providerConfig.delete();
+ } else {
+ log.preventedDeletionOfSharedProviderConfiguration(providerConfig.getAbsolutePath());
+ }
+ } else {
+ result = true; // If it already does NOT exist, then the delete effectively succeeded
+ }
+
+ return result;
+ }
+
+ @Override
+ public boolean deployDescriptor(String name, String content) {
+ return writeConfig(descriptorsDirectory, name, content);
+ }
+
+ @Override
+ public Collection<File> getDescriptors() {
+ List<File> descriptors = new ArrayList<>();
+ for (File descriptor : listFiles(descriptorsDirectory)) {
+ if (DescriptorsMonitor.SUPPORTED_EXTENSIONS.contains(FilenameUtils.getExtension(descriptor.getName()))) {
+ descriptors.add(descriptor);
+ }
+ }
+ return descriptors;
+ }
+
+ @Override
+ public boolean deleteDescriptor(String name) {
+ File descriptor = getExistingFile(descriptorsDirectory, name);
+ return (descriptor == null) || descriptor.delete();
+ }
+
+ @Override
+ public void addTopologyChangeListener(TopologyListener listener) {
+ listeners.add(listener);
+ }
+
+ @Override
+ public void startMonitor() throws Exception {
+ // Start the local configuration monitors
+ for (FileAlterationMonitor monitor : monitors) {
+ monitor.start();
+ }
+
+ // Start the remote configuration monitor, if it has been initialized
+ if (remoteMonitor != null) {
+ try {
+ remoteMonitor.start();
+ } catch (Exception e) {
+ log.remoteConfigurationMonitorStartFailure(remoteMonitor.getClass().getTypeName(), e.getLocalizedMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public void stopMonitor() throws Exception {
+ // Stop the local configuration monitors
+ for (FileAlterationMonitor monitor : monitors) {
+ monitor.stop();
+ }
+
+ // Stop the remote configuration monitor, if it has been initialized
+ if (remoteMonitor != null) {
+ remoteMonitor.stop();
+ }
+ }
+
+ @Override
+ public boolean accept(File file) {
+ boolean accept = false;
+ if (!file.isDirectory() && file.canRead()) {
+ String extension = FilenameUtils.getExtension(file.getName());
+ if (SUPPORTED_TOPOLOGY_FILE_EXTENSIONS.contains(extension)) {
+ accept = true;
+ }
+ }
+ return accept;
+ }
+
+ @Override
+ public void onFileCreate(File file) {
+ onFileChange(file);
+ }
+
+ @Override
+ public void onFileDelete(java.io.File file) {
+ // For full topology descriptors, we need to make sure to delete any corresponding simple descriptors to prevent
+ // unintended subsequent generation of the topology descriptor
+ for (String ext : DescriptorsMonitor.SUPPORTED_EXTENSIONS) {
+ File simpleDesc =
+ new File(descriptorsDirectory, FilenameUtils.getBaseName(file.getName()) + "." + ext);
+ if (simpleDesc.exists()) {
+ log.deletingDescriptorForTopologyDeletion(simpleDesc.getName(), file.getName());
+ simpleDesc.delete();
+ }
+ }
+
+ onFileChange(file);
+ }
+
+ @Override
+ public void onFileChange(File file) {
+ reloadTopologies();
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public void start() {
+ // Register a cluster configuration monitor listener for change notifications
+ ClusterConfigurationMonitorService ccms =
+ GatewayServer.getGatewayServices().getService(GatewayServices.CLUSTER_CONFIGURATION_MONITOR_SERVICE);
+ ccms.addListener(new TopologyDiscoveryTrigger(this));
+ }
+
+ @Override
+ public void init(GatewayConfig config, Map<String, String> options) throws ServiceLifecycleException {
+
+ try {
+ listeners = new HashSet<>();
+ topologies = new HashMap<>();
+
+ topologiesDirectory = calculateAbsoluteTopologiesDir(config);
+
+ File configDirectory = calculateAbsoluteConfigDir(config);
+ descriptorsDirectory = new File(configDirectory, "descriptors");
+ sharedProvidersDirectory = new File(configDirectory, "shared-providers");
+
+ // Add support for conf/topologies
+ initListener(topologiesDirectory, this, this);
+
+ // Add support for conf/descriptors
+ descriptorsMonitor = new DescriptorsMonitor(topologiesDirectory, aliasService);
+ initListener(descriptorsDirectory,
+ descriptorsMonitor,
+ descriptorsMonitor);
+ log.monitoringDescriptorChangesInDirectory(descriptorsDirectory.getAbsolutePath());
+
+ // Add support for conf/shared-providers
+ SharedProviderConfigMonitor spm = new SharedProviderConfigMonitor(descriptorsMonitor, descriptorsDirectory);
+ initListener(sharedProvidersDirectory, spm, spm);
+ log.monitoringProviderConfigChangesInDirectory(sharedProvidersDirectory.getAbsolutePath());
+
- // For all the descriptors currently in the descriptors dir at start-up time, trigger topology generation.
++ // For all the descriptors currently in the descriptors dir at start-up time, determine if topology regeneration
++ // is required.
+ // This happens prior to the start-up loading of the topologies.
+ String[] descriptorFilenames = descriptorsDirectory.list();
+ if (descriptorFilenames != null) {
+ for (String descriptorFilename : descriptorFilenames) {
+ if (DescriptorsMonitor.isDescriptorFile(descriptorFilename)) {
++ String topologyName = FilenameUtils.getBaseName(descriptorFilename);
++ File existingDescriptorFile = getExistingFile(descriptorsDirectory, topologyName);
++
+ // If there isn't a corresponding topology file, or if the descriptor has been modified since the
+ // corresponding topology file was generated, then trigger generation of one
- File matchingTopologyFile = getExistingFile(topologiesDirectory, FilenameUtils.getBaseName(descriptorFilename));
- if (matchingTopologyFile == null ||
- matchingTopologyFile.lastModified() < (new File(descriptorsDirectory, descriptorFilename)).lastModified()) {
- descriptorsMonitor.onFileChange(new File(descriptorsDirectory, descriptorFilename));
++ File matchingTopologyFile = getExistingFile(topologiesDirectory, topologyName);
++ if (matchingTopologyFile == null || matchingTopologyFile.lastModified() < existingDescriptorFile.lastModified()) {
++ descriptorsMonitor.onFileChange(existingDescriptorFile);
++ } else {
++ // If regeneration is NOT required, then we at least need to report the provider configuration
++ // reference relationship (KNOX-1144)
++ String normalizedDescriptorPath = FilenameUtils.normalize(existingDescriptorFile.getAbsolutePath());
++
++ // Parse the descriptor to determine the provider config reference
++ SimpleDescriptor sd = SimpleDescriptorFactory.parse(normalizedDescriptorPath);
++ if (sd != null) {
++ File referencedProviderConfig =
++ getExistingFile(sharedProvidersDirectory, FilenameUtils.getBaseName(sd.getProviderConfig()));
++ if (referencedProviderConfig != null) {
++ List<String> references =
++ descriptorsMonitor.getReferencingDescriptors(referencedProviderConfig.getAbsolutePath());
++ if (!references.contains(normalizedDescriptorPath)) {
++ references.add(normalizedDescriptorPath);
++ }
++ }
++ }
+ }
+ }
+ }
+ }
+
+ // Initialize the remote configuration monitor, if it has been configured
+ remoteMonitor = RemoteConfigurationMonitorFactory.get(config);
+
+ } catch (IOException | SAXException io) {
+ throw new ServiceLifecycleException(io.getMessage());
+ }
+ }
+
+ /**
+ * Utility method for listing the files in the specified directory.
+ * This method is "nicer" than the File#listFiles() because it will not return null.
+ *
+ * @param directory The directory whose files should be returned.
+ *
+ * @return A List of the Files on the directory.
+ */
+ private static List<File> listFiles(File directory) {
+ List<File> result;
+ File[] files = directory.listFiles();
+ if (files != null) {
+ result = Arrays.asList(files);
+ } else {
+ result = Collections.emptyList();
+ }
+ return result;
+ }
+
+ /**
+ * Search for a file in the specified directory whose base name (filename without extension) matches the
+ * specified basename.
+ *
+ * @param directory The directory in which to search.
+ * @param basename The basename of interest.
+ *
+ * @return The matching File
+ */
+ private static File getExistingFile(File directory, String basename) {
+ File match = null;
+ for (File file : listFiles(directory)) {
+ if (FilenameUtils.getBaseName(file.getName()).equals(basename)) {
+ match = file;
+ break;
+ }
+ }
+ return match;
+ }
+
+ /**
+ * Write the specified content to a file.
+ *
+ * @param dest The destination directory.
+ * @param name The name of the file.
+ * @param content The contents of the file.
+ *
+ * @return true, if the write succeeds; otherwise, false.
+ */
+ private static boolean writeConfig(File dest, String name, String content) {
+ boolean result = false;
+
+ File destFile = new File(dest, name);
+ try {
+ FileUtils.writeStringToFile(destFile, content);
+ log.wroteConfigurationFile(destFile.getAbsolutePath());
+ result = true;
+ } catch (IOException e) {
+ log.failedToWriteConfigurationFile(destFile.getAbsolutePath(), e);
+ }
+
+ return result;
+ }
+
+
+ /**
+ * Change handler for simple descriptors
+ */
+ public static class DescriptorsMonitor extends FileAlterationListenerAdaptor
+ implements FileFilter {
+
+ static final List<String> SUPPORTED_EXTENSIONS = new ArrayList<String>();
+ static {
+ SUPPORTED_EXTENSIONS.add("json");
+ SUPPORTED_EXTENSIONS.add("yml");
+ SUPPORTED_EXTENSIONS.add("yaml");
+ }
+
+ private File topologiesDir;
+
+ private AliasService aliasService;
+
+ private Map<String, List<String>> providerConfigReferences = new HashMap<>();
+
+
+ static boolean isDescriptorFile(String filename) {
+ return SUPPORTED_EXTENSIONS.contains(FilenameUtils.getExtension(filename));
+ }
+
+ public DescriptorsMonitor(File topologiesDir, AliasService aliasService) {
+ this.topologiesDir = topologiesDir;
+ this.aliasService = aliasService;
+ }
+
+ List<String> getReferencingDescriptors(String providerConfigPath) {
- List<String> result = providerConfigReferences.get(FilenameUtils.normalize(providerConfigPath));
- if (result == null) {
- result = Collections.emptyList();
- }
- return result;
++ String normalizedPath = FilenameUtils.normalize(providerConfigPath);
++ return providerConfigReferences.computeIfAbsent(normalizedPath, p -> new ArrayList<>());
+ }
+
+ @Override
+ public void onFileCreate(File file) {
+ onFileChange(file);
+ }
+
+ @Override
+ public void onFileDelete(File file) {
+ // For simple descriptors, we need to make sure to delete any corresponding full topology descriptors to trigger undeployment
+ for (String ext : DefaultTopologyService.SUPPORTED_TOPOLOGY_FILE_EXTENSIONS) {
+ File topologyFile =
+ new File(topologiesDir, FilenameUtils.getBaseName(file.getName()) + "." + ext);
+ if (topologyFile.exists()) {
+ log.deletingTopologyForDescriptorDeletion(topologyFile.getName(), file.getName());
+ topologyFile.delete();
+ }
+ }
+
+ String normalizedFilePath = FilenameUtils.normalize(file.getAbsolutePath());
+ String reference = null;
+ for (Map.Entry<String, List<String>> entry : providerConfigReferences.entrySet()) {
+ if (entry.getValue().contains(normalizedFilePath)) {
+ reference = entry.getKey();
+ break;
+ }
+ }
+
+ if (reference != null) {
+ providerConfigReferences.get(reference).remove(normalizedFilePath);
+ log.removedProviderConfigurationReference(normalizedFilePath, reference);
+ }
+ }
+
+ @Override
+ public void onFileChange(File file) {
+ try {
+ // When a simple descriptor has been created or modified, generate the new topology descriptor
+ Map<String, File> result = SimpleDescriptorHandler.handle(file, topologiesDir, aliasService);
+ log.generatedTopologyForDescriptorChange(result.get("topology").getName(), file.getName());
+
+ // Add the provider config reference relationship for handling updates to the provider config
+ String providerConfig = FilenameUtils.normalize(result.get("reference").getAbsolutePath());
+ if (!providerConfigReferences.containsKey(providerConfig)) {
+ providerConfigReferences.put(providerConfig, new ArrayList<String>());
+ }
+ List<String> refs = providerConfigReferences.get(providerConfig);
+ String descriptorName = FilenameUtils.normalize(file.getAbsolutePath());
+ if (!refs.contains(descriptorName)) {
+ // Need to check if descriptor had previously referenced another provider config, so it can be removed
+ for (List<String> descs : providerConfigReferences.values()) {
+ if (descs.contains(descriptorName)) {
+ descs.remove(descriptorName);
+ }
+ }
+
+ // Add the current reference relationship
+ refs.add(descriptorName);
+ log.addedProviderConfigurationReference(descriptorName, providerConfig);
+ }
+ } catch (Exception e) {
+ log.simpleDescriptorHandlingError(file.getName(), e);
+ }
+ }
+
+ @Override
+ public boolean accept(File file) {
+ boolean accept = false;
+ if (!file.isDirectory() && file.canRead()) {
+ String extension = FilenameUtils.getExtension(file.getName());
+ if (SUPPORTED_EXTENSIONS.contains(extension)) {
+ accept = true;
+ }
+ }
+ return accept;
+ }
+ }
+
+ /**
+ * Change handler for shared provider configurations
+ */
+ public static class SharedProviderConfigMonitor extends FileAlterationListenerAdaptor
+ implements FileFilter {
+
+ static final List<String> SUPPORTED_EXTENSIONS = new ArrayList<>();
+ static {
+ SUPPORTED_EXTENSIONS.add("xml");
+ }
+
+ private DescriptorsMonitor descriptorsMonitor;
+ private File descriptorsDir;
+
+
+ SharedProviderConfigMonitor(DescriptorsMonitor descMonitor, File descriptorsDir) {
+ this.descriptorsMonitor = descMonitor;
+ this.descriptorsDir = descriptorsDir;
+ }
+
+ @Override
+ public void onFileCreate(File file) {
+ onFileChange(file);
+ }
+
+ @Override
+ public void onFileDelete(File file) {
+ onFileChange(file);
+ }
+
+ @Override
+ public void onFileChange(File file) {
+ // For shared provider configuration, we need to update any simple descriptors that reference it
+ for (File descriptor : getReferencingDescriptors(file)) {
+ descriptor.setLastModified(System.currentTimeMillis());
+ }
+ }
+
+ private List<File> getReferencingDescriptors(File sharedProviderConfig) {
+ List<File> references = new ArrayList<>();
+
+ for (File descriptor : listFiles(descriptorsDir)) {
+ if (DescriptorsMonitor.SUPPORTED_EXTENSIONS.contains(FilenameUtils.getExtension(descriptor.getName()))) {
+ for (String reference : descriptorsMonitor.getReferencingDescriptors(FilenameUtils.normalize(sharedProviderConfig.getAbsolutePath()))) {
+ references.add(new File(reference));
+ }
+ }
+ }
+
+ return references;
+ }
+
+ @Override
+ public boolean accept(File file) {
+ boolean accept = false;
+ if (!file.isDirectory() && file.canRead()) {
+ String extension = FilenameUtils.getExtension(file.getName());
+ if (SUPPORTED_EXTENSIONS.contains(extension)) {
+ accept = true;
+ }
+ }
+ return accept;
+ }
+ }
+
+ /**
+ * Listener for Ambari config change events, which will trigger re-generation (including re-discovery) of the
+ * affected topologies.
+ */
+ private static class TopologyDiscoveryTrigger implements ClusterConfigurationMonitor.ConfigurationChangeListener {
+
+ private TopologyService topologyService = null;
+
+ TopologyDiscoveryTrigger(TopologyService topologyService) {
+ this.topologyService = topologyService;
+ }
+
+ @Override
+ public void onConfigurationChange(String source, String clusterName) {
+ log.noticedClusterConfigurationChange(source, clusterName);
+ try {
+ // Identify any descriptors associated with the cluster configuration change
+ for (File descriptor : topologyService.getDescriptors()) {
+ String descriptorContent = FileUtils.readFileToString(descriptor);
+ if (descriptorContent.contains(source)) {
+ if (descriptorContent.contains(clusterName)) {
+ log.triggeringTopologyRegeneration(source, clusterName, descriptor.getAbsolutePath());
+ // 'Touch' the descriptor to trigger re-generation of the associated topology
+ descriptor.setLastModified(System.currentTimeMillis());
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.errorRespondingToConfigChange(source, clusterName, e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/e5fd0622/gateway-server/src/main/java/org/apache/knox/gateway/topology/monitor/DefaultRemoteConfigurationMonitor.java
----------------------------------------------------------------------
diff --cc gateway-server/src/main/java/org/apache/knox/gateway/topology/monitor/DefaultRemoteConfigurationMonitor.java
index efafee0,0000000..37d1ca6
mode 100644,000000..100644
--- a/gateway-server/src/main/java/org/apache/knox/gateway/topology/monitor/DefaultRemoteConfigurationMonitor.java
+++ b/gateway-server/src/main/java/org/apache/knox/gateway/topology/monitor/DefaultRemoteConfigurationMonitor.java
@@@ -1,228 -1,0 +1,246 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.knox.gateway.topology.monitor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.knox.gateway.GatewayMessages;
+import org.apache.knox.gateway.config.GatewayConfig;
+import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+import org.apache.knox.gateway.services.config.client.RemoteConfigurationRegistryClient.ChildEntryListener;
+import org.apache.knox.gateway.services.config.client.RemoteConfigurationRegistryClient.EntryListener;
+import org.apache.knox.gateway.services.config.client.RemoteConfigurationRegistryClient;
+import org.apache.knox.gateway.services.config.client.RemoteConfigurationRegistryClientService;
+import org.apache.zookeeper.ZooDefs;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
++import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+
+class DefaultRemoteConfigurationMonitor implements RemoteConfigurationMonitor {
+
+ private static final String NODE_KNOX = "/knox";
+ private static final String NODE_KNOX_CONFIG = NODE_KNOX + "/config";
+ private static final String NODE_KNOX_PROVIDERS = NODE_KNOX_CONFIG + "/shared-providers";
+ private static final String NODE_KNOX_DESCRIPTORS = NODE_KNOX_CONFIG + "/descriptors";
+
+ private static GatewayMessages log = MessagesFactory.get(GatewayMessages.class);
+
+ // N.B. This is ZooKeeper-specific, and should be abstracted when another registry is supported
+ private static final RemoteConfigurationRegistryClient.EntryACL AUTHENTICATED_USERS_ALL;
+ static {
+ AUTHENTICATED_USERS_ALL = new RemoteConfigurationRegistryClient.EntryACL() {
+ public String getId() {
+ return "";
+ }
+
+ public String getType() {
+ return "auth";
+ }
+
+ public Object getPermissions() {
+ return ZooDefs.Perms.ALL;
+ }
+
+ public boolean canRead() {
+ return true;
+ }
+
+ public boolean canWrite() {
+ return true;
+ }
+ };
+ }
+
+ private RemoteConfigurationRegistryClient client = null;
+
+ private File providersDir;
+ private File descriptorsDir;
+
+ /**
+ * @param config The gateway configuration
+ * @param registryClientService The service from which the remote registry client should be acquired.
+ */
+ DefaultRemoteConfigurationMonitor(GatewayConfig config,
+ RemoteConfigurationRegistryClientService registryClientService) {
+ this.providersDir = new File(config.getGatewayProvidersConfigDir());
+ this.descriptorsDir = new File(config.getGatewayDescriptorsDir());
+
+ if (registryClientService != null) {
+ String clientName = config.getRemoteConfigurationMonitorClientName();
+ if (clientName != null) {
+ this.client = registryClientService.get(clientName);
+ if (this.client == null) {
+ log.unresolvedClientConfigurationForRemoteMonitoring(clientName);
+ }
+ } else {
+ log.missingClientConfigurationForRemoteMonitoring();
+ }
+ }
+ }
+
+ @Override
+ public void start() throws Exception {
+ if (client == null) {
+ throw new IllegalStateException("Failed to acquire a remote configuration registry client.");
+ }
+
+ final String monitorSource = client.getAddress();
+ log.startingRemoteConfigurationMonitor(monitorSource);
+
+ // Ensure the existence of the expected entries and their associated ACLs
+ ensureEntries();
+
+ // Confirm access to the remote provider configs directory znode
+ List<String> providerConfigs = client.listChildEntries(NODE_KNOX_PROVIDERS);
+ if (providerConfigs == null) {
+ // Either the ZNode does not exist, or there is an authentication problem
+ throw new IllegalStateException("Unable to access remote path: " + NODE_KNOX_PROVIDERS);
++ } else {
++ // Download any existing provider configs in the remote registry, which either do not exist locally, or have
++ // been modified, so that they are certain to be present when this monitor downloads any descriptors that
++ // reference them.
++ for (String providerConfig : providerConfigs) {
++ File localFile = new File(providersDir, providerConfig);
++
++ byte[] remoteContent = client.getEntryData(NODE_KNOX_PROVIDERS + "/" + providerConfig).getBytes();
++ if (!localFile.exists() || !Arrays.equals(remoteContent, FileUtils.readFileToByteArray(localFile))) {
++ FileUtils.writeByteArrayToFile(localFile, remoteContent);
++ log.downloadedRemoteConfigFile(providersDir.getName(), providerConfig);
++ }
++ }
+ }
+
+ // Confirm access to the remote descriptors directory znode
+ List<String> descriptors = client.listChildEntries(NODE_KNOX_DESCRIPTORS);
+ if (descriptors == null) {
+ // Either the ZNode does not exist, or there is an authentication problem
+ throw new IllegalStateException("Unable to access remote path: " + NODE_KNOX_DESCRIPTORS);
+ }
+
+ // Register a listener for provider config znode additions/removals
+ client.addChildEntryListener(NODE_KNOX_PROVIDERS, new ConfigDirChildEntryListener(providersDir));
+
+ // Register a listener for descriptor znode additions/removals
+ client.addChildEntryListener(NODE_KNOX_DESCRIPTORS, new ConfigDirChildEntryListener(descriptorsDir));
+
+ log.monitoringRemoteConfigurationSource(monitorSource);
+ }
+
+
+ @Override
+ public void stop() throws Exception {
+ client.removeEntryListener(NODE_KNOX_PROVIDERS);
+ client.removeEntryListener(NODE_KNOX_DESCRIPTORS);
+ }
+
+ private void ensureEntries() {
+ ensureEntry(NODE_KNOX);
+ ensureEntry(NODE_KNOX_CONFIG);
+ ensureEntry(NODE_KNOX_PROVIDERS);
+ ensureEntry(NODE_KNOX_DESCRIPTORS);
+ }
+
+ private void ensureEntry(String name) {
+ if (!client.entryExists(name)) {
+ client.createEntry(name);
+ } else {
+ // Validate the ACL
+ List<RemoteConfigurationRegistryClient.EntryACL> entryACLs = client.getACL(name);
+ for (RemoteConfigurationRegistryClient.EntryACL entryACL : entryACLs) {
+ // N.B. This is ZooKeeper-specific, and should be abstracted when another registry is supported
+ // For now, check for ZooKeeper world:anyone with ANY permissions (even read-only)
+ if (entryACL.getType().equals("world") && entryACL.getId().equals("anyone")) {
+ log.suspectWritableRemoteConfigurationEntry(name);
+
+ // If the client is authenticated, but "anyone" can write the content, then the content may not
+ // be trustworthy.
+ if (client.isAuthenticationConfigured()) {
+ log.correctingSuspectWritableRemoteConfigurationEntry(name);
+
+ // Replace the existing ACL with one that permits only authenticated users
+ client.setACL(name, Collections.singletonList(AUTHENTICATED_USERS_ALL));
+ }
+ }
+ }
+ }
+ }
+
+ private static class ConfigDirChildEntryListener implements ChildEntryListener {
+ File localDir;
+
+ ConfigDirChildEntryListener(File localDir) {
+ this.localDir = localDir;
+ }
+
+ @Override
+ public void childEvent(RemoteConfigurationRegistryClient client, Type type, String path) {
+ File localFile = new File(localDir, path.substring(path.lastIndexOf("/") + 1));
+
+ switch (type) {
+ case REMOVED:
+ FileUtils.deleteQuietly(localFile);
+ log.deletedRemoteConfigFile(localDir.getName(), localFile.getName());
+ try {
+ client.removeEntryListener(path);
+ } catch (Exception e) {
+ log.errorRemovingRemoteConfigurationListenerForPath(path, e);
+ }
+ break;
+ case ADDED:
+ try {
+ client.addEntryListener(path, new ConfigEntryListener(localDir));
+ } catch (Exception e) {
+ log.errorAddingRemoteConfigurationListenerForPath(path, e);
+ }
+ break;
+ }
+ }
+ }
+
+ private static class ConfigEntryListener implements EntryListener {
+ private File localDir;
+
+ ConfigEntryListener(File localDir) {
+ this.localDir = localDir;
+ }
+
+ @Override
+ public void entryChanged(RemoteConfigurationRegistryClient client, String path, byte[] data) {
+ File localFile = new File(localDir, path.substring(path.lastIndexOf("/")));
+ if (data != null) {
+ try {
- FileUtils.writeByteArrayToFile(localFile, data);
- log.downloadedRemoteConfigFile(localDir.getName(), localFile.getName());
++ // If there is no corresponding local file, or the content is different from the existing local
++ // file, write the data to the local file.
++ if (!localFile.exists() || !Arrays.equals(FileUtils.readFileToByteArray(localFile), data)) {
++ FileUtils.writeByteArrayToFile(localFile, data);
++ log.downloadedRemoteConfigFile(localDir.getName(), localFile.getName());
++ }
+ } catch (IOException e) {
+ log.errorDownloadingRemoteConfiguration(path, e);
+ }
+ } else {
+ FileUtils.deleteQuietly(localFile);
+ log.deletedRemoteConfigFile(localDir.getName(), localFile.getName());
+ }
+ }
+ }
+
+}