You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/03/01 12:55:22 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-4495 -
revisit. Reinstate check for space on pagein,
so that highWaterMark is respected and full state is not reached,
hense pfc is not triggered in error
Repository: activemq
Updated Branches:
refs/heads/master 473b3284d -> d8cf54b0a
https://issues.apache.org/jira/browse/AMQ-4495 - revisit. Reinstate check for space on pagein, so that highWaterMark is respected and full state is not reached, hense pfc is not triggered in error
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d8cf54b0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d8cf54b0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d8cf54b0
Branch: refs/heads/master
Commit: d8cf54b0a9eee4b86db1ffef2cb3dd1171067307
Parents: 473b328
Author: gtully <ga...@gmail.com>
Authored: Tue Mar 1 11:41:37 2016 +0000
Committer: gtully <ga...@gmail.com>
Committed: Tue Mar 1 11:44:41 2016 +0000
----------------------------------------------------------------------
.../apache/activemq/broker/region/Queue.java | 4 +-
.../region/cursors/AbstractStoreCursor.java | 4 +-
.../region/cursors/QueueStorePrefetch.java | 11 +-
.../broker/region/cursors/StoreQueueCursor.java | 2 +-
.../region/cursors/TopicStorePrefetch.java | 15 +-
.../store/jdbc/JDBCMessageRecoveryListener.java | 1 +
.../activemq/store/jdbc/JDBCMessageStore.java | 37 ++--
.../store/jdbc/JDBCTopicMessageStore.java | 8 +
.../store/jdbc/adapter/DefaultJDBCAdapter.java | 13 +-
.../activemq/store/kahadb/KahaDBStore.java | 2 +-
.../org/apache/activemq/leveldb/DBManager.scala | 2 +-
.../StoreQueueCursorNoDuplicateTest.java | 14 +-
.../cursors/StoreQueueCursorOrderTest.java | 10 +-
.../org/apache/activemq/bugs/AMQ4930Test.java | 2 +-
.../activemq/usecases/MemoryLimitPfcTest.java | 213 +++++++++++++++++++
.../activemq/usecases/MemoryLimitTest.java | 13 +-
.../activemq/usecases/QueueBrowsingTest.java | 4 +-
17 files changed, 280 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 34817a0..96a22ec 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -636,8 +636,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
if (isProducerFlowControl() && context.isProducerFlowControl()) {
if (warnOnProducerFlowControl) {
warnOnProducerFlowControl = false;
- LOG.info("Usage Manager Memory Limit ({}) reached on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
- memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount());
+ LOG.info("Usage Manager Memory Limit ({}) reached (%{}) on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
+ memoryUsage.getLimit(), memoryUsage.getPercentUsage(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount());
}
if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index 06bae97..d84379d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -48,8 +48,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
private static int SYNC_ADD = 0;
private static int ASYNC_ADD = 1;
final MessageId[] lastCachedIds = new MessageId[2];
- protected boolean hadSpace = false;
-
protected AbstractStoreCursor(Destination destination) {
@@ -401,7 +399,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
resetBatch();
this.batchResetNeeded = false;
}
- if (this.batchList.isEmpty() && this.size >0) {
+ if (this.batchList.isEmpty() && this.size >0 && hasSpace()) {
try {
doFillBatch();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
index b10b2e2..dacae78 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
@@ -38,16 +38,14 @@ import org.slf4j.LoggerFactory;
class QueueStorePrefetch extends AbstractStoreCursor {
private static final Logger LOG = LoggerFactory.getLogger(QueueStorePrefetch.class);
private final MessageStore store;
- private final Broker broker;
/**
* Construct it
* @param queue
*/
- public QueueStorePrefetch(Queue queue, Broker broker) {
+ public QueueStorePrefetch(Queue queue) {
super(queue);
this.store = queue.getMessageStore();
- this.broker = broker;
}
@@ -115,11 +113,8 @@ class QueueStorePrefetch extends AbstractStoreCursor {
@Override
protected void doFillBatch() throws Exception {
- hadSpace = this.hasSpace();
- if (!broker.getBrokerService().isPersistent() || hadSpace) {
- this.store.recoverNextMessages(this.maxBatchSize, this);
- dealWithDuplicates(); // without the index lock
- }
+ this.store.recoverNextMessages(this.maxBatchSize, this);
+ dealWithDuplicates(); // without the index lock
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
index 7f26b43..e6de82e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
@@ -47,7 +47,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
super((queue != null ? queue.isPrioritizedMessages():false));
this.broker=broker;
this.queue = queue;
- this.persistent = new QueueStorePrefetch(queue, broker);
+ this.persistent = new QueueStorePrefetch(queue);
currentCursor = persistent;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
index 35ec3ed..1a6a851 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
@@ -40,7 +40,6 @@ class TopicStorePrefetch extends AbstractStoreCursor {
private final String subscriberName;
private final Subscription subscription;
private byte lastRecoveredPriority = 9;
- private boolean storeHasMessages = false;
/**
* @param topic
@@ -56,7 +55,6 @@ class TopicStorePrefetch extends AbstractStoreCursor {
this.maxProducersToAudit=32;
this.maxAuditDepth=10000;
resetSize();
- this.storeHasMessages=this.size > 0;
}
@Override
@@ -73,11 +71,6 @@ class TopicStorePrefetch extends AbstractStoreCursor {
//this.messageSize.addSize(node.getMessage().getSize());
}
- @Override
- public final synchronized boolean addMessageLast(MessageReference node) throws Exception {
- this.storeHasMessages = super.addMessageLast(node);
- return this.storeHasMessages;
- }
@Override
public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
@@ -90,7 +83,6 @@ class TopicStorePrefetch extends AbstractStoreCursor {
if (recovered && !cached) {
lastRecoveredPriority = message.getPriority();
}
- storeHasMessages = true;
}
return recovered;
}
@@ -134,13 +126,8 @@ class TopicStorePrefetch extends AbstractStoreCursor {
@Override
protected void doFillBatch() throws Exception {
- // avoid repeated trips to the store if there is nothing of interest
- this.storeHasMessages = false;
this.store.recoverNextMessages(clientId, subscriberName,
maxBatchSize, this);
- if (!this.storeHasMessages && (!this.batchList.isEmpty() || !hadSpace)) {
- this.storeHasMessages = true;
- }
}
public byte getLastRecoveredPriority() {
@@ -158,6 +145,6 @@ class TopicStorePrefetch extends AbstractStoreCursor {
@Override
public String toString() {
- return "TopicStorePrefetch(" + clientId + "," + subscriberName + ",storeHasMessages=" + this.storeHasMessages +") " + this.subscription.getConsumerInfo().getConsumerId() + " - " + super.toString();
+ return "TopicStorePrefetch(" + clientId + "," + subscriberName + ") " + this.subscription.getConsumerInfo().getConsumerId() + " - " + super.toString();
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java
index 07f4816..5ade773 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java
@@ -24,4 +24,5 @@ package org.apache.activemq.store.jdbc;
public interface JDBCMessageRecoveryListener {
boolean recoverMessage(long sequenceId, byte[] message) throws Exception;
boolean recoverMessageReference(String reference) throws Exception;
+ boolean hasSpace();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index 175002a..27313f4 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -279,6 +279,10 @@ public class JDBCMessageStore extends AbstractMessageStore {
public boolean recoverMessageReference(String reference) throws Exception {
return listener.recoverMessageReference(new MessageId(reference));
}
+
+ public boolean hasSpace() {
+ return listener.hasSpace();
+ }
});
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
@@ -337,24 +341,25 @@ public class JDBCMessageStore extends AbstractMessageStore {
adapter.doRecoverNextMessages(c, destination, perPriorityLastRecovered, minPendingSequeunceId(),
maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
- public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
- Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
- msg.getMessageId().setBrokerSequenceId(sequenceId);
- msg.getMessageId().setFutureOrSequenceLong(sequenceId);
- listener.recoverMessage(msg);
- trackLastRecovered(sequenceId, msg.getPriority());
- return true;
- }
+ public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
+ Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
+ msg.getMessageId().setBrokerSequenceId(sequenceId);
+ msg.getMessageId().setFutureOrSequenceLong(sequenceId);
+ listener.recoverMessage(msg);
+ trackLastRecovered(sequenceId, msg.getPriority());
+ return true;
+ }
- public boolean recoverMessageReference(String reference) throws Exception {
- if (listener.hasSpace()) {
- listener.recoverMessageReference(new MessageId(reference));
- return true;
- }
- return false;
- }
+ public boolean recoverMessageReference(String reference) throws Exception {
+ listener.recoverMessageReference(new MessageId(reference));
+ return true;
+ }
- });
+ public boolean hasSpace() {
+ return listener.hasSpace();
+ }
+
+ });
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
} finally {
http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
index 3bff9b2..7203f92 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
@@ -129,6 +129,10 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
return listener.recoverMessageReference(new MessageId(reference));
}
+ public boolean hasSpace() {
+ return listener.hasSpace();
+ }
+
});
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
@@ -238,6 +242,10 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
return false;
}
+ public boolean hasSpace() {
+ return delegate.hasSpace();
+ }
+
@Override
public boolean recoverMessageReference(String reference) throws Exception {
return delegate.recoverMessageReference(new MessageId(reference));
http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
index facf969..6fe83c8 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
@@ -37,7 +37,6 @@ import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.jdbc.JDBCAdapter;
import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
-import org.apache.activemq.store.jdbc.JDBCMessageStore;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore;
import org.apache.activemq.store.jdbc.Statements;
@@ -633,13 +632,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
rs = s.executeQuery();
int count = 0;
if (this.statements.isUseExternalMessageReferences()) {
- while (rs.next() && count < maxReturned) {
+ while (rs.next() && count < maxReturned && listener.hasSpace()) {
if (listener.recoverMessageReference(rs.getString(1))) {
count++;
}
}
} else {
- while (rs.next() && count < maxReturned) {
+ while (rs.next() && count < maxReturned && listener.hasSpace()) {
if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
count++;
}
@@ -670,13 +669,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
rs = s.executeQuery();
int count = 0;
if (this.statements.isUseExternalMessageReferences()) {
- while (rs.next() && count < maxReturned) {
+ while (rs.next() && count < maxReturned && listener.hasSpace() ) {
if (listener.recoverMessageReference(rs.getString(1))) {
count++;
}
}
} else {
- while (rs.next() && count < maxReturned) {
+ while (rs.next() && count < maxReturned && listener.hasSpace()) {
if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
count++;
}
@@ -1144,7 +1143,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
rs = s.executeQuery();
int count = 0;
if (this.statements.isUseExternalMessageReferences()) {
- while (rs.next() && count < maxReturned) {
+ while (rs.next() && count < maxReturned && listener.hasSpace()) {
if (listener.recoverMessageReference(rs.getString(1))) {
count++;
} else {
@@ -1153,7 +1152,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
} else {
- while (rs.next() && count < maxReturned) {
+ while (rs.next() && count < maxReturned && listener.hasSpace()) {
if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
count++;
} else {
http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index e1c1df4..69319a0 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -585,7 +585,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
listener.recoverMessage(msg);
counter++;
- if (counter >= maxReturned) {
+ if (counter >= maxReturned || listener.hasSpace() == false) {
break;
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
index b0051cc..09fd350 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
@@ -737,7 +737,7 @@ class DBManager(val parent:LevelDBStore) {
lastmsgid = msg.getMessageId
count += 1
}
- count < max
+ count < max && listener.hasSpace
}
if( lastmsgid==null ) {
startPos
http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
index 2406e88..7680ca9 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
@@ -27,6 +27,7 @@ import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
@@ -82,10 +83,14 @@ public class StoreQueueCursorNoDuplicateTest extends TestCase {
queueMessageStore.start();
queueMessageStore.registerIndexListener(null);
- QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+ QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
SystemUsage systemUsage = new SystemUsage();
+
+ ActiveMQTextMessage sampleMessage = getMessage(0);
+ int unitSize = sampleMessage.getSize();
+
// ensure memory limit is reached
- systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 2));
+ systemUsage.getMemoryUsage().setLimit(unitSize * count);
underTest.setSystemUsage(systemUsage);
underTest.setEnableAudit(false);
underTest.start();
@@ -110,8 +115,11 @@ public class StoreQueueCursorNoDuplicateTest extends TestCase {
ref.decrementReferenceCount();
underTest.remove();
LOG.info("Received message: {} with body: {}",
- ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText());
+ ref.getMessageId(), ((ActiveMQTextMessage) ref.getMessage()).getText());
assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId());
+
+ // memory store keeps a message ref that needs releasing to free usage
+ queueMessageStore.removeMessage(contextNotInTx, new MessageAck(ref.getMessage(), MessageAck.STANDARD_ACK_TYPE, 1));
}
underTest.release();
assertEquals(count, dequeueCount);
http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
index 90b8428..92c646b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
@@ -89,7 +89,7 @@ public class StoreQueueCursorOrderTest {
queueMessageStore.start();
queueMessageStore.registerIndexListener(null);
- QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+ QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
SystemUsage systemUsage = new SystemUsage();
// ensure memory limit is reached
systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
@@ -154,7 +154,7 @@ public class StoreQueueCursorOrderTest {
queueMessageStore.start();
queueMessageStore.registerIndexListener(null);
- QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+ QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
SystemUsage systemUsage = new SystemUsage();
// ensure memory limit is reached
systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
@@ -222,7 +222,7 @@ public class StoreQueueCursorOrderTest {
queueMessageStore.start();
queueMessageStore.registerIndexListener(null);
- QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+ QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
SystemUsage systemUsage = new SystemUsage();
// ensure memory limit is reached
systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
@@ -299,7 +299,7 @@ public class StoreQueueCursorOrderTest {
queueMessageStore.start();
queueMessageStore.registerIndexListener(null);
- QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+ QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
SystemUsage systemUsage = new SystemUsage();
// ensure memory limit is reached
systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 6));
@@ -392,7 +392,7 @@ public class StoreQueueCursorOrderTest {
queueMessageStore.start();
queueMessageStore.registerIndexListener(null);
- QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+ QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
SystemUsage systemUsage = new SystemUsage();
// ensure memory limit is reached
systemUsage.getMemoryUsage().setLimit(messageBytesSize * 5);
http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
index e65ad91..8f6fbb2 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
@@ -46,7 +46,7 @@ public class AMQ4930Test extends TestCase {
protected void configureBroker() throws Exception {
broker.setDeleteAllMessagesOnStartup(true);
broker.setAdvisorySupport(false);
- broker.getSystemUsage().getMemoryUsage().setLimit(1*1024*1024);
+ broker.getSystemUsage().getMemoryUsage().setLimit(100*1024*1024);
PolicyMap pMap = new PolicyMap();
PolicyEntry policy = new PolicyEntry();
http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitPfcTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitPfcTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitPfcTest.java
new file mode 100644
index 0000000..5b2dc23
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitPfcTest.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.ProducerThread;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(value = Parameterized.class)
+public class MemoryLimitPfcTest extends TestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(MemoryLimitPfcTest.class);
+ final String payload = new String(new byte[100 * 1024]);
+ protected BrokerService broker;
+
+ @Parameterized.Parameter
+ public PersistenceAdapterChoice persistenceAdapterChoice;
+
+ @Parameterized.Parameters(name="store={0}")
+ public static Iterable<Object[]> getTestParameters() {
+ return Arrays.asList(new Object[][]{{PersistenceAdapterChoice.KahaDB}, {PersistenceAdapterChoice.LevelDB}, {PersistenceAdapterChoice.JDBC}});
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.getSystemUsage().getMemoryUsage().setLimit(1 * 1024 * 1024); //1MB
+ broker.setDeleteAllMessagesOnStartup(true);
+
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry policyEntry = new PolicyEntry();
+ policyEntry.setExpireMessagesPeriod(0); // when this fires it will consume 2*pageSize mem which will throw the test
+ policyMap.put(new ActiveMQQueue(">"), policyEntry);
+ broker.setDestinationPolicy(policyMap);
+
+ LOG.info("Starting broker with persistenceAdapterChoice " + persistenceAdapterChoice.toString());
+ setPersistenceAdapter(broker, persistenceAdapterChoice);
+
+ return broker;
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ if (broker == null) {
+ broker = createBroker();
+ }
+ broker.start();
+ broker.waitUntilStarted();
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+ }
+
+
+ @Test(timeout = 120000)
+ public void testStopCachingDispatchNoPfc() throws Exception {
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10");
+ factory.setOptimizeAcknowledge(true);
+ Connection conn = factory.createConnection();
+ conn.start();
+ Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = sess.createQueue("STORE");
+ final ProducerThread producer = new ProducerThread(sess, queue) {
+ @Override
+ protected Message createMessage(int i) throws Exception {
+ BytesMessage bytesMessage = session.createBytesMessage();
+ bytesMessage.writeBytes(payload.getBytes());
+ return bytesMessage;
+ }
+ };
+ producer.setMessageCount(200);
+ producer.start();
+ producer.join();
+
+ Thread.sleep(1000);
+
+ // assert we didn't break high watermark (70%) usage
+ final Destination dest = broker.getDestination((ActiveMQQueue) queue);
+ LOG.info("Destination usage: " + dest.getMemoryUsage());
+ int percentUsage = dest.getMemoryUsage().getPercentUsage();
+ assertTrue("Should be less than 70% of limit but was: " + percentUsage, percentUsage <= 80);
+ LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage());
+ assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 80);
+
+ assertFalse("cache disabled", ((org.apache.activemq.broker.region.Queue) dest).getMessages().isCacheEnabled());
+
+ // consume one message
+ MessageConsumer consumer = sess.createConsumer(queue);
+ Message msg = consumer.receive(5000);
+ msg.acknowledge();
+
+ LOG.info("Destination usage after consume one: " + dest.getMemoryUsage());
+
+ // ensure we can send more messages
+ final ProducerThread secondProducer = new ProducerThread(sess, queue) {
+ @Override
+ protected Message createMessage(int i) throws Exception {
+ BytesMessage bytesMessage = session.createBytesMessage();
+ bytesMessage.writeBytes(payload.getBytes());
+ return bytesMessage;
+ }
+ };
+ secondProducer.setMessageCount(100);
+ secondProducer.start();
+ secondProducer.join();
+
+ LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage());
+ assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 100);
+
+ // let's make sure we can consume all messages
+ for (int i = 1; i < 300; i++) {
+ msg = consumer.receive(5000);
+ if (msg == null) {
+ dumpAllThreads("NoMessage");
+ }
+ assertNotNull("Didn't receive message " + i, msg);
+ msg.acknowledge();
+ }
+ }
+
+ @Test(timeout = 120000)
+ public void testConsumeFromTwoAfterPageInToOne() throws Exception {
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10");
+ factory.setOptimizeAcknowledge(true);
+ Connection conn = factory.createConnection();
+ conn.start();
+ Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ final ProducerThread producer = new ProducerThread(sess, sess.createQueue("STORE.1")) {
+ @Override
+ protected Message createMessage(int i) throws Exception {
+ return session.createTextMessage(payload + "::" + i);
+ }
+ };
+ producer.setMessageCount(20);
+
+ final ProducerThread producer2 = new ProducerThread(sess, sess.createQueue("STORE.2")) {
+ @Override
+ protected Message createMessage(int i) throws Exception {
+ return session.createTextMessage(payload + "::" + i);
+ }
+ };
+ producer2.setMessageCount(20);
+
+ producer.start();
+ producer2.start();
+
+ producer.join();
+ producer2.join();
+
+ LOG.info("before consumer1, broker % mem usage: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
+
+ MessageConsumer consumer = sess.createConsumer(sess.createQueue("STORE.1"));
+ Message msg = null;
+ for (int i=0; i<10; i++) {
+ msg = consumer.receive(5000);
+ LOG.info("% mem usage: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
+ msg.acknowledge();
+ }
+
+ TimeUnit.SECONDS.sleep(2);
+ LOG.info("Before consumer2, Broker % mem usage: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
+
+ MessageConsumer consumer2 = sess.createConsumer(sess.createQueue("STORE.2"));
+ for (int i=0; i<10; i++) {
+ msg = consumer2.receive(5000);
+ LOG.info("% mem usage: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
+ msg.acknowledge();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
index 760876c..d3af604 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
@@ -133,18 +133,9 @@ public class MemoryLimitTest extends TestSupport {
Message msg = consumer.receive(5000);
msg.acknowledge();
- // this should free some space and allow us to get new batch of messages in the memory
- // exceeding the limit
- assertTrue("Limit is exceeded", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- LOG.info("Destination usage: " + dest.getMemoryUsage());
- return dest.getMemoryUsage().getPercentUsage() >= 200;
- }
- }));
-
+ assertTrue("Should be less than 70% of limit but was: " + percentUsage, percentUsage <= 71);
LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage());
- assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() >= 200);
+ assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 71);
// let's make sure we can consume all messages
for (int i = 1; i < 2000; i++) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/d8cf54b0/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
index 29b6e72..05540a5 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
@@ -182,7 +182,7 @@ public class QueueBrowsingTest {
@Test
public void testMemoryLimit() throws Exception {
- broker.getSystemUsage().getMemoryUsage().setLimit(16 * 1024);
+ broker.getSystemUsage().getMemoryUsage().setLimit((maxPageSize + 10) * 4 * 1024);
int messageToSend = 370;
@@ -211,6 +211,6 @@ public class QueueBrowsingTest {
}
browser.close();
- assertTrue("got at least maxPageSize", received >= maxPageSize);
+ assertTrue("got at least maxPageSize, received: " + received, received >= maxPageSize);
}
}