You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ace.apache.org by br...@apache.org on 2013/05/07 12:25:15 UTC

svn commit: r1479841 [3/3] - in /ace/trunk: org.apache.ace.agent.itest/ org.apache.ace.agent.itest/.settings/ org.apache.ace.agent.itest/resources/ org.apache.ace.agent.itest/src/ org.apache.ace.agent.itest/src/org/ org.apache.ace.agent.itest/src/org/a...

Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTask.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTask.java?rev=1479841&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTask.java (added)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTask.java Tue May  7 10:25:13 2013
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.agent.logging;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.net.ConnectException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.ace.connectionfactory.ConnectionFactory;
+import org.apache.ace.discovery.Discovery;
+import org.apache.ace.identification.Identification;
+import org.apache.ace.log.LogDescriptor;
+import org.apache.ace.log.LogEvent;
+import org.apache.ace.log.target.store.LogStore;
+import org.apache.ace.range.RangeIterator;
+import org.apache.ace.range.SortedRangeSet;
+import org.osgi.service.log.LogService;
+
+//FIXME This is a of the org.apache.ace.log it is private and may be better located here.
+
+// TODO there are two versions of this class around, the other ohne being the server.LogSyncTask,
+// and both are fairly similar
+public class LogSyncTask implements Runnable {
+
+    private static final String COMMAND_QUERY = "query";
+    private static final String COMMAND_SEND = "send";
+    private static final String PARAMETER_TARGETID = "tid";
+    private static final String PARAMETER_LOGID = "logid";
+
+    // injected by dependencymanager
+    private volatile Discovery m_discovery;
+    private volatile Identification m_identification;
+    private volatile LogService m_log;
+    private volatile LogStore m_LogStore;
+    private volatile ConnectionFactory m_connectionFactory;
+
+    private final String m_endpoint;
+
+    public LogSyncTask(String endpoint) {
+        m_endpoint = endpoint;
+    }
+
+    /**
+     * Synchronize the log events available remote with the events available locally.
+     */
+    public void run() {
+        URL host = m_discovery.discover();
+
+        if (host == null) {
+            // expected if there's no discovered
+            // ps or relay server
+            m_log.log(LogService.LOG_WARNING, "Unable to synchronize log with remote (endpoint=" + m_endpoint + ") - none available");
+            return;
+        }
+
+        if ("file".equals(host.getProtocol())) {
+            // if the discovery URL is a file, we cannot sync, so we silently return here
+            return;
+        }
+
+        String targetId = m_identification.getID();
+        URLConnection sendConnection = null;
+        try {
+            sendConnection = m_connectionFactory.createConnection(new URL(host, m_endpoint + "/" + COMMAND_SEND));
+            sendConnection.setDoOutput(true);
+            if (sendConnection instanceof HttpURLConnection) {
+                // ACE-294: enable streaming mode causing only small amounts of memory to be
+                // used for this commit. Otherwise, the entire input stream is cached into
+                // memory prior to sending it to the server...
+                ((HttpURLConnection) sendConnection).setChunkedStreamingMode(8192);
+            }
+
+            long[] logIDs = m_LogStore.getLogIDs();
+            for (int i = 0; i < logIDs.length; i++) {
+                URL url = new URL(host, m_endpoint + "/" + COMMAND_QUERY + "?" + PARAMETER_TARGETID + "=" + targetId + "&" + PARAMETER_LOGID + "=" + logIDs[i]);
+
+                URLConnection queryConnection = m_connectionFactory.createConnection(url);
+                // TODO: make sure no actual call is made using sendConnection
+                // when there's nothing to sync
+                synchronizeLog(logIDs[i], queryConnection.getInputStream(), sendConnection);
+            }
+
+            // Make sure to send the actual POST request...
+            sendConnection.getContent();
+        }
+        catch (ConnectException e) {
+            m_log.log(LogService.LOG_WARNING, "Unable to connect to remote (endpoint=" + m_endpoint + ")");
+        }
+        catch (IOException e) {
+            m_log.log(LogService.LOG_ERROR, "Unable to (fully) synchronize log with remote (endpoint=" + m_endpoint + ")", e);
+        }
+        finally {
+            if (sendConnection instanceof HttpURLConnection) {
+                ((HttpURLConnection) sendConnection).disconnect();
+            }
+        }
+    }
+
+    /**
+     * Synchronizes a single log (there can be multiple log/logid's per target).
+     * 
+     * @param logID
+     *            ID of the log to synchronize.
+     * @param queryInput
+     *            Stream pointing to a query result for the events available remotely for this log id
+     * @param sendConnection
+     *            .getOutputStream() Stream to write the events to that are missing on the remote side.
+     * @throws java.io.IOException
+     *             If synchronization could not be completed due to an I/O failure.
+     */
+    protected void synchronizeLog(long logID, InputStream queryInput, URLConnection sendConnection) throws IOException {
+        long highestLocal = m_LogStore.getHighestID(logID);
+        if (highestLocal == 0) {
+            // No events, no need to synchronize
+            return;
+        }
+
+        SortedRangeSet localRange = new SortedRangeSet("1-" + highestLocal);
+        SortedRangeSet remoteRange = getDescriptor(queryInput).getRangeSet();
+        SortedRangeSet delta = remoteRange.diffDest(localRange);
+        RangeIterator rangeIterator = delta.iterator();
+
+        BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(sendConnection.getOutputStream()));
+
+        if (rangeIterator.hasNext()) {
+            long lowest = rangeIterator.next();
+            long highest = delta.getHigh();
+            if (lowest <= highest) {
+                List events = m_LogStore.get(logID, lowest, highestLocal > highest ? highest : highestLocal);
+                Iterator iter = events.iterator();
+                while (iter.hasNext()) {
+                    LogEvent current = (LogEvent) iter.next();
+                    while ((current.getID() > lowest) && rangeIterator.hasNext()) {
+                        lowest = rangeIterator.next();
+                    }
+                    if (current.getID() == lowest) {
+                        // before we send the LogEvent to the other side, we fill out the
+                        // appropriate identification
+                        LogEvent event = new LogEvent(m_identification.getID(), current);
+                        writer.write(event.toRepresentation() + "\n");
+                    }
+                }
+            }
+        }
+
+        writer.flush();
+    }
+
+    /**
+     * Retrieves a LogDescriptor object from the specified stream.
+     * 
+     * @param queryInput
+     *            Stream containing a LogDescriptor object.
+     * @return LogDescriptor object reflecting the range contained in the stream.
+     * @throws java.io.IOException
+     *             If no range could be determined due to an I/O failure.
+     */
+    protected LogDescriptor getDescriptor(InputStream queryInput) throws IOException {
+        BufferedReader queryReader = null;
+        try {
+            queryReader = new BufferedReader(new InputStreamReader(queryInput));
+            String rangeString = queryReader.readLine();
+            if (rangeString != null) {
+                try {
+                    return new LogDescriptor(rangeString);
+                }
+                catch (IllegalArgumentException iae) {
+                    throw new IOException("Could not determine highest remote event id, received malformed event range (" + rangeString + ")");
+                }
+            }
+            else {
+                throw new IOException("Could not construct LogDescriptor from stream because stream is empty");
+            }
+        }
+        finally {
+            if (queryReader != null) {
+                try {
+                    queryReader.close();
+                }
+                catch (Exception ex) {
+                    // not much we can do
+                }
+            }
+        }
+    }
+}

Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTaskFactory.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTaskFactory.java?rev=1479841&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTaskFactory.java (added)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTaskFactory.java Tue May  7 10:25:13 2013
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.agent.logging;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.ace.agent.spi.ComponentFactoryBase;
+import org.apache.ace.connectionfactory.ConnectionFactory;
+import org.apache.ace.discovery.Discovery;
+import org.apache.ace.identification.Identification;
+import org.apache.ace.log.target.store.LogStore;
+import org.apache.ace.scheduler.constants.SchedulerConstants;
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyManager;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.log.LogService;
+
+/**
+ * Creates a executor whiteboard {@link Runnable} service components with a {@link LogSyncTask} implementation for every
+ * configured store unless explicitly disabled.
+ * 
+ */
+public class LogSyncTaskFactory extends ComponentFactoryBase {
+
+    @Override
+    public Set<Component> createComponents(BundleContext context, DependencyManager manager, LogService logService, Map<String, String> configuration) {
+
+        Set<Component> components = new HashSet<Component>();
+        String value = configuration.get(LogFactory.LOG_STORES);
+        String[] stores = value.split(",");
+        for (String store : stores) {
+
+            String sync = configuration.get(LogFactory.LOG_STORES + "." + store + ".sync");
+            if (sync != null && sync.trim().toLowerCase().equals("false")) {
+                System.err.println("Disabled " + getAgentIdentifier(configuration) + "/" + store);
+                logService.log(LogService.LOG_DEBUG, "Log sync disabled for agent " + getAgentIdentifier(configuration) + "/" + store);
+            }
+            else {
+                components.add(createLogSyncComponent(context, manager, logService, configuration, store.trim()));
+            }
+        }
+        return components;
+    }
+
+    private Component createLogSyncComponent(BundleContext context, DependencyManager manager, LogService logService, Map<String, String> configuration, String store) {
+
+        String schedulerName = getAgentIdentifier(configuration) + "-" + store;
+        String description = "Task that synchronizes log store " + store + " for agent=" + getAgentIdentifier(configuration) + " on the target and server";
+
+        Properties props = getAgentproperties(configuration);
+        props.put(LogFactory.LOG_NAME, store);
+
+        props.put(SchedulerConstants.SCHEDULER_NAME_KEY, LogSyncTask.class.getSimpleName());
+        props.put(SchedulerConstants.SCHEDULER_DESCRIPTION_KEY, description);
+        props.put(SchedulerConstants.SCHEDULER_RECIPE, "2000");
+
+        Component component = manager.createComponent()
+            .setInterface(Runnable.class.getName(), props)
+            .setImplementation(new LogSyncTask(store))
+            .add(manager.createServiceDependency()
+                .setService(ConnectionFactory.class, getAgentFilter(configuration, null))
+                .setRequired(true))
+            .add(manager.createServiceDependency()
+                .setService(LogStore.class, getAgentFilter(configuration, "(" + LogFactory.LOG_NAME + "=" + store + ")"))
+                .setRequired(true))
+            .add(manager.createServiceDependency()
+                .setService(Discovery.class, getAgentFilter(configuration, null))
+                .setRequired(true))
+            .add(manager.createServiceDependency()
+                .setService(Identification.class, getAgentFilter(configuration, null))
+                .setRequired(true))
+            .add(manager.createServiceDependency()
+                .setService(LogService.class).setRequired(false));
+
+        return component;
+    }
+}

Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/packageinfo
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/packageinfo?rev=1479841&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/packageinfo (added)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/packageinfo Tue May  7 10:25:13 2013
@@ -0,0 +1 @@
+version 1.0
\ No newline at end of file

Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactory.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactory.java?rev=1479841&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactory.java (added)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactory.java Tue May  7 10:25:13 2013
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.agent.spi;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyManager;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.log.LogService;
+
+/**
+ * SPI for Management Agent component factories. The create method is called for every individual agent configuration.
+ * The factory can return zero or more components and should not add them to the manager. Factories are create and
+ * disposed as required. must have a public default constructor, and are expected to be state-less and thread-safe.
+ * 
+ */
+public interface ComponentFactory {
+
+    /**
+     * Return zero or more service components for the specified agent configuartion.
+     * 
+     * @param context
+     *            The Bundle Context
+     * @param manager
+     *            The Dependency Manager
+     * @param logService
+     *            The Log Service
+     * @param configuration
+     *            The agent configuration
+     * @return A set of components, not <code>null</code>
+     * @throws ConfigurationException
+     *             If there is a fatal problem.
+     */
+    Set<Component> createComponents(BundleContext context, DependencyManager manager, LogService logService, Map<String, String> configuration) throws ConfigurationException;
+}

Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactoryBase.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactoryBase.java?rev=1479841&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactoryBase.java (added)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactoryBase.java Tue May  7 10:25:13 2013
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.agent.spi;
+
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Component factory base class that provides some convenience methods for concrete implementations.
+ */
+public abstract class ComponentFactoryBase implements ComponentFactory {
+
+    /**
+     * Returns the agent identifier from a configuration.
+     * 
+     * @param configuration
+     *            The configuration
+     * @return The identifier
+     */
+    protected String getAgentIdentifier(Map<String, String> configuration) {
+        return configuration.get("agent");
+    }
+
+    /**
+     * Returns mutable service properties with agent identifier pre-configured.
+     * 
+     * @param configuration
+     *            The configuration
+     * @return The properties
+     */
+    protected Properties getAgentproperties(Map<String, String> configuration) {
+        Properties properties = new Properties();
+        properties.put("agent", getAgentIdentifier(configuration));
+        return properties;
+    }
+
+    /**
+     * Returns a service filter that scopes to the agent identifier. Optionally wraps a base filter.
+     * 
+     * @param configuration
+     *            The configuration
+     * @param base
+     *            The optional base filter
+     * @return The filter
+     */
+    protected String getAgentFilter(Map<String, String> configuration, String base) {
+        if (base == null) {
+            return "(agent=" + getAgentIdentifier(configuration) + ")";
+        }
+        else {
+            return "(&(agent=" + getAgentIdentifier(configuration) + ")" + base + ")";
+        }
+    }
+}

Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/OneComponentFactoryBase.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/OneComponentFactoryBase.java?rev=1479841&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/OneComponentFactoryBase.java (added)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/OneComponentFactoryBase.java Tue May  7 10:25:13 2013
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.agent.spi;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyManager;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.log.LogService;
+
+/**
+ * Convenience base class for component factories that return just one component.
+ * 
+ */
+public abstract class OneComponentFactoryBase extends ComponentFactoryBase {
+
+    @Override
+    public final Set<Component> createComponents(BundleContext context, DependencyManager manager, LogService logService, Map<String, String> configuration) throws ConfigurationException {
+        Component component = createComponent(context, manager, logService, configuration);
+        if (component != null) {
+            Set<Component> components = new HashSet<Component>();
+            components.add(component);
+            return components;
+        }
+        return Collections.emptySet();
+    }
+
+    /**
+     * Returns a component for the specified agent configuration.
+     * 
+     * @param context
+     *            The Bundle Context
+     * @param manager
+     *            The Dependency manager
+     * @param logService
+     *            The Log Service
+     * @param configuration
+     *            The agent configuration
+     * @return A component, or <code>null</code>
+     * @throws ConfigurationException
+     *             If there is a fatal problem
+     */
+    public abstract Component createComponent(BundleContext context, DependencyManager manager, LogService logService, Map<String, String> configuration) throws ConfigurationException;
+}

Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/packageinfo
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/packageinfo?rev=1479841&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/packageinfo (added)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/packageinfo Tue May  7 10:25:13 2013
@@ -0,0 +1 @@
+version 1.0
\ No newline at end of file