You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ace.apache.org by ma...@apache.org on 2009/06/27 17:53:26 UTC
svn commit: r788992 [2/25] - in /incubator/ace/trunk: gateway/ gateway/src/
gateway/src/net/ gateway/src/net/luminis/ gateway/src/net/luminis/liq/
gateway/src/net/luminis/liq/bootstrap/
gateway/src/net/luminis/liq/bootstrap/multigateway/ gateway/src/ne...
Added: incubator/ace/trunk/gateway/src/net/luminis/liq/configurator/Configurator.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/gateway/src/net/luminis/liq/configurator/Configurator.java?rev=788992&view=auto
==============================================================================
--- incubator/ace/trunk/gateway/src/net/luminis/liq/configurator/Configurator.java (added)
+++ incubator/ace/trunk/gateway/src/net/luminis/liq/configurator/Configurator.java Sat Jun 27 15:53:04 2009
@@ -0,0 +1,385 @@
+package net.luminis.liq.configurator;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
+import org.osgi.service.log.LogService;
+
+/**
+ * Configures bundles managed by the <code>ConfigurationAdmin</code>. This Configurator uses text files as configuration
+ * files containing properties. When a configuration file is added, the properties are being read and added. If the config file is
+ * removed, the properties are removed as well.
+ * <p>
+ * The configuration files should be stored in the configuration directory (often the 'conf' directory) of the OSGi framework and
+ * should have the format: <pid>.cfg
+ * <p>
+ * Note: this Configurator is based upon the principle in the FileInstall bundle Peter Kriens wrote. (see
+ * http://www.aqute.biz/Code/FileInstall for more information)
+ */
+public class Configurator implements Runnable {
+
+ private static final String DELIM_START = "${";
+ private static final String DELIM_STOP = "}";
+ private static final FileFilter FILENAME_FILTER = new FileFilter() {
+ public boolean accept(File file) {
+ return file.getName().endsWith(".cfg") || file.isDirectory();
+ }
+ };
+ private static final String FACTORY_INSTANCE_KEY = "factory.instance.pid";
+
+ private volatile LogService m_log; /* injected by dependency manager */
+ private volatile ConfigurationAdmin m_configAdmin; /* injected by dependency manager */
+ private volatile BundleContext m_context; /* injected by dependency manager */
+
+ private final File m_configDir;
+ private final long m_pollInterval;
+ private final Map m_checksums = new HashMap(); // absolutepath -> xor(length, date)
+ private final Map m_foundFactories = new HashMap(); // absolutedirpath -> (absolutepath -> xor(length, date))
+ private Thread m_configThread;
+
+ public Configurator(File dir, long pollInterval) {
+ if ((dir == null) || !dir.isDirectory() || (pollInterval < 0)) {
+ throw new IllegalArgumentException("Bad arguments; either not an existing directory or an invalid interval.");
+ }
+ m_configDir = dir;
+ m_pollInterval = pollInterval;
+ }
+
+ /**
+ * Starts the Configuration timer.
+ */
+ synchronized void start() {
+ if (m_configThread == null) {
+ m_configThread = new Thread(this, "LiQ-Configurator");
+ }
+ m_configThread.setDaemon(true);
+ m_configThread.start();
+ }
+
+ /**
+ * Stops the Configuration timer.
+ *
+ * @throws InterruptedException
+ */
+ synchronized void stop() throws InterruptedException {
+ // Join in stop to prevent race condition, careful with bundle location setting to null
+ m_configThread.interrupt();
+ m_configThread.join();
+ m_configThread = null;
+ m_checksums.clear();
+ }
+
+ /**
+ * Starts the actual Timer task, and calls the configurator to make sure the configurations are performed. Checking whether
+ * a new configuration is present, will be done with an interval that can be defined via a system property.
+ */
+ public void run() {
+ try {
+ while (!Thread.interrupted()) {
+ doConfigs();
+ Thread.sleep(m_pollInterval);
+ }
+ }
+ catch (InterruptedException ex) {
+ // We are requested to stop.
+ }
+ }
+
+ /**
+ * Enables the actual configuring of OSGi ManagedServices. It makes sure all new configurations are added, changed
+ * configurations are updated, and old configurations are removed. Configurations are updated when the timestamp or
+ * the size of the new configuration has changed.
+ */
+ private void doConfigs() {
+ Set pids = new HashSet(m_checksums.keySet());
+
+ File[] files = m_configDir.listFiles(FILENAME_FILTER);
+ for (int i = 0; (files != null) && (i < files.length); i++) {
+ File file = files[i];
+ String pid = parsePid(file);
+
+ if (file.isDirectory()) {
+ doFactoryConfigs(pid, file.listFiles(FILENAME_FILTER));
+ }
+ else {
+ Long newChecksum = new Long(file.lastModified() ^ file.length());
+ Long oldChecksum = (Long) m_checksums.get(pid); // may be null, intended
+ if (!newChecksum.equals(oldChecksum)) {
+ m_checksums.put(pid, newChecksum);
+ processConfigFile(file, null);
+ }
+ pids.remove(pid);
+ }
+ }
+ for (Iterator e = pids.iterator(); e.hasNext();) {
+ String pid = (String) e.next();
+ deleteConfig(pid, null);
+ m_checksums.remove(pid);
+ }
+ }
+
+ private void doFactoryConfigs(String factoryPid, File[] newInstances) {
+ if (!m_foundFactories.containsKey(factoryPid)) {
+ m_foundFactories.put(factoryPid, new HashMap());
+ }
+ Map instances = (Map) m_foundFactories.get(factoryPid);
+ Set instancesPids = new HashSet(instances.keySet());
+
+ for (int j = 0; j < newInstances.length; j++) {
+ File instanceConfigFile = newInstances[j];
+ String instancePid = parsePid(instanceConfigFile);
+
+ Long newChecksum = new Long(instanceConfigFile.lastModified() ^ instanceConfigFile.length());
+ Long oldChecksum = (Long) instances.get(instancePid);
+ if (!newChecksum.equals(oldChecksum)) {
+ instances.put(instancePid, newChecksum);
+ processConfigFile(instanceConfigFile, factoryPid);
+ }
+ instancesPids.remove(instancePid);
+ }
+
+ for (Iterator e = instancesPids.iterator(); e.hasNext(); ) {
+ String instancePid = (String) e.next();
+ deleteConfig(instancePid, factoryPid);
+ instances.remove(instancePid);
+ }
+ }
+
+ /**
+ * Sets the Configuration and calls update() to do the actual configuration on the ManagedService. If and only if the configuration
+ * did not exist before or has changed. A configuration has changed if the length or the lastModified date has changed.
+ */
+ private void processConfigFile(File configFile, String factoryPid) {
+ InputStream in = null;
+ try {
+ in = new FileInputStream(configFile);
+ Properties properties = new Properties();
+ properties.load(in);
+ String pid = parsePid(configFile);
+ properties = substVars(properties);
+ configure(pid, factoryPid, properties);
+ }
+ catch (IOException ex) {
+ m_log.log(LogService.LOG_ERROR, "Unable to read configuration from file: " + configFile.getAbsolutePath(), ex);
+ }
+ finally {
+ if (in != null) {
+ try {
+ in.close();
+ }
+ catch (Exception ex) {
+ // Not much we can do
+ }
+ }
+ }
+ }
+
+ private void configure(String pid, String factoryPid, Properties properties) {
+ try {
+ Configuration config = getConfiguration(pid, factoryPid);
+ Dictionary oldProps = config.getProperties();
+ if (oldProps != null) {
+ Enumeration keys = oldProps.keys();
+ while (keys.hasMoreElements()) {
+ String key = (String) keys.nextElement();
+ if (properties.containsKey(key)) {
+ // FIXME: this is to prevent overwriting configurations, that were changed by other means than this class, every time this class is ran
+ // sadly, this also breaks editing a configuration file on the fly, degrading this class to do only first-time configurations (on a per key basis)
+ // ultimately we may want to use autoconf everywhere for managing configurations, at the moment this is not feasible yet
+ properties.put(key, oldProps.get(key));
+ m_log.log(LogService.LOG_DEBUG, "Using previously configured value for bundle=" + pid + " key=" + key);
+ }
+ }
+ }
+ if (factoryPid != null) {
+ properties.put(FACTORY_INSTANCE_KEY, factoryPid + "_" + pid);
+ }
+ config.update(properties);
+ m_log.log(LogService.LOG_DEBUG, "Updated configuration for pid '" + pid + "' (" + properties +")");
+ }
+ catch (IOException ex) {
+ m_log.log(LogService.LOG_ERROR, "Unable to update configuration for pid '" + pid + "'", ex);
+ }
+ }
+
+ private Configuration getConfiguration(String pid, String factoryPid) throws IOException {
+ if (factoryPid != null) {
+ Configuration[] configs = null;
+ try {
+ configs = m_configAdmin.listConfigurations("(" + FACTORY_INSTANCE_KEY + "=" + factoryPid + "_" + pid + ")");
+ }
+ catch (InvalidSyntaxException e) {
+ m_log.log(LogService.LOG_ERROR, "Exception during lookup of configuration of managed service factory instance '" + pid + "'", e);
+ }
+ if ((configs == null) || (configs.length == 0)) {
+ return m_configAdmin.createFactoryConfiguration(factoryPid, null);
+ }
+ else {
+ return configs[0];
+ }
+ }
+ else {
+ return m_configAdmin.getConfiguration(pid, null);
+ }
+ }
+
+ /**
+ * Removes a configuration from ConfigAdmin.
+ */
+ protected void deleteConfig(String pid, String factoryPid) {
+ try {
+ Configuration config = getConfiguration(pid, factoryPid);
+ config.delete();
+ m_log.log(LogService.LOG_DEBUG, "Removed configuration for pid '" + pid + "'");
+ }
+ catch (Exception e) {
+ m_log.log(LogService.LOG_ERROR, "Unable to remove configuration for pid '" + pid + "'", e);
+ }
+ }
+
+ /**
+ * Remove the config extension (.cfg) and return the resulting String.
+ */
+ protected String parsePid(File file) {
+ String name = file.getName();
+ if (file.isDirectory()) {
+ // factory pid
+ return name;
+ }
+ else {
+ return name.substring(0, name.length() - 4);
+ }
+ }
+
+
+ /**
+ * Performs variable substitution for a complete set of properties
+ *
+ * @see substVars()
+ * @param properties Set of properties to apply substitution on.
+ * @return Same set of properties with all variables substituted.
+ */
+ private Properties substVars(Properties properties) {
+ for (Enumeration propertyKeys = properties.propertyNames(); propertyKeys.hasMoreElements(); ) {
+ String name = (String) propertyKeys.nextElement();
+ String value = properties.getProperty(name);
+ properties.setProperty(name, substVars(value, name, null, properties));
+ }
+ return properties;
+ }
+
+ /**
+ * <p>
+ * This method performs property variable substitution on the specified value. If the specified value contains the syntax
+ * <tt>${<prop-name>}</tt>, where <tt><prop-name></tt> refers to either a configuration property or a
+ * system property, then the corresponding property value is substituted for the variable placeholder. Multiple variable
+ * placeholders may exist in the specified value as well as nested variable placeholders, which are substituted from inner
+ * most to outer most. Configuration properties override system properties.
+ * </p>
+ *
+ * @param val The string on which to perform property substitution.
+ * @param currentKey The key of the property being evaluated used to detect cycles.
+ * @param cycleMap Map of variable references used to detect nested cycles.
+ * @param configProps Set of configuration properties.
+ * @return The value of the specified string after system property substitution.
+ * @throws IllegalArgumentException If there was a syntax error in the property placeholder syntax or a recursive variable
+ * reference.
+ */
+ private String substVars(String val, String currentKey, Map cycleMap, Properties configProps) throws IllegalArgumentException {
+ // If there is currently no cycle map, then create
+ // one for detecting cycles for this invocation.
+ if (cycleMap == null) {
+ cycleMap = new HashMap();
+ }
+
+ // Put the current key in the cycle map.
+ cycleMap.put(currentKey, currentKey);
+
+ // Assume we have a value that is something like:
+ // "leading ${foo.${bar}} middle ${baz} trailing"
+
+ // Find the first ending '}' variable delimiter, which
+ // will correspond to the first deepest nested variable
+ // placeholder.
+ int stopDelim = val.indexOf(DELIM_STOP);
+
+ // Find the matching starting "${" variable delimiter
+ // by looping until we find a start delimiter that is
+ // greater than the stop delimiter we have found.
+ int startDelim = val.indexOf(DELIM_START);
+ while (stopDelim >= 0) {
+ int idx = val.indexOf(DELIM_START, startDelim + DELIM_START.length());
+ if ((idx < 0) || (idx > stopDelim)) {
+ break;
+ }
+ else if (idx < stopDelim) {
+ startDelim = idx;
+ }
+ }
+
+ // If we do not have a start or stop delimiter, then just
+ // return the existing value.
+ if ((startDelim < 0) && (stopDelim < 0)) {
+ return val;
+ }
+ // At this point, we found a stop delimiter without a start,
+ // so throw an exception.
+ else if (((startDelim < 0) || (startDelim > stopDelim)) && (stopDelim >= 0)) {
+ throw new IllegalArgumentException("stop delimiter with no start delimiter: " + val);
+ }
+
+ // At this point, we have found a variable placeholder so
+ // we must perform a variable substitution on it.
+ // Using the start and stop delimiter indices, extract
+ // the first, deepest nested variable placeholder.
+ String variable = val.substring(startDelim + DELIM_START.length(), stopDelim);
+
+ // Verify that this is not a recursive variable reference.
+ if (cycleMap.get(variable) != null) {
+ throw new IllegalArgumentException("recursive variable reference: " + variable);
+ }
+
+ // Get the value of the deepest nested variable placeholder.
+ // Try to configuration properties first.
+ String substValue = (configProps != null) ? configProps.getProperty(variable, null) : null;
+ if (substValue == null) {
+ // Ignore unknown property values.
+ substValue = m_context.getProperty(variable);
+ if (substValue == null) {
+ substValue = "";
+ }
+ }
+
+ // Remove the found variable from the cycle map, since
+ // it may appear more than once in the value and we don't
+ // want such situations to appear as a recursive reference.
+ cycleMap.remove(variable);
+
+ // Append the leading characters, the substituted value of
+ // the variable, and the trailing characters to get the new
+ // value.
+ val = val.substring(0, startDelim) + substValue + val.substring(stopDelim + DELIM_STOP.length(), val.length());
+
+ // Now perform substitution again, since there could still
+ // be substitutions to make.
+ val = substVars(val, currentKey, cycleMap, configProps);
+
+ // Return the value.
+ return val;
+ }
+}
Added: incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/Deployment.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/Deployment.java?rev=788992&view=auto
==============================================================================
--- incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/Deployment.java (added)
+++ incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/Deployment.java Sat Jun 27 15:53:04 2009
@@ -0,0 +1,51 @@
+package net.luminis.liq.deployment;
+
+import java.io.InputStream;
+
+import org.osgi.framework.Version;
+
+/**
+ * Service that abstracts the actual implementation that manages components that are to be deployed.
+ * Implementations of this interface could for example make use of the <code>DeploymentAdmin</code>
+ * from the OSGI mobile spec to actually deploy packages. The objects used as arguments and return values
+ * must all be of the same type, which type depends on the implementation.
+ */
+public interface Deployment {
+
+ /**
+ * Deploys the contents of the stream onto the system
+ *
+ * @param inputStream Stream containing new components.
+ * @return The update package that was installed, may be null if the implementation does not support this.
+ * @throws Exception If the specified stream could not be deployed.
+ */
+ public Object install(InputStream inputStream) throws Exception;
+
+ /**
+ * Gets the name of the specified update package. Guaranteed to work with <code>Object</code>s returned
+ * by the same implementation of this interface.
+ *
+ * @param object The update package
+ * @return the name
+ * @throws IllegalArgumentException when the specified object is an invalid update package, only Objects returned by the same implementation of this interface should be used.
+ */
+ public String getName(Object object) throws IllegalArgumentException;
+
+ /**
+ * Gets the version of the specified update package. Guaranteed to work with <code>Object</code>s returned
+ * by the same implementation of this interface.
+ *
+ * @param object The update package
+ * @return the version
+ * @throws IllegalArgumentException when the specified object is an invalid update package, only Objects returned by the same implementation of this interface should be used.
+ */
+ public Version getVersion(Object object) throws IllegalArgumentException;
+
+ /**
+ * Retrieve a list of installed update packages.
+ *
+ * @return list of installed update packages or an empty array if none are available.
+ */
+ public Object[] list();
+
+}
\ No newline at end of file
Added: incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/deploymentadmin/Activator.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/deploymentadmin/Activator.java?rev=788992&view=auto
==============================================================================
--- incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/deploymentadmin/Activator.java (added)
+++ incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/deploymentadmin/Activator.java Sat Jun 27 15:53:04 2009
@@ -0,0 +1,23 @@
+package net.luminis.liq.deployment.deploymentadmin;
+
+import net.luminis.liq.deployment.Deployment;
+
+import org.apache.felix.dependencymanager.DependencyActivatorBase;
+import org.apache.felix.dependencymanager.DependencyManager;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.deploymentadmin.DeploymentAdmin;
+import org.osgi.service.log.LogService;
+
+public class Activator extends DependencyActivatorBase {
+ public void init(BundleContext context, DependencyManager manager) throws Exception {
+ manager.add(createService()
+ .setInterface(Deployment.class.getName(), null)
+ .setImplementation(DeploymentAdminDeployer.class)
+ .add(createServiceDependency().setService(DeploymentAdmin.class).setRequired(true))
+ .add(createServiceDependency().setService(LogService.class).setRequired(false)));
+ }
+
+ public void destroy(BundleContext context, DependencyManager manager) throws Exception {
+ // do nothing
+ }
+}
\ No newline at end of file
Added: incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/deploymentadmin/DeploymentAdminDeployer.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/deploymentadmin/DeploymentAdminDeployer.java?rev=788992&view=auto
==============================================================================
--- incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/deploymentadmin/DeploymentAdminDeployer.java (added)
+++ incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/deploymentadmin/DeploymentAdminDeployer.java Sat Jun 27 15:53:04 2009
@@ -0,0 +1,44 @@
+package net.luminis.liq.deployment.deploymentadmin;
+
+import java.io.InputStream;
+
+import net.luminis.liq.deployment.Deployment;
+
+import org.osgi.framework.Version;
+import org.osgi.service.deploymentadmin.DeploymentAdmin;
+import org.osgi.service.deploymentadmin.DeploymentPackage;
+import org.osgi.service.log.LogService;
+
+/**
+ * Implementation of the <code>DeploymentService</code> interface that uses the <code>DeploymentAdmin</code>
+ * to deploy components.
+ */
+public class DeploymentAdminDeployer implements Deployment {
+ private volatile LogService m_log; /* will be injected by dependencymanager */
+ private volatile DeploymentAdmin m_admin; /* will be injected by dependencymanager */
+
+ public String getName(Object object) throws IllegalArgumentException {
+ if (!(object instanceof DeploymentPackage)) {
+ throw new IllegalArgumentException("Argument is not a DeploymentPackage");
+ }
+ return ((DeploymentPackage) object).getName();
+ }
+
+ public Version getVersion(Object object) throws IllegalArgumentException {
+ if (!(object instanceof DeploymentPackage)) {
+ throw new IllegalArgumentException("Argument is not a DeploymentPackage");
+ }
+ return ((DeploymentPackage) object).getVersion();
+ }
+
+ public Object install(InputStream inputStream) throws Exception {
+ DeploymentPackage deploymentPackage = m_admin.installDeploymentPackage(inputStream);
+ m_log.log(LogService.LOG_INFO, "Deployment Package installed: name=" + deploymentPackage.getName() + " version=" + deploymentPackage.getVersion());
+ return deploymentPackage;
+ }
+
+ public Object[] list() {
+ // DeploymentAdmin spec says this call should never return null
+ return m_admin.listDeploymentPackages();
+ }
+}
Added: incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/task/Activator.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/task/Activator.java?rev=788992&view=auto
==============================================================================
--- incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/task/Activator.java (added)
+++ incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/task/Activator.java Sat Jun 27 15:53:04 2009
@@ -0,0 +1,51 @@
+package net.luminis.liq.deployment.task;
+
+import java.util.Dictionary;
+import java.util.Properties;
+
+import net.luminis.liq.deployment.Deployment;
+import net.luminis.liq.discovery.Discovery;
+import net.luminis.liq.identification.Identification;
+import net.luminis.liq.scheduler.constants.SchedulerConstants;
+
+import org.apache.felix.dependencymanager.DependencyActivatorBase;
+import org.apache.felix.dependencymanager.DependencyManager;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.log.LogService;
+
+public class Activator extends DependencyActivatorBase {
+ public void init(BundleContext context, DependencyManager manager) throws Exception {
+ Dictionary updateProperties = new Properties();
+ updateProperties.put(SchedulerConstants.SCHEDULER_DESCRIPTION_KEY, "Task that synchronizes the artifacts (bundles, resources) installed on this gateway with the server.");
+ updateProperties.put(SchedulerConstants.SCHEDULER_NAME_KEY, DeploymentUpdateTask.class.getName());
+ updateProperties.put(SchedulerConstants.SCHEDULER_RECIPE, "5000");
+
+ Dictionary checkProperties = new Properties();
+ checkProperties.put(SchedulerConstants.SCHEDULER_DESCRIPTION_KEY, "Task that checks for updates for gateway on the server.");
+ checkProperties.put(SchedulerConstants.SCHEDULER_NAME_KEY, DeploymentCheckTask.class.getName());
+ checkProperties.put(SchedulerConstants.SCHEDULER_RECIPE, "5000");
+
+ manager.add(createService()
+ .setInterface(Runnable.class.getName(), updateProperties)
+ .setImplementation(DeploymentUpdateTask.class)
+ .add(createServiceDependency().setService(Deployment.class).setRequired(true))
+ .add(createServiceDependency().setService(Identification.class).setRequired(true))
+ .add(createServiceDependency().setService(Discovery.class).setRequired(true))
+ .add(createServiceDependency().setService(EventAdmin.class).setRequired(false))
+ .add(createServiceDependency().setService(LogService.class).setRequired(false)));
+
+ manager.add(createService()
+ .setInterface(Runnable.class.getName(), checkProperties)
+ .setImplementation(DeploymentCheckTask.class)
+ .add(createServiceDependency().setService(Deployment.class).setRequired(true))
+ .add(createServiceDependency().setService(Identification.class).setRequired(true))
+ .add(createServiceDependency().setService(Discovery.class).setRequired(true))
+ .add(createServiceDependency().setService(EventAdmin.class).setRequired(false))
+ .add(createServiceDependency().setService(LogService.class).setRequired(false)));
+ }
+
+ public void destroy(BundleContext context, DependencyManager manager) throws Exception {
+ // do nothing
+ }
+}
\ No newline at end of file
Added: incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/task/DeploymentCheckTask.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/task/DeploymentCheckTask.java?rev=788992&view=auto
==============================================================================
--- incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/task/DeploymentCheckTask.java (added)
+++ incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/task/DeploymentCheckTask.java Sat Jun 27 15:53:04 2009
@@ -0,0 +1,52 @@
+package net.luminis.liq.deployment.task;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Properties;
+
+import org.osgi.framework.Version;
+import org.osgi.service.event.Event;
+import org.osgi.service.log.LogService;
+
+/**
+ * Task that checks for a new version and sends out an event if there is a new version. It does not actually
+ * download or install it.
+ */
+public class DeploymentCheckTask extends DeploymentTaskBase implements Runnable {
+ private static final String TOPIC_UPDATE_AVAILABLE = "net/luminis/liq/deployment/UPDATEAVAILABLE";
+
+ /**
+ * When run a check is made if a higher version is available on the remote. If so, send out an event.
+ */
+ public void run() {
+ try {
+ String gatewayID = m_identification.getID();
+ URL host = m_discovery.discover();
+
+ Version highestLocalVersion = getHighestLocalVersion();
+
+ if (host == null) {
+ //expected if there's no discovered
+ //ps or relay server
+ m_log.log(LogService.LOG_INFO, "Highest remote: unknown / Highest local: " + highestLocalVersion);
+ return;
+ }
+
+ URL url = new URL(host, "deployment/" + gatewayID + "/versions/");
+ Version highestRemoteVersion = getHighestRemoteVersion(url);
+ m_log.log(LogService.LOG_INFO, "Highest remote: " + highestRemoteVersion + " / Highest local: " + highestLocalVersion);
+ if ((highestRemoteVersion != null) && ((highestLocalVersion == null) || (highestRemoteVersion.compareTo(highestLocalVersion) > 0))) {
+ Properties properties = new Properties();
+ properties.put("deploymentpackage.localversion", ((highestLocalVersion == null) ? Version.emptyVersion : highestLocalVersion));
+ properties.put("deploymentpackage.remoteversion", highestRemoteVersion);
+ m_eventAdmin.postEvent(new Event(TOPIC_UPDATE_AVAILABLE, properties));
+ }
+ }
+ catch (MalformedURLException e) {
+ m_log.log(LogService.LOG_ERROR, "Error creating endpoint url",e );
+ }
+ catch (Exception e) {
+ m_log.log(LogService.LOG_ERROR, "Error checking for update", e);
+ }
+ }
+}
Added: incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/task/DeploymentTaskBase.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/task/DeploymentTaskBase.java?rev=788992&view=auto
==============================================================================
--- incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/task/DeploymentTaskBase.java (added)
+++ incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/task/DeploymentTaskBase.java Sat Jun 27 15:53:04 2009
@@ -0,0 +1,140 @@
+package net.luminis.liq.deployment.task;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import net.luminis.liq.deployment.Deployment;
+import net.luminis.liq.discovery.Discovery;
+import net.luminis.liq.identification.Identification;
+
+import org.osgi.framework.Version;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.log.LogService;
+
+public class DeploymentTaskBase {
+
+ private final String TOPIC_DEPLOYMENTPACKAGE_INSTALL = "net/luminis/liq/deployment/INSTALL";
+
+ // injected by dependencymanager
+ protected volatile Deployment m_deployer;
+ protected volatile Identification m_identification;
+ protected volatile Discovery m_discovery;
+ protected volatile LogService m_log;
+ protected volatile EventAdmin m_eventAdmin;
+
+ /**
+ * Installs the version specified by the highestRemoteVersion.
+ *
+ * @param url Base URL for retrieving a specific version
+ * @param highestRemoteVersion The version to retrieve and install
+ * @param highestLocalVersion The current version or <code>null</code> in case of none.
+ */
+ public void installVersion(URL url, Version highestRemoteVersion, Version highestLocalVersion) throws IOException, Exception {
+ InputStream inputStream = null;
+ m_log.log(LogService.LOG_INFO, "Installing version: " + highestRemoteVersion);
+ try {
+ String version = highestRemoteVersion.toString();
+ if (highestLocalVersion != null) {
+ version += "?current=" + highestLocalVersion.toString();
+ }
+ URL dataURL = new URL(url, version);
+ inputStream = dataURL.openStream();
+
+ // Post event for auditlog
+ Dictionary properties = new Properties();
+ properties.put("deploymentpackage.url", dataURL.toString());
+ properties.put("deploymentpackage.version", version);
+ m_eventAdmin.postEvent(new Event(TOPIC_DEPLOYMENTPACKAGE_INSTALL, properties));
+
+ m_deployer.install(inputStream);
+ }
+ finally {
+ if (inputStream != null) {
+ try {
+ inputStream.close();
+ }
+ catch (Exception ex) {
+ // Not much we can do.
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns the highest version that is available locally (already installed).
+ *
+ * @return The highest installed version or <code>null</code> if no version is available locally.
+ */
+ public Version getHighestLocalVersion() {
+ Object[] installedPackages = m_deployer.list();
+ List versions = new ArrayList();
+ for (int i = 0; i < installedPackages.length; i++) {
+ versions.add(m_deployer.getVersion(installedPackages[i]));
+ }
+ return getHighestVersion(versions);
+ }
+
+ /**
+ * Returns the highest version that is available remotely.
+ *
+ * @param url The URL to be used to retrieve the versions available on the remote.
+ * @return The highest version available on the remote or <code>null</code> if no versions were available or the remote could not be reached.
+ */
+ public Version getHighestRemoteVersion(URL url) {
+ BufferedReader bufReader = null;
+ try {
+ bufReader = new BufferedReader(new InputStreamReader(url.openStream()));
+
+ List versions = new ArrayList();
+ for (String versionString = bufReader.readLine(); versionString != null; versionString = bufReader.readLine()) {
+ try {
+ Version version = Version.parseVersion(versionString);
+ if (version != Version.emptyVersion) {
+ versions.add(version);
+ }
+ }
+ catch (IllegalArgumentException iae) {
+ m_log.log(LogService.LOG_WARNING, "Received malformed version, ignoring: " + versionString);
+ }
+ }
+ return getHighestVersion(versions);
+ }
+ catch (IOException ioe) {
+ return null;
+ }
+ finally {
+ if (bufReader != null) {
+ try {
+ bufReader.close();
+ }
+ catch (Exception ex) {
+ // not much we can do
+ }
+ }
+ }
+ }
+
+ private Version getHighestVersion(List versions) {
+ Version highestVersion = null;
+ for (Iterator i = versions.iterator(); i.hasNext(); ) {
+ Version version = (Version) i.next();
+ if (highestVersion == null) {
+ highestVersion = version;
+ }
+ else if (version.compareTo(highestVersion) > 0) {
+ highestVersion = version;
+ }
+ }
+ return highestVersion;
+ }
+
+}
Added: incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/task/DeploymentUpdateTask.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/task/DeploymentUpdateTask.java?rev=788992&view=auto
==============================================================================
--- incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/task/DeploymentUpdateTask.java (added)
+++ incubator/ace/trunk/gateway/src/net/luminis/liq/deployment/task/DeploymentUpdateTask.java Sat Jun 27 15:53:04 2009
@@ -0,0 +1,53 @@
+package net.luminis.liq.deployment.task;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.osgi.framework.Version;
+import org.osgi.service.log.LogService;
+
+/**
+ * Implementation of the <code>Updater</code> interface that updates software configurations by using the
+ * <code>DeploymentService</code> to determine the current local version and to actually install new versions.
+ */
+public class DeploymentUpdateTask extends DeploymentTaskBase implements Runnable {
+ /**
+ * When run a check is made if a higher version is available on the remote. If so, an attempt is made to install
+ * this new version.
+ */
+ public void run() {
+ try {
+ String gatewayID = m_identification.getID();
+ URL host = m_discovery.discover();
+
+ Version highestLocalVersion = getHighestLocalVersion();
+
+ if (host == null) {
+ //expected if there's no discovered
+ //ps or relay server
+ m_log.log(LogService.LOG_INFO, "Highest remote: unknown / Highest local: " + highestLocalVersion);
+ return;
+ }
+
+
+ URL url = new URL(host, "deployment/" + gatewayID + "/versions/");
+ Version highestRemoteVersion = getHighestRemoteVersion(url);
+
+ m_log.log(LogService.LOG_INFO, "Highest remote: " + highestRemoteVersion + " / Highest local: " + highestLocalVersion);
+ if ((highestRemoteVersion != null) && ((highestLocalVersion == null) || (highestRemoteVersion.compareTo(highestLocalVersion) > 0))) {
+ // no local version or local version lower than remote, install the update
+ installVersion(url, highestRemoteVersion, highestLocalVersion);
+ }
+ }
+ catch (MalformedURLException e) {
+ m_log.log(LogService.LOG_ERROR, "Error creating endpoint url", e);
+ }
+ catch (IOException e) {
+ m_log.log(LogService.LOG_ERROR, "Error accessing resources", e);
+ }
+ catch (Exception e) {
+ m_log.log(LogService.LOG_ERROR, "Error installing update", e);
+ }
+ }
+}
Added: incubator/ace/trunk/gateway/src/net/luminis/liq/discovery/property/Activator.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/gateway/src/net/luminis/liq/discovery/property/Activator.java?rev=788992&view=auto
==============================================================================
--- incubator/ace/trunk/gateway/src/net/luminis/liq/discovery/property/Activator.java (added)
+++ incubator/ace/trunk/gateway/src/net/luminis/liq/discovery/property/Activator.java Sat Jun 27 15:53:04 2009
@@ -0,0 +1,34 @@
+package net.luminis.liq.discovery.property;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import net.luminis.liq.discovery.Discovery;
+import net.luminis.liq.discovery.property.constants.DiscoveryConstants;
+
+import org.apache.felix.dependencymanager.DependencyActivatorBase;
+import org.apache.felix.dependencymanager.DependencyManager;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.service.log.LogService;
+
+public class Activator extends DependencyActivatorBase {
+
+ public void init(BundleContext context, DependencyManager manager) throws Exception {
+ Dictionary properties = new Hashtable();
+ properties.put(Constants.SERVICE_PID, DiscoveryConstants.DISCOVERY_PID);
+ manager.add(createService()
+ .setInterface(new String[] {Discovery.class.getName()}, properties)
+ .setImplementation(PropertyBasedDiscovery.class)
+ .add(createConfigurationDependency()
+ .setPid(DiscoveryConstants.DISCOVERY_PID))
+ .add(createServiceDependency()
+ .setService(LogService.class)
+ .setRequired(false)));
+ }
+
+ public void destroy(BundleContext context, DependencyManager manager) throws Exception {
+ // do nothing
+ }
+
+}
\ No newline at end of file
Added: incubator/ace/trunk/gateway/src/net/luminis/liq/discovery/property/PropertyBasedDiscovery.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/gateway/src/net/luminis/liq/discovery/property/PropertyBasedDiscovery.java?rev=788992&view=auto
==============================================================================
--- incubator/ace/trunk/gateway/src/net/luminis/liq/discovery/property/PropertyBasedDiscovery.java (added)
+++ incubator/ace/trunk/gateway/src/net/luminis/liq/discovery/property/PropertyBasedDiscovery.java Sat Jun 27 15:53:04 2009
@@ -0,0 +1,41 @@
+package net.luminis.liq.discovery.property;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Dictionary;
+
+import net.luminis.liq.discovery.Discovery;
+import net.luminis.liq.discovery.property.constants.DiscoveryConstants;
+
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedService;
+import org.osgi.service.log.LogService;
+
+/**
+ * Simple implementation of the <code>Discovery</code> interface. It 'discovers'
+ * the Provisioning Server by implementing the <code>ManagedService</code> and having the
+ * location configured by <code>ConfigurationAdmin</code>. If no configuration or a <code>null</code>
+ * configuration has been supplied by <code>ConfigurationAdmin</code> the location stored
+ * in <code>GatewayConstants.DISCOVERY_DEFAULT_URL</code> will be used.
+ */
+public class PropertyBasedDiscovery implements Discovery, ManagedService {
+
+ volatile public LogService m_log; /* will be injected by dependencymanager */
+ private URL m_serverURL; /* managed by configadmin */
+
+ public synchronized void updated(Dictionary dictionary) throws ConfigurationException {
+ try {
+ if(dictionary != null) {
+ m_serverURL = new URL((String) dictionary.get(DiscoveryConstants.DISCOVERY_URL_KEY));
+ }
+ }
+ catch (MalformedURLException e) {
+ throw new ConfigurationException(DiscoveryConstants.DISCOVERY_URL_KEY, "Malformed URL", e);
+ }
+ }
+
+ public synchronized URL discover() {
+ return m_serverURL;
+ }
+
+}
Added: incubator/ace/trunk/gateway/src/net/luminis/liq/discovery/property/constants/DiscoveryConstants.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/gateway/src/net/luminis/liq/discovery/property/constants/DiscoveryConstants.java?rev=788992&view=auto
==============================================================================
--- incubator/ace/trunk/gateway/src/net/luminis/liq/discovery/property/constants/DiscoveryConstants.java (added)
+++ incubator/ace/trunk/gateway/src/net/luminis/liq/discovery/property/constants/DiscoveryConstants.java Sat Jun 27 15:53:04 2009
@@ -0,0 +1,6 @@
+package net.luminis.liq.discovery.property.constants;
+
+public interface DiscoveryConstants {
+ public static final String DISCOVERY_PID = "net.luminis.liq.discovery.property";
+ public static final String DISCOVERY_URL_KEY = "serverURL";
+}
Added: incubator/ace/trunk/gateway/src/net/luminis/liq/discovery/upnp/Activator.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/gateway/src/net/luminis/liq/discovery/upnp/Activator.java?rev=788992&view=auto
==============================================================================
--- incubator/ace/trunk/gateway/src/net/luminis/liq/discovery/upnp/Activator.java (added)
+++ incubator/ace/trunk/gateway/src/net/luminis/liq/discovery/upnp/Activator.java Sat Jun 27 15:53:04 2009
@@ -0,0 +1,39 @@
+package net.luminis.liq.discovery.upnp;
+
+import net.luminis.liq.discovery.Discovery;
+
+import org.apache.felix.dependencymanager.DependencyActivatorBase;
+import org.apache.felix.dependencymanager.DependencyManager;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.log.LogService;
+import org.osgi.service.upnp.UPnPDevice;
+
+public class Activator extends DependencyActivatorBase {
+
+ public void init(BundleContext context, DependencyManager manager) throws Exception {
+
+
+ StringBuffer deviceFilter = new StringBuffer();
+ deviceFilter.append("(")
+ .append(UPnPDevice.TYPE).append("=")
+ .append(UPnPBasedDiscovery.DEVICE_TYPE).append(")");
+
+ manager.add(createService()
+ .setInterface(new String[] {Discovery.class.getName()}, null)
+ .setImplementation(UPnPBasedDiscovery.class)
+ .add(createServiceDependency()
+ .setService(LogService.class)
+ .setRequired(false))
+ //not required
+ .add(createServiceDependency()
+ .setService(UPnPDevice.class, deviceFilter.toString())
+ .setCallbacks("added", "removed")
+ .setRequired(false))
+ );
+ }
+
+ public void destroy(BundleContext context, DependencyManager manager) throws Exception {
+ // do nothing
+ }
+
+}
\ No newline at end of file
Added: incubator/ace/trunk/gateway/src/net/luminis/liq/discovery/upnp/UPnPBasedDiscovery.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/gateway/src/net/luminis/liq/discovery/upnp/UPnPBasedDiscovery.java?rev=788992&view=auto
==============================================================================
--- incubator/ace/trunk/gateway/src/net/luminis/liq/discovery/upnp/UPnPBasedDiscovery.java (added)
+++ incubator/ace/trunk/gateway/src/net/luminis/liq/discovery/upnp/UPnPBasedDiscovery.java Sat Jun 27 15:53:04 2009
@@ -0,0 +1,113 @@
+package net.luminis.liq.discovery.upnp;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.List;
+
+import net.luminis.liq.discovery.Discovery;
+
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.log.LogService;
+import org.osgi.service.upnp.UPnPAction;
+import org.osgi.service.upnp.UPnPDevice;
+import org.osgi.service.upnp.UPnPService;
+
+/**
+ * Simple implementation of the <code>Discovery</code> interface. It 'discovers'
+ * the Provisioning Server by means of UPnP.
+ */
+public class UPnPBasedDiscovery implements Discovery {
+
+ final static String DEVICE_TYPE = "urn:schemas-upnp-org:device:ProvisioningDevice:1";
+ final static String SERVICE_ID = "urn:upnp-org:serviceId:LocationService:1";
+ final static String ACTION_GET_LOCATION = "GetLocation";
+ final static String ACTION_GET_TYPE = "GetServerType";
+ final static String ACTION_GET_LOAD = "GetServerLoad";
+
+ volatile public LogService m_log; /* will be injected by dependencymanager */
+
+ private List m_devices;
+
+ public void start() {
+ m_devices = new ArrayList();
+ }
+
+ public void added(ServiceReference ref, Object device) {
+ if (device instanceof UPnPDevice) {
+ m_devices.add(device);
+ }
+ }
+
+
+ public void removed(Object device) {
+ m_devices.remove(device);
+ }
+
+
+
+ public synchronized URL discover() {
+
+ try {
+ return getLocation();
+ }
+ catch (Exception e) {
+ m_log.log(LogService.LOG_DEBUG, "unable to retrieve location property", e);
+ }
+
+ return null;
+ }
+
+
+ private URL getLocation() {
+ UPnPAction action = getAction(ACTION_GET_LOCATION);
+ try {
+ Dictionary dict = action.invoke(null);
+ String location = (String)dict.get(action.getOutputArgumentNames()[0]);
+ return new URL(location);
+ }
+ catch (Exception e) {}
+ return null;
+ }
+
+ private String getType() {
+ UPnPAction action = getAction(ACTION_GET_TYPE);
+ try {
+ Dictionary dict = action.invoke(null);
+ return (String)dict.get(action.getOutputArgumentNames()[0]);
+ }
+ catch (Exception e) {}
+ return "Unknown";
+ }
+
+ private int getLoad() {
+ UPnPAction action = getAction(ACTION_GET_LOAD);
+ try {
+ Dictionary dict = action.invoke(null);
+ Integer val = (Integer)dict.get(action.getOutputArgumentNames()[0]);
+ return val.intValue();
+ }
+ catch (Exception e) {
+ //ignore, just report worst case
+ }
+ return 100;
+ }
+
+ private UPnPAction getAction(String name) {
+ UPnPDevice device = null;
+
+ //zero-order implementation
+ if ( m_devices.size() > 0 ) {
+ device = (UPnPDevice)m_devices.iterator().next();
+ }
+
+ if (device != null) {
+ UPnPService svc = device.getService(SERVICE_ID);
+ if (svc != null) {
+ return svc.getAction(name);
+ }
+ }
+ return null;
+ }
+
+}
Added: incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/Activator.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/Activator.java?rev=788992&view=auto
==============================================================================
--- incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/Activator.java (added)
+++ incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/Activator.java Sat Jun 27 15:53:04 2009
@@ -0,0 +1,100 @@
+package net.luminis.liq.gateway.log;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import net.luminis.liq.discovery.Discovery;
+import net.luminis.liq.gateway.log.store.LogStore;
+import net.luminis.liq.gateway.log.task.LogSyncTask;
+import net.luminis.liq.identification.Identification;
+import net.luminis.liq.log.Log;
+import net.luminis.liq.scheduler.constants.SchedulerConstants;
+
+import org.apache.felix.dependencymanager.DependencyActivatorBase;
+import org.apache.felix.dependencymanager.DependencyManager;
+import org.apache.felix.dependencymanager.Service;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedServiceFactory;
+import org.osgi.service.log.LogService;
+
+public class Activator extends DependencyActivatorBase implements ManagedServiceFactory {
+
+ private static final String LOG_NAME = "name";
+
+ private DependencyManager m_manager;
+ private final Map m_logInstances = new HashMap(); // String -> Service
+ private final Map m_syncInstances = new HashMap(); // String -> Service
+ private volatile LogService m_log;
+
+ public void init(BundleContext context, DependencyManager manager) throws Exception {
+ m_manager = manager;
+ Properties props = new Properties();
+ props.put(Constants.SERVICE_PID, "net.luminis.liq.gateway.log.factory");
+ manager.add(createService()
+ .setInterface(ManagedServiceFactory.class.getName(), props)
+ .setImplementation(this)
+ .add(createServiceDependency().setService(LogService.class).setRequired(false)));
+ }
+
+ public void destroy(BundleContext context, DependencyManager manager) throws Exception {
+ }
+
+ public synchronized void deleted(String pid) {
+ Service log = (Service) m_logInstances.remove(pid);
+ if (log != null) {
+ m_manager.remove(log);
+ }
+ Service sync = (Service) m_syncInstances.remove(pid);
+ if (sync != null) {
+ m_manager.remove(sync);
+ }
+ }
+
+ public String getName() {
+ return "Log Factory";
+ }
+
+ public synchronized void updated(String pid, Dictionary dict) throws ConfigurationException {
+ String name = (String) dict.get(LOG_NAME);
+ if ((name == null) || "".equals(name)) {
+ throw new ConfigurationException(LOG_NAME, "Log name has to be specified.");
+ }
+
+ Service service = (Service) m_logInstances.get(pid);
+ if (service == null) {
+ // publish log service
+ Properties props = new Properties();
+ props.put(LOG_NAME, name);
+ Service log = m_manager.createService()
+ .setInterface(Log.class.getName(), props)
+ .setImplementation(LogImpl.class)
+ .add(createServiceDependency().setService(LogStore.class, "(&("+Constants.OBJECTCLASS+"="+LogStore.class.getName()+")(name=" + name + "))").setRequired(true))
+ .add(createServiceDependency().setService(LogService.class).setRequired(false));
+
+ // publish log sync task service
+ Dictionary properties = new Properties();
+ properties.put(SchedulerConstants.SCHEDULER_DESCRIPTION_KEY, "Task that synchronizes log store with id=" + name + " on the gateway and server");
+ properties.put(SchedulerConstants.SCHEDULER_NAME_KEY, name);
+ properties.put(SchedulerConstants.SCHEDULER_RECIPE, "2000");
+ Service sync = m_manager.createService()
+ .setInterface(Runnable.class.getName(), properties)
+ .setImplementation(new LogSyncTask(name))
+ .add(createServiceDependency().setService(LogStore.class, "(&("+Constants.OBJECTCLASS+"="+LogStore.class.getName()+")(name=" + name + "))").setRequired(true))
+ .add(createServiceDependency().setService(Discovery.class).setRequired(true))
+ .add(createServiceDependency().setService(Identification.class).setRequired(true))
+ .add(createServiceDependency().setService(LogService.class).setRequired(false));
+
+ m_logInstances.put(pid, log);
+ m_syncInstances.put(pid, sync);
+ m_manager.add(log);
+ m_manager.add(sync);
+ } else {
+ m_log.log(LogService.LOG_INFO, "Ignoring configuration update because factory instance was lready configured: " + name);
+ }
+ }
+
+}
Added: incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/LogImpl.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/LogImpl.java?rev=788992&view=auto
==============================================================================
--- incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/LogImpl.java (added)
+++ incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/LogImpl.java Sat Jun 27 15:53:04 2009
@@ -0,0 +1,29 @@
+package net.luminis.liq.gateway.log;
+
+import java.io.IOException;
+import java.util.Dictionary;
+
+import net.luminis.liq.gateway.log.store.LogStore;
+import net.luminis.liq.log.LogEvent;
+import net.luminis.liq.log.Log;
+
+import org.osgi.service.log.LogService;
+
+public class LogImpl implements Log {
+ private volatile LogStore m_store;
+ private volatile LogService m_log;
+
+ public void log(int type, Dictionary properties) {
+ try {
+ m_store.put(type, properties);
+ }
+ catch (NullPointerException e) {
+ // if we cannot store the event, we log it to the normal log as extensively as possible
+ m_log.log(LogService.LOG_WARNING, "Could not store event: " + (new LogEvent("", 0, 0, 0, type, properties)).toRepresentation(), e);
+ }
+ catch (IOException e) {
+ // if we cannot store the event, we log it to the normal log as extensively as possible
+ m_log.log(LogService.LOG_WARNING, "Could not store event: " + (new LogEvent("", 0, 0, 0, type, properties)).toRepresentation(), e);
+ }
+ }
+}
Added: incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/store/LogStore.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/store/LogStore.java?rev=788992&view=auto
==============================================================================
--- incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/store/LogStore.java (added)
+++ incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/store/LogStore.java Sat Jun 27 15:53:04 2009
@@ -0,0 +1,62 @@
+package net.luminis.liq.gateway.log.store;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.List;
+
+import net.luminis.liq.log.LogEvent;
+
+/**
+ * Server log store interface for the gateways. Implementations of this service interface provide a persisted storage for
+ * log data.
+ */
+public interface LogStore {
+
+ /**
+ * Create a new event out of the given type and properties. Write it to the store and return it.
+ *
+ * @param type the type of the event.
+ * @param props the properties of the event.
+ * @return the created event that has been persisted.
+ * @throws IOException in case of any IO error.
+ */
+ public LogEvent put(int type, Dictionary props) throws IOException;
+
+ /**
+ * Get all events in the given log.
+ *
+ * @param logID the id of the log.
+ * @return a list of LogEvent's that are currently in the log of the given logID.
+ * @throws IOException in case of any IO error.
+ */
+ public List/*<LogEvent>*/get(long logID) throws IOException;
+
+ /**
+ * Get the events in the given log that are in the range of the given lower and upper bound.
+ *
+ * @param logID the id of the log.
+ * @param from the lower bound.
+ * @param to the upper bound.
+ * @return a list of LogEvent's that are currently in the log of the given logID and have an id in the range of the given
+ * bounds.
+ * @throws IOException in case of any IO error.
+ */
+ public List/*<LogEvent>*/get(long logID, long from, long to) throws IOException;
+
+ /**
+ * Get the the highest id of any LogEvent entry in the given log.
+ *
+ * @param logID the id of the log.
+ * @return the id of the highest LogEvent entry in the given log.
+ * @throws IOException in case of any IO error.
+ */
+ public long getHighestID(long logID) throws IOException;
+
+ /**
+ * Get the ids of all available logs in this store.
+ *
+ * @return an array of the ids of all available logs in this store.
+ * @throws IOException in case of any IO error.
+ */
+ public long[] getLogIDs() throws IOException;
+}
Added: incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/store/impl/Activator.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/store/impl/Activator.java?rev=788992&view=auto
==============================================================================
--- incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/store/impl/Activator.java (added)
+++ incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/store/impl/Activator.java Sat Jun 27 15:53:04 2009
@@ -0,0 +1,91 @@
+package net.luminis.liq.gateway.log.store.impl;
+
+import java.io.File;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import net.luminis.liq.gateway.log.store.LogStore;
+import net.luminis.liq.identification.Identification;
+
+import org.apache.felix.dependencymanager.DependencyActivatorBase;
+import org.apache.felix.dependencymanager.DependencyManager;
+import org.apache.felix.dependencymanager.Service;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedServiceFactory;
+import org.osgi.service.log.LogService;
+
+public class Activator extends DependencyActivatorBase implements ManagedServiceFactory {
+
+ private static final String LOG_NAME = "name";
+
+ private DependencyManager m_manager;
+ private BundleContext m_context;
+ private final Map m_instances = new HashMap(); // String -> Service
+ private volatile LogService m_log;
+
+ public void init(BundleContext context, DependencyManager manager) throws Exception {
+ m_context = context;
+ m_manager = manager;
+ Properties props = new Properties();
+ props.put(Constants.SERVICE_PID, "net.luminis.liq.gateway.log.store.factory");
+ manager.add(createService()
+ .setInterface(ManagedServiceFactory.class.getName(), props)
+ .setImplementation(this)
+ .add(createServiceDependency().setService(LogService.class).setRequired(false)));
+ }
+
+ public void destroy(BundleContext context, DependencyManager manager) throws Exception {
+ // Nothing we need to do
+ }
+
+ public synchronized void deleted(String pid) {
+ Service log = (Service) m_instances.remove(pid);
+ if (log != null) {
+ m_manager.remove(log);
+ delete(new File(m_context.getDataFile(""), pid));
+ }
+ }
+
+ public String getName() {
+ return "Log Store Factory";
+ }
+
+ private void delete(File root) {
+ if (root.isDirectory()) {
+ File[] files = root.listFiles();
+ for (int i = 0; i < files.length; i++) {
+ delete(files[i]);
+ }
+ }
+ root.delete();
+ }
+
+ public synchronized void updated(String pid, Dictionary dict) throws ConfigurationException {
+ String name = (String) dict.get(LOG_NAME);
+ if ((name == null) || "".equals(name)) {
+ throw new ConfigurationException(LOG_NAME, "Log name has to be specified.");
+ }
+
+ Service service = (Service) m_instances.get(pid);
+ if (service == null) {
+ Properties props = new Properties();
+ props.put(LOG_NAME, name);
+ File baseDir = new File(m_context.getDataFile(""), pid);
+ service = m_manager.createService()
+ .setInterface(LogStore.class.getName(), props)
+ .setImplementation(new LogStoreImpl(baseDir))
+ .add(createServiceDependency().setService(Identification.class).setRequired(true))
+ .add(createServiceDependency().setService(LogService.class).setRequired(false));
+ m_instances.put(pid, service);
+ m_manager.add(service);
+ } else {
+ m_log.log(LogService.LOG_INFO, "Ignoring configuration update because factory instance was already configured: " + name);
+ }
+ }
+
+
+}
Added: incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/store/impl/LogStoreImpl.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/store/impl/LogStoreImpl.java?rev=788992&view=auto
==============================================================================
--- incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/store/impl/LogStoreImpl.java (added)
+++ incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/store/impl/LogStoreImpl.java Sat Jun 27 15:53:04 2009
@@ -0,0 +1,468 @@
+package net.luminis.liq.gateway.log.store.impl;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.List;
+
+import net.luminis.liq.gateway.log.store.LogStore;
+import net.luminis.liq.identification.Identification;
+import net.luminis.liq.log.LogEvent;
+
+import org.osgi.service.log.LogService;
+
+/**
+ * This class provides an implementation of the LogStore service. It tries to repair broken log files to make them readable
+ * again. However, this might lead to loss of data. Additionally, a new file is used when an error is detected.
+ */
+public class LogStoreImpl implements LogStore {
+ // injected by dependencymanager
+ volatile Identification m_identification;
+ volatile LogService m_log;
+
+ // The current store
+ private Store m_store = null;
+ private final File m_baseDir;
+
+ /**
+ * Create new instance using the specified directory as root directory.
+ *
+ * @param baseDir Root directory to use for storage.
+ */
+ public LogStoreImpl(File baseDir) {
+ m_baseDir = new File(baseDir, "store");
+ }
+
+ /**
+ * Init the current store.
+ *
+ * @throws IOException
+ */
+ protected synchronized void start() throws IOException {
+ if (!m_baseDir.isDirectory() && !m_baseDir.mkdirs()) {
+ throw new IllegalArgumentException("Need valid dir");
+ }
+ long current = -1;
+ File[] files = (File[]) notNull(m_baseDir.listFiles());
+ for (int i = 0; i < files.length; i++) {
+ long id = Long.parseLong(files[i].getName());
+ current = Math.max(id, current);
+ }
+ try {
+ if (current == -1) {
+ m_store = newStore();
+ }
+ else {
+ m_store = createStore(current);
+ try {
+ m_store.init();
+ }
+ catch (IOException ex) {
+ handleException(m_store, ex);
+ }
+ }
+ }
+ catch (IOException ex) {
+ // We should be able to recover from the error.
+ m_log.log(LogService.LOG_ERROR, "Exception during log store init", ex);
+ }
+ }
+
+ /**
+ * Close the current store.
+ *
+ * @throws IOException in case of any IO error.
+ */
+ protected synchronized void stop() throws IOException {
+ m_store.close();
+ m_store = null;
+ }
+
+ /**
+ * Create a store object for a new log.
+ *
+ * @return a store object for a new log.
+ * @throws IOException in case of any IO error.
+ */
+ protected Store newStore() throws IOException {
+ long id = System.currentTimeMillis();
+
+ while (!(new File(m_baseDir, String.valueOf(id))).createNewFile()) {
+ id++;
+ }
+
+ return new Store(new File(m_baseDir, String.valueOf(id)), id);
+ }
+
+ /**
+ * Create a store object for the given log. This should not be used to create a new log.
+ *
+ * @param id the id of the log.
+ * @return a new store object for the given log.
+ * @throws IOException in case of an IO error.
+ */
+ protected Store createStore(long id) throws IOException {
+ return new Store(new File(m_baseDir, String.valueOf(id)), id);
+ }
+
+ /**
+ * Get the entries in the given range from the given log.
+ *
+ * @param logID the id of the log.
+ * @param from the lower bound of the range.
+ * @param to the upper bound of the range.
+ * @return a list of entries from the given log in the given range.
+ * @throws IOException in case of any IO error.
+ */
+ public synchronized List get(long logID, long from, long to) throws IOException {
+ Store store = getLog(logID);
+ List result = new ArrayList();
+ try {
+ store.reset();
+ while (store.hasNext()) {
+ LogEvent event = new LogEvent(new String(store.read()));
+ if ((event.getID() >= from) && (event.getID() <= to) ) {
+ result.add(event);
+ }
+ }
+ }
+ catch (Exception ex) {
+ handleException(store, ex);
+ }
+ finally {
+ closeIfNeeded(store);
+ }
+ return result;
+ }
+
+ /**
+ * Try to repair the given store, log the given exception and rethrow it. In case the store is the current log switch to a
+ * new one if possible.
+ *
+ * @param store the store to repair/close.
+ * @param exception the exception to log and rethrow.
+ * @throws IOException the given exception if it is an IOException else the message of the given exception wrapped in an
+ * IOException.
+ */
+ protected void handleException(Store store, Exception exception) throws IOException {
+ m_log.log(LogService.LOG_WARNING, "Exception accessing the log: " + store.getId(), exception);
+ if (store == m_store) {
+ m_store = newStore();
+ }
+
+ try {
+ store.truncate();
+ }
+ catch (IOException ex) {
+ m_log.log(LogService.LOG_WARNING, "Exception during truncate: " + store.getId(), ex);
+ }
+ try {
+ store.close();
+ }
+ catch (IOException ex) {
+ // Not much we can do
+ }
+ if (exception instanceof IOException) {
+ throw (IOException) exception;
+ }
+ throw new IOException("Unable to read log entry: " + exception.getMessage());
+ }
+
+ /**
+ * Get all entries of the given log.
+ *
+ * @param logID the id of the log.
+ * @return a list of all entries in this log.
+ * @throws IOException in case of any IO error.
+ */
+ public List get(long logID) throws IOException {
+ return get(logID, 0, Long.MAX_VALUE);
+ }
+
+ /**
+ * Get the current log ids.
+ *
+ * @return the ids of the current logs.
+ */
+ public long[] getLogIDs() throws IOException {
+ File[] files = (File[]) notNull(m_baseDir.listFiles());
+ long[] result = new long[files.length];
+ for (int i = 0; i < files.length; i++) {
+ result[i] = Long.parseLong(files[i].getName());
+ }
+ return result;
+ }
+
+ /**
+ * Create and add a LogEvent to the current log.
+ *
+ * @param type the type the event.
+ * @param props the properties of the event.
+ * @return the new event.
+ * @throws IOException in case of any IO error.
+ */
+ public synchronized LogEvent put(int type, Dictionary props) throws IOException {
+ try {
+ LogEvent result = new LogEvent(m_identification.getID(), m_store.getId(), getNextID(), System.currentTimeMillis(), type, props);
+ m_store.append(result.toRepresentation().getBytes());
+ return result;
+ }
+ catch (IOException ex) {
+ handleException(m_store, ex);
+ }
+ return null;
+ }
+
+ /**
+ * Get the highest entry id of the given log.
+ *
+ * @param logID the id of the log.
+ * @return the id of the highest entry.
+ * @throws IOException in case of any IO error.
+ */
+ public synchronized long getHighestID(long logID) throws IOException {
+ Store store = getLog(logID);
+ try {
+ store.init();
+ return store.getCurrent();
+ }
+ catch (IOException ex) {
+ handleException(store, ex);
+ }
+ finally {
+ closeIfNeeded(store);
+ }
+ return -1;
+ }
+
+ /**
+ * Close the given store if it is not the current store. IO errors are ignored.
+ *
+ * @param store the store to close.
+ */
+ protected void closeIfNeeded(Store store) {
+ if (store != m_store) {
+ try {
+ store.close();
+ }
+ catch (IOException ex) {
+ // Not much we can do;
+ }
+ }
+ }
+
+ /**
+ * Get a Store object for the log of the given logid.
+ *
+ * @param logID the id for which to return (and possibly create) a store.
+ * @return either a new or the current Store object.
+ * @throws IOException in case of any IO error.
+ */
+ protected Store getLog(long logID) throws IOException {
+ if (m_store.getId() == logID) {
+ return m_store;
+ }
+ return createStore(logID);
+ }
+
+ /**
+ * Get the next id for the current store.
+ *
+ * @return the next free log id of the current store.
+ * @throws IOException
+ */
+ protected long getNextID() throws IOException {
+ return m_store.getCurrent() + 1;
+ }
+
+ /*
+ * throw IOException in case the target is null else return the target.
+ */
+ private Object notNull(Object target) throws IOException {
+ if (target == null) {
+ throw new IOException("Unknown IO error while trying to access the store.");
+ }
+ return target;
+ }
+
+ /**
+ * The general idea is to provide easy access to a file of records. It supports iterating over records both by skipping and
+ * by reading. Furthermore, files can be truncated. Most methods will make an effort to reset to the last good record in
+ * case of an error -- hence, a call to truncate after an IOException might make the store readable again.
+ */
+ class Store {
+ private final RandomAccessFile m_store;
+ private final long m_id;
+ private long m_current;
+
+ /**
+ * Create a new File based Store.
+ *
+ * @param store the file to use as backend.
+ * @param id the log id of the store
+ * @throws IOException in case the file is not rw.
+ */
+ Store(File store, long id) throws IOException {
+ m_store = new RandomAccessFile(store, "rwd");
+ m_id = id;
+ }
+
+ /**
+ * Get the id of the current record.
+ *
+ * @return the idea of the current record.
+ */
+ public long getCurrent() throws IOException {
+ long pos = m_store.getFilePointer();
+ if (m_store.length() == 0) {
+ return 0;
+ }
+ long result = 0;
+ try {
+ m_store.seek(m_current);
+ result = new LogEvent(new String(read())).getID();
+ m_store.seek(pos);
+ }
+ catch (IOException ex) {
+ handle(pos, ex);
+ }
+ return result;
+ }
+
+ /**
+ * Get the log id of this store.
+ *
+ * @return the log id of this store.
+ */
+ public long getId() {
+ return m_id;
+ }
+
+ /**
+ * Reset the store to the beginning of the records
+ *
+ * @throws IOException in case of an IO error.
+ */
+ public void reset() throws IOException {
+ m_store.seek(0);
+ m_current = 0;
+ }
+
+ /**
+ * Determine whether there are any records left based on the current postion.
+ *
+ * @return <code>true</code> if there are still records to be read.
+ * @throws IOException in case of an IO error.
+ */
+ public boolean hasNext() throws IOException {
+ return m_store.getFilePointer() < m_store.length();
+ }
+
+ public byte[] read() throws IOException {
+ long pos = m_store.getFilePointer();
+ try {
+ if (pos < m_store.length()) {
+ long current = m_store.getFilePointer();
+ int next = m_store.readInt();
+ byte[] entry = new byte[next];
+ m_store.readFully(entry);
+ m_current = current;
+ return entry;
+ }
+ }
+ catch (IOException ex) {
+ handle(pos, ex);
+ }
+ return null;
+ }
+
+ /**
+ * Make sure the store is readable. As a result, the store is at the end of the records.
+ *
+ * @throws IOException in case of any IO error.
+ */
+ public void init() throws IOException {
+ reset();
+ try {
+ while (true) {
+ skip();
+ }
+ }
+ catch (EOFException ex) {
+ // done
+ }
+ }
+
+ /**
+ * Skip the next record if there is any.
+ *
+ * @throws IOException in case of any IO error or if there is no record left.
+ */
+ public void skip() throws IOException {
+ long pos = m_store.getFilePointer();
+ try {
+ int next = m_store.readInt();
+ if (m_store.length() < next + m_store.getFilePointer()) {
+ throw new IOException("Unexpected end of file");
+ }
+ m_store.seek(m_store.getFilePointer() + next);
+ m_current = pos;
+ pos = m_store.getFilePointer();
+ }
+ catch (IOException ex) {
+ handle(pos, ex);
+ }
+ }
+
+ /**
+ * Store the given record data as the next record.
+ *
+ * @param entry the data of the record to store.
+ * @throws IOException in case of any IO error.
+ */
+ public void append(byte[] entry) throws IOException {
+ long pos = m_store.getFilePointer();
+ try {
+ m_store.seek(m_store.length());
+ long current = m_store.getFilePointer();
+ m_store.writeInt(entry.length);
+ m_store.write(entry);
+ m_current = current;
+ }
+ catch (IOException ex) {
+ handle(pos, ex);
+ }
+ }
+
+ /**
+ * Try to truncate the store at the current record.
+ *
+ * @throws IOException in case of any IO error.
+ */
+ public void truncate() throws IOException {
+ m_store.setLength(m_store.getFilePointer());
+ }
+
+ /**
+ * Release any resources.
+ *
+ * @throws IOException in case of any IO error.
+ */
+ public void close() throws IOException {
+ m_store.close();
+ }
+
+ private void handle(long pos, IOException exception) throws IOException {
+ try {
+ m_store.seek(pos);
+ }
+ catch (IOException ex) {
+ m_log.log(LogService.LOG_WARNING, "Exception during seek!", ex);
+ }
+ throw exception;
+ }
+ }
+}
\ No newline at end of file
Added: incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/task/Connection.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/task/Connection.java?rev=788992&view=auto
==============================================================================
--- incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/task/Connection.java (added)
+++ incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/task/Connection.java Sat Jun 27 15:53:04 2009
@@ -0,0 +1,74 @@
+package net.luminis.liq.gateway.log.task;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.net.URLConnection;
+
+/**
+ * Helper class that abstracts handling of a URLConnection somewhat.
+ *
+ */
+public class Connection {
+ private URLConnection m_connection;
+
+ public Connection(URL url) throws IOException {
+ m_connection = url.openConnection();
+ }
+
+ /**
+ * Enables the retrieving of input using this connection and returns an inputstream
+ * to the connection.
+ *
+ * @return Inputstream to the connection.
+ * @throws IOException If I/O problems occur.
+ */
+ public InputStream getInputStream() throws IOException {
+ m_connection.setDoInput(true);
+ return m_connection.getInputStream();
+ }
+
+ /**
+ * Enables the sending of output using this connection and returns an outputstream
+ * to the connection.
+ *
+ * @return Outputstream to the connection.
+ * @throws IOException If I/O problems occur.
+ */
+ public OutputStream getOutputStream() throws IOException {
+ m_connection.setDoOutput(true);
+ return m_connection.getOutputStream();
+ }
+
+ /**
+ * Should be called when a <code>Connection</code> is used to do a POST (write to it's outputstream)
+ * without reading it's inputstream (the response). Calling this will make sure the POST request is sent.
+ * If no data was written to the connection nothing is done.
+ */
+ public void close() {
+ if (m_connection.getDoOutput()) {
+ try {
+ m_connection.getOutputStream().close();
+ }
+ catch (IOException e) {
+ // not much we can do
+ }
+ try {
+ m_connection.getContent();
+ }
+ catch (IOException e) {
+ // not much we can do
+ }
+ }
+ if (m_connection.getDoInput()) {
+ try {
+ m_connection.getInputStream().close();
+ }
+ catch (IOException e) {
+ // not much we can do
+ }
+ }
+ }
+
+}
\ No newline at end of file
Added: incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/task/LogSyncTask.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/task/LogSyncTask.java?rev=788992&view=auto
==============================================================================
--- incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/task/LogSyncTask.java (added)
+++ incubator/ace/trunk/gateway/src/net/luminis/liq/gateway/log/task/LogSyncTask.java Sat Jun 27 15:53:04 2009
@@ -0,0 +1,140 @@
+package net.luminis.liq.gateway.log.task;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.net.URL;
+import java.util.List;
+
+import net.luminis.liq.discovery.Discovery;
+import net.luminis.liq.gateway.log.store.LogStore;
+import net.luminis.liq.identification.Identification;
+import net.luminis.liq.log.LogDescriptor;
+import net.luminis.liq.log.LogEvent;
+import net.luminis.liq.repository.RangeIterator;
+import net.luminis.liq.repository.SortedRangeSet;
+
+import org.osgi.service.log.LogService;
+
+public class LogSyncTask implements Runnable {
+
+ private static final String COMMAND_QUERY = "query";
+ private static final String COMMAND_SEND = "send";
+ private static final String PARAMETER_GATEWAYID = "gwid";
+ private static final String PARAMETER_LOGID = "logid";
+
+ // injected by dependencymanager
+ private volatile Discovery m_discovery;
+ private volatile Identification m_identification;
+ private volatile LogService m_log;
+ private volatile LogStore m_LogStore;
+
+ private final String m_endpoint;
+
+ public LogSyncTask(String endpoint) {
+ m_endpoint = endpoint;
+ }
+
+ /**
+ * Synchronize the log events available remote with the events available locally.
+ */
+ public void run() {
+ String gatewayID = m_identification.getID();
+ URL host = m_discovery.discover();
+
+ if (host == null) {
+ //expected if there's no discovered
+ //ps or relay server
+ m_log.log(LogService.LOG_WARNING, "Unable to synchronize log with remote (endpoint=" + m_endpoint + ") - none available");
+ return;
+ }
+
+ Connection sendConnection = null;
+ try {
+ sendConnection = new Connection(new URL(host, m_endpoint + "/" + COMMAND_SEND));
+ long[] logIDs = m_LogStore.getLogIDs();
+ for (int i = 0; i < logIDs.length; i++) {
+ Connection queryConnection = new Connection(new URL(host, m_endpoint + "/" + COMMAND_QUERY + "?" + PARAMETER_GATEWAYID + "=" + gatewayID + "&" + PARAMETER_LOGID + "=" + logIDs[i]));
+ // TODO: make sure no actual call is made using sendConnection when there's nothing to sync
+ synchronizeLog(logIDs[i], queryConnection.getInputStream(), sendConnection);
+ }
+ }
+ catch (IOException e) {
+ m_log.log(LogService.LOG_ERROR, "Unable to (fully) synchronize log with remote (endpoint=" + m_endpoint + ")", e);
+ }
+ finally {
+ if (sendConnection != null) {
+ sendConnection.close();
+ }
+ }
+ }
+
+ /**
+ * Synchronizes a single log (there can be multiple log/logid's per gateway).
+ *
+ * @param logID ID of the log to synchronize.
+ * @param queryInput Stream pointing to a query result for the events available remotely for this log id
+ * @param sendConnection.getOutputStream() Stream to write the events to that are missing on the remote side.
+ * @throws IOException If synchronization could not be completed due to an I/O failure.
+ */
+ protected void synchronizeLog(long logID, InputStream queryInput, Connection sendConnection) throws IOException {
+ long highestLocal = m_LogStore.getHighestID(logID);
+ if (highestLocal == 0) {
+ // No events, no need to synchronize
+ return;
+ }
+ SortedRangeSet localRange = new SortedRangeSet("1-" + highestLocal);
+ SortedRangeSet remoteRange = getDescriptor(queryInput).getRangeSet();
+ SortedRangeSet delta = remoteRange.diffDest(localRange);
+ RangeIterator rangeIterator = delta.iterator();
+ List events = m_LogStore.get(logID, 1, highestLocal);
+ BufferedWriter writer = null;
+ writer = new BufferedWriter(new OutputStreamWriter(sendConnection.getOutputStream()));
+
+ while (rangeIterator.hasNext()) {
+ // Note the -1: Events are 1-based, but the list is 0-based.
+ LogEvent event = (LogEvent) events.get((int) rangeIterator.next() - 1);
+ writer.write(event.toRepresentation() + "\n");
+ }
+ writer.flush();
+ }
+
+ /**
+ * Retrieves a LogDescriptor object from the specified stream.
+ *
+ * @param queryInput Stream containing a LogDescriptor object.
+ * @return LogDescriptor object reflecting the range contained in the stream.
+ * @throws IOException If no range could be determined due to an I/O failure.
+ */
+ protected LogDescriptor getDescriptor(InputStream queryInput) throws IOException {
+ BufferedReader queryReader = null;
+ try {
+ queryReader = new BufferedReader(new InputStreamReader(queryInput));
+ String rangeString = queryReader.readLine();
+ if (rangeString != null) {
+ try {
+ return new LogDescriptor(rangeString);
+ }
+ catch (IllegalArgumentException iae) {
+ throw new IOException("Could not determine highest remote event id, received malformed event range (" + rangeString + ")");
+ }
+ } else {
+ throw new IOException("Could not construct LogDescriptor from stream because stream is empty");
+ }
+ }
+ finally {
+ if (queryReader != null) {
+ try {
+ queryReader.close();
+ }
+ catch (Exception ex) {
+ // not much we can do
+ }
+ }
+ }
+ }
+
+}
Added: incubator/ace/trunk/gateway/src/net/luminis/liq/identification/ifconfig/Activator.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/gateway/src/net/luminis/liq/identification/ifconfig/Activator.java?rev=788992&view=auto
==============================================================================
--- incubator/ace/trunk/gateway/src/net/luminis/liq/identification/ifconfig/Activator.java (added)
+++ incubator/ace/trunk/gateway/src/net/luminis/liq/identification/ifconfig/Activator.java Sat Jun 27 15:53:04 2009
@@ -0,0 +1,23 @@
+package net.luminis.liq.identification.ifconfig;
+
+import net.luminis.liq.identification.Identification;
+
+import org.apache.felix.dependencymanager.DependencyActivatorBase;
+import org.apache.felix.dependencymanager.DependencyManager;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.log.LogService;
+
+public class Activator extends DependencyActivatorBase {
+
+ public synchronized void init(BundleContext context, DependencyManager manager) throws Exception {
+ manager.add(createService()
+ .setInterface(new String[] {Identification.class.getName()}, null)
+ .setImplementation(IfconfigIdentification.class)
+ .add(createServiceDependency().setService(LogService.class).setRequired(false))
+ );
+ }
+
+ public void destroy(BundleContext context, DependencyManager manager) throws Exception {
+ // do nothing
+ }
+}
\ No newline at end of file