You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/12/08 11:03:27 UTC

svn commit: r602440 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq: AMQMessageStore.java AMQTopicMessageStore.java

Author: rajdavies
Date: Sat Dec  8 02:03:26 2007
New Revision: 602440

URL: http://svn.apache.org/viewvc?rev=602440&view=rev
Log:
Fix for excessive memory usage for durable consumers - 
see https://issues.apache.org/activemq/browse/AMQ-1490

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=602440&r1=602439&r2=602440&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Sat Dec  8 02:03:26 2007
@@ -127,8 +127,8 @@
                     }
                     synchronized (AMQMessageStore.this) {
                         inFlightTxLocations.remove(location);
-                        addMessage(message, location);
                     }
+                    addMessage(message, location);
                 }
 
                 public void afterRollback() throws Exception {
@@ -153,10 +153,15 @@
             messages.put(message.getMessageId(), data);
             this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId());
         }
-        try {
-            asyncWriteTask.wakeup();
-        } catch (InterruptedException e) {
-            throw new InterruptedIOException();
+        if (messages.size() > this.peristenceAdapter
+                .getMaxCheckpointMessageAddSize()) {
+            flush();
+        } else {
+            try {
+                asyncWriteTask.wakeup();
+            } catch (InterruptedException e) {
+                throw new InterruptedIOException();
+            }
         }
     }
 
@@ -233,7 +238,10 @@
                 messageAcks.add(ack);
             }
         }
-        if (data == null) {
+        if (messageAcks.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
+            flush();
+        }
+        else if (data == null) {
             try {
                 asyncWriteTask.wakeup();
             } catch (InterruptedException e) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java?rev=602440&r1=602439&r2=602440&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java Sat Dec  8 02:03:26 2007
@@ -47,8 +47,6 @@
 
     private static final Log LOG = LogFactory.getLog(AMQTopicMessageStore.class);
     private TopicReferenceStore topicReferenceStore;
-    private Map<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
-
     public AMQTopicMessageStore(AMQPersistenceAdapter adapter, TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) {
         super(adapter, topicReferenceStore, destinationName);
         this.topicReferenceStore = topicReferenceStore;
@@ -158,12 +156,6 @@
             MessageAck ack = new MessageAck();
             ack.setLastMessageId(messageId);
             removeMessage(context, ack);
-
-        }
-        try {
-            asyncWriteTask.wakeup();
-        } catch (InterruptedException e) {
-            throw new InterruptedIOException();
         }
     }