You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ace.apache.org by br...@apache.org on 2013/05/07 12:25:15 UTC
svn commit: r1479841 [3/3] - in /ace/trunk: org.apache.ace.agent.itest/
org.apache.ace.agent.itest/.settings/ org.apache.ace.agent.itest/resources/
org.apache.ace.agent.itest/src/ org.apache.ace.agent.itest/src/org/
org.apache.ace.agent.itest/src/org/a...
Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTask.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTask.java?rev=1479841&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTask.java (added)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTask.java Tue May 7 10:25:13 2013
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.agent.logging;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.net.ConnectException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.ace.connectionfactory.ConnectionFactory;
+import org.apache.ace.discovery.Discovery;
+import org.apache.ace.identification.Identification;
+import org.apache.ace.log.LogDescriptor;
+import org.apache.ace.log.LogEvent;
+import org.apache.ace.log.target.store.LogStore;
+import org.apache.ace.range.RangeIterator;
+import org.apache.ace.range.SortedRangeSet;
+import org.osgi.service.log.LogService;
+
+//FIXME This is a of the org.apache.ace.log it is private and may be better located here.
+
+// TODO there are two versions of this class around, the other ohne being the server.LogSyncTask,
+// and both are fairly similar
+public class LogSyncTask implements Runnable {
+
+ private static final String COMMAND_QUERY = "query";
+ private static final String COMMAND_SEND = "send";
+ private static final String PARAMETER_TARGETID = "tid";
+ private static final String PARAMETER_LOGID = "logid";
+
+ // injected by dependencymanager
+ private volatile Discovery m_discovery;
+ private volatile Identification m_identification;
+ private volatile LogService m_log;
+ private volatile LogStore m_LogStore;
+ private volatile ConnectionFactory m_connectionFactory;
+
+ private final String m_endpoint;
+
+ public LogSyncTask(String endpoint) {
+ m_endpoint = endpoint;
+ }
+
+ /**
+ * Synchronize the log events available remote with the events available locally.
+ */
+ public void run() {
+ URL host = m_discovery.discover();
+
+ if (host == null) {
+ // expected if there's no discovered
+ // ps or relay server
+ m_log.log(LogService.LOG_WARNING, "Unable to synchronize log with remote (endpoint=" + m_endpoint + ") - none available");
+ return;
+ }
+
+ if ("file".equals(host.getProtocol())) {
+ // if the discovery URL is a file, we cannot sync, so we silently return here
+ return;
+ }
+
+ String targetId = m_identification.getID();
+ URLConnection sendConnection = null;
+ try {
+ sendConnection = m_connectionFactory.createConnection(new URL(host, m_endpoint + "/" + COMMAND_SEND));
+ sendConnection.setDoOutput(true);
+ if (sendConnection instanceof HttpURLConnection) {
+ // ACE-294: enable streaming mode causing only small amounts of memory to be
+ // used for this commit. Otherwise, the entire input stream is cached into
+ // memory prior to sending it to the server...
+ ((HttpURLConnection) sendConnection).setChunkedStreamingMode(8192);
+ }
+
+ long[] logIDs = m_LogStore.getLogIDs();
+ for (int i = 0; i < logIDs.length; i++) {
+ URL url = new URL(host, m_endpoint + "/" + COMMAND_QUERY + "?" + PARAMETER_TARGETID + "=" + targetId + "&" + PARAMETER_LOGID + "=" + logIDs[i]);
+
+ URLConnection queryConnection = m_connectionFactory.createConnection(url);
+ // TODO: make sure no actual call is made using sendConnection
+ // when there's nothing to sync
+ synchronizeLog(logIDs[i], queryConnection.getInputStream(), sendConnection);
+ }
+
+ // Make sure to send the actual POST request...
+ sendConnection.getContent();
+ }
+ catch (ConnectException e) {
+ m_log.log(LogService.LOG_WARNING, "Unable to connect to remote (endpoint=" + m_endpoint + ")");
+ }
+ catch (IOException e) {
+ m_log.log(LogService.LOG_ERROR, "Unable to (fully) synchronize log with remote (endpoint=" + m_endpoint + ")", e);
+ }
+ finally {
+ if (sendConnection instanceof HttpURLConnection) {
+ ((HttpURLConnection) sendConnection).disconnect();
+ }
+ }
+ }
+
+ /**
+ * Synchronizes a single log (there can be multiple log/logid's per target).
+ *
+ * @param logID
+ * ID of the log to synchronize.
+ * @param queryInput
+ * Stream pointing to a query result for the events available remotely for this log id
+ * @param sendConnection
+ * .getOutputStream() Stream to write the events to that are missing on the remote side.
+ * @throws java.io.IOException
+ * If synchronization could not be completed due to an I/O failure.
+ */
+ protected void synchronizeLog(long logID, InputStream queryInput, URLConnection sendConnection) throws IOException {
+ long highestLocal = m_LogStore.getHighestID(logID);
+ if (highestLocal == 0) {
+ // No events, no need to synchronize
+ return;
+ }
+
+ SortedRangeSet localRange = new SortedRangeSet("1-" + highestLocal);
+ SortedRangeSet remoteRange = getDescriptor(queryInput).getRangeSet();
+ SortedRangeSet delta = remoteRange.diffDest(localRange);
+ RangeIterator rangeIterator = delta.iterator();
+
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(sendConnection.getOutputStream()));
+
+ if (rangeIterator.hasNext()) {
+ long lowest = rangeIterator.next();
+ long highest = delta.getHigh();
+ if (lowest <= highest) {
+ List events = m_LogStore.get(logID, lowest, highestLocal > highest ? highest : highestLocal);
+ Iterator iter = events.iterator();
+ while (iter.hasNext()) {
+ LogEvent current = (LogEvent) iter.next();
+ while ((current.getID() > lowest) && rangeIterator.hasNext()) {
+ lowest = rangeIterator.next();
+ }
+ if (current.getID() == lowest) {
+ // before we send the LogEvent to the other side, we fill out the
+ // appropriate identification
+ LogEvent event = new LogEvent(m_identification.getID(), current);
+ writer.write(event.toRepresentation() + "\n");
+ }
+ }
+ }
+ }
+
+ writer.flush();
+ }
+
+ /**
+ * Retrieves a LogDescriptor object from the specified stream.
+ *
+ * @param queryInput
+ * Stream containing a LogDescriptor object.
+ * @return LogDescriptor object reflecting the range contained in the stream.
+ * @throws java.io.IOException
+ * If no range could be determined due to an I/O failure.
+ */
+ protected LogDescriptor getDescriptor(InputStream queryInput) throws IOException {
+ BufferedReader queryReader = null;
+ try {
+ queryReader = new BufferedReader(new InputStreamReader(queryInput));
+ String rangeString = queryReader.readLine();
+ if (rangeString != null) {
+ try {
+ return new LogDescriptor(rangeString);
+ }
+ catch (IllegalArgumentException iae) {
+ throw new IOException("Could not determine highest remote event id, received malformed event range (" + rangeString + ")");
+ }
+ }
+ else {
+ throw new IOException("Could not construct LogDescriptor from stream because stream is empty");
+ }
+ }
+ finally {
+ if (queryReader != null) {
+ try {
+ queryReader.close();
+ }
+ catch (Exception ex) {
+ // not much we can do
+ }
+ }
+ }
+ }
+}
Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTaskFactory.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTaskFactory.java?rev=1479841&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTaskFactory.java (added)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/logging/LogSyncTaskFactory.java Tue May 7 10:25:13 2013
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.agent.logging;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.ace.agent.spi.ComponentFactoryBase;
+import org.apache.ace.connectionfactory.ConnectionFactory;
+import org.apache.ace.discovery.Discovery;
+import org.apache.ace.identification.Identification;
+import org.apache.ace.log.target.store.LogStore;
+import org.apache.ace.scheduler.constants.SchedulerConstants;
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyManager;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.log.LogService;
+
+/**
+ * Creates a executor whiteboard {@link Runnable} service components with a {@link LogSyncTask} implementation for every
+ * configured store unless explicitly disabled.
+ *
+ */
+public class LogSyncTaskFactory extends ComponentFactoryBase {
+
+ @Override
+ public Set<Component> createComponents(BundleContext context, DependencyManager manager, LogService logService, Map<String, String> configuration) {
+
+ Set<Component> components = new HashSet<Component>();
+ String value = configuration.get(LogFactory.LOG_STORES);
+ String[] stores = value.split(",");
+ for (String store : stores) {
+
+ String sync = configuration.get(LogFactory.LOG_STORES + "." + store + ".sync");
+ if (sync != null && sync.trim().toLowerCase().equals("false")) {
+ System.err.println("Disabled " + getAgentIdentifier(configuration) + "/" + store);
+ logService.log(LogService.LOG_DEBUG, "Log sync disabled for agent " + getAgentIdentifier(configuration) + "/" + store);
+ }
+ else {
+ components.add(createLogSyncComponent(context, manager, logService, configuration, store.trim()));
+ }
+ }
+ return components;
+ }
+
+ private Component createLogSyncComponent(BundleContext context, DependencyManager manager, LogService logService, Map<String, String> configuration, String store) {
+
+ String schedulerName = getAgentIdentifier(configuration) + "-" + store;
+ String description = "Task that synchronizes log store " + store + " for agent=" + getAgentIdentifier(configuration) + " on the target and server";
+
+ Properties props = getAgentproperties(configuration);
+ props.put(LogFactory.LOG_NAME, store);
+
+ props.put(SchedulerConstants.SCHEDULER_NAME_KEY, LogSyncTask.class.getSimpleName());
+ props.put(SchedulerConstants.SCHEDULER_DESCRIPTION_KEY, description);
+ props.put(SchedulerConstants.SCHEDULER_RECIPE, "2000");
+
+ Component component = manager.createComponent()
+ .setInterface(Runnable.class.getName(), props)
+ .setImplementation(new LogSyncTask(store))
+ .add(manager.createServiceDependency()
+ .setService(ConnectionFactory.class, getAgentFilter(configuration, null))
+ .setRequired(true))
+ .add(manager.createServiceDependency()
+ .setService(LogStore.class, getAgentFilter(configuration, "(" + LogFactory.LOG_NAME + "=" + store + ")"))
+ .setRequired(true))
+ .add(manager.createServiceDependency()
+ .setService(Discovery.class, getAgentFilter(configuration, null))
+ .setRequired(true))
+ .add(manager.createServiceDependency()
+ .setService(Identification.class, getAgentFilter(configuration, null))
+ .setRequired(true))
+ .add(manager.createServiceDependency()
+ .setService(LogService.class).setRequired(false));
+
+ return component;
+ }
+}
Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/packageinfo
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/packageinfo?rev=1479841&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/packageinfo (added)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/packageinfo Tue May 7 10:25:13 2013
@@ -0,0 +1 @@
+version 1.0
\ No newline at end of file
Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactory.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactory.java?rev=1479841&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactory.java (added)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactory.java Tue May 7 10:25:13 2013
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.agent.spi;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyManager;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.log.LogService;
+
+/**
+ * SPI for Management Agent component factories. The create method is called for every individual agent configuration.
+ * The factory can return zero or more components and should not add them to the manager. Factories are create and
+ * disposed as required. must have a public default constructor, and are expected to be state-less and thread-safe.
+ *
+ */
+public interface ComponentFactory {
+
+ /**
+ * Return zero or more service components for the specified agent configuartion.
+ *
+ * @param context
+ * The Bundle Context
+ * @param manager
+ * The Dependency Manager
+ * @param logService
+ * The Log Service
+ * @param configuration
+ * The agent configuration
+ * @return A set of components, not <code>null</code>
+ * @throws ConfigurationException
+ * If there is a fatal problem.
+ */
+ Set<Component> createComponents(BundleContext context, DependencyManager manager, LogService logService, Map<String, String> configuration) throws ConfigurationException;
+}
Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactoryBase.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactoryBase.java?rev=1479841&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactoryBase.java (added)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/ComponentFactoryBase.java Tue May 7 10:25:13 2013
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.agent.spi;
+
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Component factory base class that provides some convenience methods for concrete implementations.
+ */
+public abstract class ComponentFactoryBase implements ComponentFactory {
+
+ /**
+ * Returns the agent identifier from a configuration.
+ *
+ * @param configuration
+ * The configuration
+ * @return The identifier
+ */
+ protected String getAgentIdentifier(Map<String, String> configuration) {
+ return configuration.get("agent");
+ }
+
+ /**
+ * Returns mutable service properties with agent identifier pre-configured.
+ *
+ * @param configuration
+ * The configuration
+ * @return The properties
+ */
+ protected Properties getAgentproperties(Map<String, String> configuration) {
+ Properties properties = new Properties();
+ properties.put("agent", getAgentIdentifier(configuration));
+ return properties;
+ }
+
+ /**
+ * Returns a service filter that scopes to the agent identifier. Optionally wraps a base filter.
+ *
+ * @param configuration
+ * The configuration
+ * @param base
+ * The optional base filter
+ * @return The filter
+ */
+ protected String getAgentFilter(Map<String, String> configuration, String base) {
+ if (base == null) {
+ return "(agent=" + getAgentIdentifier(configuration) + ")";
+ }
+ else {
+ return "(&(agent=" + getAgentIdentifier(configuration) + ")" + base + ")";
+ }
+ }
+}
Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/OneComponentFactoryBase.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/OneComponentFactoryBase.java?rev=1479841&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/OneComponentFactoryBase.java (added)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/OneComponentFactoryBase.java Tue May 7 10:25:13 2013
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.agent.spi;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyManager;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.log.LogService;
+
+/**
+ * Convenience base class for component factories that return just one component.
+ *
+ */
+public abstract class OneComponentFactoryBase extends ComponentFactoryBase {
+
+ @Override
+ public final Set<Component> createComponents(BundleContext context, DependencyManager manager, LogService logService, Map<String, String> configuration) throws ConfigurationException {
+ Component component = createComponent(context, manager, logService, configuration);
+ if (component != null) {
+ Set<Component> components = new HashSet<Component>();
+ components.add(component);
+ return components;
+ }
+ return Collections.emptySet();
+ }
+
+ /**
+ * Returns a component for the specified agent configuration.
+ *
+ * @param context
+ * The Bundle Context
+ * @param manager
+ * The Dependency manager
+ * @param logService
+ * The Log Service
+ * @param configuration
+ * The agent configuration
+ * @return A component, or <code>null</code>
+ * @throws ConfigurationException
+ * If there is a fatal problem
+ */
+ public abstract Component createComponent(BundleContext context, DependencyManager manager, LogService logService, Map<String, String> configuration) throws ConfigurationException;
+}
Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/packageinfo
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/packageinfo?rev=1479841&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/packageinfo (added)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/spi/packageinfo Tue May 7 10:25:13 2013
@@ -0,0 +1 @@
+version 1.0
\ No newline at end of file