You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/04/03 21:26:05 UTC
svn commit: r644447 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
broker/region/cursors/AbstractStoreCursor.java
store/memory/MemoryMessageStore.java store/memory/MemoryTopicSub.java
Author: chirino
Date: Thu Apr 3 12:26:04 2008
New Revision: 644447
URL: http://svn.apache.org/viewvc?rev=644447&view=rev
Log:
Do better reference count management of persistent messages when they are stored in a Memory store.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=644447&r1=644446&r2=644447&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Thu Apr 3 12:26:04 2008
@@ -77,7 +77,9 @@
if (!isDuplicate(message.getMessageId())) {
if (!cached) {
message.setRegionDestination(regionDestination);
- message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
+ if( message.getMemoryUsage()==null ) {
+ message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
+ }
}
message.incrementReferenceCount();
batchList.put(message.getMessageId(), message);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?rev=644447&r1=644446&r2=644447&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Thu Apr 3 12:26:04 2008
@@ -58,6 +58,7 @@
synchronized (messageTable) {
messageTable.put(message.getMessageId(), message);
}
+ message.incrementReferenceCount();
}
// public void addMessageReference(ConnectionContext context,MessageId
@@ -82,7 +83,10 @@
public void removeMessage(MessageId msgId) throws IOException {
synchronized (messageTable) {
- messageTable.remove(msgId);
+ Message removed = messageTable.remove(msgId);
+ if( removed !=null ) {
+ removed.decrementReferenceCount();
+ }
if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) {
lastBatchId = null;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java?rev=644447&r1=644446&r2=644447&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java Thu Apr 3 12:26:04 2008
@@ -34,14 +34,23 @@
private Map<MessageId, Message> map = new LinkedHashMap<MessageId, Message>();
private MessageId lastBatch;
- synchronized void addMessage(MessageId id, Message message) {
- map.put(id, message);
+ void addMessage(MessageId id, Message message) {
+ synchronized(this) {
+ map.put(id, message);
+ }
+ message.incrementReferenceCount();
}
- synchronized void removeMessage(MessageId id) {
- map.remove(id);
- if ((lastBatch != null && lastBatch.equals(id)) || map.isEmpty()) {
- resetBatching();
+ void removeMessage(MessageId id) {
+ Message removed;
+ synchronized(this) {
+ removed = map.remove(id);
+ if ((lastBatch != null && lastBatch.equals(id)) || map.isEmpty()) {
+ resetBatching();
+ }
+ }
+ if( removed!=null ) {
+ removed.decrementReferenceCount();
}
}