You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/08/20 12:37:34 UTC

svn commit: r567647 [2/3] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/...

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java Mon Aug 20 03:37:29 2007
@@ -17,6 +17,7 @@
 package org.apache.activemq.kaha;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.kaha.impl.KahaStore;
 
@@ -39,7 +40,19 @@
      * @throws IOException
      */
     public static Store open(String name, String mode) throws IOException {
-        return new KahaStore(name, mode);
+        return new KahaStore(name, mode,new AtomicLong());
+    }
+    
+    /**
+     * open or create a Store
+     * @param name
+     * @param mode
+     * @param size
+     * @return the opened/created store
+     * @throws IOException
+     */
+    public static Store open(String name, String mode, AtomicLong size) throws IOException {
+        return new KahaStore(name, mode,size);
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java Mon Aug 20 03:37:29 2007
@@ -26,6 +26,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.kaha.ContainerId;
 import org.apache.activemq.kaha.ListContainer;
@@ -79,9 +80,16 @@
     private FileLock lock;
     private boolean persistentIndex = true;
     private RandomAccessFile lockFile;
+    private final AtomicLong storeSize;
 
+    
     public KahaStore(String name, String mode) throws IOException {
+        this(name,mode,new AtomicLong());
+    }
+    
+    public KahaStore(String name, String mode,AtomicLong storeSize) throws IOException {
         this.mode = mode;
+        this.storeSize = storeSize;
         directory = new File(name);
         directory.mkdirs();
     }
@@ -337,14 +345,14 @@
         DataManager dm = dataManagers.get(name);
         if (dm == null) {
             if (isUseAsyncDataManager()) {
-                AsyncDataManager t = new AsyncDataManager();
+                AsyncDataManager t = new AsyncDataManager(storeSize);
                 t.setDirectory(directory);
                 t.setFilePrefix("async-data-" + name + "-");
                 t.setMaxFileLength((int)maxDataFileLength);
                 t.start();
                 dm = new DataManagerFacade(t, name);
             } else {
-                DataManagerImpl t = new DataManagerImpl(directory, name);
+                DataManagerImpl t = new DataManagerImpl(directory, name,storeSize);
                 t.setMaxFileLength(maxDataFileLength);
                 dm = t;
             }
@@ -359,7 +367,7 @@
     public synchronized IndexManager getIndexManager(DataManager dm, String name) throws IOException {
         IndexManager im = indexManagers.get(name);
         if (im == null) {
-            im = new IndexManager(directory, name, mode, logIndexChanges ? dm : null);
+            im = new IndexManager(directory, name, mode, logIndexChanges ? dm : null,storeSize);
             indexManagers.put(name, im);
         }
         return im;
@@ -546,6 +554,14 @@
 
     public synchronized void setUseAsyncDataManager(boolean useAsyncWriter) {
         this.useAsyncDataManager = useAsyncWriter;
+    }
+
+    /**
+     * @return
+     * @see org.apache.activemq.kaha.Store#size()
+     */
+    public long size(){
+        return storeSize.get();
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Mon Aug 20 03:37:29 2007
@@ -32,6 +32,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
@@ -89,6 +90,15 @@
     private Location mark;
     private final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
     private Runnable cleanupTask;
+    private final AtomicLong storeSize;
+    
+    public AsyncDataManager(AtomicLong storeSize) {
+        this.storeSize=storeSize;
+    }
+    
+    public AsyncDataManager() {
+        this(new AtomicLong());
+    }
 
     @SuppressWarnings("unchecked")
     public synchronized void start() throws IOException {
@@ -128,6 +138,7 @@
                     int num = Integer.parseInt(numStr);
                     DataFile dataFile = new DataFile(file, num, preferedFileLength);
                     fileMap.put(dataFile.getDataFileId(), dataFile);
+                    storeSize.addAndGet(dataFile.getLength());
                 } catch (NumberFormatException e) {
                     // Ignore file that do not match the pattern.
                 }
@@ -249,8 +260,10 @@
         }
         location.setOffset(currentWriteFile.getLength());
         location.setDataFileId(currentWriteFile.getDataFileId().intValue());
-        currentWriteFile.incrementLength(location.getSize());
+        int size = location.getSize();
+        currentWriteFile.incrementLength(size);
         currentWriteFile.increment();
+        storeSize.addAndGet(size);
         return currentWriteFile;
     }
 
@@ -297,6 +310,7 @@
         boolean result = true;
         for (Iterator i = fileMap.values().iterator(); i.hasNext();) {
             DataFile dataFile = (DataFile)i.next();
+            storeSize.addAndGet(-dataFile.getLength());
             result &= dataFile.delete();
         }
         fileMap.clear();
@@ -387,6 +401,7 @@
         accessorPool.disposeDataFileAccessors(dataFile);
 
         fileMap.remove(dataFile.getDataFileId());
+        storeSize.addAndGet(-dataFile.getLength());
         dataFile.unlink();
         boolean result = dataFile.delete();
         LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed"));

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java Mon Aug 20 03:37:29 2007
@@ -24,6 +24,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.StoreLocation;
@@ -57,10 +58,12 @@
     private Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
     private Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER;
     private String dataFilePrefix;
+    private final AtomicLong storeSize;
 
-    public DataManagerImpl(File dir, final String name) {
+    public DataManagerImpl(File dir, final String name,AtomicLong storeSize) {
         this.directory = dir;
         this.name = name;
+        this.storeSize=storeSize;
 
         dataFilePrefix = NAME_PREFIX + name + "-";
         // build up list of current dataFiles
@@ -76,6 +79,7 @@
                 String numStr = n.substring(dataFilePrefix.length(), n.length());
                 int num = Integer.parseInt(numStr);
                 DataFile dataFile = new DataFile(file, num);
+                storeSize.addAndGet(dataFile.getLength());
                 fileMap.put(dataFile.getNumber(), dataFile);
                 if (currentWriteFile == null || currentWriteFile.getNumber().intValue() < num) {
                     currentWriteFile = dataFile;
@@ -111,7 +115,9 @@
         }
         item.setOffset(currentWriteFile.getLength());
         item.setFile(currentWriteFile.getNumber().intValue());
-        currentWriteFile.incrementLength(item.getSize() + ITEM_HEAD_SIZE);
+        int len = item.getSize() + ITEM_HEAD_SIZE;
+        currentWriteFile.incrementLength(len);
+        storeSize.addAndGet(len);
         return currentWriteFile;
     }
 
@@ -250,6 +256,7 @@
         boolean result = true;
         for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
             DataFile dataFile = i.next();
+            storeSize.addAndGet(-dataFile.getLength());
             result &= dataFile.delete();
         }
         fileMap.clear();
@@ -325,6 +332,7 @@
         if (writer != null) {
             writer.force(dataFile);
         }
+        storeSize.addAndGet(-dataFile.getLength());
         boolean result = dataFile.delete();
         LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed"));
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java Mon Aug 20 03:37:29 2007
@@ -19,6 +19,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.kaha.impl.DataManager;
 import org.apache.commons.logging.Log;
@@ -45,12 +46,14 @@
     private IndexItem firstFree;
     private IndexItem lastFree;
     private boolean dirty;
+    private final AtomicLong storeSize;
 
-    public IndexManager(File directory, String name, String mode, DataManager redoLog) throws IOException {
+    public IndexManager(File directory, String name, String mode, DataManager redoLog, AtomicLong storeSize) throws IOException {
         this.directory = directory;
         this.name = name;
         this.mode = mode;
         this.redoLog = redoLog;
+        this.storeSize=storeSize;
         initialize();
     }
 
@@ -106,6 +109,7 @@
             result = new IndexItem();
             result.setOffset(length);
             length += IndexItem.INDEX_SIZE;
+            storeSize.addAndGet(IndexItem.INDEX_SIZE);
         }
         return result;
     }
@@ -156,9 +160,14 @@
     synchronized long getLength() {
         return length;
     }
+    
+    public final long size() {
+        return length;
+    }
 
     public synchronized void setLength(long value) {
         this.length = value;
+        storeSize.addAndGet(length);
     }
 
     public String toString() {
@@ -187,5 +196,6 @@
             offset += IndexItem.INDEX_SIZE;
         }
         length = offset;
+        storeSize.addAndGet(length);
     }
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/Cache.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/Cache.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/Cache.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/Cache.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory;
+
+/**
+ * Defines the interface used to cache messages.
+ * 
+ * @version $Revision$
+ */
+public interface Cache {
+
+    /**
+     * Gets an object that was previously <code>put</code> into this object.
+     * 
+     * @param msgid
+     * @return null if the object was not previously put or if the object has
+     *         expired out of the cache.
+     */
+    Object get(Object key);
+
+    /**
+     * Puts an object into the cache.
+     * 
+     * @param messageID
+     * @param message
+     */
+    Object put(Object key, Object value);
+
+    /**
+     * Removes an object from the cache.
+     * 
+     * @param messageID
+     * @return the object associated with the key if it was still in the cache.
+     */
+    Object remove(Object key);
+
+    /**
+     * Lets a cache know it will not be used any further and that it can release
+     * acquired resources
+     */
+    void close();
+
+    /**
+     * How big is the cache right now?
+     * 
+     * @return
+     */
+    int size();
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/Cache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/Cache.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntry.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntry.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntry.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory;
+
+public class CacheEntry {
+
+    public final Object key;
+    public final Object value;
+
+    public CacheEntry next;
+    public CacheEntry previous;
+    public CacheEntryList owner;
+
+    public CacheEntry(Object key, Object value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    /**
+     * 
+     * @param entry
+     * @return false if you are trying to remove the tail pointer.
+     */
+    public boolean remove() {
+
+        // Cannot remove if this is a tail pointer.
+        // Or not linked.
+        if (owner == null || this.key == null || this.next == null) {
+            return false;
+        }
+
+        synchronized (owner.tail) {
+            this.next.previous = this.previous;
+            this.previous.next = this.next;
+            this.owner = null;
+            this.next = null;
+            this.previous = null;
+        }
+
+        return true;
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntry.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntryList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntryList.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntryList.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntryList.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory;
+
+/**
+ * Maintains a simple linked list of CacheEntry objects. It is thread safe.
+ * 
+ * @version $Revision$
+ */
+public class CacheEntryList {
+
+    // Points at the tail of the CacheEntry list
+    public final CacheEntry tail = new CacheEntry(null, null);
+
+    public CacheEntryList() {
+        tail.next = tail;
+        tail.previous = tail;
+    }
+
+    public void add(CacheEntry ce) {
+        addEntryBefore(tail, ce);
+    }
+
+    private void addEntryBefore(CacheEntry position, CacheEntry ce) {
+        assert ce.key != null && ce.next == null && ce.owner == null;
+
+        synchronized (tail) {
+            ce.owner = this;
+            ce.next = position;
+            ce.previous = position.previous;
+            ce.previous.next = ce;
+            ce.next.previous = ce;
+        }
+    }
+
+    public void clear() {
+        synchronized (tail) {
+            tail.next = tail;
+            tail.previous = tail;
+        }
+    }
+
+    public CacheEvictor createFIFOCacheEvictor() {
+        return new CacheEvictor() {
+            public CacheEntry evictCacheEntry() {
+                CacheEntry rc;
+                synchronized (tail) {
+                    rc = tail.next;
+                }
+                return rc.remove() ? rc : null;
+            }
+        };
+    }
+
+    public CacheEvictor createLIFOCacheEvictor() {
+        return new CacheEvictor() {
+            public CacheEntry evictCacheEntry() {
+                CacheEntry rc;
+                synchronized (tail) {
+                    rc = tail.previous;
+                }
+                return rc.remove() ? rc : null;
+            }
+        };
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntryList.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntryList.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.Usage;
+import org.apache.activemq.usage.UsageListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class CacheEvictionUsageListener implements UsageListener {
+
+    private static final Log LOG = LogFactory.getLog(CacheEvictionUsageListener.class);
+
+    private final List<CacheEvictor> evictors = new CopyOnWriteArrayList<CacheEvictor>();
+    private final int usageHighMark;
+    private final int usageLowMark;
+
+    private final TaskRunner evictionTask;
+    private final Usage usage;
+
+    public CacheEvictionUsageListener(Usage usage, int usageHighMark, int usageLowMark, TaskRunnerFactory taskRunnerFactory) {
+        this.usage = usage;
+        this.usageHighMark = usageHighMark;
+        this.usageLowMark = usageLowMark;
+        evictionTask = taskRunnerFactory.createTaskRunner(new Task() {
+            public boolean iterate() {
+                return evictMessages();
+            }
+        }, "Cache Evictor: " + System.identityHashCode(this));
+    }
+
+    boolean evictMessages() {
+        // Try to take the memory usage down below the low mark.
+        LOG.debug("Evicting cache memory usage: " + usage.getPercentUsage());
+
+        List<CacheEvictor> list = new LinkedList<CacheEvictor>(evictors);
+        while (list.size() > 0 && usage.getPercentUsage() > usageLowMark) {
+
+            // Evenly evict messages from all evictors
+            for (Iterator<CacheEvictor> iter = list.iterator(); iter.hasNext();) {
+                CacheEvictor evictor = iter.next();
+                if (evictor.evictCacheEntry() == null) {
+                    iter.remove();
+                }
+            }
+        }
+        return false;
+    }
+
+    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
+        // Do we need to start evicting cache entries? Usage > than the
+        // high mark
+        if (oldPercentUsage < newPercentUsage && usage.getPercentUsage() >= usageHighMark) {
+            try {
+                evictionTask.wakeup();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    public void add(CacheEvictor evictor) {
+        evictors.add(evictor);
+    }
+
+    public void remove(CacheEvictor evictor) {
+        evictors.remove(evictor);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictor.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictor.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictor.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory;
+
+public interface CacheEvictor {
+    
+    CacheEntry evictCacheEntry();
+    
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictor.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheFilter.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheFilter.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheFilter.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory;
+
+
+/**
+ * Filters another Cache implementation.
+ * 
+ * @version $Revision$
+ */
+public class CacheFilter implements Cache {
+    
+    protected final Cache next;
+    
+    public CacheFilter(Cache next) {
+        this.next = next;
+    }
+
+    public Object put(Object key, Object value) {
+        return next.put(key, value);
+    }
+
+    public Object get(Object key) {
+        return next.get(key);
+    }
+
+    public Object remove(Object key) {
+        return next.remove(key);
+    }
+    
+    public void close() {
+        next.close();
+    }
+
+    public int size() {
+        return next.size();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheFilter.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/LRUMap.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/LRUMap.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/LRUMap.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/LRUMap.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A simple least-recently-used cache of a fixed size.
+ * 
+ * @version $Revision:$
+ */
+public class LRUMap extends LinkedHashMap {
+
+    protected static final float DEFAULT_LOAD_FACTOR = (float) 0.75;
+    protected static final int DEFAULT_INITIAL_CAPACITY = 5000;
+    private static final long serialVersionUID = -9179676638408888162L;
+
+    private int maximumSize;
+
+    public LRUMap(int maximumSize) {
+        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, true, maximumSize);
+    }
+
+    public LRUMap(int maximumSize, boolean accessOrder) {
+        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, accessOrder, maximumSize);
+    }
+
+    public LRUMap(int initialCapacity, float loadFactor, boolean accessOrder, int maximumSize) {
+        super(initialCapacity, loadFactor, accessOrder);
+        this.maximumSize = maximumSize;
+    }
+
+    protected boolean removeEldestEntry(Map.Entry eldest) {
+        return size() > maximumSize;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/LRUMap.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/LRUMap.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/MapCache.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/MapCache.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/MapCache.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/MapCache.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory;
+
+import java.util.Map;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Use any Map to implement the Cache.  No cache eviction going on here.  Just gives
+ * a Map a Cache interface.
+ * 
+ * @version $Revision$
+ */
+public class MapCache implements Cache {
+    
+    protected final Map<Object, Object> map;
+    
+    public MapCache() {
+        this(new ConcurrentHashMap<Object, Object>());
+    }
+    
+    public MapCache(Map<Object, Object> map) {
+        this.map = map;
+    }
+
+    public Object put(Object key, Object value) {
+        return map.put(key, value);
+    }
+
+    public Object get(Object key) {
+        return map.get(key);
+    }
+
+    public Object remove(Object key) {
+        return map.remove(key);
+    }
+    
+    public void close() {
+        map.clear();
+    }
+
+    public int size() {
+        return map.size();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/MapCache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/MapCache.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManagerCacheFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManagerCacheFilter.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManagerCacheFilter.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManagerCacheFilter.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.usage.MemoryUsage;
+
+/**
+ * Simple CacheFilter that increases/decreases usage on a UsageManager as
+ * objects are added/removed from the Cache.
+ * 
+ * @version $Revision$
+ */
+public class UsageManagerCacheFilter extends CacheFilter {
+
+    private final AtomicLong totalUsage = new AtomicLong(0);
+    private final MemoryUsage usage;
+
+    public UsageManagerCacheFilter(Cache next, MemoryUsage um) {
+        super(next);
+        this.usage = um;
+    }
+
+    public Object put(Object key, Object value) {
+        long usageValue = getUsageOfAddedObject(value);
+        Object rc = super.put(key, value);
+        if (rc != null) {
+            usageValue -= getUsageOfRemovedObject(rc);
+        }
+        totalUsage.addAndGet(usageValue);
+        usage.increaseUsage(usageValue);
+        return rc;
+    }
+
+    public Object remove(Object key) {
+        Object rc = super.remove(key);
+        if (rc != null) {
+            long usageValue = getUsageOfRemovedObject(rc);
+            totalUsage.addAndGet(-usageValue);
+            usage.decreaseUsage(usageValue);
+        }
+        return rc;
+    }
+
+    protected long getUsageOfAddedObject(Object value) {
+        return 1;
+    }
+
+    protected long getUsageOfRemovedObject(Object value) {
+        return 1;
+    }
+
+    public void close() {
+        usage.decreaseUsage(totalUsage.get());
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManagerCacheFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManagerCacheFilter.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java Mon Aug 20 03:37:29 2007
@@ -24,7 +24,8 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
 
 /**
  * Represents a message store which is used by the persistent implementations
@@ -88,10 +89,10 @@
     ActiveMQDestination getDestination();
 
     /**
-     * @param usageManager The UsageManager that is controlling the
+     * @param memoeyUSage The SystemUsage that is controlling the
      *                destination's memory usage.
      */
-    void setUsageManager(UsageManager usageManager);
+    void setMemoryUsage(MemoryUsage memoeyUSage);
 
     /**
      * @return the number of messages ready to deliver

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java Mon Aug 20 03:37:29 2007
@@ -25,7 +25,7 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.SystemUsage;
 
 /**
  * Adapter to the actual persistence mechanism used with ActiveMQ
@@ -115,7 +115,7 @@
     /**
      * @param usageManager The UsageManager that is controlling the broker's memory usage.
      */
-    void setUsageManager(UsageManager usageManager);
+    void setUsageManager(SystemUsage usageManager);
     
     /**
      * Set the name of the broker using the adapter
@@ -136,4 +136,10 @@
      *
      */
     void checkpoint(boolean sync) throws IOException;
+    
+    /**
+     * A hint to return the size of the store on disk
+     * @return disk space used in bytes of 0 if not implemented
+     */
+    long size();
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java Mon Aug 20 03:37:29 2007
@@ -17,13 +17,12 @@
 package org.apache.activemq.store;
 
 import java.io.IOException;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.MemoryUsage;
 
 /**
  * A simple proxy that delegates to another MessageStore.
@@ -72,8 +71,8 @@
         return delegate.getDestination();
     }
 
-    public void setUsageManager(UsageManager usageManager) {
-        delegate.setUsageManager(usageManager);
+    public void setMemoryUsage(MemoryUsage memoryUsage) {
+        delegate.setMemoryUsage(memoryUsage);
     }
 
     public int getMessageCount() throws IOException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Mon Aug 20 03:37:29 2007
@@ -24,7 +24,8 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
 
 /**
  * A simple proxy that delegates to another MessageStore.
@@ -108,8 +109,8 @@
         return delegate.getAllSubscriptions();
     }
 
-    public void setUsageManager(UsageManager usageManager) {
-        delegate.setUsageManager(usageManager);
+    public void setMemoryUsage(MemoryUsage memoryUsage) {
+        delegate.setMemoryUsage(memoryUsage);
     }
 
     public int getMessageCount(String clientId, String subscriberName) throws IOException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Mon Aug 20 03:37:29 2007
@@ -38,7 +38,6 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.kaha.impl.async.Location;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -47,6 +46,8 @@
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.Callback;
 import org.apache.activemq.util.TransactionTemplate;
 import org.apache.commons.logging.Log;
@@ -94,15 +95,15 @@
         }, "Checkpoint: " + destination);
     }
 
-    public void setUsageManager(UsageManager usageManager) {
-        referenceStore.setUsageManager(usageManager);
+    public void setMemoryUsage(MemoryUsage memoryUsage) {
+        referenceStore.setMemoryUsage(memoryUsage);
     }
 
     /**
      * Not synchronized since the Journal has better throughput if you increase
      * the number of concurrent writes that it is doing.
      */
-    public void addMessage(ConnectionContext context, final Message message) throws IOException {
+    public final void addMessage(ConnectionContext context, final Message message) throws IOException {
         final MessageId id = message.getMessageId();
         final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
         if (!context.isInTransaction()) {
@@ -142,7 +143,7 @@
         }
     }
 
-    void addMessage(final Message message, final Location location) throws InterruptedIOException {
+    final void addMessage(final Message message, final Location location) throws InterruptedIOException {
         ReferenceData data = new ReferenceData();
         data.setExpiration(message.getExpiration());
         data.setFileId(location.getDataFileId());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Mon Aug 20 03:37:29 2007
@@ -25,6 +25,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
@@ -40,8 +41,6 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.kaha.impl.async.AsyncDataManager;
 import org.apache.activemq.kaha.impl.async.Location;
-import org.apache.activemq.memory.UsageListener;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -56,6 +55,9 @@
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.Usage;
+import org.apache.activemq.usage.UsageListener;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IOHelper;
@@ -80,7 +82,7 @@
     private ReferenceStoreAdapter referenceStoreAdapter;
     private TaskRunnerFactory taskRunnerFactory;
     private WireFormat wireFormat = new OpenWireFormat();
-    private UsageManager usageManager;
+    private SystemUsage usageManager;
     private long cleanupInterval = 1000 * 60;
     private long checkpointInterval = 1000 * 10;
     private int maxCheckpointWorkers = 1;
@@ -96,6 +98,7 @@
     private String brokerName = "";
     private File directory;
     private BrokerService brokerService;
+    private AtomicLong storeSize = new AtomicLong();
 
     public String getBrokerName() {
         return this.brokerName;
@@ -132,7 +135,7 @@
         this.directory.mkdirs();
 
         if (this.usageManager != null) {
-            this.usageManager.addUsageListener(this);
+            this.usageManager.getMemoryUsage().addUsageListener(this);
         }
         if (asyncDataManager == null) {
             asyncDataManager = createAsyncDataManager();
@@ -217,7 +220,7 @@
         if (!started.compareAndSet(true, false)) {
             return;
         }
-        this.usageManager.removeUsageListener(this);
+        this.usageManager.getMemoryUsage().removeUsageListener(this);
         synchronized (this) {
             Scheduler.cancel(periodicCheckpointTask);
             Scheduler.cancel(periodicCleanupTask);
@@ -571,7 +574,7 @@
         return writeCommand(trace, sync);
     }
 
-    public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
+    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
         newPercentUsage = (newPercentUsage / 10) * 10;
         oldPercentUsage = (oldPercentUsage / 10) * 10;
         if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
@@ -595,13 +598,13 @@
     // Subclass overridables
     // /////////////////////////////////////////////////////////////////
     protected AsyncDataManager createAsyncDataManager() {
-        AsyncDataManager manager = new AsyncDataManager();
+        AsyncDataManager manager = new AsyncDataManager(storeSize);
         manager.setDirectory(new File(directory, "journal"));
         return manager;
     }
 
     protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
-        KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter();
+        KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(storeSize);
         return adaptor;
     }
 
@@ -643,11 +646,11 @@
         this.wireFormat = wireFormat;
     }
 
-    public UsageManager getUsageManager() {
+    public SystemUsage getUsageManager() {
         return usageManager;
     }
 
-    public void setUsageManager(UsageManager usageManager) {
+    public void setUsageManager(SystemUsage usageManager) {
         this.usageManager = usageManager;
     }
 
@@ -688,5 +691,9 @@
      */
     public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
         this.referenceStoreAdapter = referenceStoreAdapter;
+    }
+    
+    public long size(){
+        return storeSize.get();
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Mon Aug 20 03:37:29 2007
@@ -25,9 +25,10 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.ByteSequenceData;
 import org.apache.activemq.util.IOExceptionSupport;
@@ -194,9 +195,10 @@
         return destination;
     }
 
-    public void setUsageManager(UsageManager usageManager) {
-        // we can ignore since we don't buffer up messages.
+    public void setMemoryUsage(MemoryUsage memoryUsage) {
+       //can ignore as messages aren't buffered
     }
+   
 
     public int getMessageCount() throws IOException {
         int result = 0;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Mon Aug 20 03:37:29 2007
@@ -34,7 +34,6 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -42,6 +41,7 @@
 import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
 import org.apache.activemq.store.memory.MemoryTransactionStore;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.FactoryFinder;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
@@ -447,7 +447,7 @@
      * @param usageManager The UsageManager that is controlling the
      *                destination's memory usage.
      */
-    public void setUsageManager(UsageManager usageManager) {
+    public void setUsageManager(SystemUsage usageManager) {
     }
 
     protected void databaseLockKeepAlive() {
@@ -492,5 +492,9 @@
     }
 
     public void checkpoint(boolean sync) throws IOException {
+    }
+
+    public long size(){
+        return 0;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java Mon Aug 20 03:37:29 2007
@@ -33,11 +33,12 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.Callback;
 import org.apache.activemq.util.TransactionTemplate;
 import org.apache.commons.logging.Log;
@@ -67,7 +68,7 @@
     private Map<MessageId, Message> cpAddedMessageIds;
 
 
-    private UsageManager usageManager;
+    private MemoryUsage memoryUsage;
 
     public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
         this.peristenceAdapter = adapter;
@@ -77,9 +78,10 @@
         this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
     }
 
-    public void setUsageManager(UsageManager usageManager) {
-        this.usageManager = usageManager;
-        longTermStore.setUsageManager(usageManager);
+    
+    public void setMemoryUsage(MemoryUsage memoryUsage) {
+        this.memoryUsage=memoryUsage;
+        longTermStore.setMemoryUsage(memoryUsage);
     }
 
     /**
@@ -351,16 +353,16 @@
     }
 
     public void start() throws Exception {
-        if (this.usageManager != null) {
-            this.usageManager.addUsageListener(peristenceAdapter);
+        if (this.memoryUsage != null) {
+            this.memoryUsage.addUsageListener(peristenceAdapter);
         }
         longTermStore.start();
     }
 
     public void stop() throws Exception {
         longTermStore.stop();
-        if (this.usageManager != null) {
-            this.usageManager.removeUsageListener(peristenceAdapter);
+        if (this.memoryUsage != null) {
+            this.memoryUsage.removeUsageListener(peristenceAdapter);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Mon Aug 20 03:37:29 2007
@@ -49,8 +49,6 @@
 import org.apache.activemq.command.JournalTransaction;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.memory.UsageListener;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -63,6 +61,9 @@
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.Usage;
+import org.apache.activemq.usage.UsageListener;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
@@ -89,7 +90,7 @@
     private final ConcurrentHashMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>();
     private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
 
-    private UsageManager usageManager;
+    private SystemUsage usageManager;
     private long checkpointInterval = 1000 * 60 * 5;
     private long lastCheckpointRequest = System.currentTimeMillis();
     private long lastCleanup = System.currentTimeMillis();
@@ -139,7 +140,7 @@
      * @param usageManager The UsageManager that is controlling the
      *                destination's memory usage.
      */
-    public void setUsageManager(UsageManager usageManager) {
+    public void setUsageManager(SystemUsage usageManager) {
         this.usageManager = usageManager;
         longTermPersistence.setUsageManager(usageManager);
     }
@@ -213,7 +214,7 @@
         });
         // checkpointExecutor.allowCoreThreadTimeOut(true);
 
-        this.usageManager.addUsageListener(this);
+        this.usageManager.getMemoryUsage().addUsageListener(this);
 
         if (longTermPersistence instanceof JDBCPersistenceAdapter) {
             // Disabled periodic clean up as it deadlocks with the checkpoint
@@ -232,7 +233,7 @@
 
     public void stop() throws Exception {
 
-        this.usageManager.removeUsageListener(this);
+        this.usageManager.getMemoryUsage().removeUsageListener(this);
         if (!started.compareAndSet(true, false)) {
             return;
         }
@@ -605,7 +606,7 @@
         return writeCommand(trace, sync);
     }
 
-    public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
+    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
         newPercentUsage = (newPercentUsage / 10) * 10;
         oldPercentUsage = (oldPercentUsage / 10) * 10;
         if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
@@ -633,7 +634,7 @@
         longTermPersistence.deleteAllMessages();
     }
 
-    public UsageManager getUsageManager() {
+    public SystemUsage getUsageManager() {
         return usageManager;
     }
 
@@ -681,6 +682,10 @@
     }
 
     public void setDirectory(File dir) {
+    }
+    
+    public long size(){
+        return 0;
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Mon Aug 20 03:37:29 2007
@@ -24,9 +24,10 @@
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.kaha.MapContainer;
 import org.apache.activemq.kaha.StoreEntry;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
 
 /**
  * An implementation of {@link org.apache.activemq.store.MessageStore} which
@@ -121,11 +122,7 @@
         messageContainer.clear();
     }
 
-    /**
-     * @param usageManager The UsageManager that is controlling the
-     *                destination's memory usage.
-     */
-    public void setUsageManager(UsageManager usageManager) {
+    public void setMemoryUsage(MemoryUsage memoryUsage) {
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Mon Aug 20 03:37:29 2007
@@ -22,6 +22,7 @@
 import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -38,12 +39,12 @@
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.kaha.StoreFactory;
 import org.apache.activemq.kaha.impl.StoreLockedExcpetion;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.IOHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -69,7 +70,16 @@
     private String brokerName;
     private Store theStore;
     private boolean initialized;
+    private final AtomicLong storeSize;
 
+    
+    public KahaPersistenceAdapter(AtomicLong size) {
+        this.storeSize=size;
+    }
+    
+    public KahaPersistenceAdapter() {
+        this(new AtomicLong());
+    }
     public Set<ActiveMQDestination> getDestinations() {
         Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
         try {
@@ -225,7 +235,7 @@
      * @param usageManager The UsageManager that is controlling the broker's
      *                memory usage.
      */
-    public void setUsageManager(UsageManager usageManager) {
+    public void setUsageManager(SystemUsage usageManager) {
     }
 
     /**
@@ -245,7 +255,7 @@
 
     protected synchronized Store getStore() throws IOException {
         if (theStore == null) {
-            theStore = StoreFactory.open(getStoreName(), "rw");
+            theStore = StoreFactory.open(getStoreName(), "rw",storeSize);
             theStore.setMaxDataFileLength(maxDataFileLength);
         }
         return theStore;
@@ -281,6 +291,10 @@
             getStore().force();
         }
     }
+   
+    public long size(){
+       return storeSize.get();
+    }
 
     private void initialize() {
         if (!initialized) {
@@ -295,5 +309,6 @@
             wireFormat.setTightEncodingEnabled(true);
         }
     }
+  
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Mon Aug 20 03:37:29 2007
@@ -24,9 +24,10 @@
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.kaha.MapContainer;
 import org.apache.activemq.kaha.StoreEntry;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.ReferenceStore;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
 
 public class KahaReferenceStore implements ReferenceStore {
 
@@ -171,9 +172,9 @@
         return messageContainer.size();
     }
 
-    public void setUsageManager(UsageManager usageManager) {
+    public void setMemoryUsage(MemoryUsage memoryUsage) {
     }
-
+    
     public boolean isSupportForCursors() {
         return true;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Mon Aug 20 03:37:29 2007
@@ -24,6 +24,7 @@
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -48,6 +49,8 @@
 
 public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter {
 
+    
+
     private static final Log LOG = LogFactory.getLog(KahaPersistenceAdapter.class);
     private static final String STORE_STATE = "store-state";
     private static final String RECORD_REFERENCES = "record-references";
@@ -59,6 +62,10 @@
     private boolean storeValid;
     private Store stateStore;
 
+    public KahaReferenceStoreAdapter(AtomicLong size){
+        super(size);
+    }
+    
     public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
         throw new RuntimeException("Use createQueueReferenceStore instead");
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Mon Aug 20 03:37:29 2007
@@ -28,9 +28,10 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
 
 /**
  * An implementation of {@link org.apache.activemq.store.MessageStore} which
@@ -125,13 +126,7 @@
         }
     }
 
-    /**
-     * @param usageManager The UsageManager that is controlling the
-     *                destination's memory usage.
-     */
-    public void setUsageManager(UsageManager usageManager) {
-    }
-
+    
     public int getMessageCount() {
         return messageTable.size();
     }
@@ -160,5 +155,14 @@
 
     public void resetBatching() {
         lastBatchId = null;
+    }
+
+    /**
+     * @param memoeyUSage
+     * @see org.apache.activemq.store.MessageStore#setMemoryUsage(org.apache.activemq.usage.MemoryUsage)
+     */
+    public void setMemoryUsage(MemoryUsage memoeyUSage){
+        // TODO Auto-generated method stub
+        
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java Mon Aug 20 03:37:29 2007
@@ -27,11 +27,11 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -151,7 +151,7 @@
      * @param usageManager The UsageManager that is controlling the broker's
      *                memory usage.
      */
-    public void setUsageManager(UsageManager usageManager) {
+    public void setUsageManager(SystemUsage usageManager) {
     }
 
     public String toString() {
@@ -165,5 +165,9 @@
     }
 
     public void checkpoint(boolean sync) throws IOException {
+    }
+    
+    public long size(){
+        return 0;
     }
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+
+
+/**
+ Identify if a limit has been reached
+ * 
+ * @org.apache.xbean.XBean
+ * 
+ * @version $Revision: 1.3 $
+ */
+public class DefaultUsageCapacity implements UsageCapacity{
+
+    private long limit;
+    
+    /**
+     * @param size
+     * @return true if the limit is reached
+     * @see org.apache.activemq.usage.UsageCapacity#isLimit(long)
+     */
+    public boolean isLimit(long size) {
+        return size >= limit;
+    }
+
+    
+    /**
+     * @return the limit
+     */
+    public final long getLimit(){
+        return this.limit;
+    }
+
+    
+    /**
+     * @param limit the limit to set
+     */
+    public final void setLimit(long limit){
+        this.limit=limit;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+
+/**
+ * Used to keep track of how much of something is being used so that a
+ * productive working set usage can be controlled.
+ * 
+ * Main use case is manage memory usage.
+ * 
+ * @org.apache.xbean.XBean
+ * 
+ * @version $Revision: 1.3 $
+ */
+public class MemoryUsage extends Usage{
+
+    private MemoryUsage parent;
+    private long usage;
+
+    public MemoryUsage(){
+        this(null,"default");
+    }
+
+    /**
+     * Create the memory manager linked to a parent. When the memory manager is
+     * linked to a parent then when usage increased or decreased, the parent's
+     * usage is also increased or decreased.
+     * 
+     * @param parent
+     */
+    public MemoryUsage(MemoryUsage parent){
+        this(parent,"default");
+    }
+
+    public MemoryUsage(String name){
+        this(null,name);
+    }
+
+    public MemoryUsage(MemoryUsage parent,String name){
+        this(parent,name,1.0f);
+    }
+
+    public MemoryUsage(MemoryUsage parent,String name,float portion){
+        super(parent,name,portion);
+    }
+    
+    /**
+     * @throws InterruptedException
+     */
+    public void waitForSpace() throws InterruptedException{
+        if(parent!=null){
+            parent.waitForSpace();
+        }
+        synchronized(usageMutex){
+            for(int i=0;percentUsage>=100;i++){
+                usageMutex.wait();
+            }
+        }
+    }
+
+    /**
+     * @param timeout 
+     * @throws InterruptedException
+     * 
+     * @return true if space
+     */
+    public boolean waitForSpace(long timeout) throws InterruptedException{
+        if(parent!=null){
+            if(!parent.waitForSpace(timeout)){
+                return false;
+            }
+        }
+        synchronized(usageMutex){
+            if(percentUsage>=100){
+                usageMutex.wait(timeout);
+            }
+            return percentUsage<100;
+        }
+    }
+    
+    public boolean isFull(){
+        if(parent!=null&&parent.isFull()){
+            return true;
+        }
+        synchronized(usageMutex){
+            return percentUsage>=100;
+        }
+    }
+
+    /**
+     * Tries to increase the usage by value amount but blocks if this object is
+     * currently full.
+     * @param value 
+     * 
+     * @throws InterruptedException
+     */
+    public void enqueueUsage(long value) throws InterruptedException{
+        waitForSpace();
+        increaseUsage(value);
+    }
+
+    /**
+     * Increases the usage by the value amount.
+     * 
+     * @param value
+     */
+    public void increaseUsage(long value){
+        if(value==0){
+            return;
+        }
+        if(parent!=null){
+            parent.increaseUsage(value);
+        }
+        int percentUsage;
+        synchronized(usageMutex){
+            usage+=value;
+            percentUsage=caclPercentUsage();
+        }
+        setPercentUsage(percentUsage);
+    }
+
+    /**
+     * Decreases the usage by the value amount.
+     * 
+     * @param value
+     */
+    public void decreaseUsage(long value){
+        if(value==0){
+            return;
+        }
+        if(parent!=null){
+            parent.decreaseUsage(value);
+        }
+        int percentUsage;
+        synchronized(usageMutex){
+            usage-=value;
+            percentUsage=caclPercentUsage();
+        }
+        setPercentUsage(percentUsage);
+    }
+
+    protected long retrieveUsage(){
+        return usage;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+import org.apache.activemq.store.PersistenceAdapter;
+
+/**
+ * Used to keep track of how much of something is being used so that a
+ * productive working set usage can be controlled.
+ * 
+ * Main use case is manage memory usage.
+ * 
+ * @org.apache.xbean.XBean
+ * 
+ * @version $Revision: 1.3 $
+ */
+public class StoreUsage extends Usage{
+
+    final private PersistenceAdapter store;
+
+    public StoreUsage(String name,PersistenceAdapter store){
+        super(null,name,1.0f);
+        this.store=store;
+    }
+    
+    public StoreUsage(StoreUsage parent,String name){
+        super(parent,name,1.0f);
+        this.store=parent.store;
+    }
+
+    protected long retrieveUsage(){
+        return store.size();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.activemq.Service;
+import org.apache.activemq.kaha.Store;
+import org.apache.activemq.store.PersistenceAdapter;
+
+
+/**
+ * Holder for Usage instances for memory, store and temp files
+ * 
+ * Main use case is manage memory usage.
+ * 
+ * @org.apache.xbean.XBean
+ * 
+ * @version $Revision: 1.3 $
+ */
+public class SystemUsage implements Service{
+
+    private final SystemUsage parent;
+    private final String name;
+    private final MemoryUsage memoryUsage;
+    private final StoreUsage storeUsage;
+    private final TempUsage tempDiskUsage;
+    /**
+     * True if someone called setSendFailIfNoSpace() on this particular usage
+     * manager
+     */
+    private boolean sendFailIfNoSpaceExplicitySet;
+    private boolean sendFailIfNoSpace;
+    private List<SystemUsage> children=new CopyOnWriteArrayList<SystemUsage>();
+
+    public SystemUsage(){
+        this.parent=null;
+        this.name="default";
+        this.memoryUsage=new MemoryUsage(name+":memory");
+        this.storeUsage=null;
+        this.tempDiskUsage=null;
+    }
+
+    public SystemUsage(String name,PersistenceAdapter adapter,Store tempStore){
+        this.parent=null;
+        this.name=name;
+        this.memoryUsage=new MemoryUsage(name+":memory");
+        this.storeUsage=new StoreUsage(name+":store",adapter);
+        this.tempDiskUsage=new TempUsage(name+":temp",tempStore);
+    }
+
+    public SystemUsage(SystemUsage parent,String name){
+        this.parent=parent;
+        this.name=name;
+        this.memoryUsage=new MemoryUsage(parent.memoryUsage,name+":memory");
+        this.storeUsage=new StoreUsage(parent.storeUsage,name+":store");
+        this.tempDiskUsage=new TempUsage(parent!=null?parent.tempDiskUsage:null,name+":temp");
+    }
+
+    public String getName(){
+        return name;
+    }
+
+    /**
+     * @return the memoryUsage
+     */
+    public MemoryUsage getMemoryUsage(){
+        return this.memoryUsage;
+    }
+
+    /**
+     * @return the storeUsage
+     */
+    public StoreUsage getStoreUsage(){
+        return this.storeUsage;
+    }
+
+    /**
+     * @return the tempDiskUsage
+     */
+    public TempUsage getTempDiskUsage(){
+        return this.tempDiskUsage;
+    }
+
+    public String toString(){
+        return "UsageManager("+getName()+")";
+    }
+
+    public void start(){
+        if(parent!=null){
+            parent.addChild(this);
+        }
+        this.memoryUsage.start();
+        this.storeUsage.start();
+        this.tempDiskUsage.start();
+    }
+
+    public void stop(){
+        if(parent!=null){
+            parent.removeChild(this);
+        }
+        this.memoryUsage.stop();
+        this.storeUsage.stop();
+        this.tempDiskUsage.stop();
+    }
+    
+    /**
+     * Sets whether or not a send() should fail if there is no space free. The
+     * default value is false which means to block the send() method until space
+     * becomes available
+     */
+    public void setSendFailIfNoSpace(boolean failProducerIfNoSpace) {
+        sendFailIfNoSpaceExplicitySet = true;
+        this.sendFailIfNoSpace = failProducerIfNoSpace;
+    }
+
+    public boolean isSendFailIfNoSpace() {
+        if (sendFailIfNoSpaceExplicitySet || parent == null) {
+            return sendFailIfNoSpace;
+        } else {
+            return parent.isSendFailIfNoSpace();
+        }
+    }
+
+    private void addChild(SystemUsage child){
+        children.add(child);
+    }
+
+    private void removeChild(SystemUsage child){
+        children.remove(child);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
------------------------------------------------------------------------------
    svn:executable = *