You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/12/08 11:03:27 UTC
svn commit: r602440 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq:
AMQMessageStore.java AMQTopicMessageStore.java
Author: rajdavies
Date: Sat Dec 8 02:03:26 2007
New Revision: 602440
URL: http://svn.apache.org/viewvc?rev=602440&view=rev
Log:
Fix for excessive memory usage for durable consumers -
see https://issues.apache.org/activemq/browse/AMQ-1490
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=602440&r1=602439&r2=602440&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Sat Dec 8 02:03:26 2007
@@ -127,8 +127,8 @@
}
synchronized (AMQMessageStore.this) {
inFlightTxLocations.remove(location);
- addMessage(message, location);
}
+ addMessage(message, location);
}
public void afterRollback() throws Exception {
@@ -153,10 +153,15 @@
messages.put(message.getMessageId(), data);
this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId());
}
- try {
- asyncWriteTask.wakeup();
- } catch (InterruptedException e) {
- throw new InterruptedIOException();
+ if (messages.size() > this.peristenceAdapter
+ .getMaxCheckpointMessageAddSize()) {
+ flush();
+ } else {
+ try {
+ asyncWriteTask.wakeup();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
}
}
@@ -233,7 +238,10 @@
messageAcks.add(ack);
}
}
- if (data == null) {
+ if (messageAcks.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
+ flush();
+ }
+ else if (data == null) {
try {
asyncWriteTask.wakeup();
} catch (InterruptedException e) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java?rev=602440&r1=602439&r2=602440&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java Sat Dec 8 02:03:26 2007
@@ -47,8 +47,6 @@
private static final Log LOG = LogFactory.getLog(AMQTopicMessageStore.class);
private TopicReferenceStore topicReferenceStore;
- private Map<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
-
public AMQTopicMessageStore(AMQPersistenceAdapter adapter, TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) {
super(adapter, topicReferenceStore, destinationName);
this.topicReferenceStore = topicReferenceStore;
@@ -158,12 +156,6 @@
MessageAck ack = new MessageAck();
ack.setLastMessageId(messageId);
removeMessage(context, ack);
-
- }
- try {
- asyncWriteTask.wakeup();
- } catch (InterruptedException e) {
- throw new InterruptedIOException();
}
}