You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/08/20 12:37:34 UTC
svn commit: r567647 [2/3] - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/jmx/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/...
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java Mon Aug 20 03:37:29 2007
@@ -17,6 +17,7 @@
package org.apache.activemq.kaha;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.kaha.impl.KahaStore;
@@ -39,7 +40,19 @@
* @throws IOException
*/
public static Store open(String name, String mode) throws IOException {
- return new KahaStore(name, mode);
+ return new KahaStore(name, mode,new AtomicLong());
+ }
+
+ /**
+ * open or create a Store
+ * @param name
+ * @param mode
+ * @param size
+ * @return the opened/created store
+ * @throws IOException
+ */
+ public static Store open(String name, String mode, AtomicLong size) throws IOException {
+ return new KahaStore(name, mode,size);
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java Mon Aug 20 03:37:29 2007
@@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
@@ -79,9 +80,16 @@
private FileLock lock;
private boolean persistentIndex = true;
private RandomAccessFile lockFile;
+ private final AtomicLong storeSize;
+
public KahaStore(String name, String mode) throws IOException {
+ this(name,mode,new AtomicLong());
+ }
+
+ public KahaStore(String name, String mode,AtomicLong storeSize) throws IOException {
this.mode = mode;
+ this.storeSize = storeSize;
directory = new File(name);
directory.mkdirs();
}
@@ -337,14 +345,14 @@
DataManager dm = dataManagers.get(name);
if (dm == null) {
if (isUseAsyncDataManager()) {
- AsyncDataManager t = new AsyncDataManager();
+ AsyncDataManager t = new AsyncDataManager(storeSize);
t.setDirectory(directory);
t.setFilePrefix("async-data-" + name + "-");
t.setMaxFileLength((int)maxDataFileLength);
t.start();
dm = new DataManagerFacade(t, name);
} else {
- DataManagerImpl t = new DataManagerImpl(directory, name);
+ DataManagerImpl t = new DataManagerImpl(directory, name,storeSize);
t.setMaxFileLength(maxDataFileLength);
dm = t;
}
@@ -359,7 +367,7 @@
public synchronized IndexManager getIndexManager(DataManager dm, String name) throws IOException {
IndexManager im = indexManagers.get(name);
if (im == null) {
- im = new IndexManager(directory, name, mode, logIndexChanges ? dm : null);
+ im = new IndexManager(directory, name, mode, logIndexChanges ? dm : null,storeSize);
indexManagers.put(name, im);
}
return im;
@@ -546,6 +554,14 @@
public synchronized void setUseAsyncDataManager(boolean useAsyncWriter) {
this.useAsyncDataManager = useAsyncWriter;
+ }
+
+ /**
+ * @return
+ * @see org.apache.activemq.kaha.Store#size()
+ */
+ public long size(){
+ return storeSize.get();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Mon Aug 20 03:37:29 2007
@@ -32,6 +32,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
@@ -89,6 +90,15 @@
private Location mark;
private final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
private Runnable cleanupTask;
+ private final AtomicLong storeSize;
+
+ public AsyncDataManager(AtomicLong storeSize) {
+ this.storeSize=storeSize;
+ }
+
+ public AsyncDataManager() {
+ this(new AtomicLong());
+ }
@SuppressWarnings("unchecked")
public synchronized void start() throws IOException {
@@ -128,6 +138,7 @@
int num = Integer.parseInt(numStr);
DataFile dataFile = new DataFile(file, num, preferedFileLength);
fileMap.put(dataFile.getDataFileId(), dataFile);
+ storeSize.addAndGet(dataFile.getLength());
} catch (NumberFormatException e) {
// Ignore file that do not match the pattern.
}
@@ -249,8 +260,10 @@
}
location.setOffset(currentWriteFile.getLength());
location.setDataFileId(currentWriteFile.getDataFileId().intValue());
- currentWriteFile.incrementLength(location.getSize());
+ int size = location.getSize();
+ currentWriteFile.incrementLength(size);
currentWriteFile.increment();
+ storeSize.addAndGet(size);
return currentWriteFile;
}
@@ -297,6 +310,7 @@
boolean result = true;
for (Iterator i = fileMap.values().iterator(); i.hasNext();) {
DataFile dataFile = (DataFile)i.next();
+ storeSize.addAndGet(-dataFile.getLength());
result &= dataFile.delete();
}
fileMap.clear();
@@ -387,6 +401,7 @@
accessorPool.disposeDataFileAccessors(dataFile);
fileMap.remove(dataFile.getDataFileId());
+ storeSize.addAndGet(-dataFile.getLength());
dataFile.unlink();
boolean result = dataFile.delete();
LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed"));
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java Mon Aug 20 03:37:29 2007
@@ -24,6 +24,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreLocation;
@@ -57,10 +58,12 @@
private Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
private Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER;
private String dataFilePrefix;
+ private final AtomicLong storeSize;
- public DataManagerImpl(File dir, final String name) {
+ public DataManagerImpl(File dir, final String name,AtomicLong storeSize) {
this.directory = dir;
this.name = name;
+ this.storeSize=storeSize;
dataFilePrefix = NAME_PREFIX + name + "-";
// build up list of current dataFiles
@@ -76,6 +79,7 @@
String numStr = n.substring(dataFilePrefix.length(), n.length());
int num = Integer.parseInt(numStr);
DataFile dataFile = new DataFile(file, num);
+ storeSize.addAndGet(dataFile.getLength());
fileMap.put(dataFile.getNumber(), dataFile);
if (currentWriteFile == null || currentWriteFile.getNumber().intValue() < num) {
currentWriteFile = dataFile;
@@ -111,7 +115,9 @@
}
item.setOffset(currentWriteFile.getLength());
item.setFile(currentWriteFile.getNumber().intValue());
- currentWriteFile.incrementLength(item.getSize() + ITEM_HEAD_SIZE);
+ int len = item.getSize() + ITEM_HEAD_SIZE;
+ currentWriteFile.incrementLength(len);
+ storeSize.addAndGet(len);
return currentWriteFile;
}
@@ -250,6 +256,7 @@
boolean result = true;
for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
DataFile dataFile = i.next();
+ storeSize.addAndGet(-dataFile.getLength());
result &= dataFile.delete();
}
fileMap.clear();
@@ -325,6 +332,7 @@
if (writer != null) {
writer.force(dataFile);
}
+ storeSize.addAndGet(-dataFile.getLength());
boolean result = dataFile.delete();
LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed"));
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java Mon Aug 20 03:37:29 2007
@@ -19,6 +19,7 @@
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.kaha.impl.DataManager;
import org.apache.commons.logging.Log;
@@ -45,12 +46,14 @@
private IndexItem firstFree;
private IndexItem lastFree;
private boolean dirty;
+ private final AtomicLong storeSize;
- public IndexManager(File directory, String name, String mode, DataManager redoLog) throws IOException {
+ public IndexManager(File directory, String name, String mode, DataManager redoLog, AtomicLong storeSize) throws IOException {
this.directory = directory;
this.name = name;
this.mode = mode;
this.redoLog = redoLog;
+ this.storeSize=storeSize;
initialize();
}
@@ -106,6 +109,7 @@
result = new IndexItem();
result.setOffset(length);
length += IndexItem.INDEX_SIZE;
+ storeSize.addAndGet(IndexItem.INDEX_SIZE);
}
return result;
}
@@ -156,9 +160,14 @@
synchronized long getLength() {
return length;
}
+
+ public final long size() {
+ return length;
+ }
public synchronized void setLength(long value) {
this.length = value;
+ storeSize.addAndGet(length);
}
public String toString() {
@@ -187,5 +196,6 @@
offset += IndexItem.INDEX_SIZE;
}
length = offset;
+ storeSize.addAndGet(length);
}
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/Cache.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/Cache.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/Cache.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/Cache.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory;
+
+/**
+ * Defines the interface used to cache messages.
+ *
+ * @version $Revision$
+ */
+public interface Cache {
+
+ /**
+ * Gets an object that was previously <code>put</code> into this object.
+ *
+ * @param msgid
+ * @return null if the object was not previously put or if the object has
+ * expired out of the cache.
+ */
+ Object get(Object key);
+
+ /**
+ * Puts an object into the cache.
+ *
+ * @param messageID
+ * @param message
+ */
+ Object put(Object key, Object value);
+
+ /**
+ * Removes an object from the cache.
+ *
+ * @param messageID
+ * @return the object associated with the key if it was still in the cache.
+ */
+ Object remove(Object key);
+
+ /**
+ * Lets a cache know it will not be used any further and that it can release
+ * acquired resources
+ */
+ void close();
+
+ /**
+ * How big is the cache right now?
+ *
+ * @return
+ */
+ int size();
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/Cache.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/Cache.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntry.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntry.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntry.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory;
+
+public class CacheEntry {
+
+ public final Object key;
+ public final Object value;
+
+ public CacheEntry next;
+ public CacheEntry previous;
+ public CacheEntryList owner;
+
+ public CacheEntry(Object key, Object value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ /**
+ *
+ * @param entry
+ * @return false if you are trying to remove the tail pointer.
+ */
+ public boolean remove() {
+
+ // Cannot remove if this is a tail pointer.
+ // Or not linked.
+ if (owner == null || this.key == null || this.next == null) {
+ return false;
+ }
+
+ synchronized (owner.tail) {
+ this.next.previous = this.previous;
+ this.previous.next = this.next;
+ this.owner = null;
+ this.next = null;
+ this.previous = null;
+ }
+
+ return true;
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntry.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntry.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntryList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntryList.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntryList.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntryList.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory;
+
+/**
+ * Maintains a simple linked list of CacheEntry objects. It is thread safe.
+ *
+ * @version $Revision$
+ */
+public class CacheEntryList {
+
+ // Points at the tail of the CacheEntry list
+ public final CacheEntry tail = new CacheEntry(null, null);
+
+ public CacheEntryList() {
+ tail.next = tail;
+ tail.previous = tail;
+ }
+
+ public void add(CacheEntry ce) {
+ addEntryBefore(tail, ce);
+ }
+
+ private void addEntryBefore(CacheEntry position, CacheEntry ce) {
+ assert ce.key != null && ce.next == null && ce.owner == null;
+
+ synchronized (tail) {
+ ce.owner = this;
+ ce.next = position;
+ ce.previous = position.previous;
+ ce.previous.next = ce;
+ ce.next.previous = ce;
+ }
+ }
+
+ public void clear() {
+ synchronized (tail) {
+ tail.next = tail;
+ tail.previous = tail;
+ }
+ }
+
+ public CacheEvictor createFIFOCacheEvictor() {
+ return new CacheEvictor() {
+ public CacheEntry evictCacheEntry() {
+ CacheEntry rc;
+ synchronized (tail) {
+ rc = tail.next;
+ }
+ return rc.remove() ? rc : null;
+ }
+ };
+ }
+
+ public CacheEvictor createLIFOCacheEvictor() {
+ return new CacheEvictor() {
+ public CacheEntry evictCacheEntry() {
+ CacheEntry rc;
+ synchronized (tail) {
+ rc = tail.previous;
+ }
+ return rc.remove() ? rc : null;
+ }
+ };
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntryList.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntryList.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.Usage;
+import org.apache.activemq.usage.UsageListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class CacheEvictionUsageListener implements UsageListener {
+
+ private static final Log LOG = LogFactory.getLog(CacheEvictionUsageListener.class);
+
+ private final List<CacheEvictor> evictors = new CopyOnWriteArrayList<CacheEvictor>();
+ private final int usageHighMark;
+ private final int usageLowMark;
+
+ private final TaskRunner evictionTask;
+ private final Usage usage;
+
+ public CacheEvictionUsageListener(Usage usage, int usageHighMark, int usageLowMark, TaskRunnerFactory taskRunnerFactory) {
+ this.usage = usage;
+ this.usageHighMark = usageHighMark;
+ this.usageLowMark = usageLowMark;
+ evictionTask = taskRunnerFactory.createTaskRunner(new Task() {
+ public boolean iterate() {
+ return evictMessages();
+ }
+ }, "Cache Evictor: " + System.identityHashCode(this));
+ }
+
+ boolean evictMessages() {
+ // Try to take the memory usage down below the low mark.
+ LOG.debug("Evicting cache memory usage: " + usage.getPercentUsage());
+
+ List<CacheEvictor> list = new LinkedList<CacheEvictor>(evictors);
+ while (list.size() > 0 && usage.getPercentUsage() > usageLowMark) {
+
+ // Evenly evict messages from all evictors
+ for (Iterator<CacheEvictor> iter = list.iterator(); iter.hasNext();) {
+ CacheEvictor evictor = iter.next();
+ if (evictor.evictCacheEntry() == null) {
+ iter.remove();
+ }
+ }
+ }
+ return false;
+ }
+
+ public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
+ // Do we need to start evicting cache entries? Usage > than the
+ // high mark
+ if (oldPercentUsage < newPercentUsage && usage.getPercentUsage() >= usageHighMark) {
+ try {
+ evictionTask.wakeup();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public void add(CacheEvictor evictor) {
+ evictors.add(evictor);
+ }
+
+ public void remove(CacheEvictor evictor) {
+ evictors.remove(evictor);
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictor.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictor.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictor.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory;
+
+public interface CacheEvictor {
+
+ CacheEntry evictCacheEntry();
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictor.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheFilter.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheFilter.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheFilter.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory;
+
+
+/**
+ * Filters another Cache implementation.
+ *
+ * @version $Revision$
+ */
+public class CacheFilter implements Cache {
+
+ protected final Cache next;
+
+ public CacheFilter(Cache next) {
+ this.next = next;
+ }
+
+ public Object put(Object key, Object value) {
+ return next.put(key, value);
+ }
+
+ public Object get(Object key) {
+ return next.get(key);
+ }
+
+ public Object remove(Object key) {
+ return next.remove(key);
+ }
+
+ public void close() {
+ next.close();
+ }
+
+ public int size() {
+ return next.size();
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheFilter.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheFilter.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/LRUMap.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/LRUMap.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/LRUMap.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/LRUMap.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A simple least-recently-used cache of a fixed size.
+ *
+ * @version $Revision:$
+ */
+public class LRUMap extends LinkedHashMap {
+
+ protected static final float DEFAULT_LOAD_FACTOR = (float) 0.75;
+ protected static final int DEFAULT_INITIAL_CAPACITY = 5000;
+ private static final long serialVersionUID = -9179676638408888162L;
+
+ private int maximumSize;
+
+ public LRUMap(int maximumSize) {
+ this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, true, maximumSize);
+ }
+
+ public LRUMap(int maximumSize, boolean accessOrder) {
+ this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, accessOrder, maximumSize);
+ }
+
+ public LRUMap(int initialCapacity, float loadFactor, boolean accessOrder, int maximumSize) {
+ super(initialCapacity, loadFactor, accessOrder);
+ this.maximumSize = maximumSize;
+ }
+
+ protected boolean removeEldestEntry(Map.Entry eldest) {
+ return size() > maximumSize;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/LRUMap.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/LRUMap.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/MapCache.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/MapCache.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/MapCache.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/MapCache.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory;
+
+import java.util.Map;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Use any Map to implement the Cache. No cache eviction going on here. Just gives
+ * a Map a Cache interface.
+ *
+ * @version $Revision$
+ */
+public class MapCache implements Cache {
+
+ protected final Map<Object, Object> map;
+
+ public MapCache() {
+ this(new ConcurrentHashMap<Object, Object>());
+ }
+
+ public MapCache(Map<Object, Object> map) {
+ this.map = map;
+ }
+
+ public Object put(Object key, Object value) {
+ return map.put(key, value);
+ }
+
+ public Object get(Object key) {
+ return map.get(key);
+ }
+
+ public Object remove(Object key) {
+ return map.remove(key);
+ }
+
+ public void close() {
+ map.clear();
+ }
+
+ public int size() {
+ return map.size();
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/MapCache.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/MapCache.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManagerCacheFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManagerCacheFilter.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManagerCacheFilter.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManagerCacheFilter.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.memory;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.usage.MemoryUsage;
+
+/**
+ * Simple CacheFilter that increases/decreases usage on a UsageManager as
+ * objects are added/removed from the Cache.
+ *
+ * @version $Revision$
+ */
+public class UsageManagerCacheFilter extends CacheFilter {
+
+ private final AtomicLong totalUsage = new AtomicLong(0);
+ private final MemoryUsage usage;
+
+ public UsageManagerCacheFilter(Cache next, MemoryUsage um) {
+ super(next);
+ this.usage = um;
+ }
+
+ public Object put(Object key, Object value) {
+ long usageValue = getUsageOfAddedObject(value);
+ Object rc = super.put(key, value);
+ if (rc != null) {
+ usageValue -= getUsageOfRemovedObject(rc);
+ }
+ totalUsage.addAndGet(usageValue);
+ usage.increaseUsage(usageValue);
+ return rc;
+ }
+
+ public Object remove(Object key) {
+ Object rc = super.remove(key);
+ if (rc != null) {
+ long usageValue = getUsageOfRemovedObject(rc);
+ totalUsage.addAndGet(-usageValue);
+ usage.decreaseUsage(usageValue);
+ }
+ return rc;
+ }
+
+ protected long getUsageOfAddedObject(Object value) {
+ return 1;
+ }
+
+ protected long getUsageOfRemovedObject(Object value) {
+ return 1;
+ }
+
+ public void close() {
+ usage.decreaseUsage(totalUsage.get());
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManagerCacheFilter.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManagerCacheFilter.java
------------------------------------------------------------------------------
svn:executable = *
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java Mon Aug 20 03:37:29 2007
@@ -24,7 +24,8 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
/**
* Represents a message store which is used by the persistent implementations
@@ -88,10 +89,10 @@
ActiveMQDestination getDestination();
/**
- * @param usageManager The UsageManager that is controlling the
+ * @param memoeyUSage The SystemUsage that is controlling the
* destination's memory usage.
*/
- void setUsageManager(UsageManager usageManager);
+ void setMemoryUsage(MemoryUsage memoeyUSage);
/**
* @return the number of messages ready to deliver
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java Mon Aug 20 03:37:29 2007
@@ -25,7 +25,7 @@
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.SystemUsage;
/**
* Adapter to the actual persistence mechanism used with ActiveMQ
@@ -115,7 +115,7 @@
/**
* @param usageManager The UsageManager that is controlling the broker's memory usage.
*/
- void setUsageManager(UsageManager usageManager);
+ void setUsageManager(SystemUsage usageManager);
/**
* Set the name of the broker using the adapter
@@ -136,4 +136,10 @@
*
*/
void checkpoint(boolean sync) throws IOException;
+
+ /**
+ * A hint to return the size of the store on disk
+ * @return disk space used in bytes of 0 if not implemented
+ */
+ long size();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java Mon Aug 20 03:37:29 2007
@@ -17,13 +17,12 @@
package org.apache.activemq.store;
import java.io.IOException;
-
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.MemoryUsage;
/**
* A simple proxy that delegates to another MessageStore.
@@ -72,8 +71,8 @@
return delegate.getDestination();
}
- public void setUsageManager(UsageManager usageManager) {
- delegate.setUsageManager(usageManager);
+ public void setMemoryUsage(MemoryUsage memoryUsage) {
+ delegate.setMemoryUsage(memoryUsage);
}
public int getMessageCount() throws IOException {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Mon Aug 20 03:37:29 2007
@@ -24,7 +24,8 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
/**
* A simple proxy that delegates to another MessageStore.
@@ -108,8 +109,8 @@
return delegate.getAllSubscriptions();
}
- public void setUsageManager(UsageManager usageManager) {
- delegate.setUsageManager(usageManager);
+ public void setMemoryUsage(MemoryUsage memoryUsage) {
+ delegate.setMemoryUsage(memoryUsage);
}
public int getMessageCount(String clientId, String subscriberName) throws IOException {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Mon Aug 20 03:37:29 2007
@@ -38,7 +38,6 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.impl.async.Location;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
@@ -47,6 +46,8 @@
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.TransactionTemplate;
import org.apache.commons.logging.Log;
@@ -94,15 +95,15 @@
}, "Checkpoint: " + destination);
}
- public void setUsageManager(UsageManager usageManager) {
- referenceStore.setUsageManager(usageManager);
+ public void setMemoryUsage(MemoryUsage memoryUsage) {
+ referenceStore.setMemoryUsage(memoryUsage);
}
/**
* Not synchronized since the Journal has better throughput if you increase
* the number of concurrent writes that it is doing.
*/
- public void addMessage(ConnectionContext context, final Message message) throws IOException {
+ public final void addMessage(ConnectionContext context, final Message message) throws IOException {
final MessageId id = message.getMessageId();
final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
if (!context.isInTransaction()) {
@@ -142,7 +143,7 @@
}
}
- void addMessage(final Message message, final Location location) throws InterruptedIOException {
+ final void addMessage(final Message message, final Location location) throws InterruptedIOException {
ReferenceData data = new ReferenceData();
data.setExpiration(message.getExpiration());
data.setFileId(location.getDataFileId());
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Mon Aug 20 03:37:29 2007
@@ -25,6 +25,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
@@ -40,8 +41,6 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.Location;
-import org.apache.activemq.memory.UsageListener;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
@@ -56,6 +55,9 @@
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.Usage;
+import org.apache.activemq.usage.UsageListener;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
@@ -80,7 +82,7 @@
private ReferenceStoreAdapter referenceStoreAdapter;
private TaskRunnerFactory taskRunnerFactory;
private WireFormat wireFormat = new OpenWireFormat();
- private UsageManager usageManager;
+ private SystemUsage usageManager;
private long cleanupInterval = 1000 * 60;
private long checkpointInterval = 1000 * 10;
private int maxCheckpointWorkers = 1;
@@ -96,6 +98,7 @@
private String brokerName = "";
private File directory;
private BrokerService brokerService;
+ private AtomicLong storeSize = new AtomicLong();
public String getBrokerName() {
return this.brokerName;
@@ -132,7 +135,7 @@
this.directory.mkdirs();
if (this.usageManager != null) {
- this.usageManager.addUsageListener(this);
+ this.usageManager.getMemoryUsage().addUsageListener(this);
}
if (asyncDataManager == null) {
asyncDataManager = createAsyncDataManager();
@@ -217,7 +220,7 @@
if (!started.compareAndSet(true, false)) {
return;
}
- this.usageManager.removeUsageListener(this);
+ this.usageManager.getMemoryUsage().removeUsageListener(this);
synchronized (this) {
Scheduler.cancel(periodicCheckpointTask);
Scheduler.cancel(periodicCleanupTask);
@@ -571,7 +574,7 @@
return writeCommand(trace, sync);
}
- public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
+ public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
newPercentUsage = (newPercentUsage / 10) * 10;
oldPercentUsage = (oldPercentUsage / 10) * 10;
if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
@@ -595,13 +598,13 @@
// Subclass overridables
// /////////////////////////////////////////////////////////////////
protected AsyncDataManager createAsyncDataManager() {
- AsyncDataManager manager = new AsyncDataManager();
+ AsyncDataManager manager = new AsyncDataManager(storeSize);
manager.setDirectory(new File(directory, "journal"));
return manager;
}
protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
- KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter();
+ KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(storeSize);
return adaptor;
}
@@ -643,11 +646,11 @@
this.wireFormat = wireFormat;
}
- public UsageManager getUsageManager() {
+ public SystemUsage getUsageManager() {
return usageManager;
}
- public void setUsageManager(UsageManager usageManager) {
+ public void setUsageManager(SystemUsage usageManager) {
this.usageManager = usageManager;
}
@@ -688,5 +691,9 @@
*/
public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
this.referenceStoreAdapter = referenceStoreAdapter;
+ }
+
+ public long size(){
+ return storeSize.get();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Mon Aug 20 03:37:29 2007
@@ -25,9 +25,10 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.IOExceptionSupport;
@@ -194,9 +195,10 @@
return destination;
}
- public void setUsageManager(UsageManager usageManager) {
- // we can ignore since we don't buffer up messages.
+ public void setMemoryUsage(MemoryUsage memoryUsage) {
+ //can ignore as messages aren't buffered
}
+
public int getMessageCount() throws IOException {
int result = 0;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Mon Aug 20 03:37:29 2007
@@ -34,7 +34,6 @@
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
@@ -42,6 +41,7 @@
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.apache.activemq.store.memory.MemoryTransactionStore;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
@@ -447,7 +447,7 @@
* @param usageManager The UsageManager that is controlling the
* destination's memory usage.
*/
- public void setUsageManager(UsageManager usageManager) {
+ public void setUsageManager(SystemUsage usageManager) {
}
protected void databaseLockKeepAlive() {
@@ -492,5 +492,9 @@
}
public void checkpoint(boolean sync) throws IOException {
+ }
+
+ public long size(){
+ return 0;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java Mon Aug 20 03:37:29 2007
@@ -33,11 +33,12 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.TransactionTemplate;
import org.apache.commons.logging.Log;
@@ -67,7 +68,7 @@
private Map<MessageId, Message> cpAddedMessageIds;
- private UsageManager usageManager;
+ private MemoryUsage memoryUsage;
public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
this.peristenceAdapter = adapter;
@@ -77,9 +78,10 @@
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
}
- public void setUsageManager(UsageManager usageManager) {
- this.usageManager = usageManager;
- longTermStore.setUsageManager(usageManager);
+
+ public void setMemoryUsage(MemoryUsage memoryUsage) {
+ this.memoryUsage=memoryUsage;
+ longTermStore.setMemoryUsage(memoryUsage);
}
/**
@@ -351,16 +353,16 @@
}
public void start() throws Exception {
- if (this.usageManager != null) {
- this.usageManager.addUsageListener(peristenceAdapter);
+ if (this.memoryUsage != null) {
+ this.memoryUsage.addUsageListener(peristenceAdapter);
}
longTermStore.start();
}
public void stop() throws Exception {
longTermStore.stop();
- if (this.usageManager != null) {
- this.usageManager.removeUsageListener(peristenceAdapter);
+ if (this.memoryUsage != null) {
+ this.memoryUsage.removeUsageListener(peristenceAdapter);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Mon Aug 20 03:37:29 2007
@@ -49,8 +49,6 @@
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.memory.UsageListener;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
@@ -63,6 +61,9 @@
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.Usage;
+import org.apache.activemq.usage.UsageListener;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
@@ -89,7 +90,7 @@
private final ConcurrentHashMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>();
private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
- private UsageManager usageManager;
+ private SystemUsage usageManager;
private long checkpointInterval = 1000 * 60 * 5;
private long lastCheckpointRequest = System.currentTimeMillis();
private long lastCleanup = System.currentTimeMillis();
@@ -139,7 +140,7 @@
* @param usageManager The UsageManager that is controlling the
* destination's memory usage.
*/
- public void setUsageManager(UsageManager usageManager) {
+ public void setUsageManager(SystemUsage usageManager) {
this.usageManager = usageManager;
longTermPersistence.setUsageManager(usageManager);
}
@@ -213,7 +214,7 @@
});
// checkpointExecutor.allowCoreThreadTimeOut(true);
- this.usageManager.addUsageListener(this);
+ this.usageManager.getMemoryUsage().addUsageListener(this);
if (longTermPersistence instanceof JDBCPersistenceAdapter) {
// Disabled periodic clean up as it deadlocks with the checkpoint
@@ -232,7 +233,7 @@
public void stop() throws Exception {
- this.usageManager.removeUsageListener(this);
+ this.usageManager.getMemoryUsage().removeUsageListener(this);
if (!started.compareAndSet(true, false)) {
return;
}
@@ -605,7 +606,7 @@
return writeCommand(trace, sync);
}
- public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
+ public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
newPercentUsage = (newPercentUsage / 10) * 10;
oldPercentUsage = (oldPercentUsage / 10) * 10;
if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
@@ -633,7 +634,7 @@
longTermPersistence.deleteAllMessages();
}
- public UsageManager getUsageManager() {
+ public SystemUsage getUsageManager() {
return usageManager;
}
@@ -681,6 +682,10 @@
}
public void setDirectory(File dir) {
+ }
+
+ public long size(){
+ return 0;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Mon Aug 20 03:37:29 2007
@@ -24,9 +24,10 @@
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.StoreEntry;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
/**
* An implementation of {@link org.apache.activemq.store.MessageStore} which
@@ -121,11 +122,7 @@
messageContainer.clear();
}
- /**
- * @param usageManager The UsageManager that is controlling the
- * destination's memory usage.
- */
- public void setUsageManager(UsageManager usageManager) {
+ public void setMemoryUsage(MemoryUsage memoryUsage) {
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Mon Aug 20 03:37:29 2007
@@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@@ -38,12 +39,12 @@
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory;
import org.apache.activemq.kaha.impl.StoreLockedExcpetion;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.IOHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -69,7 +70,16 @@
private String brokerName;
private Store theStore;
private boolean initialized;
+ private final AtomicLong storeSize;
+
+ public KahaPersistenceAdapter(AtomicLong size) {
+ this.storeSize=size;
+ }
+
+ public KahaPersistenceAdapter() {
+ this(new AtomicLong());
+ }
public Set<ActiveMQDestination> getDestinations() {
Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
try {
@@ -225,7 +235,7 @@
* @param usageManager The UsageManager that is controlling the broker's
* memory usage.
*/
- public void setUsageManager(UsageManager usageManager) {
+ public void setUsageManager(SystemUsage usageManager) {
}
/**
@@ -245,7 +255,7 @@
protected synchronized Store getStore() throws IOException {
if (theStore == null) {
- theStore = StoreFactory.open(getStoreName(), "rw");
+ theStore = StoreFactory.open(getStoreName(), "rw",storeSize);
theStore.setMaxDataFileLength(maxDataFileLength);
}
return theStore;
@@ -281,6 +291,10 @@
getStore().force();
}
}
+
+ public long size(){
+ return storeSize.get();
+ }
private void initialize() {
if (!initialized) {
@@ -295,5 +309,6 @@
wireFormat.setTightEncodingEnabled(true);
}
}
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Mon Aug 20 03:37:29 2007
@@ -24,9 +24,10 @@
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.StoreEntry;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.ReferenceStore;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
public class KahaReferenceStore implements ReferenceStore {
@@ -171,9 +172,9 @@
return messageContainer.size();
}
- public void setUsageManager(UsageManager usageManager) {
+ public void setMemoryUsage(MemoryUsage memoryUsage) {
}
-
+
public boolean isSupportForCursors() {
return true;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Mon Aug 20 03:37:29 2007
@@ -24,6 +24,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@@ -48,6 +49,8 @@
public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter {
+
+
private static final Log LOG = LogFactory.getLog(KahaPersistenceAdapter.class);
private static final String STORE_STATE = "store-state";
private static final String RECORD_REFERENCES = "record-references";
@@ -59,6 +62,10 @@
private boolean storeValid;
private Store stateStore;
+ public KahaReferenceStoreAdapter(AtomicLong size){
+ super(size);
+ }
+
public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
throw new RuntimeException("Use createQueueReferenceStore instead");
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Mon Aug 20 03:37:29 2007
@@ -28,9 +28,10 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
/**
* An implementation of {@link org.apache.activemq.store.MessageStore} which
@@ -125,13 +126,7 @@
}
}
- /**
- * @param usageManager The UsageManager that is controlling the
- * destination's memory usage.
- */
- public void setUsageManager(UsageManager usageManager) {
- }
-
+
public int getMessageCount() {
return messageTable.size();
}
@@ -160,5 +155,14 @@
public void resetBatching() {
lastBatchId = null;
+ }
+
+ /**
+ * @param memoeyUSage
+ * @see org.apache.activemq.store.MessageStore#setMemoryUsage(org.apache.activemq.usage.MemoryUsage)
+ */
+ public void setMemoryUsage(MemoryUsage memoeyUSage){
+ // TODO Auto-generated method stub
+
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java Mon Aug 20 03:37:29 2007
@@ -27,11 +27,11 @@
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -151,7 +151,7 @@
* @param usageManager The UsageManager that is controlling the broker's
* memory usage.
*/
- public void setUsageManager(UsageManager usageManager) {
+ public void setUsageManager(SystemUsage usageManager) {
}
public String toString() {
@@ -165,5 +165,9 @@
}
public void checkpoint(boolean sync) throws IOException {
+ }
+
+ public long size(){
+ return 0;
}
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+
+
+/**
+ Identify if a limit has been reached
+ *
+ * @org.apache.xbean.XBean
+ *
+ * @version $Revision: 1.3 $
+ */
+public class DefaultUsageCapacity implements UsageCapacity{
+
+ private long limit;
+
+ /**
+ * @param size
+ * @return true if the limit is reached
+ * @see org.apache.activemq.usage.UsageCapacity#isLimit(long)
+ */
+ public boolean isLimit(long size) {
+ return size >= limit;
+ }
+
+
+ /**
+ * @return the limit
+ */
+ public final long getLimit(){
+ return this.limit;
+ }
+
+
+ /**
+ * @param limit the limit to set
+ */
+ public final void setLimit(long limit){
+ this.limit=limit;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+
+/**
+ * Used to keep track of how much of something is being used so that a
+ * productive working set usage can be controlled.
+ *
+ * Main use case is manage memory usage.
+ *
+ * @org.apache.xbean.XBean
+ *
+ * @version $Revision: 1.3 $
+ */
+public class MemoryUsage extends Usage{
+
+ private MemoryUsage parent;
+ private long usage;
+
+ public MemoryUsage(){
+ this(null,"default");
+ }
+
+ /**
+ * Create the memory manager linked to a parent. When the memory manager is
+ * linked to a parent then when usage increased or decreased, the parent's
+ * usage is also increased or decreased.
+ *
+ * @param parent
+ */
+ public MemoryUsage(MemoryUsage parent){
+ this(parent,"default");
+ }
+
+ public MemoryUsage(String name){
+ this(null,name);
+ }
+
+ public MemoryUsage(MemoryUsage parent,String name){
+ this(parent,name,1.0f);
+ }
+
+ public MemoryUsage(MemoryUsage parent,String name,float portion){
+ super(parent,name,portion);
+ }
+
+ /**
+ * @throws InterruptedException
+ */
+ public void waitForSpace() throws InterruptedException{
+ if(parent!=null){
+ parent.waitForSpace();
+ }
+ synchronized(usageMutex){
+ for(int i=0;percentUsage>=100;i++){
+ usageMutex.wait();
+ }
+ }
+ }
+
+ /**
+ * @param timeout
+ * @throws InterruptedException
+ *
+ * @return true if space
+ */
+ public boolean waitForSpace(long timeout) throws InterruptedException{
+ if(parent!=null){
+ if(!parent.waitForSpace(timeout)){
+ return false;
+ }
+ }
+ synchronized(usageMutex){
+ if(percentUsage>=100){
+ usageMutex.wait(timeout);
+ }
+ return percentUsage<100;
+ }
+ }
+
+ public boolean isFull(){
+ if(parent!=null&&parent.isFull()){
+ return true;
+ }
+ synchronized(usageMutex){
+ return percentUsage>=100;
+ }
+ }
+
+ /**
+ * Tries to increase the usage by value amount but blocks if this object is
+ * currently full.
+ * @param value
+ *
+ * @throws InterruptedException
+ */
+ public void enqueueUsage(long value) throws InterruptedException{
+ waitForSpace();
+ increaseUsage(value);
+ }
+
+ /**
+ * Increases the usage by the value amount.
+ *
+ * @param value
+ */
+ public void increaseUsage(long value){
+ if(value==0){
+ return;
+ }
+ if(parent!=null){
+ parent.increaseUsage(value);
+ }
+ int percentUsage;
+ synchronized(usageMutex){
+ usage+=value;
+ percentUsage=caclPercentUsage();
+ }
+ setPercentUsage(percentUsage);
+ }
+
+ /**
+ * Decreases the usage by the value amount.
+ *
+ * @param value
+ */
+ public void decreaseUsage(long value){
+ if(value==0){
+ return;
+ }
+ if(parent!=null){
+ parent.decreaseUsage(value);
+ }
+ int percentUsage;
+ synchronized(usageMutex){
+ usage-=value;
+ percentUsage=caclPercentUsage();
+ }
+ setPercentUsage(percentUsage);
+ }
+
+ protected long retrieveUsage(){
+ return usage;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+import org.apache.activemq.store.PersistenceAdapter;
+
+/**
+ * Used to keep track of how much of something is being used so that a
+ * productive working set usage can be controlled.
+ *
+ * Main use case is manage memory usage.
+ *
+ * @org.apache.xbean.XBean
+ *
+ * @version $Revision: 1.3 $
+ */
+public class StoreUsage extends Usage{
+
+ final private PersistenceAdapter store;
+
+ public StoreUsage(String name,PersistenceAdapter store){
+ super(null,name,1.0f);
+ this.store=store;
+ }
+
+ public StoreUsage(StoreUsage parent,String name){
+ super(parent,name,1.0f);
+ this.store=parent.store;
+ }
+
+ protected long retrieveUsage(){
+ return store.size();
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java?rev=567647&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java Mon Aug 20 03:37:29 2007
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usage;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.activemq.Service;
+import org.apache.activemq.kaha.Store;
+import org.apache.activemq.store.PersistenceAdapter;
+
+
+/**
+ * Holder for Usage instances for memory, store and temp files
+ *
+ * Main use case is manage memory usage.
+ *
+ * @org.apache.xbean.XBean
+ *
+ * @version $Revision: 1.3 $
+ */
+public class SystemUsage implements Service{
+
+ private final SystemUsage parent;
+ private final String name;
+ private final MemoryUsage memoryUsage;
+ private final StoreUsage storeUsage;
+ private final TempUsage tempDiskUsage;
+ /**
+ * True if someone called setSendFailIfNoSpace() on this particular usage
+ * manager
+ */
+ private boolean sendFailIfNoSpaceExplicitySet;
+ private boolean sendFailIfNoSpace;
+ private List<SystemUsage> children=new CopyOnWriteArrayList<SystemUsage>();
+
+ public SystemUsage(){
+ this.parent=null;
+ this.name="default";
+ this.memoryUsage=new MemoryUsage(name+":memory");
+ this.storeUsage=null;
+ this.tempDiskUsage=null;
+ }
+
+ public SystemUsage(String name,PersistenceAdapter adapter,Store tempStore){
+ this.parent=null;
+ this.name=name;
+ this.memoryUsage=new MemoryUsage(name+":memory");
+ this.storeUsage=new StoreUsage(name+":store",adapter);
+ this.tempDiskUsage=new TempUsage(name+":temp",tempStore);
+ }
+
+ public SystemUsage(SystemUsage parent,String name){
+ this.parent=parent;
+ this.name=name;
+ this.memoryUsage=new MemoryUsage(parent.memoryUsage,name+":memory");
+ this.storeUsage=new StoreUsage(parent.storeUsage,name+":store");
+ this.tempDiskUsage=new TempUsage(parent!=null?parent.tempDiskUsage:null,name+":temp");
+ }
+
+ public String getName(){
+ return name;
+ }
+
+ /**
+ * @return the memoryUsage
+ */
+ public MemoryUsage getMemoryUsage(){
+ return this.memoryUsage;
+ }
+
+ /**
+ * @return the storeUsage
+ */
+ public StoreUsage getStoreUsage(){
+ return this.storeUsage;
+ }
+
+ /**
+ * @return the tempDiskUsage
+ */
+ public TempUsage getTempDiskUsage(){
+ return this.tempDiskUsage;
+ }
+
+ public String toString(){
+ return "UsageManager("+getName()+")";
+ }
+
+ public void start(){
+ if(parent!=null){
+ parent.addChild(this);
+ }
+ this.memoryUsage.start();
+ this.storeUsage.start();
+ this.tempDiskUsage.start();
+ }
+
+ public void stop(){
+ if(parent!=null){
+ parent.removeChild(this);
+ }
+ this.memoryUsage.stop();
+ this.storeUsage.stop();
+ this.tempDiskUsage.stop();
+ }
+
+ /**
+ * Sets whether or not a send() should fail if there is no space free. The
+ * default value is false which means to block the send() method until space
+ * becomes available
+ */
+ public void setSendFailIfNoSpace(boolean failProducerIfNoSpace) {
+ sendFailIfNoSpaceExplicitySet = true;
+ this.sendFailIfNoSpace = failProducerIfNoSpace;
+ }
+
+ public boolean isSendFailIfNoSpace() {
+ if (sendFailIfNoSpaceExplicitySet || parent == null) {
+ return sendFailIfNoSpace;
+ } else {
+ return parent.isSendFailIfNoSpace();
+ }
+ }
+
+ private void addChild(SystemUsage child){
+ children.add(child);
+ }
+
+ private void removeChild(SystemUsage child){
+ children.remove(child);
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
------------------------------------------------------------------------------
svn:executable = *