You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/05/21 13:27:12 UTC
activemq git commit: AMQ-6967 - ensure there are some messages paged
in for periodic expiry check if non are in memory
Repository: activemq
Updated Branches:
refs/heads/master 01384c714 -> 026c6f440
AMQ-6967 - ensure there are some messages paged in for periodic expiry check if non are in memory
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/026c6f44
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/026c6f44
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/026c6f44
Branch: refs/heads/master
Commit: 026c6f4403ea2a53426b507c6d991672942046b7
Parents: 01384c7
Author: gtully <ga...@gmail.com>
Authored: Mon May 21 14:26:45 2018 +0100
Committer: gtully <ga...@gmail.com>
Committed: Mon May 21 14:26:45 2018 +0100
----------------------------------------------------------------------
.../apache/activemq/broker/region/Queue.java | 4 +--
...DBCPersistenceAdapterExpiredMessageTest.java | 34 ++++++++++++++++++++
2 files changed, 36 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/026c6f44/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 2101523..183ecd3 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1238,9 +1238,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
}
LOG.trace("max {}, alreadyPagedIn {}, messagesCount {}, memoryUsage {}%", new Object[]{max, alreadyPagedIn, messagesInQueue, memoryUsage.getPercentUsage()});
- return (alreadyPagedIn < max)
+ return (alreadyPagedIn == 0 || (alreadyPagedIn < max)
&& (alreadyPagedIn < messagesInQueue)
- && messages.hasSpace();
+ && messages.hasSpace());
}
private void addAll(Collection<? extends MessageReference> refs, List<Message> l, int max,
http://git-wip-us.apache.org/repos/asf/activemq/blob/026c6f44/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java
index e8e819c..29fae95 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java
@@ -31,6 +31,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
@@ -44,8 +45,11 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class JDBCPersistenceAdapterExpiredMessageTest {
+ private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapterExpiredMessageTest.class);
@Rule
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
@@ -110,6 +114,7 @@ public class JDBCPersistenceAdapterExpiredMessageTest {
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setExpireMessagesPeriod(5000);
defaultEntry.setMaxExpirePageSize(expireSize);
+ defaultEntry.setMemoryLimit(100*16*1024);
policyMap.setDefaultEntry(defaultEntry);
brokerService.setDestinationPolicy(policyMap);
@@ -154,4 +159,33 @@ public class JDBCPersistenceAdapterExpiredMessageTest {
}
}, 15000, 1000));
}
+
+ @Test
+ public void testExpiredAfterCacheExhausted() throws Exception {
+ final ActiveMQQueue queue = new ActiveMQQueue("test.q");
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+ factory.setWatchTopicAdvisories(false);
+ Connection conn = factory.createConnection();
+ conn.start();
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = sess.createProducer(queue);
+ producer.setTimeToLive(1000);
+ String payLoad = new String(new byte[16*1024]);
+
+ final int numMessages = 500;
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(sess.createTextMessage("test message: " + payLoad));
+ }
+
+ assertTrue(Wait.waitFor(new Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ long expired = brokerService.getDestination(queue).getDestinationStatistics().getExpired().getCount();
+ LOG.info("Expired: " + expired);
+ return expired == numMessages;
+ }
+ }, 15000, 1000));
+ }
}