You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by km...@apache.org on 2014/07/18 15:24:26 UTC
git commit: KNOX-402: New GatewayService - TopologyService
Repository: knox
Updated Branches:
refs/heads/master a970502aa -> 2adc917e4
KNOX-402: New GatewayService - TopologyService
Project: http://git-wip-us.apache.org/repos/asf/knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/2adc917e
Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/2adc917e
Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/2adc917e
Branch: refs/heads/master
Commit: 2adc917e486860cf0d009bd50786d5c5bb87c374
Parents: a970502
Author: Kevin Minder <ke...@hortonworks.com>
Authored: Fri Jul 18 09:23:54 2014 -0400
Committer: Kevin Minder <ke...@hortonworks.com>
Committed: Fri Jul 18 09:23:54 2014 -0400
----------------------------------------------------------------------
.../apache/hadoop/gateway/GatewayServer.java | 81 ++---
.../gateway/services/CLIGatewayServices.java | 11 +
.../services/DefaultGatewayServices.java | 11 +
.../topology/impl/DefaultTopologyService.java | 321 +++++++++++++++++++
.../topology/file/FileTopologyProvider.java | 249 --------------
.../org/apache/hadoop/gateway/util/KnoxCLI.java | 12 +-
.../topology/DefaultTopologyServiceTest.java | 208 ++++++++++++
.../topology/file/FileTopologyProviderTest.java | 205 ------------
.../gateway/services/GatewayServices.java | 3 +-
.../services/topology/TopologyService.java | 41 +++
.../hadoop/gateway/GatewayDeployFuncTest.java | 4 +-
11 files changed, 625 insertions(+), 521 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayServer.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayServer.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayServer.java
index 9fc0dc2..92faa17 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayServer.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayServer.java
@@ -34,12 +34,12 @@ import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
import org.apache.hadoop.gateway.i18n.resources.ResourcesFactory;
import org.apache.hadoop.gateway.services.GatewayServices;
import org.apache.hadoop.gateway.services.ServiceLifecycleException;
+import org.apache.hadoop.gateway.services.topology.TopologyService;
import org.apache.hadoop.gateway.services.registry.ServiceRegistry;
import org.apache.hadoop.gateway.services.security.SSLService;
import org.apache.hadoop.gateway.topology.Topology;
import org.apache.hadoop.gateway.topology.TopologyEvent;
import org.apache.hadoop.gateway.topology.TopologyListener;
-import org.apache.hadoop.gateway.topology.file.FileTopologyProvider;
import org.apache.log4j.PropertyConfigurator;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
@@ -48,7 +48,6 @@ import org.eclipse.jetty.server.handler.ErrorHandler;
import org.eclipse.jetty.webapp.WebAppContext;
import org.jboss.shrinkwrap.api.exporter.ExplodedExporter;
import org.jboss.shrinkwrap.api.spec.WebArchive;
-import org.xml.sax.SAXException;
import java.io.File;
import java.io.FileOutputStream;
@@ -68,20 +67,20 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
public class GatewayServer {
- private static GatewayResources res = ResourcesFactory.get( GatewayResources.class );
- private static GatewayMessages log = MessagesFactory.get( GatewayMessages.class );
- private static Auditor auditor = AuditServiceFactory.getAuditService().getAuditor( AuditConstants.DEFAULT_AUDITOR_NAME,
- AuditConstants.KNOX_SERVICE_NAME, AuditConstants.KNOX_COMPONENT_NAME );
+ private static GatewayResources res = ResourcesFactory.get(GatewayResources.class);
+ private static GatewayMessages log = MessagesFactory.get(GatewayMessages.class);
+ private static Auditor auditor = AuditServiceFactory.getAuditService().getAuditor(AuditConstants.DEFAULT_AUDITOR_NAME,
+ AuditConstants.KNOX_SERVICE_NAME, AuditConstants.KNOX_COMPONENT_NAME);
private static GatewayServer server;
private static GatewayServices services;
-
+
private static Properties buildProperties;
private Server jetty;
private ErrorHandler errorHandler;
private GatewayConfig config;
private ContextHandlerCollection contexts;
- private FileTopologyProvider monitor;
+ private TopologyService monitor;
private TopologyListener listener;
private Map<String, WebAppContext> deployments;
@@ -94,8 +93,7 @@ public class GatewayServer {
} else if( cmd.hasOption( GatewayCommandLine.VERSION_LONG ) ) {
printVersion();
} else if( cmd.hasOption( GatewayCommandLine.REDEPLOY_LONG ) ) {
- GatewayConfig config = new GatewayConfigImpl();
- redeployTopologies( config, cmd.getOptionValue( GatewayCommandLine.REDEPLOY_LONG ) );
+ redeployTopologies( cmd.getOptionValue( GatewayCommandLine.REDEPLOY_LONG ) );
} else {
buildProperties = loadBuildProperties();
services = instantiateGatewayServices();
@@ -177,12 +175,12 @@ public class GatewayServer {
private static void configureKerberosSecurity( GatewayConfig config ) {
System.setProperty(GatewayConfig.HADOOP_KERBEROS_SECURED, "true");
System.setProperty(GatewayConfig.KRB5_CONFIG, config.getKerberosConfig());
- System.setProperty(GatewayConfig.KRB5_DEBUG,
+ System.setProperty(GatewayConfig.KRB5_DEBUG,
Boolean.toString(config.isKerberosDebugEnabled()));
System.setProperty(GatewayConfig.KRB5_LOGIN_CONFIG, config.getKerberosLoginConfig());
System.setProperty(GatewayConfig.KRB5_USE_SUBJECT_CREDS_ONLY, "false");
}
-
+
private static Properties loadBuildProperties() {
Properties properties = new Properties();
InputStream inputStream = GatewayServer.class.getClassLoader().getResourceAsStream( "build.properties" );
@@ -205,50 +203,11 @@ public class GatewayServer {
input.close();
}
- private static void redeployTopology( Topology topology ) {
- File topologyFile = new File( topology.getUri() );
- long start = System.currentTimeMillis();
- long limit = 1000L; // One second.
- long elapsed = 1;
- while( elapsed <= limit ) {
- try {
- long origTimestamp = topologyFile.lastModified();
- long setTimestamp = Math.max( System.currentTimeMillis(), topologyFile.lastModified() + elapsed );
- if( topologyFile.setLastModified( setTimestamp ) ) {
- long newTimstamp = topologyFile.lastModified();
- if( newTimstamp > origTimestamp ) {
- break;
- } else {
- Thread.sleep( 10 );
- elapsed = System.currentTimeMillis() - start;
- continue;
- }
- } else {
- log.failedToRedeployTopology( topology.getName() );
- break;
- }
- } catch( InterruptedException e ) {
- log.failedToRedeployTopology( topology.getName(), e );
- e.printStackTrace();
- }
- }
- }
- public static void redeployTopologies( GatewayConfig config, String topologyName ) {
- try {
- File topologiesDir = calculateAbsoluteTopologiesDir( config );
- FileTopologyProvider provider = new FileTopologyProvider( topologiesDir );
- provider.reloadTopologies();
- for( Topology topology : provider.getTopologies() ) {
- if( topologyName == null || topologyName.equals( topology.getName() ) ) {
- redeployTopology( topology );
- }
- }
- } catch( SAXException e ) {
- log.failedToRedeployTopologies( e );
- } catch( IOException e ) {
- log.failedToRedeployTopologies( e );
- }
+ public static void redeployTopologies( String topologyName ) {
+ TopologyService ts = getGatewayServices().getService(GatewayServices.TOPOLOGY_SERVICE);
+ ts.reloadTopologies();
+ ts.redeployTopologies(topologyName);
}
public static GatewayServer startGateway( GatewayConfig config, GatewayServices svcs ) {
@@ -307,7 +266,7 @@ public class GatewayServer {
// jetty.start();
// }
-
+
private synchronized void start() throws Exception {
// Create the global context handler.
@@ -329,7 +288,7 @@ public class GatewayServer {
if (config.isSSLEnabled()) {
SSLService ssl = services.getService("SSLService");
String keystoreFileName = config.getGatewaySecurityDir() + File.separatorChar + "keystores" + File.separatorChar + "gateway.jks";
- Connector connector = (Connector) ssl.buildSSlConnector( keystoreFileName );
+ Connector connector = (Connector) ssl.buildSSlConnector(keystoreFileName);
connector.setHost(address.getHostName());
connector.setPort(address.getPort());
jetty.addConnector(connector);
@@ -345,15 +304,15 @@ public class GatewayServer {
// Create a dir/file based cluster topology provider.
File topologiesDir = calculateAbsoluteTopologiesDir();
- monitor = new FileTopologyProvider( topologiesDir );
- monitor.addTopologyChangeListener( listener );
+ monitor = services.getService(GatewayServices.TOPOLOGY_SERVICE);
+ monitor.addTopologyChangeListener(listener);
// Load the current topologies.
- log.loadingTopologiesFromDirectory( topologiesDir.getAbsolutePath() );
+ log.loadingTopologiesFromDirectory(topologiesDir.getAbsolutePath());
monitor.reloadTopologies();
// Start the topology monitor.
- log.monitoringTopologyChangesInDirectory( topologiesDir.getAbsolutePath() );
+ log.monitoringTopologyChangesInDirectory(topologiesDir.getAbsolutePath());
monitor.startMonitor();
}
http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-server/src/main/java/org/apache/hadoop/gateway/services/CLIGatewayServices.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/CLIGatewayServices.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/CLIGatewayServices.java
index 66f22b8..6fc10a2 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/CLIGatewayServices.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/CLIGatewayServices.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.gateway.deploy.DeploymentContext;
import org.apache.hadoop.gateway.descriptor.FilterParamDescriptor;
import org.apache.hadoop.gateway.descriptor.ResourceDescriptor;
import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.apache.hadoop.gateway.services.topology.impl.DefaultTopologyService;
import org.apache.hadoop.gateway.services.security.impl.DefaultAliasService;
import org.apache.hadoop.gateway.services.security.impl.DefaultCryptoService;
import org.apache.hadoop.gateway.services.security.impl.DefaultKeystoreService;
@@ -66,6 +67,10 @@ public class CLIGatewayServices implements GatewayServices {
crypto.setAliasService(alias);
crypto.init(config, options);
services.put(CRYPTO_SERVICE, crypto);
+
+ DefaultTopologyService tops = new DefaultTopologyService();
+ tops.init( config, options );
+ services.put(TOPOLOGY_SERVICE, tops);
}
public void start() throws ServiceLifecycleException {
@@ -75,6 +80,9 @@ public class CLIGatewayServices implements GatewayServices {
DefaultAliasService alias = (DefaultAliasService) services.get(ALIAS_SERVICE);
alias.start();
+
+ DefaultTopologyService tops = (DefaultTopologyService)services.get(TOPOLOGY_SERVICE);
+ tops.start();
}
public void stop() throws ServiceLifecycleException {
@@ -84,6 +92,9 @@ public class CLIGatewayServices implements GatewayServices {
DefaultAliasService alias = (DefaultAliasService) services.get(ALIAS_SERVICE);
alias.stop();
+
+ DefaultTopologyService tops = (DefaultTopologyService)services.get(TOPOLOGY_SERVICE);
+ tops.stop();
}
/* (non-Javadoc)
http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-server/src/main/java/org/apache/hadoop/gateway/services/DefaultGatewayServices.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/DefaultGatewayServices.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/DefaultGatewayServices.java
index eacd124..367901d 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/DefaultGatewayServices.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/DefaultGatewayServices.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.gateway.deploy.DeploymentContext;
import org.apache.hadoop.gateway.descriptor.FilterParamDescriptor;
import org.apache.hadoop.gateway.descriptor.ResourceDescriptor;
import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.apache.hadoop.gateway.services.topology.impl.DefaultTopologyService;
import org.apache.hadoop.gateway.services.hostmap.impl.DefaultHostMapperService;
import org.apache.hadoop.gateway.services.registry.impl.DefaultServiceRegistryService;
import org.apache.hadoop.gateway.services.security.KeystoreServiceException;
@@ -96,6 +97,10 @@ public class DefaultGatewayServices implements GatewayServices {
DefaultServerInfoService sis = new DefaultServerInfoService();
sis.init( config, options );
services.put( SERVER_INFO_SERVICE, sis );
+
+ DefaultTopologyService tops = new DefaultTopologyService();
+ tops.init( config, options );
+ services.put( TOPOLOGY_SERVICE, tops );
}
public void start() throws ServiceLifecycleException {
@@ -111,6 +116,9 @@ public class DefaultGatewayServices implements GatewayServices {
ServerInfoService sis = (ServerInfoService) services.get(SERVER_INFO_SERVICE);
sis.start();
+
+ DefaultTopologyService tops = (DefaultTopologyService)services.get(TOPOLOGY_SERVICE);
+ tops.start();
}
public void stop() throws ServiceLifecycleException {
@@ -126,6 +134,9 @@ public class DefaultGatewayServices implements GatewayServices {
ServerInfoService sis = (ServerInfoService) services.get(SERVER_INFO_SERVICE);
sis.stop();
+
+ DefaultTopologyService tops = (DefaultTopologyService)services.get(TOPOLOGY_SERVICE);
+ tops.stop();
}
/* (non-Javadoc)
http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultTopologyService.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultTopologyService.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultTopologyService.java
new file mode 100644
index 0000000..516cd24
--- /dev/null
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultTopologyService.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.gateway.services.topology.impl;
+
+
+import org.apache.commons.digester3.Digester;
+import org.apache.commons.digester3.binder.DigesterLoader;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.io.monitor.FileAlterationListener;
+import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
+import org.apache.commons.io.monitor.FileAlterationMonitor;
+import org.apache.commons.io.monitor.FileAlterationObserver;
+import org.apache.hadoop.gateway.GatewayMessages;
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.apache.hadoop.gateway.services.ServiceLifecycleException;
+import org.apache.hadoop.gateway.services.topology.TopologyService;
+import org.apache.hadoop.gateway.topology.TopologyListener;
+import org.apache.hadoop.gateway.topology.TopologyMonitor;
+import org.apache.hadoop.gateway.topology.builder.TopologyBuilder;
+import org.apache.hadoop.gateway.topology.Topology;
+import org.apache.hadoop.gateway.topology.TopologyProvider;
+import org.apache.hadoop.gateway.topology.TopologyEvent;
+import org.apache.hadoop.gateway.topology.xml.AmbariFormatXmlTopologyRules;
+import org.apache.hadoop.gateway.topology.xml.KnoxFormatXmlTopologyRules;
+import org.xml.sax.SAXException;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Collections;
+
+
+import static org.apache.commons.digester3.binder.DigesterLoader.newLoader;
+
+
+public class DefaultTopologyService
+ extends FileAlterationListenerAdaptor
+ implements TopologyService, TopologyMonitor, TopologyProvider, FileFilter, FileAlterationListener {
+
+ private static final List<String> SUPPORTED_TOPOLOGY_FILE_EXTENSIONS = new ArrayList<String>();
+ static {
+ SUPPORTED_TOPOLOGY_FILE_EXTENSIONS.add("xml");
+ SUPPORTED_TOPOLOGY_FILE_EXTENSIONS.add("conf");
+ }
+ private static GatewayMessages log = MessagesFactory.get(GatewayMessages.class);
+ private static DigesterLoader digesterLoader = newLoader(new KnoxFormatXmlTopologyRules(), new AmbariFormatXmlTopologyRules());
+ private FileAlterationMonitor monitor;
+ private File directory;
+ private Set<TopologyListener> listeners;
+ private volatile Map<File, Topology> topologies;
+
+ private Topology loadTopology(File file) throws IOException, SAXException, URISyntaxException, InterruptedException {
+ final long TIMEOUT = 250; //ms
+ final long DELAY = 50; //ms
+ log.loadingTopologyFile(file.getAbsolutePath());
+ Topology topology;
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ topology = loadTopologyAttempt(file);
+ break;
+ } catch (IOException e) {
+ if (System.currentTimeMillis() - start < TIMEOUT) {
+ log.failedToLoadTopologyRetrying(file.getAbsolutePath(), Long.toString(DELAY), e);
+ Thread.sleep(DELAY);
+ } else {
+ throw e;
+ }
+ } catch (SAXException e) {
+ if (System.currentTimeMillis() - start < TIMEOUT) {
+ log.failedToLoadTopologyRetrying(file.getAbsolutePath(), Long.toString(DELAY), e);
+ Thread.sleep(DELAY);
+ } else {
+ throw e;
+ }
+ }
+ }
+ return topology;
+ }
+
+ private Topology loadTopologyAttempt(File file) throws IOException, SAXException, URISyntaxException {
+ Topology topology;
+ Digester digester = digesterLoader.newDigester();
+ TopologyBuilder topologyBuilder = digester.parse(FileUtils.openInputStream(file));
+ topology = topologyBuilder.build();
+ topology.setUri(file.toURI());
+ topology.setName(FilenameUtils.removeExtension(file.getName()));
+ topology.setTimestamp(file.lastModified());
+ return topology;
+ }
+
+ private void redeployTopology(Topology topology) {
+ File topologyFile = new File(topology.getUri());
+ long start = System.currentTimeMillis();
+ long limit = 1000L; // One second.
+ long elapsed = 1;
+ while (elapsed <= limit) {
+ try {
+ long origTimestamp = topologyFile.lastModified();
+ long setTimestamp = Math.max(System.currentTimeMillis(), topologyFile.lastModified() + elapsed);
+ if (topologyFile.setLastModified(setTimestamp)) {
+ long newTimstamp = topologyFile.lastModified();
+ if (newTimstamp > origTimestamp) {
+ break;
+ } else {
+ Thread.sleep(10);
+ elapsed = System.currentTimeMillis() - start;
+ continue;
+ }
+ } else {
+ log.failedToRedeployTopology(topology.getName());
+ break;
+ }
+ } catch (InterruptedException e) {
+ log.failedToRedeployTopology(topology.getName(), e);
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private List<TopologyEvent> createChangeEvents(
+ Map<File, Topology> oldTopologies,
+ Map<File, Topology> newTopologies) {
+ ArrayList<TopologyEvent> events = new ArrayList<TopologyEvent>();
+ // Go through the old topologies and find anything that was deleted.
+ for (File file : oldTopologies.keySet()) {
+ if (!newTopologies.containsKey(file)) {
+ events.add(new TopologyEvent(TopologyEvent.Type.DELETED, oldTopologies.get(file)));
+ }
+ }
+ // Go through the new topologies and figure out what was updated vs added.
+ for (File file : newTopologies.keySet()) {
+ if (oldTopologies.containsKey(file)) {
+ Topology oldTopology = oldTopologies.get(file);
+ Topology newTopology = newTopologies.get(file);
+ if (newTopology.getTimestamp() > oldTopology.getTimestamp()) {
+ events.add(new TopologyEvent(TopologyEvent.Type.UPDATED, newTopologies.get(file)));
+ }
+ } else {
+ events.add(new TopologyEvent(TopologyEvent.Type.CREATED, newTopologies.get(file)));
+ }
+ }
+ return events;
+ }
+
+ private File calculateAbsoluteTopologiesDir(GatewayConfig config) {
+
+ File topoDir = new File(config.getGatewayTopologyDir());
+ topoDir = topoDir.getAbsoluteFile();
+ return topoDir;
+ }
+
+ private void initListener(FileAlterationMonitor monitor, File directory) {
+ this.directory = directory;
+ this.monitor = monitor;
+
+
+ FileAlterationObserver observer = new FileAlterationObserver(this.directory, this);
+ observer.addListener(this);
+ monitor.addObserver(observer);
+
+ this.listeners = new HashSet<TopologyListener>();
+ this.topologies = new HashMap<File, Topology>(); //loadTopologies( this.directory );
+ }
+
+ private void initListener(File directory) throws IOException, SAXException {
+ initListener(new FileAlterationMonitor(1000L), directory);
+ }
+
+ private Map<File, Topology> loadTopologies(File directory) {
+ Map<File, Topology> map = new HashMap<File, Topology>();
+ if (directory.exists() && directory.canRead()) {
+ for (File file : directory.listFiles(this)) {
+ try {
+ map.put(file, loadTopology(file));
+ } catch (IOException e) {
+ // Maybe it makes sense to throw exception
+ log.failedToLoadTopology(file.getAbsolutePath(), e);
+ } catch (SAXException e) {
+ // Maybe it makes sense to throw exception
+ log.failedToLoadTopology(file.getAbsolutePath(), e);
+ } catch (Exception e) {
+ // Maybe it makes sense to throw exception
+ log.failedToLoadTopology(file.getAbsolutePath(), e);
+ }
+ }
+ }
+ return map;
+ }
+
+ public void redeployTopologies(String topologyName) {
+
+ for (Topology topology : getTopologies()) {
+ if (topologyName == null || topologyName.equals(topology.getName())) {
+ redeployTopology(topology);
+ }
+ }
+
+ }
+
+ public void reloadTopologies() {
+ try {
+ synchronized (this) {
+ Map<File, Topology> oldTopologies = topologies;
+ Map<File, Topology> newTopologies = loadTopologies(directory);
+ List<TopologyEvent> events = createChangeEvents(oldTopologies, newTopologies);
+ topologies = newTopologies;
+ notifyChangeListeners(events);
+ }
+ } catch (Exception e) {
+ // Maybe it makes sense to throw exception
+ log.failedToReloadTopologies(e);
+ }
+ }
+
+ private void notifyChangeListeners(List<TopologyEvent> events) {
+ for (TopologyListener listener : listeners) {
+ try {
+ listener.handleTopologyEvent(events);
+ } catch (RuntimeException e) {
+ log.failedToHandleTopologyEvents(e);
+ }
+ }
+ }
+
+ public Collection<Topology> getTopologies() {
+ Map<File, Topology> map = topologies;
+ return Collections.unmodifiableCollection(map.values());
+ }
+
+ @Override
+ public void addTopologyChangeListener(TopologyListener listener) {
+ listeners.add(listener);
+ }
+
+ @Override
+ public void startMonitor() throws Exception {
+ monitor.start();
+ }
+
+ @Override
+ public void stopMonitor() throws Exception {
+ monitor.stop();
+ }
+
+ @Override
+ public boolean accept(File file) {
+ boolean accept = false;
+ if (!file.isDirectory() && file.canRead()) {
+ String extension = FilenameUtils.getExtension(file.getName());
+ if (SUPPORTED_TOPOLOGY_FILE_EXTENSIONS.contains(extension)) {
+ accept = true;
+ }
+ }
+ return accept;
+ }
+
+ @Override
+ public void onFileCreate(File file) {
+ onFileChange(file);
+ }
+
+ @Override
+ public void onFileDelete(java.io.File file) {
+ onFileChange(file);
+ }
+
+ @Override
+ public void onFileChange(File file) {
+ reloadTopologies();
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public void start() {
+
+ }
+
+ @Override
+ public void init(GatewayConfig config, Map<String, String> options) throws ServiceLifecycleException {
+
+ try {
+ initListener(calculateAbsoluteTopologiesDir(config));
+ } catch (IOException io) {
+ throw new ServiceLifecycleException(io.getMessage());
+ } catch (SAXException sax) {
+ throw new ServiceLifecycleException(sax.getMessage());
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/file/FileTopologyProvider.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/file/FileTopologyProvider.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/file/FileTopologyProvider.java
deleted file mode 100644
index cd0567d..0000000
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/file/FileTopologyProvider.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.gateway.topology.file;
-
-import org.apache.commons.digester3.Digester;
-import org.apache.commons.digester3.binder.DigesterLoader;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.io.monitor.FileAlterationListener;
-import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
-import org.apache.commons.io.monitor.FileAlterationMonitor;
-import org.apache.commons.io.monitor.FileAlterationObserver;
-import org.apache.hadoop.gateway.GatewayMessages;
-import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
-import org.apache.hadoop.gateway.topology.Topology;
-import org.apache.hadoop.gateway.topology.TopologyEvent;
-import org.apache.hadoop.gateway.topology.TopologyListener;
-import org.apache.hadoop.gateway.topology.TopologyMonitor;
-import org.apache.hadoop.gateway.topology.TopologyProvider;
-import org.apache.hadoop.gateway.topology.builder.TopologyBuilder;
-import org.apache.hadoop.gateway.topology.xml.AmbariFormatXmlTopologyRules;
-import org.apache.hadoop.gateway.topology.xml.KnoxFormatXmlTopologyRules;
-import org.xml.sax.SAXException;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.commons.digester3.binder.DigesterLoader.newLoader;
-
-//import org.codehaus.plexus.util.FileUtils;
-
-public class FileTopologyProvider
- extends FileAlterationListenerAdaptor
- implements TopologyProvider, TopologyMonitor, FileFilter, FileAlterationListener {
-
- private static GatewayMessages log = MessagesFactory.get( GatewayMessages.class );
- private static DigesterLoader digesterLoader = newLoader( new KnoxFormatXmlTopologyRules(), new AmbariFormatXmlTopologyRules() );
- private static final List<String> SUPPORTED_TOPOLOGY_FILE_EXTENSIONS = new ArrayList<String>();
- static {
- SUPPORTED_TOPOLOGY_FILE_EXTENSIONS.add("xml");
- SUPPORTED_TOPOLOGY_FILE_EXTENSIONS.add("conf");
- }
-
- private FileAlterationMonitor monitor;
- private File directory;
- private Set<TopologyListener> listeners;
- private volatile Map<File,Topology> topologies;
-
- FileTopologyProvider( FileAlterationMonitor monitor, File directory ) {
- this.directory = directory;
- this.monitor = monitor;
-
- FileAlterationObserver observer = new FileAlterationObserver( this.directory, this );
- observer.addListener( this );
- monitor.addObserver( observer );
-
- this.listeners = new HashSet<TopologyListener>();
- this.topologies = new HashMap<File,Topology>(); //loadTopologies( this.directory );
- }
-
- public FileTopologyProvider( File directory ) throws IOException, SAXException {
- this( new FileAlterationMonitor( 1000L ), directory );
- }
-
- private static Topology loadTopology( File file ) throws IOException, SAXException, URISyntaxException, InterruptedException {
- final long TIMEOUT = 250; //ms
- final long DELAY = 50; //ms
- log.loadingTopologyFile( file.getAbsolutePath() );
- Topology topology;
- long start = System.currentTimeMillis();
- while( true ) {
- try {
- topology = loadTopologyAttempt( file );
- break;
- } catch ( IOException e ) {
- if( System.currentTimeMillis() - start < TIMEOUT ) {
- log.failedToLoadTopologyRetrying( file.getAbsolutePath(), Long.toString( DELAY ), e );
- Thread.sleep( DELAY );
- } else {
- throw e;
- }
- } catch ( SAXException e ) {
- if( System.currentTimeMillis() - start < TIMEOUT ) {
- log.failedToLoadTopologyRetrying( file.getAbsolutePath(), Long.toString( DELAY ), e );
- Thread.sleep( DELAY );
- } else {
- throw e;
- }
- }
- }
- return topology;
- }
-
- private static Topology loadTopologyAttempt( File file ) throws IOException, SAXException, URISyntaxException {
- Topology topology;Digester digester = digesterLoader.newDigester();
- TopologyBuilder topologyBuilder = digester.parse( FileUtils.openInputStream( file ) );
- topology = topologyBuilder.build();
- topology.setUri( file.toURI() );
- topology.setName( FilenameUtils.removeExtension( file.getName() ) );
- topology.setTimestamp( file.lastModified() );
- return topology;
- }
-
- private Map<File, Topology> loadTopologies( File directory ) {
- Map<File, Topology> map = new HashMap<File, Topology>();
- if( directory.exists() && directory.canRead() ) {
- for( File file : directory.listFiles( this ) ) {
- try {
- map.put( file, loadTopology( file ) );
- } catch( IOException e ) {
- // Maybe it makes sense to throw exception
- log.failedToLoadTopology( file.getAbsolutePath(), e );
- } catch( SAXException e ) {
- // Maybe it makes sense to throw exception
- log.failedToLoadTopology( file.getAbsolutePath(), e );
- } catch ( Exception e ) {
- // Maybe it makes sense to throw exception
- log.failedToLoadTopology( file.getAbsolutePath(), e );
- }
- }
- }
- return map;
- }
-
- public void reloadTopologies() {
- try {
- synchronized ( this ) {
- Map<File,Topology> oldTopologies = topologies;
- Map<File,Topology> newTopologies = loadTopologies( directory );
- List<TopologyEvent> events = createChangeEvents( oldTopologies, newTopologies );
- topologies = newTopologies;
- notifyChangeListeners( events );
- }
- }
- catch( Exception e ) {
- // Maybe it makes sense to throw exception
- log.failedToReloadTopologies( e );
- }
- }
-
- private static List<TopologyEvent> createChangeEvents(
- Map<File,Topology> oldTopologies,
- Map<File,Topology> newTopologies ) {
- ArrayList<TopologyEvent> events = new ArrayList<TopologyEvent>();
- // Go through the old topologies and find anything that was deleted.
- for( File file : oldTopologies.keySet() ) {
- if( !newTopologies.containsKey( file ) ) {
- events.add( new TopologyEvent( TopologyEvent.Type.DELETED, oldTopologies.get( file ) ) );
- }
- }
- // Go through the new topologies and figure out what was updated vs added.
- for( File file : newTopologies.keySet() ) {
- if( oldTopologies.containsKey( file ) ) {
- Topology oldTopology = oldTopologies.get( file );
- Topology newTopology = newTopologies.get( file );
- if( newTopology.getTimestamp() > oldTopology.getTimestamp() ) {
- events.add( new TopologyEvent( TopologyEvent.Type.UPDATED, newTopologies.get( file ) ) );
- }
- } else {
- events.add( new TopologyEvent( TopologyEvent.Type.CREATED, newTopologies.get( file ) ) );
- }
- }
- return events ;
- }
-
- private void notifyChangeListeners( List<TopologyEvent> events ) {
- for( TopologyListener listener : listeners ) {
- try {
- listener.handleTopologyEvent( events );
- } catch( RuntimeException e ) {
- log.failedToHandleTopologyEvents( e );
- }
- }
- }
-
- @Override
- public Collection<Topology> getTopologies() {
- Map<File,Topology> map = topologies;
- return Collections.unmodifiableCollection( map.values() );
- }
-
- @Override
- public void addTopologyChangeListener( TopologyListener listener ) {
- listeners.add( listener );
- }
-
- @Override
- public void startMonitor() throws Exception {
- monitor.start();
- }
-
- @Override
- public void stopMonitor() throws Exception {
- monitor.stop();
- }
-
- @Override
- public boolean accept( File file ) {
- boolean accept = false;
- if( !file.isDirectory() && file.canRead() ) {
- String extension = FilenameUtils.getExtension( file.getName() );
- if( SUPPORTED_TOPOLOGY_FILE_EXTENSIONS.contains( extension ) ) {
- accept = true;
- }
- }
- return accept;
- }
-
- @Override
- public void onFileCreate( File file ) {
- onFileChange( file );
- }
-
- @Override
- public void onFileDelete(java.io.File file) {
- onFileChange( file );
- }
-
- @Override
- public void onFileChange( File file ) {
- reloadTopologies();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-server/src/main/java/org/apache/hadoop/gateway/util/KnoxCLI.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/util/KnoxCLI.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/util/KnoxCLI.java
index c754fd7..499e001 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/util/KnoxCLI.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/util/KnoxCLI.java
@@ -20,13 +20,13 @@ package org.apache.hadoop.gateway.util;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.gateway.GatewayCommandLine;
-import org.apache.hadoop.gateway.GatewayServer;
import org.apache.hadoop.gateway.config.GatewayConfig;
import org.apache.hadoop.gateway.config.impl.GatewayConfigImpl;
import org.apache.hadoop.gateway.services.CLIGatewayServices;
import org.apache.hadoop.gateway.services.GatewayServices;
import org.apache.hadoop.gateway.services.Service;
import org.apache.hadoop.gateway.services.ServiceLifecycleException;
+import org.apache.hadoop.gateway.services.topology.TopologyService;
import org.apache.hadoop.gateway.services.security.AliasService;
import org.apache.hadoop.gateway.services.security.KeystoreService;
import org.apache.hadoop.gateway.services.security.KeystoreServiceException;
@@ -274,6 +274,11 @@ public class KnoxCLI extends Configured implements Tool {
KeystoreService ks = services.getService(GatewayServices.KEYSTORE_SERVICE);
return ks;
}
+
+ protected TopologyService getTopologyService() {
+ TopologyService ts = services.getService(GatewayServices.TOPOLOGY_SERVICE);
+ return ts;
+ }
}
private class AliasListCommand extends Command {
@@ -586,8 +591,9 @@ public class KnoxCLI extends Configured implements Tool {
@Override
public void execute() throws Exception {
- GatewayConfig config = new GatewayConfigImpl();
- GatewayServer.redeployTopologies( config, cluster );
+ TopologyService ts = getTopologyService();
+ ts.reloadTopologies();
+ ts.redeployTopologies(cluster);
}
@Override
http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-server/src/test/java/org/apache/hadoop/gateway/services/topology/DefaultTopologyServiceTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/services/topology/DefaultTopologyServiceTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/services/topology/DefaultTopologyServiceTest.java
new file mode 100644
index 0000000..d381aa8
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/services/topology/DefaultTopologyServiceTest.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.services.topology;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.monitor.FileAlterationMonitor;
+import org.apache.commons.io.monitor.FileAlterationObserver;
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.services.topology.impl.DefaultTopologyService;
+import org.apache.hadoop.gateway.topology.*;
+import org.apache.hadoop.test.TestUtils;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.*;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.core.IsNull.notNullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class DefaultTopologyServiceTest {
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ private File createDir() throws IOException {
+ return TestUtils.createTempDir(this.getClass().getSimpleName() + "-");
+ }
+
+ private File createFile(File parent, String name, String resource, long timestamp) throws IOException {
+ File file = new File(parent, name);
+ if (!file.exists()) {
+ FileUtils.touch(file);
+ }
+ InputStream input = ClassLoader.getSystemResourceAsStream(resource);
+ OutputStream output = FileUtils.openOutputStream(file);
+ IOUtils.copy(input, output);
+ output.flush();
+ input.close();
+ output.close();
+ file.setLastModified(timestamp);
+ assertTrue("Failed to create test file " + file.getAbsolutePath(), file.exists());
+ assertTrue("Failed to populate test file " + file.getAbsolutePath(), file.length() > 0);
+
+ return file;
+ }
+
+ @Test
+ public void testGetTopologies() throws Exception {
+
+ File dir = createDir();
+ long time = dir.lastModified();
+ try {
+ createFile(dir, "one.xml", "org/apache/hadoop/gateway/topology/file/topology-one.xml", time);
+
+ TestTopologyListener topoListener = new TestTopologyListener();
+ FileAlterationMonitor monitor = new FileAlterationMonitor(Long.MAX_VALUE);
+
+ TopologyService provider = new DefaultTopologyService();
+ Map<String, String> c = new HashMap<String, String>();
+
+ GatewayConfig config = EasyMock.createNiceMock(GatewayConfig.class);
+ EasyMock.expect(config.getGatewayTopologyDir()).andReturn(dir.toString()).anyTimes();
+ EasyMock.replay(config);
+
+ provider.init(config, c);
+
+
+ provider.addTopologyChangeListener(topoListener);
+
+ provider.reloadTopologies();
+
+
+ Collection<Topology> topologies = provider.getTopologies();
+ assertThat(topologies, notNullValue());
+ assertThat(topologies.size(), is(1));
+ Topology topology = topologies.iterator().next();
+ assertThat(topology.getName(), is("one"));
+ assertThat(topology.getTimestamp(), is(time));
+ assertThat(topoListener.events.size(), is(1));
+ topoListener.events.clear();
+
+ // Add a file to the directory.
+ File two = createFile(dir, "two.xml", "org/apache/hadoop/gateway/topology/file/topology-two.xml", 1L);
+ provider.reloadTopologies();
+ topologies = provider.getTopologies();
+ assertThat(topologies.size(), is(2));
+ Set<String> names = new HashSet<String>(Arrays.asList("one", "two"));
+ Iterator<Topology> iterator = topologies.iterator();
+ topology = iterator.next();
+ assertThat(names, hasItem(topology.getName()));
+ names.remove(topology.getName());
+ topology = iterator.next();
+ assertThat(names, hasItem(topology.getName()));
+ names.remove(topology.getName());
+ assertThat(names.size(), is(0));
+ assertThat(topoListener.events.size(), is(1));
+ List<TopologyEvent> events = topoListener.events.get(0);
+ assertThat(events.size(), is(1));
+ TopologyEvent event = events.get(0);
+ assertThat(event.getType(), is(TopologyEvent.Type.CREATED));
+ assertThat(event.getTopology(), notNullValue());
+
+ // Update a file in the directory.
+ two = createFile(dir, "two.xml", "org/apache/hadoop/gateway/topology/file/topology-three.xml", 2L);
+ provider.reloadTopologies();
+ topologies = provider.getTopologies();
+ assertThat(topologies.size(), is(2));
+ names = new HashSet<String>(Arrays.asList("one", "two"));
+ iterator = topologies.iterator();
+ topology = iterator.next();
+ assertThat(names, hasItem(topology.getName()));
+ names.remove(topology.getName());
+ topology = iterator.next();
+ assertThat(names, hasItem(topology.getName()));
+ names.remove(topology.getName());
+ assertThat(names.size(), is(0));
+
+ // Remove a file from the directory.
+ two.delete();
+ provider.reloadTopologies();
+ topologies = provider.getTopologies();
+ assertThat(topologies.size(), is(1));
+ topology = topologies.iterator().next();
+ assertThat(topology.getName(), is("one"));
+ assertThat(topology.getTimestamp(), is(time));
+ } finally {
+ FileUtils.deleteQuietly(dir);
+ }
+ }
+
+ private void kickMonitor(FileAlterationMonitor monitor) {
+ for (FileAlterationObserver observer : monitor.getObservers()) {
+ observer.checkAndNotify();
+ }
+ }
+
+ @Test
+ public void testProviderParamsOrderIsPreserved() {
+
+ Provider provider = new Provider();
+ String names[] = {"ldapRealm=",
+ "ldapContextFactory",
+ "ldapRealm.contextFactory",
+ "ldapGroupRealm",
+ "ldapGroupRealm.contextFactory",
+ "ldapGroupRealm.contextFactory.systemAuthenticationMechanism"
+ };
+
+ Param param = null;
+ for (String name : names) {
+ param = new Param();
+ param.setName(name);
+ param.setValue(name);
+ provider.addParam(param);
+
+ }
+ Map<String, String> params = provider.getParams();
+ Set<String> keySet = params.keySet();
+ Iterator<String> iter = keySet.iterator();
+ int i = 0;
+ while (iter.hasNext()) {
+ assertTrue(iter.next().equals(names[i++]));
+ }
+
+ }
+
+ private class TestTopologyListener implements TopologyListener {
+
+ public ArrayList<List<TopologyEvent>> events = new ArrayList<List<TopologyEvent>>();
+
+ @Override
+ public void handleTopologyEvent(List<TopologyEvent> events) {
+ this.events.add(events);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-server/src/test/java/org/apache/hadoop/gateway/topology/file/FileTopologyProviderTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/topology/file/FileTopologyProviderTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/topology/file/FileTopologyProviderTest.java
deleted file mode 100644
index 2e20376..0000000
--- a/gateway-server/src/test/java/org/apache/hadoop/gateway/topology/file/FileTopologyProviderTest.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.gateway.topology.file;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.Matchers.hasItem;
-import static org.hamcrest.core.IsNull.notNullValue;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.monitor.FileAlterationMonitor;
-import org.apache.commons.io.monitor.FileAlterationObserver;
-import org.apache.hadoop.gateway.topology.Param;
-import org.apache.hadoop.gateway.topology.Provider;
-import org.apache.hadoop.gateway.topology.Topology;
-import org.apache.hadoop.gateway.topology.TopologyEvent;
-import org.apache.hadoop.gateway.topology.TopologyListener;
-import org.apache.hadoop.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FileTopologyProviderTest {
-
- @Before
- public void setUp() throws Exception {
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
- private File createDir() throws IOException {
- return TestUtils.createTempDir( this.getClass().getSimpleName() + "-" );
- }
-
- private File createFile( File parent, String name, String resource, long timestamp ) throws IOException {
- File file = new File( parent, name );
- if( !file.exists() ) {
- FileUtils.touch( file );
- }
- InputStream input = ClassLoader.getSystemResourceAsStream( resource );
- OutputStream output = FileUtils.openOutputStream( file );
- IOUtils.copy( input, output );
- output.flush();
- input.close();
- output.close();
- file.setLastModified( timestamp );
- assertTrue( "Failed to create test file " + file.getAbsolutePath(), file.exists() );
- assertTrue( "Failed to populate test file " + file.getAbsolutePath(), file.length() > 0 );
-
- return file;
- }
-
- @Test
- public void testGetTopologies() throws Exception {
-
- File dir = createDir();
- long time = dir.lastModified();
- try {
- createFile( dir, "one.xml", "org/apache/hadoop/gateway/topology/file/topology-one.xml", time );
-
- TestTopologyListener topoListener = new TestTopologyListener();
- FileAlterationMonitor monitor = new FileAlterationMonitor( Long.MAX_VALUE );
- FileTopologyProvider provider = new FileTopologyProvider( monitor, dir );
- provider.addTopologyChangeListener( topoListener );
-
- kickMonitor( monitor );
-
- Collection<Topology> topologies = provider.getTopologies();
- assertThat( topologies, notNullValue() );
- assertThat( topologies.size(), is( 1 ) );
- Topology topology = topologies.iterator().next();
- assertThat( topology.getName(), is( "one" ) );
- assertThat( topology.getTimestamp(), is( time ) );
- assertThat( topoListener.events.size(), is( 1 ) );
- topoListener.events.clear();
-
- // Add a file to the directory.
- File two = createFile( dir, "two.xml", "org/apache/hadoop/gateway/topology/file/topology-two.xml", 1L );
- kickMonitor( monitor );
- topologies = provider.getTopologies();
- assertThat( topologies.size(), is( 2 ) );
- Set<String> names = new HashSet<String>( Arrays.asList( "one", "two" ) );
- Iterator<Topology> iterator = topologies.iterator();
- topology = iterator.next();
- assertThat( names, hasItem( topology.getName() ) );
- names.remove( topology.getName() );
- topology = iterator.next();
- assertThat( names, hasItem( topology.getName() ) );
- names.remove( topology.getName() );
- assertThat( names.size(), is( 0 ) );
- assertThat( topoListener.events.size(), is( 1 ) );
- List<TopologyEvent> events = topoListener.events.get( 0 );
- assertThat( events.size(), is( 1 ) );
- TopologyEvent event = events.get( 0 );
- assertThat( event.getType(), is( TopologyEvent.Type.CREATED ) );
- assertThat( event.getTopology(), notNullValue() );
-
- // Update a file in the directory.
- two = createFile( dir, "two.xml", "org/apache/hadoop/gateway/topology/file/topology-three.xml", 2L );
- kickMonitor( monitor );
- topologies = provider.getTopologies();
- assertThat( topologies.size(), is( 2 ) );
- names = new HashSet<String>( Arrays.asList( "one", "two" ) );
- iterator = topologies.iterator();
- topology = iterator.next();
- assertThat( names, hasItem( topology.getName() ) );
- names.remove( topology.getName() );
- topology = iterator.next();
- assertThat( names, hasItem( topology.getName() ) );
- names.remove( topology.getName() );
- assertThat( names.size(), is( 0 ) );
-
- // Remove a file from the directory.
- two.delete();
- kickMonitor( monitor );
- topologies = provider.getTopologies();
- assertThat( topologies.size(), is( 1 ) );
- topology = topologies.iterator().next();
- assertThat( topology.getName(), is( "one" ) );
- assertThat( topology.getTimestamp(), is( time ) );
- } finally {
- FileUtils.deleteQuietly( dir );
- }
- }
-
- private void kickMonitor( FileAlterationMonitor monitor ) {
- for( FileAlterationObserver observer : monitor.getObservers() ) {
- observer.checkAndNotify();
- }
- }
-
- @Test
- public void testProviderParamsOrderIsPreserved() {
-
- Provider provider = new Provider();
- String names[] = {"ldapRealm=",
- "ldapContextFactory",
- "ldapRealm.contextFactory",
- "ldapGroupRealm",
- "ldapGroupRealm.contextFactory",
- "ldapGroupRealm.contextFactory.systemAuthenticationMechanism"
- };
-
- Param param = null;
- for (String name : names) {
- param = new Param();
- param.setName(name);
- param.setValue(name);
- provider.addParam(param);
-
- }
- Map<String, String> params = provider.getParams();
- Set<String> keySet = params.keySet();
- Iterator<String> iter = keySet.iterator();
- int i = 0;
- while (iter.hasNext()) {
- assertTrue(iter.next().equals(names[i++]));
- }
-
- }
-
- private class TestTopologyListener implements TopologyListener {
-
- public ArrayList<List<TopologyEvent>> events = new ArrayList<List<TopologyEvent>>();
-
- @Override
- public void handleTopologyEvent( List<TopologyEvent> events ) {
- this.events.add( events );
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/GatewayServices.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/GatewayServices.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/GatewayServices.java
index 9af6b96..33c844d 100644
--- a/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/GatewayServices.java
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/GatewayServices.java
@@ -35,9 +35,10 @@ public interface GatewayServices extends Service, ProviderDeploymentContributor
public static final String SERVICE_REGISTRY_SERVICE = "ServiceRegistryService";
public static final String HOST_MAPPING_SERVICE = "HostMappingService";
public static final String SERVER_INFO_SERVICE = "ServerInfoService";
+ public static final String TOPOLOGY_SERVICE = "TopologyService";
public abstract Collection<String> getServiceNames();
public abstract <T> T getService( String serviceName );
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/topology/TopologyService.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/topology/TopologyService.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/topology/TopologyService.java
new file mode 100644
index 0000000..ab3bb70
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/topology/TopologyService.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.services.topology;
+
+import org.apache.hadoop.gateway.services.Service;
+import org.apache.hadoop.gateway.topology.Topology;
+import org.apache.hadoop.gateway.topology.TopologyListener;
+
+import java.util.Collection;
+
+
+public interface TopologyService extends Service {
+
+ public void reloadTopologies();
+
+ public void redeployTopologies(String topologyName);
+
+ public void addTopologyChangeListener(TopologyListener listener);
+
+ public void startMonitor() throws Exception;
+
+ public void stopMonitor() throws Exception;
+
+ public Collection<Topology> getTopologies();
+
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/2adc917e/gateway-test/src/test/java/org/apache/hadoop/gateway/GatewayDeployFuncTest.java
----------------------------------------------------------------------
diff --git a/gateway-test/src/test/java/org/apache/hadoop/gateway/GatewayDeployFuncTest.java b/gateway-test/src/test/java/org/apache/hadoop/gateway/GatewayDeployFuncTest.java
index a990e74..25f624c 100644
--- a/gateway-test/src/test/java/org/apache/hadoop/gateway/GatewayDeployFuncTest.java
+++ b/gateway-test/src/test/java/org/apache/hadoop/gateway/GatewayDeployFuncTest.java
@@ -231,7 +231,7 @@ public class GatewayDeployFuncTest {
// Redeploy and make sure the timestamp is updated.
topoTimestampBefore = descriptor.lastModified();
- GatewayServer.redeployTopologies( config, null );
+ GatewayServer.redeployTopologies( null );
writeTime = System.currentTimeMillis();
topoTimestampAfter = descriptor.lastModified();
assertThat( topoTimestampAfter, greaterThan( topoTimestampBefore ) );
@@ -248,7 +248,7 @@ public class GatewayDeployFuncTest {
// Redeploy and make sure the timestamp is updated.
topoTimestampBefore = descriptor.lastModified();
- GatewayServer.redeployTopologies( config, "test-cluster" );
+ GatewayServer.redeployTopologies( "test-cluster" );
writeTime = System.currentTimeMillis();
topoTimestampAfter = descriptor.lastModified();
assertThat( topoTimestampAfter, greaterThan( topoTimestampBefore ) );