You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/09/11 18:08:05 UTC
[1/3] git commit: https://issues.apache.org/jira/browse/AMQ-5274 - we
now only check expiry on non inflight messages so there is on contention on
ack with the periodic expriy check thread - related
https://issues.apache.org/jira/browse/AMQ-2876
Repository: activemq
Updated Branches:
refs/heads/trunk b1ede0559 -> 8cdb5c2c1
https://issues.apache.org/jira/browse/AMQ-5274 - we now only check expiry on non inflight messages so there is on contention on ack with the periodic expriy check thread - related https://issues.apache.org/jira/browse/AMQ-2876
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/26807cd4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/26807cd4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/26807cd4
Branch: refs/heads/trunk
Commit: 26807cd4524460e22844d16136f03bc96fa9b4c8
Parents: b1ede05
Author: gtully <ga...@gmail.com>
Authored: Thu Sep 11 16:13:43 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Sep 11 16:13:43 2014 +0100
----------------------------------------------------------------------
.../apache/activemq/broker/region/Queue.java | 2 +-
.../broker/region/QueueSubscription.java | 8 --
...JmsSendReceiveWithMessageExpirationTest.java | 21 ++-
.../org/apache/activemq/bugs/AMQ5274Test.java | 133 +++++++++++++++++++
.../activemq/usecases/ExpiredMessagesTest.java | 2 +-
.../ExpiredMessagesWithNoConsumerTest.java | 21 ++-
6 files changed, 170 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/26807cd4/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index e9f2180..ff16dfc 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1169,7 +1169,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
List<MessageReference> toExpire) throws Exception {
for (Iterator<? extends MessageReference> i = refs.iterator(); i.hasNext() && l.size() < max;) {
QueueMessageReference ref = (QueueMessageReference) i.next();
- if (ref.isExpired()) {
+ if (ref.isExpired() && (ref.getLockOwner() == null)) {
toExpire.add(ref);
} else if (l.contains(ref.getMessage()) == false) {
l.add(ref.getMessage());
http://git-wip-us.apache.org/repos/asf/activemq/blob/26807cd4/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
index 7c7027f..358f946 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
@@ -52,14 +52,6 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
final Destination q = (Destination) n.getRegionDestination();
final QueueMessageReference node = (QueueMessageReference)n;
final Queue queue = (Queue)q;
-
- if (n.isExpired()) {
- // sync with message expiry processing
- if (!broker.isExpired(n)) {
- LOG.debug("ignoring ack {}, for already expired message: {}", ack, n);
- return;
- }
- }
queue.removeMessage(context, this, node, ack);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/26807cd4/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
index 956fa40..391253e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
@@ -30,6 +30,10 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
+import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -151,7 +155,7 @@ public class JmsSendReceiveWithMessageExpirationTest extends TestSupport {
received.acknowledge();
};
- assertEquals("got messages", messageCount + 1, messages.size());
+ assertEquals("got all (normal plus one with ttl) messages", messageCount + 1, messages.size());
Vector<Message> dlqMessages = new Vector<Message>();
while ((received = dlqConsumer.receive(1000)) != null) {
@@ -159,6 +163,21 @@ public class JmsSendReceiveWithMessageExpirationTest extends TestSupport {
};
assertEquals("got dlq messages", data.length - 1, dlqMessages.size());
+
+ final DestinationStatistics view = getDestinationStatistics(BrokerRegistry.getInstance().findFirst(), ActiveMQDestination.transform(consumerDestination));
+
+ // wait for all to inflight to expire
+ assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return view.getInflight().getCount() == 0;
+ }
+ }));
+ assertEquals("Wrong inFlightCount: ", 0, view.getInflight().getCount());
+
+ LOG.info("Stats: received: " + messages.size() + ", messages: " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount()
+ + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expired: " + view.getExpired().getCount());
+
}
/**
http://git-wip-us.apache.org/repos/asf/activemq/blob/26807cd4/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java
new file mode 100644
index 0000000..4ba6526
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AMQ5274Test {
+ static Logger LOG = LoggerFactory.getLogger(AMQ5274Test.class);
+ String activemqURL;
+ BrokerService brokerService;
+ ActiveMQQueue dest = new ActiveMQQueue("TestQ");
+
+
+ @Before
+ public void startBroker() throws Exception {
+ brokerService = new BrokerService();
+ brokerService.setPersistent(false);
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry defaultPolicy = new PolicyEntry();
+ defaultPolicy.setExpireMessagesPeriod(1000);
+ policyMap.setDefaultEntry(defaultPolicy);
+ brokerService.setDestinationPolicy(policyMap);
+ activemqURL = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
+ brokerService.start();
+ }
+
+ @After
+ public void stopBroker() throws Exception {
+ if (brokerService != null) {
+ brokerService.stop();
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ LOG.info("Starting Test");
+ assertTrue(brokerService.isStarted());
+
+ produce();
+ consumeAndRollback();
+
+ // check reported queue size using JMX
+ long queueSize = getQueueSize();
+ assertEquals("Queue " + dest.getPhysicalName() + " not empty, reporting " + queueSize + " messages.", 0, queueSize);
+ }
+
+ private void consumeAndRollback() throws JMSException, InterruptedException {
+ ActiveMQConnection connection = createConnection();
+ RedeliveryPolicy noRedelivery = new RedeliveryPolicy();
+ noRedelivery.setMaximumRedeliveries(0);
+ connection.setRedeliveryPolicy(noRedelivery);
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = session.createConsumer(dest);
+ Message m;
+ while ( (m = consumer.receive(4000)) != null) {
+ LOG.info("Got:" + m);
+ TimeUnit.SECONDS.sleep(1);
+ session.rollback();
+ }
+ connection.close();
+ }
+
+ private void produce() throws Exception {
+ Connection connection = createConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(dest);
+ producer.setTimeToLive(10000);
+ for (int i=0;i<20;i++) {
+ producer.send(session.createTextMessage("i="+i));
+ }
+ connection.close();
+ }
+
+ private ActiveMQConnection createConnection() throws JMSException {
+ return (ActiveMQConnection) new ActiveMQConnectionFactory(activemqURL).createConnection();
+ }
+
+
+ public long getQueueSize() throws Exception {
+ long queueSize = 0;
+ try {
+ QueueViewMBean queueViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(BrokerMBeanSupport.createDestinationName(brokerService.getBrokerObjectName(), dest), QueueViewMBean.class, false);
+ queueSize = queueViewMBean.getQueueSize();
+ LOG.info("QueueSize for destination {} is {}", dest, queueSize);
+ } catch (Exception ex) {
+ LOG.error("Error retrieving QueueSize from JMX ", ex);
+ throw ex;
+ }
+ return queueSize;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/26807cd4/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
index 4c97972..0205599 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
@@ -190,7 +190,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
// memory check
assertEquals("memory usage is back to duck egg", 0, getDestination(broker, destination).getMemoryUsage().getPercentUsage());
- assertTrue("memory usage is increased ", 0 < getDestination(broker, dlqDestination).getMemoryUsage().getPercentUsage());
+ assertTrue("memory usage is increased ", 0 < getDestination(broker, dlqDestination).getMemoryUsage().getUsage());
// verify DLQ
MessageConsumer dlqConsumer = createDlqConsumer(connection);
http://git-wip-us.apache.org/repos/asf/activemq/blob/26807cd4/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
index ebdb5bc..e2ad7f6 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
@@ -257,7 +257,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
// first ack delivered after expiry
public void testExpiredMessagesWithVerySlowConsumer() throws Exception {
createBroker();
- final long queuePrefetch = 600;
+ final long queuePrefetch = 5;
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
connectionUri + "?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch);
connection = factory.createConnection();
@@ -266,7 +266,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
final int ttl = 4000;
producer.setTimeToLive(ttl);
- final long sendCount = 1500;
+ final long sendCount = 10;
final CountDownLatch receivedOneCondition = new CountDownLatch(1);
final CountDownLatch waitCondition = new CountDownLatch(1);
@@ -328,10 +328,14 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
return queuePrefetch == view.getDispatchCount();
}
}));
- assertTrue("Not all sent have expired ", Wait.waitFor(new Wait.Condition() {
+ assertTrue("all non inflight have expired ", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
- return sendCount == view.getExpiredCount();
+ LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+ + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+ + ", size= " + view.getQueueSize());
+
+ return view.getExpiredCount() > 0 && (view.getEnqueueCount() - view.getInFlightCount()) == view.getExpiredCount();
}
}));
@@ -448,10 +452,15 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
return queuePrefetch == view.getDispatchCount();
}
}));
- assertTrue("All have not sent have expired ", Wait.waitFor(new Wait.Condition() {
+
+ assertTrue("all non inflight have expired ", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
- return sendCount == view.getExpiredCount();
+ LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+ + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+ + ", size= " + view.getQueueSize());
+
+ return view.getExpiredCount() > 0 && (view.getEnqueueCount() - view.getInFlightCount()) == view.getExpiredCount();
}
}));
[3/3] git commit: https://issues.apache.org/jira/browse/AMQ-5266 -
remove err message print on iterator limit
Posted by gt...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5266 - remove err message print on iterator limit
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8cdb5c2c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8cdb5c2c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8cdb5c2c
Branch: refs/heads/trunk
Commit: 8cdb5c2c1d36416a827470661679087bc9e9a108
Parents: 5861d86
Author: gtully <ga...@gmail.com>
Authored: Thu Sep 11 17:07:35 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Sep 11 17:07:35 2014 +0100
----------------------------------------------------------------------
.../java/org/apache/activemq/store/kahadb/disk/index/BTreeNode.java | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/8cdb5c2c/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/BTreeNode.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/BTreeNode.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/BTreeNode.java
index a53d5bf..b91a9fc 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/BTreeNode.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/BTreeNode.java
@@ -113,7 +113,6 @@ public final class BTreeNode<Key,Value> {
}
} else {
if (endKey != null && current.keys[nextIndex].equals(endKey)) {
- System.err.println("Stopping iterator on reaching: " + endKey);
break;
}
nextEntry = new KeyValueEntry(current.keys[nextIndex], current.values[nextIndex]);
[2/3] git commit: https://issues.apache.org/jira/browse/AMQ-5266 -
fix edge case with optimizedDispatch=true where a single message could be
pending till the next page in event
Posted by gt...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5266 - fix edge case with optimizedDispatch=true where a single message could be pending till the next page in event
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5861d86a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5861d86a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5861d86a
Branch: refs/heads/trunk
Commit: 5861d86ad39cac1644b1a48157bd6c799a586ac4
Parents: 26807cd
Author: gtully <ga...@gmail.com>
Authored: Thu Sep 11 16:59:50 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Sep 11 16:59:50 2014 +0100
----------------------------------------------------------------------
.../apache/activemq/broker/region/Queue.java | 6 ++--
.../org/apache/activemq/bugs/AMQ5266Test.java | 29 ++++++++++++--------
2 files changed, 20 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/5861d86a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index ff16dfc..c7f768e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -781,12 +781,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
sendLock.unlock();
}
for (MessageContext messageContext : orderedUpdates) {
- if (!messageContext.duplicate) {
- messageSent(messageContext.context, messageContext.message);
- }
if (messageContext.onCompletion != null) {
messageContext.onCompletion.run();
}
+ if (!messageContext.duplicate) {
+ messageSent(messageContext.context, messageContext.message);
+ }
}
orderedUpdates.clear();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/5861d86a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
index 626fe6e..efccefa 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
@@ -87,22 +87,26 @@ public class AMQ5266Test {
@Parameterized.Parameter(5)
public boolean useDefaultStore = false;
- @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5}")
+ @Parameterized.Parameter(6)
+ public boolean optimizeDispatch = false;
+
+ @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
public static Iterable<Object[]> parameters() {
return Arrays.asList(new Object[][]{
// jdbc
- {1000, 20, 5, 50*1024, true, false},
- {100, 20, 5, 50*1024, false, false},
- {1000, 5, 20, 50*1024, true, false},
- {1000, 20, 20, 1024*1024, true, false},
- {1000, 100, 100, 1024*1024, true, false},
+ {1, 1, 1, 50*1024, false, false, true},
+ {1000, 20, 5, 50*1024, true, false, false},
+ {100, 20, 5, 50*1024, false, false, false},
+ {1000, 5, 20, 50*1024, true, false, false},
+ {1000, 20, 20, 1024*1024, true, false, false},
// default store
- {1000, 20, 5, 50*1024, true, true},
- {100, 20, 5, 50*1024, false, true},
- {1000, 5, 20, 50*1024, true, true},
- {1000, 20, 20, 1024*1024, true, true},
- {1000, 100, 100, 1024*1024, true, true}
+ {1, 1, 1, 50*1024, false, true, true},
+ {100, 5, 5, 50*1024, false, true, false},
+ {1000, 20, 5, 50*1024, true, true, false},
+ {100, 20, 5, 50*1024, false, true, false},
+ {1000, 5, 20, 50*1024, true, true, false},
+ {1000, 20, 20, 1024*1024, true, true, false},
});
}
@@ -127,6 +131,7 @@ public class AMQ5266Test {
kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true);
}
brokerService.setDeleteAllMessagesOnStartup(true);
+ brokerService.setUseJmx(false);
PolicyMap policyMap = new PolicyMap();
@@ -136,7 +141,7 @@ public class AMQ5266Test {
defaultEntry.setEnableAudit(true);
defaultEntry.setUseCache(useCache);
defaultEntry.setMaxPageSize(1000);
- defaultEntry.setOptimizedDispatch(false);
+ defaultEntry.setOptimizedDispatch(optimizeDispatch);
defaultEntry.setMemoryLimit(destMemoryLimit);
defaultEntry.setExpireMessagesPeriod(0);
policyMap.setDefaultEntry(defaultEntry);