You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2017/05/03 15:17:48 UTC
activemq git commit: modify test to validate setting isDLQ flag via
startup destination query options
Repository: activemq
Updated Branches:
refs/heads/master 35f30102a -> 18d05ba5e
modify test to validate setting isDLQ flag via startup destination query options
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/18d05ba5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/18d05ba5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/18d05ba5
Branch: refs/heads/master
Commit: 18d05ba5e0a96d231a357eb0a20b1f6388cb6c49
Parents: 35f3010
Author: gtully <ga...@gmail.com>
Authored: Wed May 3 16:17:29 2017 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed May 3 16:17:29 2017 +0100
----------------------------------------------------------------------
.../org/apache/activemq/bugs/AMQ6059Test.java | 391 +++++++++----------
1 file changed, 192 insertions(+), 199 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/18d05ba5/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6059Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6059Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6059Test.java
index 049d683..44053d0 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6059Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6059Test.java
@@ -1,199 +1,192 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.util.Enumeration;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.Session;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.leveldb.LevelDBStore;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Once the wire format is completed we can test against real persistence storage.
- */
-public class AMQ6059Test {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ6059Test.class);
-
- private BrokerService broker;
-
- @Before
- public void setUp() throws Exception {
- broker = createBroker();
- broker.start();
- broker.waitUntilStarted();
- }
-
- @After
- public void tearDown() throws Exception {
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- }
- }
-
- @Test
- public void testDLQRecovery() throws Exception {
-
- sendMessage(new ActiveMQQueue("leveldbQueue"));
- TimeUnit.SECONDS.sleep(3);
-
- LOG.info("### Check for expired message moving to DLQ.");
-
- Queue dlqQueue = (Queue) createDlqDestination();
- verifyIsDlq(dlqQueue);
-
- final QueueViewMBean queueViewMBean = getProxyToQueue(dlqQueue.getQueueName());
-
- assertTrue("The message expired", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- LOG.info("DLQ stats: Enqueues {}, Dispatches {}, Expired {}, Inflight {}",
- new Object[] { queueViewMBean.getEnqueueCount(),
- queueViewMBean.getDispatchCount(),
- queueViewMBean.getExpiredCount(),
- queueViewMBean.getInFlightCount()});
- return queueViewMBean.getEnqueueCount() == 1;
- }
- }));
-
- verifyMessageIsRecovered(dlqQueue);
- restartBroker(broker);
- verifyMessageIsRecovered(dlqQueue);
- }
-
- protected BrokerService createBroker() throws Exception {
- return createBrokerWithDLQ(true);
- }
-
- private BrokerService createBrokerWithDLQ(boolean purge) throws Exception {
- BrokerService broker = new BrokerService();
-
- File directory = new File("target/activemq-data/leveldb");
- if (purge) {
- IOHelper.deleteChildren(directory);
- }
-
- LevelDBStore levelDBStore = new LevelDBStore();
- levelDBStore.setDirectory(directory);
- if (purge) {
- levelDBStore.deleteAllMessages();
- }
-
- PolicyMap pMap = new PolicyMap();
-
- SharedDeadLetterStrategy sharedDLQStrategy = new SharedDeadLetterStrategy();
- sharedDLQStrategy.setProcessNonPersistent(true);
- sharedDLQStrategy.setProcessExpired(true);
- sharedDLQStrategy.setDeadLetterQueue(new ActiveMQQueue("ActiveMQ.DLQ"));
- sharedDLQStrategy.setExpiration(10000);
-
- PolicyEntry defaultPolicy = new PolicyEntry();
- defaultPolicy.setDeadLetterStrategy(sharedDLQStrategy);
- defaultPolicy.setExpireMessagesPeriod(2000);
- defaultPolicy.setUseCache(false);
-
- pMap.put(new ActiveMQQueue(">"), defaultPolicy);
- broker.setDestinationPolicy(pMap);
- broker.setPersistenceAdapter(levelDBStore);
- if (purge) {
- broker.setDeleteAllMessagesOnStartup(true);
- }
-
- return broker;
- }
-
- private void restartBroker(BrokerService broker) throws Exception {
- broker.stop();
- broker.waitUntilStopped();
- broker = createBrokerWithDLQ(false);
- broker.start();
- broker.waitUntilStarted();
- }
-
- private void verifyMessageIsRecovered(final Queue dlqQueue) throws Exception, JMSException {
- Connection connection = createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- QueueBrowser browser = session.createBrowser(dlqQueue);
- Enumeration<?> elements = browser.getEnumeration();
- assertTrue(elements.hasMoreElements());
- Message browsed = (Message) elements.nextElement();
- assertNotNull("Recover message after broker restarts", browsed);
- }
-
- private void sendMessage(Destination destination) throws Exception {
- Connection connection = createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- producer.send(destination, session.createTextMessage("DLQ message"), DeliveryMode.PERSISTENT, 4, 1000);
- connection.stop();
- LOG.info("### Send message that will expire.");
- }
-
- private Connection createConnection() throws Exception {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker.getVmConnectorURI());
- return factory.createConnection();
- }
-
- private Destination createDlqDestination() {
- return new ActiveMQQueue("ActiveMQ.DLQ");
- }
-
- private void verifyIsDlq(Queue dlqQ) throws Exception {
- final QueueViewMBean queueViewMBean = getProxyToQueue(dlqQ.getQueueName());
- assertTrue("is dlq", queueViewMBean.isDLQ());
- }
-
- private QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
- ObjectName queueViewMBeanName = new ObjectName(
- "org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + name);
- QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(
- queueViewMBeanName, QueueViewMBean.class, true);
- return proxy;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Enumeration;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.leveldb.LevelDBStore;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Once the wire format is completed we can test against real persistence storage.
+ */
+public class AMQ6059Test {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AMQ6059Test.class);
+
+ private BrokerService broker;
+
+ @Before
+ public void setUp() throws Exception {
+ broker = createBroker();
+ broker.start();
+ broker.waitUntilStarted();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+ }
+
+ @Test
+ public void testDLQRecovery() throws Exception {
+
+ sendMessage(new ActiveMQQueue("QName"));
+ TimeUnit.SECONDS.sleep(3);
+
+ LOG.info("### Check for expired message moving to DLQ.");
+
+ Queue dlqQueue = (Queue) createDlqDestination();
+ verifyIsDlq(dlqQueue);
+
+ final QueueViewMBean queueViewMBean = getProxyToQueue(dlqQueue.getQueueName());
+
+ assertTrue("The message expired", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("DLQ stats: Enqueues {}, Dispatches {}, Expired {}, Inflight {}",
+ new Object[] { queueViewMBean.getEnqueueCount(),
+ queueViewMBean.getDispatchCount(),
+ queueViewMBean.getExpiredCount(),
+ queueViewMBean.getInFlightCount()});
+ return queueViewMBean.getEnqueueCount() == 1;
+ }
+ }));
+
+ verifyMessageIsRecovered(dlqQueue);
+ restartBroker();
+ verifyIsDlq(dlqQueue);
+ verifyMessageIsRecovered(dlqQueue);
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ return createBrokerWithDLQ(true);
+ }
+
+ private BrokerService createBrokerWithDLQ(boolean purge) throws Exception {
+ BrokerService broker = new BrokerService();
+ ActiveMQQueue dlq = new ActiveMQQueue("ActiveMQ.DLQ?isDLQ=true");
+
+ broker.setDestinations(new ActiveMQDestination[]{dlq});
+
+ PolicyMap pMap = new PolicyMap();
+
+ SharedDeadLetterStrategy sharedDLQStrategy = new SharedDeadLetterStrategy();
+ sharedDLQStrategy.setProcessNonPersistent(true);
+ sharedDLQStrategy.setProcessExpired(true);
+ sharedDLQStrategy.setDeadLetterQueue(dlq);
+ sharedDLQStrategy.setExpiration(10000);
+
+ PolicyEntry defaultPolicy = new PolicyEntry();
+ defaultPolicy.setDeadLetterStrategy(sharedDLQStrategy);
+ defaultPolicy.setExpireMessagesPeriod(2000);
+ defaultPolicy.setUseCache(false);
+
+ pMap.put(new ActiveMQQueue(">"), defaultPolicy);
+ broker.setDestinationPolicy(pMap);
+ if (purge) {
+ broker.setDeleteAllMessagesOnStartup(true);
+ }
+
+ return broker;
+ }
+
+ private void restartBroker() throws Exception {
+ broker.stop();
+ broker.waitUntilStopped();
+ broker = createBrokerWithDLQ(false);
+ broker.start();
+ broker.waitUntilStarted();
+ }
+
+ private void verifyMessageIsRecovered(final Queue dlqQueue) throws Exception, JMSException {
+ Connection connection = createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ QueueBrowser browser = session.createBrowser(dlqQueue);
+ Enumeration<?> elements = browser.getEnumeration();
+ assertTrue(elements.hasMoreElements());
+ Message browsed = (Message) elements.nextElement();
+ assertNotNull("Recover message after broker restarts", browsed);
+ }
+
+ private void sendMessage(Destination destination) throws Exception {
+ Connection connection = createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ producer.send(destination, session.createTextMessage("DLQ message"), DeliveryMode.PERSISTENT, 4, 1000);
+ connection.stop();
+ LOG.info("### Send message that will expire.");
+ }
+
+ private Connection createConnection() throws Exception {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker.getVmConnectorURI());
+ return factory.createConnection();
+ }
+
+ private Destination createDlqDestination() {
+ return new ActiveMQQueue("ActiveMQ.DLQ");
+ }
+
+ private void verifyIsDlq(Queue dlqQ) throws Exception {
+ final QueueViewMBean queueViewMBean = getProxyToQueue(dlqQ.getQueueName());
+ assertTrue("is dlq", queueViewMBean.isDLQ());
+ }
+
+ private QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
+ ObjectName queueViewMBeanName = new ObjectName(
+ "org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + name);
+ QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(
+ queueViewMBeanName, QueueViewMBean.class, true);
+ return proxy;
+ }
+}