You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/09/11 18:08:05 UTC

[1/3] git commit: https://issues.apache.org/jira/browse/AMQ-5274 - we now only check expiry on non inflight messages so there is on contention on ack with the periodic expriy check thread - related https://issues.apache.org/jira/browse/AMQ-2876

Repository: activemq
Updated Branches:
  refs/heads/trunk b1ede0559 -> 8cdb5c2c1


https://issues.apache.org/jira/browse/AMQ-5274 - we now only check expiry on non inflight messages so there is on contention on ack with the periodic expriy check thread - related https://issues.apache.org/jira/browse/AMQ-2876


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/26807cd4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/26807cd4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/26807cd4

Branch: refs/heads/trunk
Commit: 26807cd4524460e22844d16136f03bc96fa9b4c8
Parents: b1ede05
Author: gtully <ga...@gmail.com>
Authored: Thu Sep 11 16:13:43 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Sep 11 16:13:43 2014 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |   2 +-
 .../broker/region/QueueSubscription.java        |   8 --
 ...JmsSendReceiveWithMessageExpirationTest.java |  21 ++-
 .../org/apache/activemq/bugs/AMQ5274Test.java   | 133 +++++++++++++++++++
 .../activemq/usecases/ExpiredMessagesTest.java  |   2 +-
 .../ExpiredMessagesWithNoConsumerTest.java      |  21 ++-
 6 files changed, 170 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/26807cd4/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index e9f2180..ff16dfc 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1169,7 +1169,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
             List<MessageReference> toExpire) throws Exception {
         for (Iterator<? extends MessageReference> i = refs.iterator(); i.hasNext() && l.size() < max;) {
             QueueMessageReference ref = (QueueMessageReference) i.next();
-            if (ref.isExpired()) {
+            if (ref.isExpired() && (ref.getLockOwner() == null)) {
                 toExpire.add(ref);
             } else if (l.contains(ref.getMessage()) == false) {
                 l.add(ref.getMessage());

http://git-wip-us.apache.org/repos/asf/activemq/blob/26807cd4/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
index 7c7027f..358f946 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
@@ -52,14 +52,6 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
         final Destination q = (Destination) n.getRegionDestination();
         final QueueMessageReference node = (QueueMessageReference)n;
         final Queue queue = (Queue)q;
-
-        if (n.isExpired()) {
-            // sync with message expiry processing
-            if (!broker.isExpired(n)) {
-                LOG.debug("ignoring ack {}, for already expired message: {}", ack, n);
-                return;
-            }
-        }
         queue.removeMessage(context, this, node, ack);
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/26807cd4/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
index 956fa40..391253e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
@@ -30,6 +30,10 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.Topic;
 
+import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -151,7 +155,7 @@ public class JmsSendReceiveWithMessageExpirationTest extends TestSupport {
              received.acknowledge();
          };
 
-         assertEquals("got messages", messageCount + 1, messages.size());
+         assertEquals("got all (normal plus one with ttl) messages", messageCount + 1, messages.size());
 
          Vector<Message> dlqMessages = new Vector<Message>();
          while ((received = dlqConsumer.receive(1000)) != null) {
@@ -159,6 +163,21 @@ public class JmsSendReceiveWithMessageExpirationTest extends TestSupport {
          };
 
          assertEquals("got dlq messages", data.length - 1, dlqMessages.size());
+
+         final DestinationStatistics view = getDestinationStatistics(BrokerRegistry.getInstance().findFirst(), ActiveMQDestination.transform(consumerDestination));
+
+         // wait for all to inflight to expire
+         assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() {
+             @Override
+             public boolean isSatisified() throws Exception {
+                 return view.getInflight().getCount() == 0;
+             }
+         }));
+         assertEquals("Wrong inFlightCount: ", 0, view.getInflight().getCount());
+
+         LOG.info("Stats: received: "  + messages.size() + ", messages: " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount()
+                 + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expired: " + view.getExpired().getCount());
+
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/26807cd4/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java
new file mode 100644
index 0000000..4ba6526
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AMQ5274Test {
+    static Logger LOG = LoggerFactory.getLogger(AMQ5274Test.class);
+    String activemqURL;
+    BrokerService brokerService;
+    ActiveMQQueue dest = new ActiveMQQueue("TestQ");
+
+
+    @Before
+    public void startBroker() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        defaultPolicy.setExpireMessagesPeriod(1000);
+        policyMap.setDefaultEntry(defaultPolicy);
+        brokerService.setDestinationPolicy(policyMap);
+        activemqURL = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
+        brokerService.start();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+        }
+    }
+
+    @Test
+    public void test() throws Exception {
+        LOG.info("Starting Test");
+        assertTrue(brokerService.isStarted());
+
+        produce();
+        consumeAndRollback();
+
+        // check reported queue size using JMX
+        long queueSize = getQueueSize();
+        assertEquals("Queue " + dest.getPhysicalName() + " not empty, reporting " + queueSize + " messages.", 0, queueSize);
+    }
+
+    private void consumeAndRollback() throws JMSException, InterruptedException {
+        ActiveMQConnection connection = createConnection();
+        RedeliveryPolicy noRedelivery = new RedeliveryPolicy();
+        noRedelivery.setMaximumRedeliveries(0);
+        connection.setRedeliveryPolicy(noRedelivery);
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer consumer = session.createConsumer(dest);
+        Message m;
+        while ( (m = consumer.receive(4000)) != null) {
+            LOG.info("Got:" + m);
+            TimeUnit.SECONDS.sleep(1);
+            session.rollback();
+        }
+        connection.close();
+    }
+
+    private void produce() throws Exception {
+        Connection connection = createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(dest);
+        producer.setTimeToLive(10000);
+        for (int i=0;i<20;i++) {
+            producer.send(session.createTextMessage("i="+i));
+        }
+       connection.close();
+    }
+
+    private ActiveMQConnection createConnection() throws JMSException {
+        return (ActiveMQConnection) new ActiveMQConnectionFactory(activemqURL).createConnection();
+    }
+
+
+    public long getQueueSize() throws Exception {
+        long queueSize = 0;
+        try {
+            QueueViewMBean queueViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(BrokerMBeanSupport.createDestinationName(brokerService.getBrokerObjectName(), dest), QueueViewMBean.class, false);
+            queueSize = queueViewMBean.getQueueSize();
+            LOG.info("QueueSize for destination {} is {}", dest, queueSize);
+        } catch (Exception ex) {
+           LOG.error("Error retrieving QueueSize from JMX ", ex);
+           throw ex;
+        }
+        return queueSize;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/26807cd4/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
index 4c97972..0205599 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
@@ -190,7 +190,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
 
         // memory check
         assertEquals("memory usage is back to duck egg", 0, getDestination(broker, destination).getMemoryUsage().getPercentUsage());
-        assertTrue("memory usage is increased ", 0 < getDestination(broker, dlqDestination).getMemoryUsage().getPercentUsage());
+        assertTrue("memory usage is increased ", 0 < getDestination(broker, dlqDestination).getMemoryUsage().getUsage());
 
         // verify DLQ
         MessageConsumer dlqConsumer = createDlqConsumer(connection);

http://git-wip-us.apache.org/repos/asf/activemq/blob/26807cd4/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
index ebdb5bc..e2ad7f6 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
@@ -257,7 +257,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
     // first ack delivered after expiry
     public void testExpiredMessagesWithVerySlowConsumer() throws Exception {
         createBroker();
-        final long queuePrefetch = 600;
+        final long queuePrefetch = 5;
         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
                 connectionUri + "?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch);
         connection = factory.createConnection();
@@ -266,7 +266,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
         final int ttl = 4000;
         producer.setTimeToLive(ttl);
 
-        final long sendCount = 1500;
+        final long sendCount = 10;
         final CountDownLatch receivedOneCondition = new CountDownLatch(1);
         final CountDownLatch waitCondition = new CountDownLatch(1);
 
@@ -328,10 +328,14 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
                 return queuePrefetch == view.getDispatchCount();
             }
         }));
-        assertTrue("Not all sent have expired ", Wait.waitFor(new Wait.Condition() {
+        assertTrue("all non inflight have expired ", Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
-                return sendCount == view.getExpiredCount();
+                LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+                        + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+                        + ", size= " + view.getQueueSize());
+
+                return view.getExpiredCount() > 0 && (view.getEnqueueCount() - view.getInFlightCount()) == view.getExpiredCount();
             }
         }));
 
