You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/04/03 21:26:05 UTC

svn commit: r644447 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/region/cursors/AbstractStoreCursor.java store/memory/MemoryMessageStore.java store/memory/MemoryTopicSub.java

Author: chirino
Date: Thu Apr  3 12:26:04 2008
New Revision: 644447

URL: http://svn.apache.org/viewvc?rev=644447&view=rev
Log:
Do better reference count management of persistent messages when they are stored in a Memory store.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=644447&r1=644446&r2=644447&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Thu Apr  3 12:26:04 2008
@@ -77,7 +77,9 @@
         if (!isDuplicate(message.getMessageId())) {
             if (!cached) {
                 message.setRegionDestination(regionDestination);
-                message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
+                if( message.getMemoryUsage()==null ) {
+                    message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
+                }
             }
             message.incrementReferenceCount();
             batchList.put(message.getMessageId(), message);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?rev=644447&r1=644446&r2=644447&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Thu Apr  3 12:26:04 2008
@@ -58,6 +58,7 @@
         synchronized (messageTable) {
             messageTable.put(message.getMessageId(), message);
         }
+        message.incrementReferenceCount();
     }
 
     // public void addMessageReference(ConnectionContext context,MessageId
@@ -82,7 +83,10 @@
 
     public void removeMessage(MessageId msgId) throws IOException {
         synchronized (messageTable) {
-            messageTable.remove(msgId);
+            Message removed = messageTable.remove(msgId);
+            if( removed !=null ) {
+                removed.decrementReferenceCount();
+            }
             if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) {
                 lastBatchId = null;
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java?rev=644447&r1=644446&r2=644447&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java Thu Apr  3 12:26:04 2008
@@ -34,14 +34,23 @@
     private Map<MessageId, Message> map = new LinkedHashMap<MessageId, Message>();
     private MessageId lastBatch;
 
-    synchronized void addMessage(MessageId id, Message message) {
-        map.put(id, message);
+    void addMessage(MessageId id, Message message) {
+        synchronized(this) {
+            map.put(id, message);
+        }
+        message.incrementReferenceCount();
     }
 
-    synchronized void removeMessage(MessageId id) {
-        map.remove(id);
-        if ((lastBatch != null && lastBatch.equals(id)) || map.isEmpty()) {
-            resetBatching();
+    void removeMessage(MessageId id) {
+        Message removed;
+        synchronized(this) {
+            removed = map.remove(id);
+            if ((lastBatch != null && lastBatch.equals(id)) || map.isEmpty()) {
+                resetBatching();
+            }
+        }
+        if( removed!=null ) {
+            removed.decrementReferenceCount();
         }
     }