You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by tu...@apache.org on 2011/12/08 20:25:33 UTC
svn commit: r1212060 [3/8] - in /hadoop/common/trunk/hadoop-hdfs-project: ./
hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/ hadoop-hdfs-httpfs/src/main/
hadoop-hdfs-httpfs/src/main/conf/ hadoop-hdfs-httpfs/src/main/java/
hadoop-hdfs-httpfs/src/main/java/o...
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/BaseService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/BaseService.java?rev=1212060&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/BaseService.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/BaseService.java Thu Dec 8 19:25:28 2011
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.lib.server;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.lib.util.ConfigurationUtils;
+
+import java.util.Map;
+
+/**
+ * Convenience class implementing the {@link Service} interface.
+ */
+public abstract class BaseService implements Service {
+ private String prefix;
+ private Server server;
+ private Configuration serviceConfig;
+
+ /**
+ * Service constructor.
+ *
+ * @param prefix service prefix.
+ */
+ public BaseService(String prefix) {
+ this.prefix = prefix;
+ }
+
+ /**
+ * Initializes the service.
+ * <p/>
+ * It collects all service properties (properties having the
+ * <code>#SERVER#.#SERVICE#.</code> prefix). The property names are then
+ * trimmed from the <code>#SERVER#.#SERVICE#.</code> prefix.
+ * <p/>
+ * After collecting the service properties it delegates to the
+ * {@link #init()} method.
+ *
+ * @param server the server initializing the service, give access to the
+ * server context.
+ *
+ * @throws ServiceException thrown if the service could not be initialized.
+ */
+ @Override
+ public final void init(Server server) throws ServiceException {
+ this.server = server;
+ String servicePrefix = getPrefixedName("");
+ serviceConfig = new Configuration(false);
+ for (Map.Entry<String, String> entry : ConfigurationUtils.resolve(server.getConfig())) {
+ String key = entry.getKey();
+ if (key.startsWith(servicePrefix)) {
+ serviceConfig.set(key.substring(servicePrefix.length()), entry.getValue());
+ }
+ }
+ init();
+ }
+
+
+ /**
+ * Post initializes the service. This method is called by the
+ * {@link Server} after all services of the server have been initialized.
+ * <p/>
+ * This method does a NOP.
+ *
+ * @throws ServiceException thrown if the service could not be
+ * post-initialized.
+ */
+ @Override
+ public void postInit() throws ServiceException {
+ }
+
+ /**
+ * Destroy the services. This method is called once, when the
+ * {@link Server} owning the service is being destroyed.
+ * <p/>
+ * This method does a NOP.
+ */
+ @Override
+ public void destroy() {
+ }
+
+ /**
+ * Returns the service dependencies of this service. The service will be
+ * instantiated only if all the service dependencies are already initialized.
+ * <p/>
+ * This method returns an empty array (size 0)
+ *
+ * @return an empty array (size 0).
+ */
+ @Override
+ public Class[] getServiceDependencies() {
+ return new Class[0];
+ }
+
+ /**
+ * Notification callback when the server changes its status.
+ * <p/>
+ * This method returns an empty array (size 0)
+ *
+ * @param oldStatus old server status.
+ * @param newStatus new server status.
+ *
+ * @throws ServiceException thrown if the service could not process the status change.
+ */
+ @Override
+ public void serverStatusChange(Server.Status oldStatus, Server.Status newStatus) throws ServiceException {
+ }
+
+ /**
+ * Returns the service prefix.
+ *
+ * @return the service prefix.
+ */
+ protected String getPrefix() {
+ return prefix;
+ }
+
+ /**
+ * Returns the server owning the service.
+ *
+ * @return the server owning the service.
+ */
+ protected Server getServer() {
+ return server;
+ }
+
+ /**
+ * Returns the full prefixed name of a service property.
+ *
+ * @param name of the property.
+ *
+ * @return prefixed name of the property.
+ */
+ protected String getPrefixedName(String name) {
+ return server.getPrefixedName(prefix + "." + name);
+ }
+
+ /**
+ * Returns the service configuration properties. Property
+ * names are trimmed off from its prefix.
+ * <p/>
+ * The sevice configuration properties are all properties
+ * with names starting with <code>#SERVER#.#SERVICE#.</code>
+ * in the server configuration.
+ *
+ * @return the service configuration properties with names
+ * trimmed off from their <code>#SERVER#.#SERVICE#.</code>
+ * prefix.
+ */
+ protected Configuration getServiceConfig() {
+ return serviceConfig;
+ }
+
+ /**
+ * Initializes the server.
+ * <p/>
+ * This method is called by {@link #init(Server)} after all service properties
+ * (properties prefixed with
+ *
+ * @throws ServiceException thrown if the service could not be initialized.
+ */
+ protected abstract void init() throws ServiceException;
+
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Server.java?rev=1212060&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Server.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Server.java Thu Dec 8 19:25:28 2011
@@ -0,0 +1,766 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.lib.server;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.lib.util.Check;
+import org.apache.hadoop.lib.util.ConfigurationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.PropertyConfigurator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A Server class provides standard configuration, logging and {@link Service}
+ * lifecyle management.
+ * <p/>
+ * A Server normally has a home directory, a configuration directory, a temp
+ * directory and logs directory.
+ * <p/>
+ * The Server configuration is loaded from 2 overlapped files,
+ * <code>#SERVER#-default.xml</code> and <code>#SERVER#-site.xml</code>. The
+ * default file is loaded from the classpath, the site file is laoded from the
+ * configuration directory.
+ * <p/>
+ * The Server collects all configuration properties prefixed with
+ * <code>#SERVER#</code>. The property names are then trimmed from the
+ * <code>#SERVER#</code> prefix.
+ * <p/>
+ * The Server log configuration is loaded from the
+ * <code>#SERVICE#-log4j.properties</code> file in the configuration directory.
+ * <p/>
+ * The lifecycle of server is defined in by {@link Server.Status} enum.
+ * When a server is create, its status is UNDEF, when being initialized it is
+ * BOOTING, once initialization is complete by default transitions to NORMAL.
+ * The <code>#SERVER#.startup.status</code> configuration property can be used
+ * to specify a different startup status (NORMAL, ADMIN or HALTED).
+ * <p/>
+ * Services classes are defined in the <code>#SERVER#.services</code> and
+ * <code>#SERVER#.services.ext</code> properties. They are loaded in order
+ * (services first, then services.ext).
+ * <p/>
+ * Before initializing the services, they are traversed and duplicate service
+ * interface are removed from the service list. The last service using a given
+ * interface wins (this enables a simple override mechanism).
+ * <p/>
+ * After the services have been resoloved by interface de-duplication they are
+ * initialized in order. Once all services are initialized they are
+ * post-initialized (this enables late/conditional service bindings).
+ * <p/>
+ */
+public class Server {
+ private Logger log;
+
+ /**
+ * Server property name that defines the service classes.
+ */
+ public static final String CONF_SERVICES = "services";
+
+ /**
+ * Server property name that defines the service extension classes.
+ */
+ public static final String CONF_SERVICES_EXT = "services.ext";
+
+ /**
+ * Server property name that defines server startup status.
+ */
+ public static final String CONF_STARTUP_STATUS = "startup.status";
+
+ /**
+ * Enumeration that defines the server status.
+ */
+ public enum Status {
+ UNDEF(false, false),
+ BOOTING(false, true),
+ HALTED(true, true),
+ ADMIN(true, true),
+ NORMAL(true, true),
+ SHUTTING_DOWN(false, true),
+ SHUTDOWN(false, false);
+
+ private boolean settable;
+ private boolean operational;
+
+ /**
+ * Status constructor.
+ *
+ * @param settable indicates if the status is settable.
+ * @param operational indicates if the server is operational
+ * when in this status.
+ */
+ private Status(boolean settable, boolean operational) {
+ this.settable = settable;
+ this.operational = operational;
+ }
+
+ /**
+ * Returns if this server status is operational.
+ *
+ * @return if this server status is operational.
+ */
+ public boolean isOperational() {
+ return operational;
+ }
+ }
+
+ /**
+ * Name of the log4j configuration file the Server will load from the
+ * classpath if the <code>#SERVER#-log4j.properties</code> is not defined
+ * in the server configuration directory.
+ */
+ public static final String DEFAULT_LOG4J_PROPERTIES = "default-log4j.properties";
+
+ private Status status;
+ private String name;
+ private String homeDir;
+ private String configDir;
+ private String logDir;
+ private String tempDir;
+ private Configuration config;
+ private Map<Class, Service> services = new LinkedHashMap<Class, Service>();
+
+ /**
+ * Creates a server instance.
+ * <p/>
+ * The config, log and temp directories are all under the specified home directory.
+ *
+ * @param name server name.
+ * @param homeDir server home directory.
+ */
+ public Server(String name, String homeDir) {
+ this(name, homeDir, null);
+ }
+
+ /**
+ * Creates a server instance.
+ *
+ * @param name server name.
+ * @param homeDir server home directory.
+ * @param configDir config directory.
+ * @param logDir log directory.
+ * @param tempDir temp directory.
+ */
+ public Server(String name, String homeDir, String configDir, String logDir, String tempDir) {
+ this(name, homeDir, configDir, logDir, tempDir, null);
+ }
+
+ /**
+ * Creates a server instance.
+ * <p/>
+ * The config, log and temp directories are all under the specified home directory.
+ * <p/>
+ * It uses the provided configuration instead loading it from the config dir.
+ *
+ * @param name server name.
+ * @param homeDir server home directory.
+ * @param config server configuration.
+ */
+ public Server(String name, String homeDir, Configuration config) {
+ this(name, homeDir, homeDir + "/conf", homeDir + "/log", homeDir + "/temp", config);
+ }
+
+ /**
+ * Creates a server instance.
+ * <p/>
+ * It uses the provided configuration instead loading it from the config dir.
+ *
+ * @param name server name.
+ * @param homeDir server home directory.
+ * @param configDir config directory.
+ * @param logDir log directory.
+ * @param tempDir temp directory.
+ * @param config server configuration.
+ */
+ public Server(String name, String homeDir, String configDir, String logDir, String tempDir, Configuration config) {
+ this.name = Check.notEmpty(name, "name").trim().toLowerCase();
+ this.homeDir = Check.notEmpty(homeDir, "homeDir");
+ this.configDir = Check.notEmpty(configDir, "configDir");
+ this.logDir = Check.notEmpty(logDir, "logDir");
+ this.tempDir = Check.notEmpty(tempDir, "tempDir");
+ checkAbsolutePath(homeDir, "homeDir");
+ checkAbsolutePath(configDir, "configDir");
+ checkAbsolutePath(logDir, "logDir");
+ checkAbsolutePath(tempDir, "tempDir");
+ if (config != null) {
+ this.config = new Configuration(false);
+ ConfigurationUtils.copy(config, this.config);
+ }
+ status = Status.UNDEF;
+ }
+
+ /**
+ * Validates that the specified value is an absolute path (starts with '/').
+ *
+ * @param value value to verify it is an absolute path.
+ * @param name name to use in the exception if the value is not an absolute
+ * path.
+ *
+ * @return the value.
+ *
+ * @throws IllegalArgumentException thrown if the value is not an absolute
+ * path.
+ */
+ private String checkAbsolutePath(String value, String name) {
+ if (!value.startsWith("/")) {
+ throw new IllegalArgumentException(
+ MessageFormat.format("[{0}] must be an absolute path [{1}]", name, value));
+ }
+ return value;
+ }
+
+ /**
+ * Returns the current server status.
+ *
+ * @return the current server status.
+ */
+ public Status getStatus() {
+ return status;
+ }
+
+ /**
+ * Sets a new server status.
+ * <p/>
+ * The status must be settable.
+ * <p/>
+ * All services will be notified o the status change via the
+ * {@link Service#serverStatusChange(Status, Status)} method. If a service
+ * throws an exception during the notification, the server will be destroyed.
+ *
+ * @param status status to set.
+ *
+ * @throws ServerException thrown if the service has been destroy because of
+ * a failed notification to a service.
+ */
+ public void setStatus(Status status) throws ServerException {
+ Check.notNull(status, "status");
+ if (status.settable) {
+ if (status != this.status) {
+ Status oldStatus = this.status;
+ this.status = status;
+ for (Service service : services.values()) {
+ try {
+ service.serverStatusChange(oldStatus, status);
+ } catch (Exception ex) {
+ log.error("Service [{}] exception during status change to [{}] -server shutting down-, {}",
+ new Object[]{service.getInterface().getSimpleName(), status, ex.getMessage(), ex});
+ destroy();
+ throw new ServerException(ServerException.ERROR.S11, service.getInterface().getSimpleName(),
+ status, ex.getMessage(), ex);
+ }
+ }
+ }
+ } else {
+ throw new IllegalArgumentException("Status [" + status + " is not settable");
+ }
+ }
+
+ /**
+ * Verifies the server is operational.
+ *
+ * @throws IllegalStateException thrown if the server is not operational.
+ */
+ protected void ensureOperational() {
+ if (!getStatus().isOperational()) {
+ throw new IllegalStateException("Server is not running");
+ }
+ }
+
+ /**
+ * Convenience method that returns a resource as inputstream from the
+ * classpath.
+ * <p/>
+ * It first attempts to use the Thread's context classloader and if not
+ * set it uses the <code>ClassUtils</code> classloader.
+ *
+ * @param name resource to retrieve.
+ *
+ * @return inputstream with the resource, NULL if the resource does not
+ * exist.
+ */
+ static InputStream getResource(String name) {
+ Check.notEmpty(name, "name");
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ if (cl == null) {
+ cl = Server.class.getClassLoader();
+ }
+ return cl.getResourceAsStream(name);
+ }
+
+ /**
+ * Initializes the Server.
+ * <p/>
+ * The initialization steps are:
+ * <ul>
+ * <li>It verifies the service home and temp directories exist</li>
+ * <li>Loads the Server <code>#SERVER#-default.xml</code>
+ * configuration file from the classpath</li>
+ * <li>Initializes log4j logging. If the
+ * <code>#SERVER#-log4j.properties</code> file does not exist in the config
+ * directory it load <code>default-log4j.properties</code> from the classpath
+ * </li>
+ * <li>Loads the <code>#SERVER#-site.xml</code> file from the server config
+ * directory and merges it with the default configuration.</li>
+ * <li>Loads the services</li>
+ * <li>Initializes the services</li>
+ * <li>Post-initializes the services</li>
+ * <li>Sets the server startup status</li>
+ *
+ * @throws ServerException thrown if the server could not be initialized.
+ */
+ public void init() throws ServerException {
+ if (status != Status.UNDEF) {
+ throw new IllegalStateException("Server already initialized");
+ }
+ status = Status.BOOTING;
+ verifyDir(homeDir);
+ verifyDir(tempDir);
+ Properties serverInfo = new Properties();
+ try {
+ InputStream is = getResource(name + ".properties");
+ serverInfo.load(is);
+ is.close();
+ } catch (IOException ex) {
+ throw new RuntimeException("Could not load server information file: " + name + ".properties");
+ }
+ initLog();
+ log.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
+ log.info("Server [{}] starting", name);
+ log.info(" Built information:");
+ log.info(" Version : {}", serverInfo.getProperty(name + ".version", "undef"));
+ log.info(" Source Repository : {}", serverInfo.getProperty(name + ".source.repository", "undef"));
+ log.info(" Source Revision : {}", serverInfo.getProperty(name + ".source.revision", "undef"));
+ log.info(" Built by : {}", serverInfo.getProperty(name + ".build.username", "undef"));
+ log.info(" Built timestamp : {}", serverInfo.getProperty(name + ".build.timestamp", "undef"));
+ log.info(" Runtime information:");
+ log.info(" Home dir: {}", homeDir);
+ log.info(" Config dir: {}", (config == null) ? configDir : "-");
+ log.info(" Log dir: {}", logDir);
+ log.info(" Temp dir: {}", tempDir);
+ initConfig();
+ log.debug("Loading services");
+ List<Service> list = loadServices();
+ try {
+ log.debug("Initializing services");
+ initServices(list);
+ log.info("Services initialized");
+ } catch (ServerException ex) {
+ log.error("Services initialization failure, destroying initialized services");
+ destroyServices();
+ throw ex;
+ }
+ Status status = Status.valueOf(getConfig().get(getPrefixedName(CONF_STARTUP_STATUS), Status.NORMAL.toString()));
+ setStatus(status);
+ log.info("Server [{}] started!, status [{}]", name, status);
+ }
+
+ /**
+ * Verifies the specified directory exists.
+ *
+ * @param dir directory to verify it exists.
+ *
+ * @throws ServerException thrown if the directory does not exist or it the
+ * path it is not a directory.
+ */
+ private void verifyDir(String dir) throws ServerException {
+ File file = new File(dir);
+ if (!file.exists()) {
+ throw new ServerException(ServerException.ERROR.S01, dir);
+ }
+ if (!file.isDirectory()) {
+ throw new ServerException(ServerException.ERROR.S02, dir);
+ }
+ }
+
+ /**
+ * Initializes Log4j logging.
+ *
+ * @throws ServerException thrown if Log4j could not be initialized.
+ */
+ protected void initLog() throws ServerException {
+ verifyDir(logDir);
+ LogManager.resetConfiguration();
+ File log4jFile = new File(configDir, name + "-log4j.properties");
+ if (log4jFile.exists()) {
+ PropertyConfigurator.configureAndWatch(log4jFile.toString(), 10 * 1000); //every 10 secs
+ log = LoggerFactory.getLogger(Server.class);
+ } else {
+ Properties props = new Properties();
+ try {
+ InputStream is = getResource(DEFAULT_LOG4J_PROPERTIES);
+ props.load(is);
+ } catch (IOException ex) {
+ throw new ServerException(ServerException.ERROR.S03, DEFAULT_LOG4J_PROPERTIES, ex.getMessage(), ex);
+ }
+ PropertyConfigurator.configure(props);
+ log = LoggerFactory.getLogger(Server.class);
+ log.warn("Log4j [{}] configuration file not found, using default configuration from classpath", log4jFile);
+ }
+ }
+
+ /**
+ * Loads and inializes the server configuration.
+ *
+ * @throws ServerException thrown if the configuration could not be loaded/initialized.
+ */
+ protected void initConfig() throws ServerException {
+ verifyDir(configDir);
+ File file = new File(configDir);
+ Configuration defaultConf;
+ String defaultConfig = name + "-default.xml";
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ InputStream inputStream = classLoader.getResourceAsStream(defaultConfig);
+ if (inputStream == null) {
+ log.warn("Default configuration file not available in classpath [{}]", defaultConfig);
+ defaultConf = new Configuration(false);
+ } else {
+ try {
+ defaultConf = new Configuration(false);
+ ConfigurationUtils.load(defaultConf, inputStream);
+ } catch (Exception ex) {
+ throw new ServerException(ServerException.ERROR.S03, defaultConfig, ex.getMessage(), ex);
+ }
+ }
+
+ if (config == null) {
+ Configuration siteConf;
+ File siteFile = new File(file, name + "-site.xml");
+ if (!siteFile.exists()) {
+ log.warn("Site configuration file [{}] not found in config directory", siteFile);
+ siteConf = new Configuration(false);
+ } else {
+ if (!siteFile.isFile()) {
+ throw new ServerException(ServerException.ERROR.S05, siteFile.getAbsolutePath());
+ }
+ try {
+ log.debug("Loading site configuration from [{}]", siteFile);
+ inputStream = new FileInputStream(siteFile);
+ siteConf = new Configuration(false);
+ ConfigurationUtils.load(siteConf, inputStream);
+ } catch (IOException ex) {
+ throw new ServerException(ServerException.ERROR.S06, siteFile, ex.getMessage(), ex);
+ }
+ }
+
+ config = new Configuration(false);
+ ConfigurationUtils.copy(siteConf, config);
+ }
+
+ ConfigurationUtils.injectDefaults(defaultConf, config);
+
+ for (String name : System.getProperties().stringPropertyNames()) {
+ String value = System.getProperty(name);
+ if (name.startsWith(getPrefix() + ".")) {
+ config.set(name, value);
+ if (name.endsWith(".password") || name.endsWith(".secret")) {
+ value = "*MASKED*";
+ }
+ log.info("System property sets {}: {}", name, value);
+ }
+ }
+
+ log.debug("Loaded Configuration:");
+ log.debug("------------------------------------------------------");
+ for (Map.Entry<String, String> entry : config) {
+ String name = entry.getKey();
+ String value = config.get(entry.getKey());
+ if (name.endsWith(".password") || name.endsWith(".secret")) {
+ value = "*MASKED*";
+ }
+ log.debug(" {}: {}", entry.getKey(), value);
+ }
+ log.debug("------------------------------------------------------");
+ }
+
+ /**
+ * Loads the specified services.
+ *
+ * @param classes services classes to load.
+ * @param list list of loaded service in order of appearance in the
+ * configuration.
+ *
+ * @throws ServerException thrown if a service class could not be loaded.
+ */
+ private void loadServices(Class[] classes, List<Service> list) throws ServerException {
+ for (Class klass : classes) {
+ try {
+ Service service = (Service) klass.newInstance();
+ log.debug("Loading service [{}] implementation [{}]", service.getInterface(),
+ service.getClass());
+ if (!service.getInterface().isInstance(service)) {
+ throw new ServerException(ServerException.ERROR.S04, klass, service.getInterface().getName());
+ }
+ list.add(service);
+ } catch (ServerException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new ServerException(ServerException.ERROR.S07, klass, ex.getMessage(), ex);
+ }
+ }
+ }
+
+ /**
+ * Loads services defined in <code>services</code> and
+ * <code>services.ext</code> and de-dups them.
+ *
+ * @return List of final services to initialize.
+ *
+ * @throws ServerException throw if the services could not be loaded.
+ */
+ protected List<Service> loadServices() throws ServerException {
+ try {
+ Map<Class, Service> map = new LinkedHashMap<Class, Service>();
+ Class[] classes = getConfig().getClasses(getPrefixedName(CONF_SERVICES));
+ Class[] classesExt = getConfig().getClasses(getPrefixedName(CONF_SERVICES_EXT));
+ List<Service> list = new ArrayList<Service>();
+ loadServices(classes, list);
+ loadServices(classesExt, list);
+
+ //removing duplicate services, strategy: last one wins
+ for (Service service : list) {
+ if (map.containsKey(service.getInterface())) {
+ log.debug("Replacing service [{}] implementation [{}]", service.getInterface(),
+ service.getClass());
+ }
+ map.put(service.getInterface(), service);
+ }
+ list = new ArrayList<Service>();
+ for (Map.Entry<Class, Service> entry : map.entrySet()) {
+ list.add(entry.getValue());
+ }
+ return list;
+ } catch (RuntimeException ex) {
+ throw new ServerException(ServerException.ERROR.S08, ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Initializes the list of services.
+ *
+ * @param services services to initialized, it must be a de-dupped list of
+ * services.
+ *
+ * @throws ServerException thrown if the services could not be initialized.
+ */
+ protected void initServices(List<Service> services) throws ServerException {
+ for (Service service : services) {
+ log.debug("Initializing service [{}]", service.getInterface());
+ checkServiceDependencies(service);
+ service.init(this);
+ this.services.put(service.getInterface(), service);
+ }
+ for (Service service : services) {
+ service.postInit();
+ }
+ }
+
+ /**
+ * Checks if all service dependencies of a service are available.
+ *
+ * @param service service to check if all its dependencies are available.
+ *
+ * @throws ServerException thrown if a service dependency is missing.
+ */
+ protected void checkServiceDependencies(Service service) throws ServerException {
+ if (service.getServiceDependencies() != null) {
+ for (Class dependency : service.getServiceDependencies()) {
+ if (services.get(dependency) == null) {
+ throw new ServerException(ServerException.ERROR.S10, service.getClass(), dependency);
+ }
+ }
+ }
+ }
+
+ /**
+ * Destroys the server services.
+ */
+ protected void destroyServices() {
+ List<Service> list = new ArrayList<Service>(services.values());
+ Collections.reverse(list);
+ for (Service service : list) {
+ try {
+ log.debug("Destroying service [{}]", service.getInterface());
+ service.destroy();
+ } catch (Throwable ex) {
+ log.error("Could not destroy service [{}], {}",
+ new Object[]{service.getInterface(), ex.getMessage(), ex});
+ }
+ }
+ log.info("Services destroyed");
+ }
+
+ /**
+ * Destroys the server.
+ * <p/>
+ * All services are destroyed in reverse order of initialization, then the
+ * Log4j framework is shutdown.
+ */
+ public void destroy() {
+ ensureOperational();
+ destroyServices();
+ log.info("Server [{}] shutdown!", name);
+ log.info("======================================================");
+ if (!Boolean.getBoolean("test.circus")) {
+ LogManager.shutdown();
+ }
+ status = Status.SHUTDOWN;
+ }
+
+ /**
+ * Returns the name of the server.
+ *
+ * @return the server name.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Returns the server prefix for server configuration properties.
+ * <p/>
+ * By default it is the server name.
+ *
+ * @return the prefix for server configuration properties.
+ */
+ public String getPrefix() {
+ return getName();
+ }
+
+ /**
+ * Returns the prefixed name of a server property.
+ *
+ * @param name of the property.
+ *
+ * @return prefixed name of the property.
+ */
+ public String getPrefixedName(String name) {
+ return getPrefix() + "." + Check.notEmpty(name, "name");
+ }
+
+ /**
+ * Returns the server home dir.
+ *
+ * @return the server home dir.
+ */
+ public String getHomeDir() {
+ return homeDir;
+ }
+
+ /**
+ * Returns the server config dir.
+ *
+ * @return the server config dir.
+ */
+ public String getConfigDir() {
+ return configDir;
+ }
+
+ /**
+ * Returns the server log dir.
+ *
+ * @return the server log dir.
+ */
+ public String getLogDir() {
+ return logDir;
+ }
+
+ /**
+ * Returns the server temp dir.
+ *
+ * @return the server temp dir.
+ */
+ public String getTempDir() {
+ return tempDir;
+ }
+
+ /**
+ * Returns the server configuration.
+ *
+ * @return
+ */
+ public Configuration getConfig() {
+ return config;
+
+ }
+
+ /**
+ * Returns the {@link Service} associated to the specified interface.
+ *
+ * @param serviceKlass service interface.
+ *
+ * @return the service implementation.
+ */
+ @SuppressWarnings("unchecked")
+ public <T> T get(Class<T> serviceKlass) {
+ ensureOperational();
+ Check.notNull(serviceKlass, "serviceKlass");
+ return (T) services.get(serviceKlass);
+ }
+
+ /**
+ * Adds a service programmatically.
+ * <p/>
+ * If a service with the same interface exists, it will be destroyed and
+ * removed before the given one is initialized and added.
+ * <p/>
+ * If an exception is thrown the server is destroyed.
+ *
+ * @param klass service class to add.
+ *
+ * @throws ServerException throw if the service could not initialized/added
+ * to the server.
+ */
+ public void setService(Class<? extends Service> klass) throws ServerException {
+ ensureOperational();
+ Check.notNull(klass, "serviceKlass");
+ if (getStatus() == Status.SHUTTING_DOWN) {
+ throw new IllegalStateException("Server shutting down");
+ }
+ try {
+ Service newService = klass.newInstance();
+ Service oldService = services.get(newService.getInterface());
+ if (oldService != null) {
+ try {
+ oldService.destroy();
+ } catch (Throwable ex) {
+ log.error("Could not destroy service [{}], {}",
+ new Object[]{oldService.getInterface(), ex.getMessage(), ex});
+ }
+ }
+ newService.init(this);
+ services.put(newService.getInterface(), newService);
+ } catch (Exception ex) {
+ log.error("Could not set service [{}] programmatically -server shutting down-, {}", klass, ex);
+ destroy();
+ throw new ServerException(ServerException.ERROR.S09, klass, ex.getMessage(), ex);
+ }
+ }
+
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/ServerException.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/ServerException.java?rev=1212060&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/ServerException.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/ServerException.java Thu Dec 8 19:25:28 2011
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.lib.server;
+
+import org.apache.hadoop.lib.lang.XException;
+
+/**
+ * Exception thrown by the {@link Server} class.
+ */
+public class ServerException extends XException {
+
+ /**
+ * Error codes use by the {@link Server} class.
+ */
+ public static enum ERROR implements XException.ERROR {
+ S01("Dir [{0}] does not exist"),
+ S02("[{0}] is not a directory"),
+ S03("Could not load file from classpath [{0}], {1}"),
+ S04("Service [{0}] does not implement declared interface [{1}]"),
+ S05("[{0}] is not a file"),
+ S06("Could not load file [{0}], {1}"),
+ S07("Could not instanciate service class [{0}], {1}"),
+ S08("Could not load service classes, {0}"),
+ S09("Could not set service [{0}] programmatically -server shutting down-, {1}"),
+ S10("Service [{0}] requires service [{1}]"),
+ S11("Service [{0}] exception during status change to [{1}] -server shutting down-, {2}");
+
+ private String msg;
+
+ /**
+ * Constructor for the error code enum.
+ *
+ * @param msg message template.
+ */
+ private ERROR(String msg) {
+ this.msg = msg;
+ }
+
+ /**
+ * Returns the message template for the error code.
+ *
+ * @return the message template for the error code.
+ */
+ @Override
+ public String getTemplate() {
+ return msg;
+ }
+ }
+
+ /**
+ * Constructor for sub-classes.
+ *
+ * @param error error code for the XException.
+ * @param params parameters to use when creating the error message
+ * with the error code template.
+ */
+ protected ServerException(XException.ERROR error, Object... params) {
+ super(error, params);
+ }
+
+ /**
+ * Creates an server exception using the specified error code.
+ * The exception message is resolved using the error code template
+ * and the passed parameters.
+ *
+ * @param error error code for the XException.
+ * @param params parameters to use when creating the error message
+ * with the error code template.
+ */
+ public ServerException(ERROR error, Object... params) {
+ super(error, params);
+ }
+
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Service.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Service.java?rev=1212060&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Service.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Service.java Thu Dec 8 19:25:28 2011
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.lib.server;
+
+/**
+ * Service interface for components to be managed by the {@link Server} class.
+ */
+public interface Service {
+
+ /**
+ * Initializes the service. This method is called once, when the
+ * {@link Server} owning the service is being initialized.
+ *
+ * @param server the server initializing the service, give access to the
+ * server context.
+ *
+ * @throws ServiceException thrown if the service could not be initialized.
+ */
+ public void init(Server server) throws ServiceException;
+
+ /**
+ * Post initializes the service. This method is called by the
+ * {@link Server} after all services of the server have been initialized.
+ *
+ * @throws ServiceException thrown if the service could not be
+ * post-initialized.
+ */
+ public void postInit() throws ServiceException;
+
+ /**
+ * Destroy the services. This method is called once, when the
+ * {@link Server} owning the service is being destroyed.
+ */
+ public void destroy();
+
+ /**
+ * Returns the service dependencies of this service. The service will be
+ * instantiated only if all the service dependencies are already initialized.
+ *
+ * @return the service dependencies.
+ */
+ public Class[] getServiceDependencies();
+
+ /**
+ * Returns the interface implemented by this service. This interface is used
+ * the {@link Server} when the {@link Server#get(Class)} method is used to
+ * retrieve a service.
+ *
+ * @return the interface that identifies the service.
+ */
+ public Class getInterface();
+
+ /**
+ * Notification callback when the server changes its status.
+ *
+ * @param oldStatus old server status.
+ * @param newStatus new server status.
+ *
+ * @throws ServiceException thrown if the service could not process the status change.
+ */
+ public void serverStatusChange(Server.Status oldStatus, Server.Status newStatus) throws ServiceException;
+
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/ServiceException.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/ServiceException.java?rev=1212060&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/ServiceException.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/ServiceException.java Thu Dec 8 19:25:28 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.lib.server;
+
+import org.apache.hadoop.lib.lang.XException;
+
+/**
+ * Exception thrown by {@link Service} implementations.
+ */
+public class ServiceException extends ServerException {
+
+ /**
+ * Creates an service exception using the specified error code.
+ * The exception message is resolved using the error code template
+ * and the passed parameters.
+ *
+ * @param error error code for the XException.
+ * @param params parameters to use when creating the error message
+ * with the error code template.
+ */
+ public ServiceException(XException.ERROR error, Object... params) {
+ super(error, params);
+ }
+
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/FileSystemAccess.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/FileSystemAccess.java?rev=1212060&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/FileSystemAccess.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/FileSystemAccess.java Thu Dec 8 19:25:28 2011
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.lib.service;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.IOException;
+
+public interface FileSystemAccess {
+
+ public interface FileSystemExecutor<T> {
+
+ public T execute(FileSystem fs) throws IOException;
+ }
+
+ public <T> T execute(String user, Configuration conf, FileSystemExecutor<T> executor) throws
+ FileSystemAccessException;
+
+ public FileSystem createFileSystem(String user, Configuration conf) throws IOException, FileSystemAccessException;
+
+ public void releaseFileSystem(FileSystem fs) throws IOException;
+
+ public Configuration getDefaultConfiguration();
+
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/FileSystemAccessException.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/FileSystemAccessException.java?rev=1212060&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/FileSystemAccessException.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/FileSystemAccessException.java Thu Dec 8 19:25:28 2011
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.lib.service;
+
+import org.apache.hadoop.lib.lang.XException;
+
+public class FileSystemAccessException extends XException {
+
+ public enum ERROR implements XException.ERROR {
+ H01("Service property [{0}] not defined"),
+ H02("Kerberos initialization failed, {0}"),
+ H03("FileSystemExecutor error, {0}"),
+ H04("JobClientExecutor error, {0}"),
+ H05("[{0}] validation failed, {1}"),
+ H06("Property [{0}] not defined in configuration object"),
+ H07("[{0}] not healthy, {1}"),
+ H08(""),
+ H09("Invalid FileSystemAccess security mode [{0}]");
+
+ private String template;
+
+ ERROR(String template) {
+ this.template = template;
+ }
+
+ @Override
+ public String getTemplate() {
+ return template;
+ }
+ }
+
+ public FileSystemAccessException(ERROR error, Object... params) {
+ super(error, params);
+ }
+
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/Groups.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/Groups.java?rev=1212060&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/Groups.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/Groups.java Thu Dec 8 19:25:28 2011
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.lib.service;
+
+import java.io.IOException;
+import java.util.List;
+
+public interface Groups {
+
+ public List<String> getGroups(String user) throws IOException;
+
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/Instrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/Instrumentation.java?rev=1212060&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/Instrumentation.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/Instrumentation.java Thu Dec 8 19:25:28 2011
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.lib.service;
+
+import java.util.Map;
+
+public interface Instrumentation {
+
+ public interface Cron {
+
+ public Cron start();
+
+ public Cron stop();
+ }
+
+ public interface Variable<T> {
+
+ T getValue();
+ }
+
+ public Cron createCron();
+
+ public void incr(String group, String name, long count);
+
+ public void addCron(String group, String name, Cron cron);
+
+ public void addVariable(String group, String name, Variable<?> variable);
+
+ //sampling happens once a second
+ public void addSampler(String group, String name, int samplingSize, Variable<Long> variable);
+
+ public Map<String, Map<String, ?>> getSnapshot();
+
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/ProxyUser.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/ProxyUser.java?rev=1212060&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/ProxyUser.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/ProxyUser.java Thu Dec 8 19:25:28 2011
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.lib.service;
+
+import java.io.IOException;
+import java.security.AccessControlException;
+
+public interface ProxyUser {
+
+ public void validate(String proxyUser, String proxyHost, String doAsUser) throws IOException, AccessControlException;
+
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/Scheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/Scheduler.java?rev=1212060&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/Scheduler.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/Scheduler.java Thu Dec 8 19:25:28 2011
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.lib.service;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+public interface Scheduler {
+
+ public abstract void schedule(Callable<?> callable, long delay, long interval, TimeUnit unit);
+
+ public abstract void schedule(Runnable runnable, long delay, long interval, TimeUnit unit);
+
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java?rev=1212060&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java Thu Dec 8 19:25:28 2011
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.lib.service.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.lib.server.BaseService;
+import org.apache.hadoop.lib.server.ServiceException;
+import org.apache.hadoop.lib.service.FileSystemAccess;
+import org.apache.hadoop.lib.service.FileSystemAccessException;
+import org.apache.hadoop.lib.service.Instrumentation;
+import org.apache.hadoop.lib.util.Check;
+import org.apache.hadoop.lib.util.ConfigurationUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class FileSystemAccessService extends BaseService implements FileSystemAccess {
+ private static final Logger LOG = LoggerFactory.getLogger(FileSystemAccessService.class);
+
+ public static final String PREFIX = "hadoop";
+
+ private static final String INSTRUMENTATION_GROUP = "hadoop";
+
+ public static final String AUTHENTICATION_TYPE = "authentication.type";
+ public static final String KERBEROS_KEYTAB = "authentication.kerberos.keytab";
+ public static final String KERBEROS_PRINCIPAL = "authentication.kerberos.principal";
+
+ public static final String NAME_NODE_WHITELIST = "name.node.whitelist";
+
+ private static final String HADOOP_CONF_PREFIX = "conf:";
+
+ private static final String NAME_NODE_PROPERTY = "fs.default.name";
+
+ public FileSystemAccessService() {
+ super(PREFIX);
+ }
+
+ private Collection<String> nameNodeWhitelist;
+
+ Configuration serviceHadoopConf;
+
+ private AtomicInteger unmanagedFileSystems = new AtomicInteger();
+
+ @Override
+ protected void init() throws ServiceException {
+ LOG.info("Using FileSystemAccess JARs version [{}]", VersionInfo.getVersion());
+ String security = getServiceConfig().get(AUTHENTICATION_TYPE, "simple").trim();
+ if (security.equals("kerberos")) {
+ String defaultName = getServer().getName();
+ String keytab = System.getProperty("user.home") + "/" + defaultName + ".keytab";
+ keytab = getServiceConfig().get(KERBEROS_KEYTAB, keytab).trim();
+ if (keytab.length() == 0) {
+ throw new ServiceException(FileSystemAccessException.ERROR.H01, KERBEROS_KEYTAB);
+ }
+ String principal = defaultName + "/localhost@LOCALHOST";
+ principal = getServiceConfig().get(KERBEROS_PRINCIPAL, principal).trim();
+ if (principal.length() == 0) {
+ throw new ServiceException(FileSystemAccessException.ERROR.H01, KERBEROS_PRINCIPAL);
+ }
+ Configuration conf = new Configuration();
+ conf.set("hadoop.security.authentication", "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ try {
+ UserGroupInformation.loginUserFromKeytab(principal, keytab);
+ } catch (IOException ex) {
+ throw new ServiceException(FileSystemAccessException.ERROR.H02, ex.getMessage(), ex);
+ }
+ LOG.info("Using FileSystemAccess Kerberos authentication, principal [{}] keytab [{}]", principal, keytab);
+ } else if (security.equals("simple")) {
+ Configuration conf = new Configuration();
+ conf.set("hadoop.security.authentication", "simple");
+ UserGroupInformation.setConfiguration(conf);
+ LOG.info("Using FileSystemAccess simple/pseudo authentication, principal [{}]", System.getProperty("user.name"));
+ } else {
+ throw new ServiceException(FileSystemAccessException.ERROR.H09, security);
+ }
+
+ serviceHadoopConf = new Configuration(false);
+ for (Map.Entry entry : getServiceConfig()) {
+ String name = (String) entry.getKey();
+ if (name.startsWith(HADOOP_CONF_PREFIX)) {
+ name = name.substring(HADOOP_CONF_PREFIX.length());
+ String value = (String) entry.getValue();
+ serviceHadoopConf.set(name, value);
+
+ }
+ }
+ setRequiredServiceHadoopConf(serviceHadoopConf);
+
+ LOG.debug("FileSystemAccess default configuration:");
+ for (Map.Entry entry : serviceHadoopConf) {
+ LOG.debug(" {} = {}", entry.getKey(), entry.getValue());
+ }
+
+ nameNodeWhitelist = toLowerCase(getServiceConfig().getTrimmedStringCollection(NAME_NODE_WHITELIST));
+ }
+
+ @Override
+ public void postInit() throws ServiceException {
+ super.postInit();
+ Instrumentation instrumentation = getServer().get(Instrumentation.class);
+ instrumentation.addVariable(INSTRUMENTATION_GROUP, "unmanaged.fs", new Instrumentation.Variable<Integer>() {
+ @Override
+ public Integer getValue() {
+ return unmanagedFileSystems.get();
+ }
+ });
+ instrumentation.addSampler(INSTRUMENTATION_GROUP, "unmanaged.fs", 60, new Instrumentation.Variable<Long>() {
+ @Override
+ public Long getValue() {
+ return (long) unmanagedFileSystems.get();
+ }
+ });
+ }
+
+ private Set<String> toLowerCase(Collection<String> collection) {
+ Set<String> set = new HashSet<String>();
+ for (String value : collection) {
+ set.add(value.toLowerCase());
+ }
+ return set;
+ }
+
+ @Override
+ public Class getInterface() {
+ return FileSystemAccess.class;
+ }
+
+ @Override
+ public Class[] getServiceDependencies() {
+ return new Class[]{Instrumentation.class};
+ }
+
+ protected UserGroupInformation getUGI(String user) throws IOException {
+ return UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
+ }
+
+ protected void setRequiredServiceHadoopConf(Configuration conf) {
+ conf.set("fs.hdfs.impl.disable.cache", "true");
+ }
+
+ protected Configuration createHadoopConf(Configuration conf) {
+ Configuration hadoopConf = new Configuration();
+ ConfigurationUtils.copy(serviceHadoopConf, hadoopConf);
+ ConfigurationUtils.copy(conf, hadoopConf);
+ return hadoopConf;
+ }
+
+ protected Configuration createNameNodeConf(Configuration conf) {
+ return createHadoopConf(conf);
+ }
+
+ protected FileSystem createFileSystem(Configuration namenodeConf) throws IOException {
+ return FileSystem.get(namenodeConf);
+ }
+
+ protected void closeFileSystem(FileSystem fs) throws IOException {
+ fs.close();
+ }
+
+ protected void validateNamenode(String namenode) throws FileSystemAccessException {
+ if (nameNodeWhitelist.size() > 0 && !nameNodeWhitelist.contains("*")) {
+ if (!nameNodeWhitelist.contains(namenode.toLowerCase())) {
+ throw new FileSystemAccessException(FileSystemAccessException.ERROR.H05, namenode, "not in whitelist");
+ }
+ }
+ }
+
+ protected void checkNameNodeHealth(FileSystem fileSystem) throws FileSystemAccessException {
+ }
+
+ @Override
+ public <T> T execute(String user, final Configuration conf, final FileSystemExecutor<T> executor)
+ throws FileSystemAccessException {
+ Check.notEmpty(user, "user");
+ Check.notNull(conf, "conf");
+ Check.notNull(executor, "executor");
+ if (conf.get(NAME_NODE_PROPERTY) == null || conf.getTrimmed(NAME_NODE_PROPERTY).length() == 0) {
+ throw new FileSystemAccessException(FileSystemAccessException.ERROR.H06, NAME_NODE_PROPERTY);
+ }
+ try {
+ validateNamenode(new URI(conf.get(NAME_NODE_PROPERTY)).getAuthority());
+ UserGroupInformation ugi = getUGI(user);
+ return ugi.doAs(new PrivilegedExceptionAction<T>() {
+ public T run() throws Exception {
+ Configuration namenodeConf = createNameNodeConf(conf);
+ FileSystem fs = createFileSystem(namenodeConf);
+ Instrumentation instrumentation = getServer().get(Instrumentation.class);
+ Instrumentation.Cron cron = instrumentation.createCron();
+ try {
+ checkNameNodeHealth(fs);
+ cron.start();
+ return executor.execute(fs);
+ } finally {
+ cron.stop();
+ instrumentation.addCron(INSTRUMENTATION_GROUP, executor.getClass().getSimpleName(), cron);
+ closeFileSystem(fs);
+ }
+ }
+ });
+ } catch (FileSystemAccessException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new FileSystemAccessException(FileSystemAccessException.ERROR.H03, ex);
+ }
+ }
+
+ public FileSystem createFileSystemInternal(String user, final Configuration conf)
+ throws IOException, FileSystemAccessException {
+ Check.notEmpty(user, "user");
+ Check.notNull(conf, "conf");
+ try {
+ validateNamenode(new URI(conf.get(NAME_NODE_PROPERTY)).getAuthority());
+ UserGroupInformation ugi = getUGI(user);
+ return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+ public FileSystem run() throws Exception {
+ Configuration namenodeConf = createNameNodeConf(conf);
+ return createFileSystem(namenodeConf);
+ }
+ });
+ } catch (IOException ex) {
+ throw ex;
+ } catch (FileSystemAccessException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new FileSystemAccessException(FileSystemAccessException.ERROR.H08, ex.getMessage(), ex);
+ }
+ }
+
+ @Override
+ public FileSystem createFileSystem(String user, final Configuration conf) throws IOException,
+ FileSystemAccessException {
+ unmanagedFileSystems.incrementAndGet();
+ return createFileSystemInternal(user, conf);
+ }
+
+ @Override
+ public void releaseFileSystem(FileSystem fs) throws IOException {
+ unmanagedFileSystems.decrementAndGet();
+ closeFileSystem(fs);
+ }
+
+
+ @Override
+ public Configuration getDefaultConfiguration() {
+ Configuration conf = new Configuration(false);
+ ConfigurationUtils.copy(serviceHadoopConf, conf);
+ return conf;
+ }
+
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/instrumentation/InstrumentationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/instrumentation/InstrumentationService.java?rev=1212060&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/instrumentation/InstrumentationService.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/instrumentation/InstrumentationService.java Thu Dec 8 19:25:28 2011
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.lib.service.instrumentation;
+
+import org.apache.hadoop.lib.server.BaseService;
+import org.apache.hadoop.lib.server.ServiceException;
+import org.apache.hadoop.lib.service.Instrumentation;
+import org.apache.hadoop.lib.service.Scheduler;
+import org.json.simple.JSONAware;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONStreamAware;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class InstrumentationService extends BaseService implements Instrumentation {
+ public static final String PREFIX = "instrumentation";
+ public static final String CONF_TIMERS_SIZE = "timers.size";
+
+ private int timersSize;
+ private Lock counterLock;
+ private Lock timerLock;
+ private Lock variableLock;
+ private Lock samplerLock;
+ private Map<String, Map<String, AtomicLong>> counters;
+ private Map<String, Map<String, Timer>> timers;
+ private Map<String, Map<String, VariableHolder>> variables;
+ private Map<String, Map<String, Sampler>> samplers;
+ private List<Sampler> samplersList;
+ private Map<String, Map<String, ?>> all;
+
+ public InstrumentationService() {
+ super(PREFIX);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void init() throws ServiceException {
+ timersSize = getServiceConfig().getInt(CONF_TIMERS_SIZE, 10);
+ counterLock = new ReentrantLock();
+ timerLock = new ReentrantLock();
+ variableLock = new ReentrantLock();
+ samplerLock = new ReentrantLock();
+ Map<String, VariableHolder> jvmVariables = new ConcurrentHashMap<String, VariableHolder>();
+ counters = new ConcurrentHashMap<String, Map<String, AtomicLong>>();
+ timers = new ConcurrentHashMap<String, Map<String, Timer>>();
+ variables = new ConcurrentHashMap<String, Map<String, VariableHolder>>();
+ samplers = new ConcurrentHashMap<String, Map<String, Sampler>>();
+ samplersList = new ArrayList<Sampler>();
+ all = new LinkedHashMap<String, Map<String, ?>>();
+ all.put("os-env", System.getenv());
+ all.put("sys-props", (Map<String, ?>) (Map) System.getProperties());
+ all.put("jvm", jvmVariables);
+ all.put("counters", (Map) counters);
+ all.put("timers", (Map) timers);
+ all.put("variables", (Map) variables);
+ all.put("samplers", (Map) samplers);
+
+ jvmVariables.put("free.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() {
+ public Long getValue() {
+ return Runtime.getRuntime().freeMemory();
+ }
+ }));
+ jvmVariables.put("max.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() {
+ public Long getValue() {
+ return Runtime.getRuntime().maxMemory();
+ }
+ }));
+ jvmVariables.put("total.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() {
+ public Long getValue() {
+ return Runtime.getRuntime().totalMemory();
+ }
+ }));
+ }
+
+ @Override
+ public void postInit() throws ServiceException {
+ Scheduler scheduler = getServer().get(Scheduler.class);
+ if (scheduler != null) {
+ scheduler.schedule(new SamplersRunnable(), 0, 1, TimeUnit.SECONDS);
+ }
+ }
+
+ @Override
+ public Class getInterface() {
+ return Instrumentation.class;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> T getToAdd(String group, String name, Class<T> klass, Lock lock, Map<String, Map<String, T>> map) {
+ boolean locked = false;
+ try {
+ Map<String, T> groupMap = map.get(group);
+ if (groupMap == null) {
+ lock.lock();
+ locked = true;
+ groupMap = map.get(group);
+ if (groupMap == null) {
+ groupMap = new ConcurrentHashMap<String, T>();
+ map.put(group, groupMap);
+ }
+ }
+ T element = groupMap.get(name);
+ if (element == null) {
+ if (!locked) {
+ lock.lock();
+ locked = true;
+ }
+ element = groupMap.get(name);
+ if (element == null) {
+ try {
+ if (klass == Timer.class) {
+ element = (T) new Timer(timersSize);
+ } else {
+ element = klass.newInstance();
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ groupMap.put(name, element);
+ }
+ }
+ return element;
+ } finally {
+ if (locked) {
+ lock.unlock();
+ }
+ }
+ }
+
+ static class Cron implements Instrumentation.Cron {
+ long start;
+ long lapStart;
+ long own;
+ long total;
+
+ public Cron start() {
+ if (total != 0) {
+ throw new IllegalStateException("Cron already used");
+ }
+ if (start == 0) {
+ start = System.currentTimeMillis();
+ lapStart = start;
+ } else if (lapStart == 0) {
+ lapStart = System.currentTimeMillis();
+ }
+ return this;
+ }
+
+ public Cron stop() {
+ if (total != 0) {
+ throw new IllegalStateException("Cron already used");
+ }
+ if (lapStart > 0) {
+ own += System.currentTimeMillis() - lapStart;
+ lapStart = 0;
+ }
+ return this;
+ }
+
+ void end() {
+ stop();
+ total = System.currentTimeMillis() - start;
+ }
+
+ }
+
+ static class Timer implements JSONAware, JSONStreamAware {
+ static final int LAST_TOTAL = 0;
+ static final int LAST_OWN = 1;
+ static final int AVG_TOTAL = 2;
+ static final int AVG_OWN = 3;
+
+ Lock lock = new ReentrantLock();
+ private long[] own;
+ private long[] total;
+ private int last;
+ private boolean full;
+ private int size;
+
+ public Timer(int size) {
+ this.size = size;
+ own = new long[size];
+ total = new long[size];
+ for (int i = 0; i < size; i++) {
+ own[i] = -1;
+ total[i] = -1;
+ }
+ last = -1;
+ }
+
+ long[] getValues() {
+ lock.lock();
+ try {
+ long[] values = new long[4];
+ values[LAST_TOTAL] = total[last];
+ values[LAST_OWN] = own[last];
+ int limit = (full) ? size : (last + 1);
+ for (int i = 0; i < limit; i++) {
+ values[AVG_TOTAL] += total[i];
+ values[AVG_OWN] += own[i];
+ }
+ values[AVG_TOTAL] = values[AVG_TOTAL] / limit;
+ values[AVG_OWN] = values[AVG_OWN] / limit;
+ return values;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void addCron(Cron cron) {
+ cron.end();
+ lock.lock();
+ try {
+ last = (last + 1) % size;
+ full = full || last == (size - 1);
+ total[last] = cron.total;
+ own[last] = cron.own;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private JSONObject getJSON() {
+ long[] values = getValues();
+ JSONObject json = new JSONObject();
+ json.put("lastTotal", values[0]);
+ json.put("lastOwn", values[1]);
+ json.put("avgTotal", values[2]);
+ json.put("avgOwn", values[3]);
+ return json;
+ }
+
+ @Override
+ public String toJSONString() {
+ return getJSON().toJSONString();
+ }
+
+ @Override
+ public void writeJSONString(Writer out) throws IOException {
+ getJSON().writeJSONString(out);
+ }
+
+ }
+
+ @Override
+ public Cron createCron() {
+ return new Cron();
+ }
+
+ @Override
+ public void incr(String group, String name, long count) {
+ AtomicLong counter = getToAdd(group, name, AtomicLong.class, counterLock, counters);
+ counter.addAndGet(count);
+ }
+
+ @Override
+ public void addCron(String group, String name, Instrumentation.Cron cron) {
+ Timer timer = getToAdd(group, name, Timer.class, timerLock, timers);
+ timer.addCron((Cron) cron);
+ }
+
+ static class VariableHolder<E> implements JSONAware, JSONStreamAware {
+ Variable<E> var;
+
+ public VariableHolder() {
+ }
+
+ public VariableHolder(Variable<E> var) {
+ this.var = var;
+ }
+
+ @SuppressWarnings("unchecked")
+ private JSONObject getJSON() {
+ JSONObject json = new JSONObject();
+ json.put("value", var.getValue());
+ return json;
+ }
+
+ @Override
+ public String toJSONString() {
+ return getJSON().toJSONString();
+ }
+
+ @Override
+ public void writeJSONString(Writer out) throws IOException {
+ out.write(toJSONString());
+ }
+
+ }
+
+ @Override
+ public void addVariable(String group, String name, Variable<?> variable) {
+ VariableHolder holder = getToAdd(group, name, VariableHolder.class, variableLock, variables);
+ holder.var = variable;
+ }
+
+ static class Sampler implements JSONAware, JSONStreamAware {
+ Variable<Long> variable;
+ long[] values;
+ private AtomicLong sum;
+ private int last;
+ private boolean full;
+
+ void init(int size, Variable<Long> variable) {
+ this.variable = variable;
+ values = new long[size];
+ sum = new AtomicLong();
+ last = 0;
+ }
+
+ void sample() {
+ int index = last;
+ long valueGoingOut = values[last];
+ full = full || last == (values.length - 1);
+ last = (last + 1) % values.length;
+ values[index] = variable.getValue();
+ sum.addAndGet(-valueGoingOut + values[index]);
+ }
+
+ double getRate() {
+ return ((double) sum.get()) / ((full) ? values.length : ((last == 0) ? 1 : last));
+ }
+
+ @SuppressWarnings("unchecked")
+ private JSONObject getJSON() {
+ JSONObject json = new JSONObject();
+ json.put("sampler", getRate());
+ json.put("size", (full) ? values.length : last);
+ return json;
+ }
+
+ @Override
+ public String toJSONString() {
+ return getJSON().toJSONString();
+ }
+
+ @Override
+ public void writeJSONString(Writer out) throws IOException {
+ out.write(toJSONString());
+ }
+ }
+
+ @Override
+ public void addSampler(String group, String name, int samplingSize, Variable<Long> variable) {
+ Sampler sampler = getToAdd(group, name, Sampler.class, samplerLock, samplers);
+ samplerLock.lock();
+ try {
+ sampler.init(samplingSize, variable);
+ samplersList.add(sampler);
+ } finally {
+ samplerLock.unlock();
+ }
+ }
+
+ class SamplersRunnable implements Runnable {
+
+ @Override
+ public void run() {
+ samplerLock.lock();
+ try {
+ for (Sampler sampler : samplersList) {
+ sampler.sample();
+ }
+ } finally {
+ samplerLock.unlock();
+ }
+ }
+ }
+
+ @Override
+ public Map<String, Map<String, ?>> getSnapshot() {
+ return all;
+ }
+
+
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/scheduler/SchedulerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/scheduler/SchedulerService.java?rev=1212060&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/scheduler/SchedulerService.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/scheduler/SchedulerService.java Thu Dec 8 19:25:28 2011
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.lib.service.scheduler;
+
+import org.apache.hadoop.lib.lang.RunnableCallable;
+import org.apache.hadoop.lib.server.BaseService;
+import org.apache.hadoop.lib.server.Server;
+import org.apache.hadoop.lib.server.ServiceException;
+import org.apache.hadoop.lib.service.Instrumentation;
+import org.apache.hadoop.lib.service.Scheduler;
+import org.apache.hadoop.lib.util.Check;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.MessageFormat;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class SchedulerService extends BaseService implements Scheduler {
+ private static final Logger LOG = LoggerFactory.getLogger(SchedulerService.class);
+
+ private static final String INST_GROUP = "scheduler";
+
+ public static final String PREFIX = "scheduler";
+
+ public static final String CONF_THREADS = "threads";
+
+ private ScheduledExecutorService scheduler;
+
+ public SchedulerService() {
+ super(PREFIX);
+ }
+
+ @Override
+ public void init() throws ServiceException {
+ int threads = getServiceConfig().getInt(CONF_THREADS, 5);
+ scheduler = new ScheduledThreadPoolExecutor(threads);
+ LOG.debug("Scheduler started");
+ }
+
+ @Override
+ public void destroy() {
+ try {
+ long limit = System.currentTimeMillis() + 30 * 1000;
+ scheduler.shutdownNow();
+ while (!scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
+ LOG.debug("Waiting for scheduler to shutdown");
+ if (System.currentTimeMillis() > limit) {
+ LOG.warn("Gave up waiting for scheduler to shutdown");
+ break;
+ }
+ }
+ if (scheduler.isTerminated()) {
+ LOG.debug("Scheduler shutdown");
+ }
+ } catch (InterruptedException ex) {
+ LOG.warn(ex.getMessage(), ex);
+ }
+ }
+
+ @Override
+ public Class[] getServiceDependencies() {
+ return new Class[]{Instrumentation.class};
+ }
+
+ @Override
+ public Class getInterface() {
+ return Scheduler.class;
+ }
+
+ @Override
+ public void schedule(final Callable<?> callable, long delay, long interval, TimeUnit unit) {
+ Check.notNull(callable, "callable");
+ if (!scheduler.isShutdown()) {
+ LOG.debug("Scheduling callable [{}], interval [{}] seconds, delay [{}] in [{}]",
+ new Object[]{callable, delay, interval, unit});
+ Runnable r = new Runnable() {
+ public void run() {
+ String instrName = callable.getClass().getSimpleName();
+ Instrumentation instr = getServer().get(Instrumentation.class);
+ if (getServer().getStatus() == Server.Status.HALTED) {
+ LOG.debug("Skipping [{}], server status [{}]", callable, getServer().getStatus());
+ instr.incr(INST_GROUP, instrName + ".skips", 1);
+ } else {
+ LOG.debug("Executing [{}]", callable);
+ instr.incr(INST_GROUP, instrName + ".execs", 1);
+ Instrumentation.Cron cron = instr.createCron().start();
+ try {
+ callable.call();
+ } catch (Exception ex) {
+ instr.incr(INST_GROUP, instrName + ".fails", 1);
+ LOG.error("Error executing [{}], {}", new Object[]{callable, ex.getMessage(), ex});
+ } finally {
+ instr.addCron(INST_GROUP, instrName, cron.stop());
+ }
+ }
+ }
+ };
+ scheduler.scheduleWithFixedDelay(r, delay, interval, unit);
+ } else {
+ throw new IllegalStateException(
+ MessageFormat.format("Scheduler shutting down, ignoring scheduling of [{}]", callable));
+ }
+ }
+
+ @Override
+ public void schedule(Runnable runnable, long delay, long interval, TimeUnit unit) {
+ schedule((Callable<?>) new RunnableCallable(runnable), delay, interval, unit);
+ }
+
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/security/GroupsService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/security/GroupsService.java?rev=1212060&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/security/GroupsService.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/security/GroupsService.java Thu Dec 8 19:25:28 2011
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.lib.service.security;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.lib.server.BaseService;
+import org.apache.hadoop.lib.server.ServiceException;
+import org.apache.hadoop.lib.service.Groups;
+import org.apache.hadoop.lib.util.ConfigurationUtils;
+
+import java.io.IOException;
+import java.util.List;
+
+public class GroupsService extends BaseService implements Groups {
+ private static final String PREFIX = "groups";
+
+ private org.apache.hadoop.security.Groups hGroups;
+
+ public GroupsService() {
+ super(PREFIX);
+ }
+
+ @Override
+ protected void init() throws ServiceException {
+ Configuration hConf = new Configuration(false);
+ ConfigurationUtils.copy(getServiceConfig(), hConf);
+ hGroups = new org.apache.hadoop.security.Groups(hConf);
+ }
+
+ @Override
+ public Class getInterface() {
+ return Groups.class;
+ }
+
+ @Override
+ public List<String> getGroups(String user) throws IOException {
+ return hGroups.getGroups(user);
+ }
+
+}