@@ -448,10 +452,15 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
                 return queuePrefetch == view.getDispatchCount();
             }
         }));
-        assertTrue("All have not sent have expired ", Wait.waitFor(new Wait.Condition() {
+
+        assertTrue("all non inflight have expired ", Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
-                return sendCount == view.getExpiredCount();
+                LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+                        + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+                        + ", size= " + view.getQueueSize());
+
+                return view.getExpiredCount() > 0 && (view.getEnqueueCount() - view.getInFlightCount()) == view.getExpiredCount();
             }
         }));
 


[3/3] git commit: https://issues.apache.org/jira/browse/AMQ-5266 - remove err message print on iterator limit

Posted by gt...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5266 - remove err message print on iterator limit


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8cdb5c2c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8cdb5c2c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8cdb5c2c

Branch: refs/heads/trunk
Commit: 8cdb5c2c1d36416a827470661679087bc9e9a108
Parents: 5861d86
Author: gtully <ga...@gmail.com>
Authored: Thu Sep 11 17:07:35 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Sep 11 17:07:35 2014 +0100

----------------------------------------------------------------------
 .../java/org/apache/activemq/store/kahadb/disk/index/BTreeNode.java | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8cdb5c2c/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/BTreeNode.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/BTreeNode.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/BTreeNode.java
