You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ace.apache.org by ma...@apache.org on 2013/04/09 10:18:48 UTC
svn commit: r1465924 [2/3] - in /ace/trunk/org.apache.ace.log: ./ src/
src/org/ src/org/apache/ src/org/apache/ace/ src/org/apache/ace/log/
src/org/apache/ace/log/listener/ src/org/apache/ace/log/server/
src/org/apache/ace/log/server/servlet/ src/org/a...
Added: ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/Activator.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/Activator.java?rev=1465924&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/Activator.java (added)
+++ ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/Activator.java Tue Apr 9 08:18:47 2013
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.log.server.store.impl;
+
+import java.io.File;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.ace.log.server.store.LogStore;
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyActivatorBase;
+import org.apache.felix.dm.DependencyManager;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedServiceFactory;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.log.LogService;
+
+public class Activator extends DependencyActivatorBase implements ManagedServiceFactory {
+
+ private static final String LOG_NAME = "name";
+ private DependencyManager m_manager;
+ private final Map<String, Component> m_instances = new HashMap<String, Component>();
+ private BundleContext m_context;
+ private volatile LogService m_log;
+
+ @Override
+ public void init(BundleContext context, DependencyManager manager) throws Exception {
+ m_context = context;
+ m_manager = manager;
+ Properties props = new Properties();
+ props.put(Constants.SERVICE_PID, "org.apache.ace.server.log.store.factory");
+ manager.add(createComponent()
+ .setInterface(ManagedServiceFactory.class.getName(), props)
+ .setImplementation(this)
+ .add(createServiceDependency().setService(LogService.class).setRequired(false)));
+ }
+
+ @Override
+ public void destroy(BundleContext context, DependencyManager manager) throws Exception {
+ }
+
+ public void deleted(String pid) {
+ Component log = m_instances.remove(pid);
+ if (log != null) {
+ m_manager.remove(log);
+ delete(new File(m_context.getDataFile(""), pid));
+ }
+ }
+
+ private void delete(File root) {
+ if (root.isDirectory()) {
+ for (File file : root.listFiles()) {
+ delete(file);
+ }
+ }
+ root.delete();
+ }
+
+ public String getName() {
+ return "Log Store Factory";
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized void updated(String pid, Dictionary dict) throws ConfigurationException {
+ String name = (String) dict.get(LOG_NAME);
+ if ((name == null) || "".equals(name)) {
+ throw new ConfigurationException(LOG_NAME, "Log name has to be specified.");
+ }
+
+ Component service = m_instances.get(pid);
+ if (service == null) {
+ Properties props = new Properties();
+ props.put(LOG_NAME, name);
+ File baseDir = new File(m_context.getDataFile(""), pid);
+ service = m_manager.createComponent()
+ .setInterface(LogStore.class.getName(), props)
+ .setImplementation(new LogStoreImpl(baseDir, name))
+ .add(createServiceDependency().setService(EventAdmin.class).setRequired(false));
+ m_instances.put(pid, service);
+ m_manager.add(service);
+ } else {
+ m_log.log(LogService.LOG_INFO, "Ignoring configuration update because factory instance was already configured: " + name);
+ }
+ }
+}
\ No newline at end of file
Added: ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java?rev=1465924&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java (added)
+++ ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java Tue Apr 9 08:18:47 2013
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.log.server.store.impl;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ace.log.LogDescriptor;
+import org.apache.ace.log.LogEvent;
+import org.apache.ace.log.server.store.LogStore;
+import org.apache.ace.range.Range;
+import org.apache.ace.range.SortedRangeSet;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+/**
+ * A simple implementation of the LogStore interface.
+ */
+public class LogStoreImpl implements LogStore {
+
+ private volatile EventAdmin m_eventAdmin; /* Injected by dependency manager */
+
+ // the dir to store logs in - init is in the start method
+ private final File m_dir;
+ private final String m_name;
+
+ public LogStoreImpl(File baseDir, String name) {
+ m_name = name;
+ m_dir = new File(baseDir, "store");
+ }
+
+ /*
+ * init the dir in which to store logs in - thows IllegalArgumentException
+ * if we can't get it.
+ */
+ protected void start() throws IOException {
+ if (!m_dir.isDirectory() && !m_dir.mkdirs()) {
+ throw new IllegalArgumentException("Need valid dir");
+ }
+ }
+
+ /**
+ * @see org.apache.ace.log.server.store.LogStore#get(org.apache.ace.log.LogDescriptor)
+ */
+ public synchronized List<LogEvent> get(LogDescriptor descriptor)
+ throws IOException {
+ final List<LogEvent> result = new ArrayList<LogEvent>();
+ final SortedRangeSet set = descriptor.getRangeSet();
+ BufferedReader in = null;
+ try {
+ File log = new File(new File(m_dir,
+ targetIDToFilename(descriptor.getTargetID())),
+ String.valueOf(descriptor.getLogID()));
+ if (!log.isFile()) {
+ return result;
+ }
+ in = new BufferedReader(new FileReader(log));
+ String file = log.getAbsolutePath();
+ long counter = 0;
+ for (String line = in.readLine(); line != null; line = in
+ .readLine()) {
+ LogEvent event = new LogEvent(line);
+ long id = event.getID();
+ if ((counter != -1) && ++counter == id) {
+
+ } else {
+ counter = -1;
+ }
+ if (set.contains(id)) {
+ result.add(event);
+ }
+ }
+ if (counter < 1) {
+ m_fileToID.remove(file);
+ } else {
+ m_fileToID.put(file, counter);
+ }
+ }
+ finally {
+ if (in != null) {
+ try {
+ in.close();
+ }
+ catch (Exception ex) {
+ // Not much we can do
+ }
+ }
+ }
+ return result;
+ }
+
+ private final Map<String, Long> m_fileToID = new HashMap<String, Long>();
+
+ /**
+ * @see org.apache.ace.log.server.store.LogStore#getDescriptor(String, long)
+ */
+ public synchronized LogDescriptor getDescriptor(String targetID, long logID)
+ throws IOException {
+ Long high = m_fileToID.get(new File(new File(m_dir,
+ targetIDToFilename(targetID)), String.valueOf(logID))
+ .getAbsolutePath());
+ if (high != null) {
+ Range r = new Range(1, high);
+ return new LogDescriptor(targetID, logID, new SortedRangeSet(
+ r.toRepresentation()));
+ }
+ List<LogEvent> events = get(new LogDescriptor(targetID, logID,
+ SortedRangeSet.FULL_SET));
+
+ long[] idsArray = new long[events.size()];
+ int i = 0;
+ for (LogEvent e : events) {
+ idsArray[i++] = e.getID();
+ }
+ return new LogDescriptor(targetID, logID, new SortedRangeSet(idsArray));
+ }
+
+ /**
+ * @see org.apache.ace.log.server.store.LogStore#getDescriptors(String)
+ */
+ public List<LogDescriptor> getDescriptors(String targetID)
+ throws IOException {
+ File dir = new File(m_dir, targetIDToFilename(targetID));
+ List<LogDescriptor> result = new ArrayList<LogDescriptor>();
+ if (!dir.isDirectory()) {
+ return result;
+ }
+
+ for (String name : notNull(dir.list())) {
+ result.add(getDescriptor(targetID, Long.parseLong(name)));
+ }
+
+ return result;
+ }
+
+ /**
+ * @see org.apache.ace.log.server.store.LogStore#getDescriptors()
+ */
+ public List<LogDescriptor> getDescriptors() throws IOException {
+ List<LogDescriptor> result = new ArrayList<LogDescriptor>();
+ for (String name : notNull(m_dir.list())) {
+ result.addAll(getDescriptors(filenameToTargetID(name)));
+ }
+ return result;
+ }
+
+ /**
+ * @see org.apache.ace.log.server.store.LogStore#put(java.util.List)
+ */
+ public void put(List<LogEvent> events) throws IOException {
+ Map<String, Map<Long, List<LogEvent>>> sorted = sort(events);
+ for (String targetID : sorted.keySet()) {
+ for (Long logID : sorted.get(targetID).keySet()) {
+ put(targetID, logID, sorted.get(targetID).get(logID));
+ }
+ }
+ }
+
+ /**
+ * Add a list of events to the log of the given ids.
+ *
+ * @param targetID
+ * the id of the target to append to its log.
+ * @param logID
+ * the id of the given target log.
+ * @param list
+ * a list of events to store.
+ * @throws java.io.IOException
+ * in case of any error.
+ */
+ protected synchronized void put(String targetID, Long logID,
+ List<LogEvent> list) throws IOException {
+ if ((list == null) || (list.size() == 0)) {
+ // nothing to add, so return
+ return;
+ }
+ // we actually need to distinguish between two scenarios here:
+ // 1. we can append events at the end of the existing file
+ // 2. we need to insert events in the existing file (meaning we have to
+ // rewrite basically the whole file)
+ String file = new File(new File(m_dir, targetIDToFilename(targetID)),
+ String.valueOf(logID)).getAbsolutePath();
+ Long highest = m_fileToID.get(file);
+ boolean cached = false;
+ if (highest != null) {
+ if (highest.longValue() + 1 == list.get(0).getID()) {
+ cached = true;
+ }
+ }
+ List<LogEvent> events = null;
+ if (!cached) {
+ events = get(new LogDescriptor(targetID, logID,
+ SortedRangeSet.FULL_SET));
+
+ // remove duplicates first
+ list.removeAll(events);
+ }
+
+ if (list.size() == 0) {
+ // nothing to add anymore, so return
+ return;
+ }
+
+ PrintWriter out = null;
+ try {
+ File dir = new File(m_dir, targetIDToFilename(targetID));
+ if (!dir.isDirectory() && !dir.mkdirs()) {
+ throw new IOException("Unable to create backup store.");
+ }
+ if (cached
+ || ((events.size() == 0) || (events.get(events.size() - 1)
+ .getID() < list.get(0).getID()))) {
+ // we can append to the existing file
+ out = new PrintWriter(new FileWriter(new File(dir,
+ logID.toString()), true));
+ } else {
+ // we have to merge the lists
+ list.addAll(events);
+ // and sort
+ Collections.sort(list);
+ out = new PrintWriter(new FileWriter(new File(dir,
+ logID.toString())));
+ }
+ long high = 0;
+ for (LogEvent event : list) {
+ out.println(event.toRepresentation());
+ if (high < event.getID()) {
+ high = event.getID();
+ } else {
+ high = Long.MAX_VALUE;
+ }
+ // send (eventadmin)event about a new (log)event being stored
+ Dictionary props = new Hashtable();
+ props.put(LogStore.EVENT_PROP_LOGNAME, m_name);
+ props.put(LogStore.EVENT_PROP_LOG_EVENT, event);
+ m_eventAdmin.postEvent(new Event(LogStore.EVENT_TOPIC, props));
+ }
+ if ((cached) && (high < Long.MAX_VALUE)) {
+ m_fileToID.put(file, new Long(high));
+ } else {
+ m_fileToID.remove(file);
+ }
+ }
+ finally {
+ try {
+ out.close();
+ }
+ catch (Exception ex) {
+ // Not much we can do
+ }
+ }
+ }
+
+ /**
+ * Sort the given list of events into a map of maps according to the
+ * targetID and the logID of each event.
+ *
+ * @param events
+ * a list of events to sort.
+ * @return a map of maps that maps target ids to a map that maps log ids to
+ * a list of events that have those ids.
+ */
+ @SuppressWarnings("boxing")
+ protected Map<String, Map<Long, List<LogEvent>>> sort(List<LogEvent> events) {
+ Map<String, Map<Long, List<LogEvent>>> result = new HashMap<String, Map<Long, List<LogEvent>>>();
+ for (LogEvent event : events) {
+ Map<Long, List<LogEvent>> target = result
+ .get(event.getTargetID());
+
+ if (target == null) {
+ target = new HashMap<Long, List<LogEvent>>();
+ result.put(event.getTargetID(), target);
+ }
+
+ List<LogEvent> list = target.get(event.getLogID());
+ if (list == null) {
+ list = new ArrayList<LogEvent>();
+ target.put(event.getLogID(), list);
+ }
+
+ list.add(event);
+ }
+ return result;
+ }
+
+ /*
+ * throw IOException in case the target is null else return the target.
+ */
+ private <T> T notNull(T target) throws IOException {
+ if (target == null) {
+ throw new IOException(
+ "Unknown IO error while trying to access the store.");
+ }
+ return target;
+ }
+
+ private static String filenameToTargetID(String filename) {
+ byte[] bytes = new byte[filename.length() / 2];
+ for (int i = 0; i < (filename.length() / 2); i++) {
+ String hexValue = filename.substring(i * 2, (i + 1) * 2);
+ bytes[i] = Byte.parseByte(hexValue, 16);
+ }
+
+ String result = null;
+ try {
+ result = new String(bytes, "UTF-8");
+ }
+ catch (UnsupportedEncodingException e) {
+ // UTF-8 is a mandatory encoding; this will never happen.
+ }
+ return result;
+ }
+
+ private static String targetIDToFilename(String targetID) {
+ StringBuilder result = new StringBuilder();
+
+ try {
+ for (Byte b : targetID.getBytes("UTF-8")) {
+ String hexValue = Integer.toHexString(b.intValue());
+ if (hexValue.length() % 2 == 0) {
+ result.append(hexValue);
+ } else {
+ result.append('0').append(hexValue);
+ }
+ }
+ }
+ catch (UnsupportedEncodingException e) {
+ // UTF-8 is a mandatory encoding; this will never happen.
+ }
+
+ return result.toString();
+ }
+}
\ No newline at end of file
Added: ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/mongo/Activator.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/mongo/Activator.java?rev=1465924&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/mongo/Activator.java (added)
+++ ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/mongo/Activator.java Tue Apr 9 08:18:47 2013
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.log.server.store.mongo;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.amdatu.mongo.MongoDBService;
+import org.apache.ace.log.server.store.LogStore;
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyActivatorBase;
+import org.apache.felix.dm.DependencyManager;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedServiceFactory;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.log.LogService;
+
+public class Activator extends DependencyActivatorBase implements ManagedServiceFactory {
+
+ private static final String LOG_NAME = "name";
+ private DependencyManager m_manager;
+ private final Map<String, Component> m_instances = new HashMap<String, Component>();
+ private BundleContext m_context;
+ private volatile LogService m_log;
+
+ @Override
+ public void init(BundleContext context, DependencyManager manager) throws Exception {
+ m_context = context;
+ m_manager = manager;
+ Properties props = new Properties();
+ props.put(Constants.SERVICE_PID, "org.apache.ace.server.log.store.factory");
+ manager.add(createComponent()
+ .setInterface(ManagedServiceFactory.class.getName(), props)
+ .setImplementation(this)
+ .add(createServiceDependency().setService(LogService.class).setRequired(false)));
+ }
+
+ @Override
+ public void destroy(BundleContext context, DependencyManager manager) throws Exception {
+ }
+
+ @Override
+ public void deleted(String pid) {
+ Component log = m_instances.remove(pid);
+ if (log != null) {
+ m_manager.remove(log);
+ }
+ }
+
+ public String getName() {
+ return "Log Store Factory";
+ }
+
+ public synchronized void updated(String pid, @SuppressWarnings("rawtypes") Dictionary dict) throws ConfigurationException {
+ String name = (String) dict.get(LOG_NAME);
+ if ((name == null) || "".equals(name)) {
+ throw new ConfigurationException(LOG_NAME, "Log name has to be specified.");
+ }
+
+ Component service = m_instances.get(pid);
+ if (service == null) {
+ Properties props = new Properties();
+ props.put(LOG_NAME, name);
+ service = m_manager.createComponent()
+ .setInterface(LogStore.class.getName(), props)
+ .setImplementation(new MongoLogStore(name))
+ .add(createServiceDependency().setService(EventAdmin.class).setRequired(false))
+ .add(createServiceDependency().setService(MongoDBService.class).setRequired(true));
+ m_instances.put(pid, service);
+ m_manager.add(service);
+ } else {
+ m_log.log(LogService.LOG_INFO, "Ignoring configuration update because factory instance was already configured: " + name);
+ }
+ }
+}
\ No newline at end of file
Added: ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/mongo/MongoLogStore.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/mongo/MongoLogStore.java?rev=1465924&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/mongo/MongoLogStore.java (added)
+++ ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/mongo/MongoLogStore.java Tue Apr 9 08:18:47 2013
@@ -0,0 +1,145 @@
+package org.apache.ace.log.server.store.mongo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.amdatu.mongo.MongoDBService;
+import org.apache.ace.log.LogDescriptor;
+import org.apache.ace.log.LogEvent;
+import org.apache.ace.log.server.store.LogStore;
+import org.apache.ace.range.Range;
+import org.apache.ace.range.SortedRangeSet;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+import com.mongodb.MapReduceCommand.OutputType;
+import com.mongodb.MapReduceOutput;
+
+public class MongoLogStore implements LogStore {
+ private final String m_logname;
+ private volatile MongoDBService m_mongoDBService;
+
+ public MongoLogStore(String logname) {
+ this.m_logname = logname;
+ }
+
+ @Override
+ public List<LogEvent> get(LogDescriptor range) throws IOException {
+ DBCollection collection = m_mongoDBService.getDB().getCollection(m_logname);
+ long high = range.getRangeSet().getHigh();
+
+ BasicDBObject filter = new BasicDBObject().append("targetId",
+ range.getTargetID()).append("logId", range.getLogID());
+ if (high > 0) {
+ filter.append("id", new BasicDBObject("$lte", high));
+ }
+
+ DBCursor cursor = collection.find(filter);
+ cursor.sort(new BasicDBObject("id", 1));
+
+ List<LogEvent> logevents = new ArrayList<LogEvent>();
+ while (cursor.hasNext()) {
+ DBObject event = cursor.next();
+ String targetId = (String) event.get("targetId");
+ long logId = (Long) event.get("logId");
+ long id = (Long) event.get("id");
+ long time = (Long) event.get("time");
+ int type = (Integer) event.get("type");
+ Properties properties = new Properties();
+ DBObject propertiesDbObject = (DBObject) event.get("properties");
+ for (String key : propertiesDbObject.keySet()) {
+ properties.put(key, propertiesDbObject.get(key));
+ }
+
+ logevents.add(new LogEvent(targetId, logId, id, time, type,
+ properties));
+ }
+
+ return logevents;
+ }
+
+ @Override
+ public LogDescriptor getDescriptor(String targetID, long logID)
+ throws IOException {
+
+ DBCollection collection = m_mongoDBService.getDB().getCollection(m_logname);
+
+ BasicDBObject filter = new BasicDBObject().append("targetId", targetID)
+ .append("logId", logID);
+
+ DBCursor cursor = collection.find(filter);
+ cursor.sort(new BasicDBObject("id", -1));
+
+ long high = 1;
+ if (cursor.hasNext()) {
+ DBObject row = cursor.next();
+ high = (Long) row.get("id");
+ return new LogDescriptor(targetID, logID, new SortedRangeSet(
+ new Range(1, high).toRepresentation()));
+ } else {
+ return new LogDescriptor(targetID, logID, SortedRangeSet.FULL_SET);
+ }
+ }
+
+ @Override
+ public void put(List<LogEvent> events) throws IOException {
+ DBCollection collection = m_mongoDBService.getDB().getCollection(m_logname);
+
+ for (LogEvent event : events) {
+ DBObject dbObject = new BasicDBObject()
+ .append("targetId", event.getTargetID())
+ .append("logId", event.getLogID())
+ .append("id", event.getID())
+ .append("time", event.getTime())
+ .append("type", event.getType())
+ .append("properties", event.getProperties());
+
+ collection.save(dbObject);
+ }
+ }
+
+ @Override
+ public List<LogDescriptor> getDescriptors(String targetID)
+ throws IOException {
+
+ DBCollection collection = m_mongoDBService.getDB().getCollection(m_logname);
+ String m = "function() {emit(this.targetId,this.logId);}";
+ String r = "function(k, vals) {var result = {target: k, logIds: []}; vals.forEach(function(value) { result.logIds.push(value)}); return result;}";
+ DBObject filter = new BasicDBObject();
+ if(targetID != null) {
+ filter.put("targetId", targetID);
+ }
+ MapReduceOutput mapReduce = collection.mapReduce(m, r, null, OutputType.INLINE, filter);
+ Iterator<DBObject> iterator = mapReduce.results().iterator();
+
+ List<LogDescriptor> descriptors = new ArrayList<LogDescriptor>();
+ while(iterator.hasNext()) {
+ DBObject row = iterator.next();
+ DBObject value = (DBObject)row.get("value");
+ String targetId = (String)value.get("target");
+ @SuppressWarnings("unchecked")
+ List<Long> logIds = (List<Long>)value.get("logIds");
+ Set<Long> logIdsFiltered = new HashSet<Long>();
+ logIdsFiltered.addAll(logIds);
+
+ for (long logId : logIdsFiltered) {
+ descriptors.add(getDescriptor(targetId, logId));
+ }
+ }
+
+ return descriptors;
+ }
+
+ @Override
+ public List<LogDescriptor> getDescriptors() throws IOException {
+ return getDescriptors(null);
+ }
+
+}
Added: ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/packageinfo
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/packageinfo?rev=1465924&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/packageinfo (added)
+++ ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/packageinfo Tue Apr 9 08:18:47 2013
@@ -0,0 +1 @@
+version 1.0
\ No newline at end of file
Added: ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/Activator.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/Activator.java?rev=1465924&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/Activator.java (added)
+++ ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/Activator.java Tue Apr 9 08:18:47 2013
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.log.server.task;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.ace.connectionfactory.ConnectionFactory;
+import org.apache.ace.discovery.Discovery;
+import org.apache.ace.log.LogSync;
+import org.apache.ace.log.server.store.LogStore;
+import org.apache.ace.log.server.task.LogSyncTask.Mode;
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyActivatorBase;
+import org.apache.felix.dm.DependencyManager;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedServiceFactory;
+import org.osgi.service.log.LogService;
+
+public class Activator extends DependencyActivatorBase implements ManagedServiceFactory {
+ private static final String KEY_LOG_NAME = "name";
+ private static final String KEY_MODE = "mode";
+
+ private final Map<String, Component> m_instances = new HashMap<String, Component>();
+ private volatile DependencyManager m_manager;
+
+ @Override
+ public void init(BundleContext context, DependencyManager manager) throws Exception {
+ m_manager = manager;
+ Properties props = new Properties();
+ props.put(Constants.SERVICE_PID, "org.apache.ace.server.log.task.factory");
+ manager.add(createComponent()
+ .setInterface(ManagedServiceFactory.class.getName(), props)
+ .setImplementation(this)
+ );
+ }
+
+ @Override
+ public void destroy(BundleContext context, DependencyManager manager) throws Exception {
+ }
+
+ public String getName() {
+ return "Log Sync Task Factory";
+ }
+
+ public synchronized void updated(String pid, Dictionary dict) throws ConfigurationException {
+ String name = (String) dict.get(KEY_LOG_NAME);
+ if ((name == null) || "".equals(name)) {
+ throw new ConfigurationException(KEY_LOG_NAME, "Log name has to be specified.");
+ }
+ Mode mode = Mode.PUSH;
+ String modeValue = (String) dict.get(KEY_MODE);
+ if ("pull".equals(modeValue)) {
+ mode = Mode.PULL;
+ }
+ else if ("pushpull".equals(modeValue)) {
+ mode = Mode.PUSHPULL;
+ }
+
+ Component oldComponent, newComponent;
+
+ Properties props = new Properties();
+ props.put(KEY_LOG_NAME, name);
+ props.put("taskName", LogSyncTask.class.getName());
+ props.put("description", "Syncs log (name=" + name + ", mode=" + mode.toString() + ") with a server.");
+ String filter = "(&(" + Constants.OBJECTCLASS + "=" + LogStore.class.getName() + ")(name=" + name + "))";
+ LogSyncTask service = new LogSyncTask(name, name, mode);
+ newComponent = m_manager.createComponent()
+ .setInterface(new String[] { Runnable.class.getName(), LogSync.class.getName() }, props)
+ .setImplementation(service)
+ .add(createServiceDependency().setService(ConnectionFactory.class).setRequired(true))
+ .add(createServiceDependency().setService(LogStore.class, filter).setRequired(true))
+ .add(createServiceDependency().setService(Discovery.class).setRequired(true))
+ .add(createServiceDependency().setService(LogService.class).setRequired(false));
+
+ synchronized (m_instances) {
+ oldComponent = m_instances.put(pid, newComponent);
+ }
+ if (oldComponent != null) {
+ m_manager.remove(oldComponent);
+ }
+ m_manager.add(newComponent);
+ }
+
+ public void deleted(String pid) {
+ Component component;
+ synchronized (m_instances) {
+ component = m_instances.remove(pid);
+ }
+ if (component != null) {
+ m_manager.remove(component);
+ }
+ }
+}
Added: ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/LogSyncTask.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/LogSyncTask.java?rev=1465924&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/LogSyncTask.java (added)
+++ ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/task/LogSyncTask.java Tue Apr 9 08:18:47 2013
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.log.server.task;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.ace.connectionfactory.ConnectionFactory;
+import org.apache.ace.discovery.Discovery;
+import org.apache.ace.log.LogDescriptor;
+import org.apache.ace.log.LogEvent;
+import org.apache.ace.log.LogSync;
+import org.apache.ace.log.server.store.LogStore;
+import org.apache.ace.range.SortedRangeSet;
+import org.osgi.service.log.LogService;
+
+public class LogSyncTask implements Runnable, LogSync {
+
+ private static final String COMMAND_QUERY = "query";
+ private static final String COMMAND_SEND = "send";
+ private static final String COMMAND_RECEIVE = "receive";
+
+ private static final String TARGETID_KEY = "tid";
+ private static final String FILTER_KEY = "filter";
+ private static final String LOGID_KEY = "logid";
+ private static final String RANGE_KEY = "range";
+
+ // injected by dependencymanager
+ private volatile Discovery m_discovery;
+ private volatile LogService m_log;
+ private volatile LogStore m_logStore;
+ private volatile ConnectionFactory m_connectionFactory;
+
+ private final String m_endpoint;
+ private final String m_name;
+ private final Mode m_mode;
+
+ public static enum Mode { PUSH, PULL, PUSHPULL };
+
+ public LogSyncTask(String endpoint, String name, Mode mode) {
+ m_endpoint = endpoint;
+ m_name = name;
+ m_mode = mode;
+ }
+
+ public void run() {
+ try {
+ switch (m_mode) {
+ case PULL:
+ pull();
+ break;
+ case PUSH:
+ push();
+ break;
+ case PUSHPULL:
+ pushpull();
+ break;
+ }
+ }
+ catch (MalformedURLException e) {
+ m_log.log(LogService.LOG_ERROR, "Unable to (" + m_mode.toString() + ") synchronize log (name=" + m_name + ") with remote");
+ }
+ catch (IOException e) {
+ m_log.log(LogService.LOG_ERROR, "Unable to (" + m_mode.toString() + ") synchronize log (name=" + m_name + ") with remote", e);
+ }
+ }
+
+ public boolean pull() throws IOException {
+ return synchronize(false, true);
+ }
+
+ public boolean push() throws IOException {
+ return synchronize(true, false);
+ }
+
+ public boolean pushpull() throws IOException {
+ return synchronize(true, true);
+ }
+
+ /**
+ * Synchronizes the local store with the discovered remote one.
+ * @throws java.io.IOException
+ */
+ private boolean synchronize(boolean push, boolean pull) throws IOException {
+ URL host = m_discovery.discover();
+
+ URLConnection queryConnection = m_connectionFactory.createConnection(new URL(host, m_endpoint + "/" + COMMAND_QUERY));
+ InputStream queryInput = queryConnection.getInputStream();
+
+ List<LogDescriptor> localRanges = m_logStore.getDescriptors();
+ List<LogDescriptor> remoteRanges = getRanges(queryInput);
+
+ boolean result = false;
+ if (push) {
+ result |= doPush(host, localRanges, remoteRanges);
+ }
+ if (pull) {
+ result |= doPull(host, localRanges, remoteRanges);
+ }
+ return result;
+ }
+
+ protected boolean doPush(URL host, List<LogDescriptor> localRanges, List<LogDescriptor> remoteRanges) {
+ boolean result = false;
+ OutputStream sendOutput = null;
+ try {
+ URLConnection sendConnection = m_connectionFactory.createConnection(new URL(host, m_endpoint + "/" + COMMAND_SEND));
+
+ if (sendConnection instanceof HttpURLConnection) {
+ // ACE-294: enable streaming mode causing only small amounts of memory to be
+ // used for this commit. Otherwise, the entire input stream is cached into
+ // memory prior to sending it to the server...
+ ((HttpURLConnection) sendConnection).setChunkedStreamingMode(8192);
+ }
+ sendConnection.setDoOutput(true);
+
+ sendOutput = sendConnection.getOutputStream();
+
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(sendOutput));
+ List<LogDescriptor> delta = calculateDelta(localRanges, remoteRanges);
+ result = !delta.isEmpty();
+ writeDelta(delta, writer);
+
+ sendOutput.flush();
+ sendOutput.close();
+
+ if (sendConnection instanceof HttpURLConnection) {
+ HttpURLConnection conn = (HttpURLConnection) sendConnection;
+ conn.getContent();
+ conn.disconnect();
+ }
+ }
+ catch (IOException e) {
+ m_log.log(LogService.LOG_ERROR, "Unable to (fully) synchronize log with remote", e);
+ }
+ finally {
+ if (sendOutput != null) {
+ try {
+ sendOutput.close();
+ }
+ catch (Exception ex) {
+ // not much we can do
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Writes the difference between local and remote to a writer.
+ * @param descriptors A list of LogDescriptors that identifies all local log entries that need to be written.
+ * @param writer A writer to write to.
+ * @throws java.io.IOException
+ */
+ protected void writeDelta(List<LogDescriptor> descriptors, Writer writer) throws IOException {
+ for (LogDescriptor l : descriptors) {
+ writeLogDescriptor(l, writer);
+ }
+ }
+
+ /**
+ * Writes the LogEvents described by the descriptor to the writer.
+ * @param descriptor A LogDescriptor that identifies the events to be written.
+ * @param writer A writer to write the events to.
+ * @throws java.io.IOException Thrown when either the writer goes wrong, or there is a problem
+ * communicating with the local log store.
+ */
+ protected void writeLogDescriptor(LogDescriptor descriptor, Writer writer) throws IOException {
+ List<LogEvent> events = m_logStore.get(descriptor);
+ for (LogEvent event : events) {
+ writer.write(event.toRepresentation() + "\n");
+ }
+ writer.flush();
+ }
+
+ protected boolean doPull(URL host, List<LogDescriptor> localRanges, List<LogDescriptor> remoteRanges) {
+ boolean result = false;
+ List<LogDescriptor> delta = calculateDelta(remoteRanges, localRanges);
+ result = !delta.isEmpty();
+ for (LogDescriptor l : delta) {
+ try {
+ /*
+ * The request currently contains a range. This is not yet supported by the servlet, but it will
+ * simply be ignored.
+ */
+ URL url = new URL(host, m_endpoint + "/" + COMMAND_RECEIVE + "?" + TARGETID_KEY + "=" + l.getTargetID() + "&" + LOGID_KEY + "=" + l.getLogID() + "&" + RANGE_KEY + "=" + l.getRangeSet().toRepresentation());
+
+ URLConnection receiveConnection = m_connectionFactory.createConnection(url);
+ InputStream receiveInput = receiveConnection.getInputStream();
+
+ BufferedReader reader = new BufferedReader(new InputStreamReader(receiveInput));
+ readLogs(reader);
+
+ if (receiveConnection instanceof HttpURLConnection) {
+ HttpURLConnection conn = (HttpURLConnection) receiveConnection;
+ conn.getContent();
+ conn.disconnect();
+ }
+ }
+ catch (IOException e) {
+ m_log.log(LogService.LOG_ERROR, "Unable to connect to retrieve log events.", e);
+ }
+ }
+ return result;
+ }
+
+ protected void readLogs(BufferedReader reader) {
+ try {
+ List<LogEvent> events = new ArrayList<LogEvent>();
+
+ String eventString = null;
+ while ((eventString = reader.readLine()) != null) {
+ try {
+ LogEvent event = new LogEvent(eventString);
+ events.add(event);
+ }
+ catch (IllegalArgumentException e) {
+ // Just skip this one.
+ }
+ }
+ m_logStore.put(events);
+ }
+ catch (IOException e) {
+ m_log.log(LogService.LOG_DEBUG, "Error reading line from reader", e);
+ }
+
+ }
+
+ /**
+ * Calculates the difference between two lists of <code>LogDescriptor</code>. The result will contain whatever is
+ * not in <code>destination</code>, but is in <code>source</code>.
+ */
+ protected List<LogDescriptor> calculateDelta(List<LogDescriptor> source, List<LogDescriptor> destination) {
+ /*
+ * For each local descriptor, we try to find a matching remote one. If so, we will synchronize all events
+ * that the remote does not have. If we do not find a matching one at all, we send the complete local
+ * log.
+ */
+ List<LogDescriptor> result = new ArrayList<LogDescriptor>();
+ for (LogDescriptor s : source) {
+ LogDescriptor diffs = s;
+ for (LogDescriptor d : destination) {
+ if ((s.getLogID() == d.getLogID()) && (s.getTargetID().equals(d.getTargetID()))) {
+ SortedRangeSet rangeDiff = d.getRangeSet().diffDest(s.getRangeSet());
+ if (!isEmptyRangeSet(rangeDiff)) {
+ diffs = new LogDescriptor(s.getTargetID(), s.getLogID(), rangeDiff);
+ }
+ else {
+ diffs = null;
+ }
+ }
+ }
+ if (diffs != null) {
+ result.add(diffs);
+ }
+ }
+ return result;
+ }
+
+ private boolean isEmptyRangeSet(SortedRangeSet set) {
+ return !set.iterator().hasNext();
+ }
+
+ protected List<LogDescriptor> getRanges(InputStream stream) throws IOException {
+ List<LogDescriptor> result = new ArrayList<LogDescriptor>();
+ BufferedReader queryReader = null;
+ try {
+ queryReader = new BufferedReader(new InputStreamReader(stream));
+
+ for (String line = queryReader.readLine(); line != null; line = queryReader.readLine()) {
+ try {
+ result.add(new LogDescriptor(line));
+ }
+ catch (IllegalArgumentException iae) {
+ throw new IOException("Could not determine highest remote event id, received malformed event range: " + line);
+ }
+ }
+ }
+ finally {
+ if (queryReader != null) {
+ try {
+ queryReader.close();
+ }
+ catch (Exception ex) {
+ // not much we can do
+ }
+ }
+ }
+ return result;
+
+ }
+
+ public String getName() {
+ return m_name;
+ }
+}
\ No newline at end of file
Added: ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/Activator.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/Activator.java?rev=1465924&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/Activator.java (added)
+++ ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/Activator.java Tue Apr 9 08:18:47 2013
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.log.target;
+
+import java.util.Properties;
+
+import org.apache.felix.dm.DependencyActivatorBase;
+import org.apache.felix.dm.DependencyManager;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.service.cm.ManagedServiceFactory;
+import org.osgi.service.log.LogService;
+
+public class Activator extends DependencyActivatorBase {
+ public void init(BundleContext context, DependencyManager manager) throws Exception {
+ Properties props;
+
+ props = new Properties();
+ props.put(Constants.SERVICE_PID, "org.apache.ace.target.log.factory");
+ manager.add(createComponent()
+ .setInterface(ManagedServiceFactory.class.getName(), props)
+ .setImplementation(LogConfigurator.class)
+ .add(createServiceDependency().setService(LogService.class).setRequired(false)));
+
+ props = new Properties();
+ props.put(Constants.SERVICE_PID, "org.apache.ace.target.log.sync.factory");
+ manager.add(createComponent()
+ .setInterface(ManagedServiceFactory.class.getName(), props)
+ .setImplementation(LogSyncConfigurator.class)
+ .add(createServiceDependency().setService(LogService.class).setRequired(false)));
+ }
+
+ public void destroy(BundleContext context, DependencyManager manager) throws Exception {
+ }
+}
Added: ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/LogConfigurator.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/LogConfigurator.java?rev=1465924&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/LogConfigurator.java (added)
+++ ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/LogConfigurator.java Tue Apr 9 08:18:47 2013
@@ -0,0 +1,62 @@
+package org.apache.ace.log.target;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.ace.log.Log;
+import org.apache.ace.log.target.store.LogStore;
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyManager;
+import org.osgi.framework.Constants;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedServiceFactory;
+import org.osgi.service.log.LogService;
+
+public class LogConfigurator implements ManagedServiceFactory {
+ private static final String LOG_NAME = "name";
+
+ private DependencyManager m_manager;
+ private final Map /*<String, Component>*/ m_logInstances = new HashMap();
+ private volatile LogService m_log;
+
+ public String getName() {
+ return "Log Factory";
+ }
+
+ public synchronized void deleted(String pid) {
+ Component log = (Component) m_logInstances.remove(pid);
+ if (log != null) {
+ m_manager.remove(log);
+ }
+ }
+
+ public synchronized void updated(String pid, Dictionary dict) throws ConfigurationException {
+ String name = (String) dict.get(LOG_NAME);
+ if ((name == null) || "".equals(name)) {
+ throw new ConfigurationException(LOG_NAME, "Log name has to be specified.");
+ }
+
+ Component service = (Component) m_logInstances.get(pid);
+ if (service == null) {
+ // publish log service
+ Properties props = new Properties();
+ props.put(LOG_NAME, name);
+ String filterString;
+ filterString = "(&(" + Constants.OBJECTCLASS + "=" + LogStore.class.getName() + ")(name=" + name + "))";
+
+ Component log = m_manager.createComponent()
+ .setInterface(Log.class.getName(), props)
+ .setImplementation(LogImpl.class)
+ .add(m_manager.createServiceDependency().setService(LogStore.class, filterString).setRequired(true))
+ .add(m_manager.createServiceDependency().setService(LogService.class).setRequired(false));
+
+ m_logInstances.put(pid, log);
+ m_manager.add(log);
+ }
+ else {
+ m_log.log(LogService.LOG_INFO, "Ignoring configuration update because factory instance was already configured: " + name);
+ }
+ }
+}
Added: ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/LogImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/LogImpl.java?rev=1465924&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/LogImpl.java (added)
+++ ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/LogImpl.java Tue Apr 9 08:18:47 2013
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.log.target;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import org.apache.ace.log.Log;
+import org.apache.ace.log.LogEvent;
+import org.apache.ace.log.target.store.LogStore;
+import org.osgi.service.log.LogService;
+
+public class LogImpl implements Log {
+ private volatile LogStore m_store;
+ private volatile LogService m_log;
+
+ public void log(int type, Dictionary properties) {
+ try {
+ m_store.put(type, properties);
+ }
+ catch (NullPointerException e) {
+ // if we cannot store the event, we log it to the normal log as extensively as possible
+ m_log.log(LogService.LOG_WARNING, "Could not store event: " + (new LogEvent("", 0, 0, 0, type, properties)).toRepresentation(), e);
+ }
+ catch (IOException e) {
+ // if we cannot store the event, we log it to the normal log as extensively as possible
+ m_log.log(LogService.LOG_WARNING, "Could not store event: " + (new LogEvent("", 0, 0, 0, type, properties)).toRepresentation(), e);
+ }
+ }
+}
\ No newline at end of file
Added: ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/LogSyncConfigurator.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/LogSyncConfigurator.java?rev=1465924&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/LogSyncConfigurator.java (added)
+++ ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/LogSyncConfigurator.java Tue Apr 9 08:18:47 2013
@@ -0,0 +1,94 @@
+package org.apache.ace.log.target;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.ace.connectionfactory.ConnectionFactory;
+import org.apache.ace.discovery.Discovery;
+import org.apache.ace.identification.Identification;
+import org.apache.ace.log.target.store.LogStore;
+import org.apache.ace.log.target.task.LogSyncTask;
+import org.apache.ace.scheduler.constants.SchedulerConstants;
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyManager;
+import org.osgi.framework.Constants;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedServiceFactory;
+import org.osgi.service.log.LogService;
+
+public class LogSyncConfigurator implements ManagedServiceFactory {
+ private static final String MA_NAME = "ma";
+ private static final String LOG_NAME = "name";
+
+ private DependencyManager m_manager;
+ private final Map /*<String, Component>*/ m_syncInstances = new HashMap();
+ private volatile LogService m_log;
+
+ public String getName() {
+ return "Log Sync Factory";
+ }
+
+ public synchronized void deleted(String pid) {
+ Component sync = (Component) m_syncInstances.remove(pid);
+ if (sync != null) {
+ m_manager.remove(sync);
+ }
+ }
+
+ public synchronized void updated(String pid, Dictionary dict) throws ConfigurationException {
+ String name = (String) dict.get(LOG_NAME);
+ String ma = (String) dict.get(MA_NAME);
+ if ((name == null) || "".equals(name)) {
+ throw new ConfigurationException(LOG_NAME, "Log name has to be specified.");
+ }
+
+ Component service = (Component) m_syncInstances.get(pid);
+ if (service == null) {
+ // publish log sync task service
+ Dictionary properties = new Properties();
+ String filterString;
+ String filterForDiscovery;
+ String filterForIdentification;
+ String schedulerName;
+ String description;
+ if (ma == null || "".equals(ma)) {
+ filterString = "(&(" + Constants.OBJECTCLASS + "=" + LogStore.class.getName() + ")(name=" + name + "))";
+ filterForDiscovery = "(&(" + Constants.OBJECTCLASS + "=" + Discovery.class.getName() + ")(!(ma=*)))";
+ filterForIdentification = "(&(" + Constants.OBJECTCLASS + "=" + Identification.class.getName() + ")(!(ma=*)))";
+ schedulerName = name;
+ description = "Task that synchronizes log store with id=" + name + " on the target and server";
+ }
+ else {
+ // if there is more than one management agent ("ma" is specified) then still it's very well possible that there's only
+ // one log, so either bind to this one global log (assuming ma is not specified for it) or a ma-specific log (ma is
+ // specified)
+ filterString = "(&(" + Constants.OBJECTCLASS + "=" + LogStore.class.getName() + ")(name=" + name + ")(|(ma=" + ma + ")(!(ma=*))))";
+ filterForDiscovery = "(&(" + Constants.OBJECTCLASS + "=" + Discovery.class.getName() + ")(ma=" + ma + "))";
+ filterForIdentification = "(&(" + Constants.OBJECTCLASS+"=" + Identification.class.getName() + ")(ma=" + ma + "))";
+ schedulerName = "ma=" + ma + ";name=" + name;
+ description = "Task that synchronizes log store with id=" + name + " and ma=" + ma + " on the target and server";
+ properties.put(MA_NAME, ma);
+ }
+
+ properties.put(SchedulerConstants.SCHEDULER_NAME_KEY, schedulerName);
+ properties.put(SchedulerConstants.SCHEDULER_DESCRIPTION_KEY, description);
+ properties.put(SchedulerConstants.SCHEDULER_RECIPE, "2000");
+ Component sync = m_manager.createComponent()
+ .setInterface(Runnable.class.getName(), properties)
+ .setImplementation(new LogSyncTask(name))
+ .add(m_manager.createServiceDependency().setService(ConnectionFactory.class).setRequired(true))
+ .add(m_manager.createServiceDependency().setService(LogStore.class, filterString).setRequired(true))
+ .add(m_manager.createServiceDependency().setService(Discovery.class, filterForDiscovery).setRequired(true))
+ .add(m_manager.createServiceDependency().setService(Identification.class, filterForIdentification).setRequired(true))
+ .add(m_manager.createServiceDependency().setService(LogService.class).setRequired(false));
+
+ m_syncInstances.put(pid, sync);
+ m_manager.add(sync);
+ }
+ else {
+ m_log.log(LogService.LOG_INFO, "Ignoring configuration update because factory instance was already configured: " + name);
+ }
+ }
+}
Added: ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/store/LogStore.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/store/LogStore.java?rev=1465924&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/store/LogStore.java (added)
+++ ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/store/LogStore.java Tue Apr 9 08:18:47 2013
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.log.target.store;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.List;
+
+import org.apache.ace.log.LogEvent;
+
+/**
+ * Server log store interface for the targets. Implementations of this service interface provide a persisted storage for
+ * log data.
+ */
+public interface LogStore
+{
+
+ /**
+ * Create a new event out of the given type and properties. Write it to the store and return it.
+ *
+ * @param type the type of the event.
+ * @param props the properties of the event.
+ * @return the created event that has been persisted.
+ * @throws java.io.IOException in case of any IO error.
+ */
+ public LogEvent put(int type, Dictionary props) throws IOException;
+
+ /**
+ * Get all events in the given log.
+ *
+ * @param logID the id of the log.
+ * @return a list of LogEvent's that are currently in the log of the given logID.
+ * @throws java.io.IOException in case of any IO error.
+ */
+ public List/*<LogEvent>*/get(long logID) throws IOException;
+
+ /**
+ * Get the events in the given log that are in the range of the given lower and upper bound.
+ *
+ * @param logID the id of the log.
+ * @param from the lower bound.
+ * @param to the upper bound.
+ * @return a list of LogEvent's that are currently in the log of the given logID and have an id in the range of the given
+ * bounds.
+ * @throws java.io.IOException in case of any IO error.
+ */
+ public List/*<LogEvent>*/get(long logID, long from, long to) throws IOException;
+
+ /**
+ * Get the the highest id of any LogEvent entry in the given log.
+ *
+ * @param logID the id of the log.
+ * @return the id of the highest LogEvent entry in the given log.
+ * @throws java.io.IOException in case of any IO error.
+ */
+ public long getHighestID(long logID) throws IOException;
+
+ /**
+ * Get the ids of all available logs in this store.
+ *
+ * @return an array of the ids of all available logs in this store.
+ * @throws java.io.IOException in case of any IO error.
+ */
+ public long[] getLogIDs() throws IOException;
+}
\ No newline at end of file
Added: ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/store/impl/Activator.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/store/impl/Activator.java?rev=1465924&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/store/impl/Activator.java (added)
+++ ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/store/impl/Activator.java Tue Apr 9 08:18:47 2013
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.log.target.store.impl;
+
+import java.io.File;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.ace.identification.Identification;
+import org.apache.ace.log.target.store.LogStore;
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyActivatorBase;
+import org.apache.felix.dm.DependencyManager;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedServiceFactory;
+import org.osgi.service.log.LogService;
+
+public class Activator extends DependencyActivatorBase implements ManagedServiceFactory {
+ private static final String MA_NAME = "ma";
+ private static final String LOG_NAME = "name";
+
+ private DependencyManager m_manager;
+ private BundleContext m_context;
+ private final Map /*<String, Component>*/ m_instances = new HashMap();
+ private volatile LogService m_log;
+
+ public void init(BundleContext context, DependencyManager manager) throws Exception {
+ m_context = context;
+ m_manager = manager;
+ Properties props = new Properties();
+ props.put(Constants.SERVICE_PID, "org.apache.ace.target.log.store.factory");
+ manager.add(createComponent()
+ .setInterface(ManagedServiceFactory.class.getName(), props)
+ .setImplementation(this)
+ .add(createServiceDependency()
+ .setService(LogService.class)
+ .setRequired(false)
+ )
+ );
+ }
+
+ public void destroy(BundleContext context, DependencyManager manager) throws Exception {
+ // Nothing we need to do
+ }
+
+ public void deleted(String pid) {
+ Component log;
+ synchronized (m_instances) {
+ log = (Component) m_instances.remove(pid);
+ }
+ if (log != null) {
+ m_manager.remove(log);
+ delete(new File(m_context.getDataFile(""), pid));
+ }
+ }
+
+ public String getName() {
+ return "Log Store Factory";
+ }
+
+ private void delete(File root) {
+ if (root.isDirectory()) {
+ File[] files = root.listFiles();
+ for (int i = 0; i < files.length; i++) {
+ delete(files[i]);
+ }
+ }
+ root.delete();
+ }
+
+ public void updated(String pid, Dictionary dict) throws ConfigurationException {
+ String ma = (String) dict.get(MA_NAME);
+ String name = (String) dict.get(LOG_NAME);
+ if ((name == null) || "".equals(name)) {
+ throw new ConfigurationException(LOG_NAME, "Log name has to be specified.");
+ }
+
+ boolean needToAddComponent = false;
+ Component component;
+ synchronized (m_instances) {
+ component = (Component) m_instances.get(pid);
+ if (component == null) {
+ Properties props = new Properties();
+ props.put(LOG_NAME, name);
+ if ((ma != null) && (ma.length() > 0)) {
+ props.put(MA_NAME, ma);
+ }
+ File baseDir = new File(m_context.getDataFile(""), pid);
+ component = m_manager.createComponent()
+ .setInterface(LogStore.class.getName(), props)
+ .setImplementation(new LogStoreImpl(baseDir))
+ .add(createServiceDependency()
+ .setService(Identification.class)
+ .setRequired(true)
+ )
+ .add(createServiceDependency()
+ .setService(LogService.class)
+ .setRequired(false)
+ );
+ m_instances.put(pid, component);
+ needToAddComponent = true;
+ }
+ }
+ if (needToAddComponent) {
+ m_manager.add(component);
+ }
+ else {
+ m_log.log(LogService.LOG_INFO, "Ignoring configuration update because factory instance was already configured: " + name);
+ }
+ }
+}
\ No newline at end of file
Added: ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/store/impl/LogStoreImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/store/impl/LogStoreImpl.java?rev=1465924&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/store/impl/LogStoreImpl.java (added)
+++ ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/store/impl/LogStoreImpl.java Tue Apr 9 08:18:47 2013
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.log.target.store.impl;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.List;
+
+import org.apache.ace.identification.Identification;
+import org.apache.ace.log.LogEvent;
+import org.apache.ace.log.target.store.LogStore;
+import org.osgi.service.log.LogService;
+
+/**
+ * This class provides an implementation of the LogStore service. It tries to
+ * repair broken log files to make them readable again. However, this might lead
+ * to loss of data. Additionally, a new file is used when an error is detected.
+ */
+public class LogStoreImpl implements LogStore {
+ // injected by dependencymanager
+ volatile Identification m_identification;
+ volatile LogService m_log;
+
+ // The current store
+ private Store m_store = null;
+ private final File m_baseDir;
+ private long m_highest;
+
+ /**
+ * Create new instance using the specified directory as root directory.
+ *
+ * @param baseDir
+ * Root directory to use for storage.
+ */
+ public LogStoreImpl(File baseDir) {
+ m_baseDir = new File(baseDir, "store");
+ }
+
+ /**
+ * Init the current store.
+ *
+ * @throws java.io.IOException
+ */
+ protected synchronized void start() throws IOException {
+ if (!m_baseDir.isDirectory() && !m_baseDir.mkdirs()) {
+ throw new IllegalArgumentException("Need valid dir");
+ }
+ long current = -1;
+ File[] files = (File[]) notNull(m_baseDir.listFiles());
+ for (int i = 0; i < files.length; i++) {
+ long id = Long.parseLong(files[i].getName());
+ current = Math.max(id, current);
+ }
+ try {
+ if (current == -1) {
+ m_store = newStore();
+ } else {
+ m_store = createStore(current);
+ try {
+ m_store.init();
+ }
+ catch (IOException ex) {
+ handleException(m_store, ex);
+ }
+ }
+ }
+ catch (IOException ex) {
+ // We should be able to recover from the error.
+ m_log.log(LogService.LOG_ERROR, "Exception during log store init",
+ ex);
+ }
+ }
+
+ /**
+ * Close the current store.
+ *
+ * @throws java.io.IOException
+ * in case of any IO error.
+ */
+ protected synchronized void stop() throws IOException {
+ m_store.close();
+ m_store = null;
+ }
+
+ /**
+ * Create a store object for a new log.
+ *
+ * @return a store object for a new log.
+ * @throws java.io.IOException
+ * in case of any IO error.
+ */
+ protected Store newStore() throws IOException {
+ long id = System.currentTimeMillis();
+
+ while (!(new File(m_baseDir, String.valueOf(id))).createNewFile()) {
+ id++;
+ }
+
+ return new Store(new File(m_baseDir, String.valueOf(id)), id);
+ }
+
+ /**
+ * Create a store object for the given log. This should not be used to
+ * create a new log.
+ *
+ * @param id
+ * the id of the log.
+ * @return a new store object for the given log.
+ * @throws java.io.IOException
+ * in case of an IO error.
+ */
+ protected Store createStore(long id) throws IOException {
+ return new Store(new File(m_baseDir, String.valueOf(id)), id);
+ }
+
+ /**
+ * Get the entries in the given range from the given log.
+ *
+ * @param logID
+ * the id of the log.
+ * @param from
+ * the lower bound of the range.
+ * @param to
+ * the upper bound of the range.
+ * @return a list of entries from the given log in the given range.
+ * @throws java.io.IOException
+ * in case of any IO error.
+ */
+ public synchronized List get(long logID, long from, long to)
+ throws IOException {
+ Store store = getLog(logID);
+ List result = new ArrayList();
+ try {
+ if (store.getCurrent() > from) {
+ store.reset();
+ }
+
+ while (store.hasNext()) {
+ long eventID = store.readCurrentID();
+ if ((eventID >= from) && (eventID <= to)) {
+ result.add(new LogEvent(new String(store.read())));
+ } else {
+ store.skip();
+ }
+ }
+ }
+ catch (Exception ex) {
+ handleException(store, ex);
+ }
+ finally {
+ closeIfNeeded(store);
+ }
+ return result;
+ }
+
+ /**
+ * Try to repair the given store, log the given exception and rethrow it. In
+ * case the store is the current log switch to a new one if possible.
+ *
+ * @param store
+ * the store to repair/close.
+ * @param exception
+ * the exception to log and rethrow.
+ * @throws java.io.IOException
+ * the given exception if it is an IOException else the message
+ * of the given exception wrapped in an IOException.
+ */
+ protected void handleException(Store store, Exception exception)
+ throws IOException {
+ m_log.log(LogService.LOG_WARNING, "Exception accessing the log: "
+ + store.getId(), exception);
+ if (store == m_store) {
+ m_store = newStore();
+ }
+
+ try {
+ store.truncate();
+ }
+ catch (IOException ex) {
+ m_log.log(LogService.LOG_WARNING, "Exception during truncate: "
+ + store.getId(), ex);
+ }
+ try {
+ store.close();
+ }
+ catch (IOException ex) {
+ // Not much we can do
+ }
+ if (exception instanceof IOException) {
+ throw (IOException) exception;
+ }
+ throw new IOException("Unable to read log entry: "
+ + exception.getMessage());
+ }
+
+ /**
+ * Get all entries of the given log.
+ *
+ * @param logID
+ * the id of the log.
+ * @return a list of all entries in this log.
+ * @throws java.io.IOException
+ * in case of any IO error.
+ */
+ public List get(long logID) throws IOException {
+ return get(logID, 0, Long.MAX_VALUE);
+ }
+
+ /**
+ * Get the current log ids.
+ *
+ * @return the ids of the current logs.
+ */
+ public long[] getLogIDs() throws IOException {
+ File[] files = (File[]) notNull(m_baseDir.listFiles());
+ long[] result = new long[files.length];
+ for (int i = 0; i < files.length; i++) {
+ result[i] = Long.parseLong(files[i].getName());
+ }
+ return result;
+ }
+
+ /**
+ * Create and add a LogEvent to the current log.
+ *
+ * @param type
+ * the type the event.
+ * @param props
+ * the properties of the event.
+ * @return the new event.
+ * @throws java.io.IOException
+ * in case of any IO error.
+ */
+ public synchronized LogEvent put(int type, Dictionary props) throws IOException {
+ try {
+ LogEvent result = new LogEvent(null, m_store.getId(), getNextID(), System.currentTimeMillis(), type, props);
+ m_store.append(result.getID(), result.toRepresentation().getBytes());
+ return result;
+ }
+ catch (IOException ex) {
+ handleException(m_store, ex);
+ }
+ return null;
+ }
+
+ /**
+ * Get the highest entry id of the given log.
+ *
+ * @param logID
+ * the id of the log.
+ * @return the id of the highest entry.
+ * @throws java.io.IOException
+ * in case of any IO error.
+ */
+ public synchronized long getHighestID(long logID) throws IOException {
+ Store store = getLog(logID);
+ try {
+ if (m_highest == 0) {
+ store.init();
+ return (m_highest = store.getCurrent());
+ } else {
+ return m_highest;
+ }
+ }
+ catch (IOException ex) {
+ handleException(store, ex);
+ }
+ finally {
+ closeIfNeeded(store);
+ }
+ return -1;
+ }
+
+ /**
+ * Close the given store if it is not the current store. IO errors are
+ * ignored.
+ *
+ * @param store
+ * the store to close.
+ */
+ protected void closeIfNeeded(Store store) {
+ if (store != m_store) {
+ try {
+ store.close();
+ }
+ catch (IOException ex) {
+ // Not much we can do;
+ }
+ }
+ }
+
+ /**
+ * Get a Store object for the log of the given logid.
+ *
+ * @param logID
+ * the id for which to return (and possibly create) a store.
+ * @return either a new or the current Store object.
+ * @throws java.io.IOException
+ * in case of any IO error.
+ */
+ protected Store getLog(long logID) throws IOException {
+ if (m_store.getId() == logID) {
+ return m_store;
+ }
+ return createStore(logID);
+ }
+
+ /**
+ * Get the next id for the current store.
+ *
+ * @return the next free log id of the current store.
+ * @throws java.io.IOException
+ */
+ protected long getNextID() throws IOException {
+ return (m_highest = getHighestID(m_store.m_id) + 1);
+ }
+
+ /*
+ * throw IOException in case the target is null else return the target.
+ */
+ private Object notNull(Object target) throws IOException {
+ if (target == null) {
+ throw new IOException(
+ "Unknown IO error while trying to access the store.");
+ }
+ return target;
+ }
+
+ /**
+ * The general idea is to provide easy access to a file of records. It
+ * supports iterating over records both by skipping and by reading.
+ * Furthermore, files can be truncated. Most methods will make an effort to
+ * reset to the last good record in case of an error -- hence, a call to
+ * truncate after an IOException might make the store readable again.
+ */
+ class Store {
+ private final RandomAccessFile m_store;
+ private final long m_id;
+ private long m_current;
+
+ /**
+ * Create a new File based Store.
+ *
+ * @param store
+ * the file to use as backend.
+ * @param id
+ * the log id of the store
+ * @throws java.io.IOException
+ * in case the file is not rw.
+ */
+ Store(File store, long id) throws IOException {
+ m_store = new RandomAccessFile(store, "rwd");
+ m_id = id;
+ }
+
+ /**
+ * Get the id of the current record.
+ *
+ * @return the idea of the current record.
+ */
+ public long getCurrent() throws IOException {
+ long pos = m_store.getFilePointer();
+ if (m_store.length() == 0) {
+ return 0;
+ }
+ long result = 0;
+ try {
+ m_store.seek(m_current);
+ result = readCurrentID();
+ m_store.seek(pos);
+ }
+ catch (IOException ex) {
+ handle(pos, ex);
+ }
+ return result;
+ }
+
+ /**
+ * Get the log id of this store.
+ *
+ * @return the log id of this store.
+ */
+ public long getId() {
+ return m_id;
+ }
+
+ /**
+ * Reset the store to the beginning of the records
+ *
+ * @throws java.io.IOException
+ * in case of an IO error.
+ */
+ public void reset() throws IOException {
+ m_store.seek(0);
+ m_current = 0;
+ }
+
+ /**
+ * Determine whether there are any records left based on the current
+ * postion.
+ *
+ * @return <code>true</code> if there are still records to be read.
+ * @throws java.io.IOException
+ * in case of an IO error.
+ */
+ public boolean hasNext() throws IOException {
+ return m_store.getFilePointer() < m_store.length();
+ }
+
+ public byte[] read() throws IOException {
+ long pos = m_store.getFilePointer();
+ try {
+ if (pos < m_store.length()) {
+ long current = m_store.getFilePointer();
+ long id = m_store.readLong();
+ int next = m_store.readInt();
+ byte[] entry = new byte[next];
+ m_store.readFully(entry);
+ m_current = current;
+ return entry;
+ }
+ }
+ catch (IOException ex) {
+ handle(pos, ex);
+ }
+ return null;
+ }
+
+ public long readCurrentID() throws IOException {
+ long pos = m_store.getFilePointer();
+ try {
+ if (pos < m_store.length()) {
+ long id = m_store.readLong();
+ m_store.seek(pos);
+ return id;
+ }
+ }
+ catch (IOException ex) {
+ handle(pos, ex);
+ }
+ return -1;
+ }
+
+ /**
+ * Make sure the store is readable. As a result, the store is at the end
+ * of the records.
+ *
+ * @throws java.io.IOException
+ * in case of any IO error.
+ */
+ public void init() throws IOException {
+ reset();
+ try {
+ while (true) {
+ skip();
+ }
+ }
+ catch (EOFException ex) {
+ // done
+ }
+ }
+
+ /**
+ * Skip the next record if there is any.
+ *
+ * @throws java.io.IOException
+ * in case of any IO error or if there is no record left.
+ */
+ public void skip() throws IOException {
+ long pos = m_store.getFilePointer();
+ try {
+ long id = m_store.readLong();
+ int next = m_store.readInt();
+ if (m_store.length() < next + m_store.getFilePointer()) {
+ throw new IOException("Unexpected end of file");
+ }
+ m_store.seek(m_store.getFilePointer() + next);
+ m_current = pos;
+ pos = m_store.getFilePointer();
+ }
+ catch (IOException ex) {
+ handle(pos, ex);
+ }
+ }
+
+ /**
+ * Store the given record data as the next record.
+ *
+ * @param entry
+ * the data of the record to store.
+ * @throws java.io.IOException
+ * in case of any IO error.
+ */
+ public void append(long id, byte[] entry) throws IOException {
+ long pos = m_store.getFilePointer();
+ try {
+ m_store.seek(m_store.length());
+ long current = m_store.getFilePointer();
+ m_store.writeLong(id);
+ m_store.writeInt(entry.length);
+ m_store.write(entry);
+ m_store.seek(pos);
+ }
+ catch (IOException ex) {
+ handle(pos, ex);
+ }
+ }
+
+ /**
+ * Try to truncate the store at the current record.
+ *
+ * @throws java.io.IOException
+ * in case of any IO error.
+ */
+ public void truncate() throws IOException {
+ m_store.setLength(m_store.getFilePointer());
+ }
+
+ /**
+ * Release any resources.
+ *
+ * @throws java.io.IOException
+ * in case of any IO error.
+ */
+ public void close() throws IOException {
+ m_store.close();
+ }
+
+ private void handle(long pos, IOException exception) throws IOException {
+ try {
+ m_store.seek(pos);
+ }
+ catch (IOException ex) {
+ m_log.log(LogService.LOG_WARNING, "Exception during seek!", ex);
+ }
+ throw exception;
+ }
+ }
+}
\ No newline at end of file
Added: ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/store/packageinfo
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/store/packageinfo?rev=1465924&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/store/packageinfo (added)
+++ ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/store/packageinfo Tue Apr 9 08:18:47 2013
@@ -0,0 +1 @@
+version 1.0
\ No newline at end of file