You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/05/21 13:27:12 UTC

activemq git commit: AMQ-6967 - ensure there are some messages paged in for periodic expiry check if non are in memory

Repository: activemq
Updated Branches:
  refs/heads/master 01384c714 -> 026c6f440


AMQ-6967 - ensure there are some messages paged in for periodic expiry check if non are in memory


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/026c6f44
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/026c6f44
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/026c6f44

Branch: refs/heads/master
Commit: 026c6f4403ea2a53426b507c6d991672942046b7
Parents: 01384c7
Author: gtully <ga...@gmail.com>
Authored: Mon May 21 14:26:45 2018 +0100
Committer: gtully <ga...@gmail.com>
Committed: Mon May 21 14:26:45 2018 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |  4 +--
 ...DBCPersistenceAdapterExpiredMessageTest.java | 34 ++++++++++++++++++++
 2 files changed, 36 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/026c6f44/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 2101523..183ecd3 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1238,9 +1238,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
         }
 
         LOG.trace("max {}, alreadyPagedIn {}, messagesCount {}, memoryUsage {}%", new Object[]{max, alreadyPagedIn, messagesInQueue, memoryUsage.getPercentUsage()});
-        return (alreadyPagedIn < max)
+        return (alreadyPagedIn == 0 || (alreadyPagedIn < max)
                 && (alreadyPagedIn < messagesInQueue)
-                && messages.hasSpace();
+                && messages.hasSpace());
     }
 
     private void addAll(Collection<? extends MessageReference> refs, List<Message> l, int max,

http://git-wip-us.apache.org/repos/asf/activemq/blob/026c6f44/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java
index e8e819c..29fae95 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java
@@ -31,6 +31,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
@@ -44,8 +45,11 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class JDBCPersistenceAdapterExpiredMessageTest {
+    private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapterExpiredMessageTest.class);
 
     @Rule
     public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
@@ -110,6 +114,7 @@ public class JDBCPersistenceAdapterExpiredMessageTest {
         PolicyEntry defaultEntry = new PolicyEntry();
         defaultEntry.setExpireMessagesPeriod(5000);
         defaultEntry.setMaxExpirePageSize(expireSize);
+        defaultEntry.setMemoryLimit(100*16*1024);
         policyMap.setDefaultEntry(defaultEntry);
         brokerService.setDestinationPolicy(policyMap);
 
@@ -154,4 +159,33 @@ public class JDBCPersistenceAdapterExpiredMessageTest {
             }
         }, 15000, 1000));
     }
+
+    @Test
+    public void testExpiredAfterCacheExhausted() throws Exception {
+        final ActiveMQQueue queue = new ActiveMQQueue("test.q");
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        factory.setWatchTopicAdvisories(false);
+        Connection conn = factory.createConnection();
+        conn.start();
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = sess.createProducer(queue);
+        producer.setTimeToLive(1000);
+        String payLoad = new String(new byte[16*1024]);
+
+        final int numMessages = 500;
+        for (int i = 0; i < numMessages; i++) {
+            producer.send(sess.createTextMessage("test message: " + payLoad));
+        }
+
+        assertTrue(Wait.waitFor(new Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                long expired = brokerService.getDestination(queue).getDestinationStatistics().getExpired().getCount();
+                LOG.info("Expired: " + expired);
+                return expired == numMessages;
+            }
+        }, 15000, 1000));
+    }
 }