index a53d5bf..b91a9fc 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/BTreeNode.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/BTreeNode.java
@@ -113,7 +113,6 @@ public final class BTreeNode<Key,Value> {
                         }
                     }  else {
                         if (endKey != null && current.keys[nextIndex].equals(endKey)) {
-                            System.err.println("Stopping iterator on reaching: " + endKey);
                             break;
                         }
                         nextEntry = new KeyValueEntry(current.keys[nextIndex], current.values[nextIndex]);


[2/3] git commit: https://issues.apache.org/jira/browse/AMQ-5266 - fix edge case with optimizedDispatch=true where a single message could be pending till the next page in event

Posted by gt...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5266 - fix edge case with optimizedDispatch=true where a single message could be pending till the next page in event


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5861d86a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5861d86a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5861d86a

Branch: refs/heads/trunk
Commit: 5861d86ad39cac1644b1a48157bd6c799a586ac4
Parents: 26807cd
Author: gtully <ga...@gmail.com>
Authored: Thu Sep 11 16:59:50 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Sep 11 16:59:50 2014 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |  6 ++--
 .../org/apache/activemq/bugs/AMQ5266Test.java   | 29 ++++++++++++--------
 2 files changed, 20 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/5861d86a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index ff16dfc..c7f768e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -781,12 +781,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
             sendLock.unlock();
         }
         for (MessageContext messageContext : orderedUpdates) {
-            if (!messageContext.duplicate) {
-                messageSent(messageContext.context, messageContext.message);
-            }
             if (messageContext.onCompletion != null) {
                 messageContext.onCompletion.run();
             }
+            if (!messageContext.duplicate) {
+                messageSent(messageContext.context, messageContext.message);
+            }
         }
         orderedUpdates.clear();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/5861d86a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
index 626fe6e..efccefa 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
@@ -87,22 +87,26 @@ public class AMQ5266Test {
     @Parameterized.Parameter(5)
     public boolean useDefaultStore = false;
 
-    @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5}")
+    @Parameterized.Parameter(6)
+    public boolean optimizeDispatch = false;
+
+    @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
     public static Iterable<Object[]> parameters() {
         return Arrays.asList(new Object[][]{
                 // jdbc
-                {1000, 20,  5,   50*1024,   true,  false},
-                {100,  20,  5,   50*1024,   false, false},
-                {1000, 5,   20,  50*1024,   true,  false},
-                {1000, 20,  20,  1024*1024, true,  false},
-                {1000, 100, 100, 1024*1024, true,  false},
+                {1,    1,   1,   50*1024,   false, false, true},
+                {1000, 20,  5,   50*1024,   true,  false, false},
+                {100,  20,  5,   50*1024,   false, false, false},
+                {1000, 5,   20,  50*1024,   true,  false, false},
+                {1000, 20,  20,  1024*1024, true,  false, false},
 
                 // default store
-                {1000, 20,  5,   50*1024,   true,  true},
-                {100,  20,  5,   50*1024,   false, true},
-                {1000, 5,   20,  50*1024,   true,  true},
-                {1000, 20,  20,  1024*1024, true,  true},
-                {1000, 100, 100, 1024*1024, true,  true}
+                {1,    1,   1,   50*1024,   false, true, true},
+                {100,  5,   5,   50*1024,   false, true, false},
+                {1000, 20,  5,   50*1024,   true,  true, false},
+                {100,  20,  5,   50*1024,   false, true, false},
+                {1000, 5,   20,  50*1024,   true,  true, false},
+                {1000, 20,  20,  1024*1024, true,  true, false},
         });
     }
 
@@ -127,6 +131,7 @@ public class AMQ5266Test {
             kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true);
         }
         brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setUseJmx(false);
 
 
         PolicyMap policyMap = new PolicyMap();
@@ -136,7 +141,7 @@ public class AMQ5266Test {
         defaultEntry.setEnableAudit(true);
         defaultEntry.setUseCache(useCache);
         defaultEntry.setMaxPageSize(1000);
-        defaultEntry.setOptimizedDispatch(false);
+        defaultEntry.setOptimizedDispatch(optimizeDispatch);
         defaultEntry.setMemoryLimit(destMemoryLimit);
         defaultEntry.setExpireMessagesPeriod(0);
         policyMap.setDefaultEntry(defaultEntry);