You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:21 UTC
[12/42] activemq-artemis git commit: ARTEMIS-463 Improvement to the
openwire testsuite https://issues.apache.org/jira/browse/ARTEMIS-463
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
deleted file mode 100644
index a70ef67..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
+++ /dev/null
@@ -1,584 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.store;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQPrefetchPolicy;
-import org.apache.activemq.CombinationTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
-import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-abstract public class MessagePriorityTest extends CombinationTestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(MessagePriorityTest.class);
-
- BrokerService broker;
- PersistenceAdapter adapter;
-
- protected ActiveMQConnectionFactory factory;
- protected Connection conn;
- protected Session sess;
-
- public boolean useCache = true;
- public int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
- public boolean dispatchAsync = true;
- public boolean prioritizeMessages = true;
- public boolean immediatePriorityDispatch = true;
- public int prefetchVal = 500;
- public int expireMessagePeriod = 30000;
-
- public int MSG_NUM = 600;
- public int HIGH_PRI = 7;
- public int LOW_PRI = 3;
-
- abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception;
-
- @Override
- protected void setUp() throws Exception {
- broker = new BrokerService();
- broker.setBrokerName("priorityTest");
- broker.setAdvisorySupport(false);
- adapter = createPersistenceAdapter(true);
- broker.setPersistenceAdapter(adapter);
- PolicyEntry policy = new PolicyEntry();
- policy.setPrioritizedMessages(prioritizeMessages);
- policy.setUseCache(useCache);
- policy.setExpireMessagesPeriod(expireMessagePeriod);
- StorePendingDurableSubscriberMessageStoragePolicy durableSubPending = new StorePendingDurableSubscriberMessageStoragePolicy();
- durableSubPending.setImmediatePriorityDispatch(immediatePriorityDispatch);
- durableSubPending.setUseCache(useCache);
- policy.setPendingDurableSubscriberPolicy(durableSubPending);
- PolicyMap policyMap = new PolicyMap();
- policyMap.put(new ActiveMQQueue("TEST"), policy);
- policyMap.put(new ActiveMQTopic("TEST"), policy);
-
- // do not process expired for one test
- PolicyEntry ignoreExpired = new PolicyEntry();
- SharedDeadLetterStrategy ignoreExpiredStrategy = new SharedDeadLetterStrategy();
- ignoreExpiredStrategy.setProcessExpired(false);
- ignoreExpired.setDeadLetterStrategy(ignoreExpiredStrategy);
- policyMap.put(new ActiveMQTopic("TEST_CLEANUP_NO_PRIORITY"), ignoreExpired);
-
- broker.setDestinationPolicy(policyMap);
- broker.start();
- broker.waitUntilStarted();
-
- factory = new ActiveMQConnectionFactory("vm://priorityTest");
- ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
- prefetch.setAll(prefetchVal);
- factory.setPrefetchPolicy(prefetch);
- factory.setWatchTopicAdvisories(false);
- factory.setDispatchAsync(dispatchAsync);
- conn = factory.createConnection();
- conn.setClientID("priority");
- conn.start();
- sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- @Override
- protected void tearDown() throws Exception {
- try {
- sess.close();
- conn.close();
- }
- catch (Exception ignored) {
- }
- finally {
- broker.stop();
- broker.waitUntilStopped();
- }
- }
-
- public void testStoreConfigured() throws Exception {
- final Queue queue = sess.createQueue("TEST");
- final Topic topic = sess.createTopic("TEST");
-
- MessageProducer queueProducer = sess.createProducer(queue);
- MessageProducer topicProducer = sess.createProducer(topic);
-
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return broker.getRegionBroker().getDestinationMap().get(queue) != null;
- }
- });
- assertTrue(broker.getRegionBroker().getDestinationMap().get(queue).getMessageStore().isPrioritizedMessages());
-
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return broker.getRegionBroker().getDestinationMap().get(topic) != null;
- }
- });
- assertTrue(broker.getRegionBroker().getDestinationMap().get(topic).getMessageStore().isPrioritizedMessages());
-
- queueProducer.close();
- topicProducer.close();
-
- }
-
- protected class ProducerThread extends Thread {
-
- int priority;
- int messageCount;
- ActiveMQDestination dest;
-
- public ProducerThread(ActiveMQDestination dest, int messageCount, int priority) {
- this.messageCount = messageCount;
- this.priority = priority;
- this.dest = dest;
- }
-
- @Override
- public void run() {
- try {
- MessageProducer producer = sess.createProducer(dest);
- producer.setPriority(priority);
- producer.setDeliveryMode(deliveryMode);
- for (int i = 0; i < messageCount; i++) {
- producer.send(sess.createTextMessage("message priority: " + priority));
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public void setMessagePriority(int priority) {
- this.priority = priority;
- }
-
- public void setMessageCount(int messageCount) {
- this.messageCount = messageCount;
- }
-
- }
-
- public void initCombosForTestQueues() {
- addCombinationValues("useCache", new Object[]{new Boolean(true), new Boolean(false)});
- addCombinationValues("deliveryMode", new Object[]{new Integer(DeliveryMode.NON_PERSISTENT), new Integer(DeliveryMode.PERSISTENT)});
- }
-
- public void testQueues() throws Exception {
- ActiveMQQueue queue = (ActiveMQQueue) sess.createQueue("TEST");
-
- ProducerThread lowPri = new ProducerThread(queue, MSG_NUM, LOW_PRI);
- ProducerThread highPri = new ProducerThread(queue, MSG_NUM, HIGH_PRI);
-
- lowPri.start();
- highPri.start();
-
- lowPri.join();
- highPri.join();
-
- MessageConsumer queueConsumer = sess.createConsumer(queue);
- for (int i = 0; i < MSG_NUM * 2; i++) {
- Message msg = queueConsumer.receive(5000);
- LOG.debug("received i=" + i + ", " + (msg != null ? msg.getJMSMessageID() : null));
- assertNotNull("Message " + i + " was null", msg);
- assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
- }
- }
-
- protected Message createMessage(int priority) throws Exception {
- final String text = "priority " + priority;
- Message msg = sess.createTextMessage(text);
- LOG.info("Sending " + text);
- return msg;
- }
-
- public void initCombosForTestDurableSubs() {
- addCombinationValues("prefetchVal", new Object[]{new Integer(1000), new Integer(MSG_NUM / 4)});
- }
-
- public void testDurableSubs() throws Exception {
- ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
- TopicSubscriber sub = sess.createDurableSubscriber(topic, "priority");
- sub.close();
-
- ProducerThread lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI);
- ProducerThread highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI);
-
- lowPri.start();
- highPri.start();
-
- lowPri.join();
- highPri.join();
-
- sub = sess.createDurableSubscriber(topic, "priority");
- for (int i = 0; i < MSG_NUM * 2; i++) {
- Message msg = sub.receive(5000);
- assertNotNull("Message " + i + " was null", msg);
- assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
- }
-
- // verify that same broker/store can deal with non priority dest also
- topic = (ActiveMQTopic) sess.createTopic("HAS_NO_PRIORITY");
- sub = sess.createDurableSubscriber(topic, "no_priority");
- sub.close();
-
- lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI);
- highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI);
-
- lowPri.start();
- highPri.start();
-
- lowPri.join();
- highPri.join();
-
- sub = sess.createDurableSubscriber(topic, "no_priority");
- // verify we got them all
- for (int i = 0; i < MSG_NUM * 2; i++) {
- Message msg = sub.receive(5000);
- assertNotNull("Message " + i + " was null", msg);
- }
-
- }
-
- public void initCombosForTestDurableSubsReconnect() {
- addCombinationValues("prefetchVal", new Object[]{new Integer(1000), new Integer(MSG_NUM / 2)});
- // REVISIT = is dispatchAsync = true a problem or is it just the test?
- addCombinationValues("dispatchAsync", new Object[]{Boolean.FALSE});
- addCombinationValues("useCache", new Object[]{Boolean.TRUE, Boolean.FALSE});
- }
-
- public void testDurableSubsReconnect() throws Exception {
- ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
- final String subName = "priorityDisconnect";
- TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
- sub.close();
-
- ProducerThread lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI);
- ProducerThread highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI);
-
- lowPri.start();
- highPri.start();
-
- lowPri.join();
- highPri.join();
-
- final int closeFrequency = MSG_NUM / 4;
- sub = sess.createDurableSubscriber(topic, subName);
- for (int i = 0; i < MSG_NUM * 2; i++) {
- Message msg = sub.receive(15000);
- LOG.debug("received i=" + i + ", " + (msg != null ? msg.getJMSMessageID() : null));
- assertNotNull("Message " + i + " was null", msg);
- assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
- if (i > 0 && i % closeFrequency == 0) {
- LOG.info("Closing durable sub.. on: " + i);
- sub.close();
- sub = sess.createDurableSubscriber(topic, subName);
- }
- }
- }
-
- public void testHighPriorityDelivery() throws Exception {
-
- // get zero prefetch
- ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
- prefetch.setAll(0);
- factory.setPrefetchPolicy(prefetch);
- conn.close();
- conn = factory.createConnection();
- conn.setClientID("priority");
- conn.start();
- sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
- final String subName = "priorityDisconnect";
- TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
- sub.close();
-
- final int numToProduce = 2000;
- final int[] dups = new int[numToProduce * 2];
- ProducerThread producerThread = new ProducerThread(topic, numToProduce, LOW_PRI + 1);
- producerThread.run();
- LOG.info("Low priority messages sent");
-
- sub = sess.createDurableSubscriber(topic, subName);
- final int batchSize = 250;
- int lowLowCount = 0;
- for (int i = 0; i < numToProduce; i++) {
- Message msg = sub.receive(15000);
- LOG.info("received i=" + i + ", " + (msg != null ? msg.getJMSMessageID() + ", priority:" + msg.getJMSPriority() : null));
- assertNotNull("Message " + i + " was null", msg);
- assertEquals("Message " + i + " has wrong priority", LOW_PRI + 1, msg.getJMSPriority());
- assertTrue("not duplicate ", dups[i] == 0);
- dups[i] = 1;
-
- if (i % batchSize == 0) {
- producerThread.setMessagePriority(HIGH_PRI);
- producerThread.setMessageCount(1);
- producerThread.run();
- LOG.info("High priority message sent, should be able to receive immediately");
-
- if (i % batchSize * 2 == 0) {
- producerThread.setMessagePriority(HIGH_PRI - 1);
- producerThread.setMessageCount(1);
- producerThread.run();
- LOG.info("High -1 priority message sent, should be able to receive immediately");
- }
-
- if (i % batchSize * 4 == 0) {
- producerThread.setMessagePriority(LOW_PRI);
- producerThread.setMessageCount(1);
- producerThread.run();
- lowLowCount++;
- LOG.info("Low low priority message sent, should not be able to receive immediately");
- }
-
- msg = sub.receive(15000);
- assertNotNull("Message was null", msg);
- LOG.info("received hi? : " + msg);
- assertEquals("high priority", HIGH_PRI, msg.getJMSPriority());
-
- if (i % batchSize * 2 == 0) {
- msg = sub.receive(15000);
- assertNotNull("Message was null", msg);
- LOG.info("received hi -1 ? i=" + i + ", " + msg);
- assertEquals("high priority", HIGH_PRI - 1, msg.getJMSPriority());
- }
- }
- }
- for (int i = 0; i < lowLowCount; i++) {
- Message msg = sub.receive(15000);
- LOG.debug("received i=" + i + ", " + (msg != null ? msg.getJMSMessageID() : null));
- assertNotNull("Message " + i + " was null", msg);
- assertEquals("Message " + i + " has wrong priority", LOW_PRI, msg.getJMSPriority());
- }
- }
-
- public void initCombosForTestHighPriorityDeliveryInterleaved() {
- addCombinationValues("useCache", new Object[]{Boolean.TRUE, Boolean.FALSE});
- }
-
- public void testHighPriorityDeliveryInterleaved() throws Exception {
-
- // get zero prefetch
- ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
- prefetch.setAll(0);
- factory.setPrefetchPolicy(prefetch);
- conn.close();
- conn = factory.createConnection();
- conn.setClientID("priority");
- conn.start();
- sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
- final String subName = "priorityDisconnect";
- TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
- sub.close();
-
- ProducerThread producerThread = new ProducerThread(topic, 1, HIGH_PRI);
- producerThread.run();
-
- producerThread.setMessagePriority(HIGH_PRI - 1);
- producerThread.setMessageCount(1);
- producerThread.run();
-
- producerThread.setMessagePriority(LOW_PRI);
- producerThread.setMessageCount(1);
- producerThread.run();
- LOG.info("Ordered priority messages sent");
-
- sub = sess.createDurableSubscriber(topic, subName);
-
- Message msg = sub.receive(15000);
- assertNotNull("Message was null", msg);
- LOG.info("received " + msg.getJMSMessageID() + ", priority:" + msg.getJMSPriority());
- assertEquals("Message has wrong priority", HIGH_PRI, msg.getJMSPriority());
-
- producerThread.setMessagePriority(LOW_PRI + 1);
- producerThread.setMessageCount(1);
- producerThread.run();
-
- msg = sub.receive(15000);
- assertNotNull("Message was null", msg);
- LOG.info("received " + msg.getJMSMessageID() + ", priority:" + msg.getJMSPriority());
- assertEquals("high priority", HIGH_PRI - 1, msg.getJMSPriority());
-
- msg = sub.receive(15000);
- assertNotNull("Message was null", msg);
- LOG.info("received hi? : " + msg);
- assertEquals("high priority", LOW_PRI + 1, msg.getJMSPriority());
-
- msg = sub.receive(15000);
- assertNotNull("Message was null", msg);
- LOG.info("received hi? : " + msg);
- assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
-
- msg = sub.receive(4000);
- assertNull("Message was null", msg);
- }
-
- // immediatePriorityDispatch is only relevant when cache is exhausted
- public void initCombosForTestHighPriorityDeliveryThroughBackLog() {
- addCombinationValues("useCache", new Object[]{Boolean.FALSE});
- addCombinationValues("immediatePriorityDispatch", new Object[]{Boolean.TRUE});
- }
-
- public void testHighPriorityDeliveryThroughBackLog() throws Exception {
-
- // get zero prefetch
- ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
- prefetch.setAll(0);
- factory.setPrefetchPolicy(prefetch);
- conn.close();
- conn = factory.createConnection();
- conn.setClientID("priority");
- conn.start();
- sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
- final String subName = "priorityDisconnect";
- TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
- sub.close();
-
- ProducerThread producerThread = new ProducerThread(topic, 600, LOW_PRI);
- producerThread.run();
-
- sub = sess.createDurableSubscriber(topic, subName);
- int count = 0;
-
- for (; count < 300; count++) {
- Message msg = sub.receive(15000);
- assertNotNull("Message was null", msg);
- assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
- }
-
- producerThread.setMessagePriority(HIGH_PRI);
- producerThread.setMessageCount(1);
- producerThread.run();
-
- Message msg = sub.receive(15000);
- assertNotNull("Message was null", msg);
- assertEquals("high priority", HIGH_PRI, msg.getJMSPriority());
-
- for (; count < 600; count++) {
- msg = sub.receive(15000);
- assertNotNull("Message was null", msg);
- assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
- }
- }
-
- public void initCombosForTestHighPriorityNonDeliveryThroughBackLog() {
- addCombinationValues("useCache", new Object[]{Boolean.FALSE});
- addCombinationValues("immediatePriorityDispatch", new Object[]{Boolean.FALSE});
- }
-
- public void testHighPriorityNonDeliveryThroughBackLog() throws Exception {
-
- // get zero prefetch
- ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
- prefetch.setAll(0);
- factory.setPrefetchPolicy(prefetch);
- conn.close();
- conn = factory.createConnection();
- conn.setClientID("priority");
- conn.start();
- sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
- final String subName = "priorityDisconnect";
- TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
- sub.close();
-
- ProducerThread producerThread = new ProducerThread(topic, 600, LOW_PRI);
- producerThread.run();
-
- sub = sess.createDurableSubscriber(topic, subName);
- int count = 0;
-
- for (; count < 300; count++) {
- Message msg = sub.receive(15000);
- assertNotNull("Message was null", msg);
- assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
- }
-
- producerThread.setMessagePriority(HIGH_PRI);
- producerThread.setMessageCount(1);
- producerThread.run();
-
- for (; count < 400; count++) {
- Message msg = sub.receive(15000);
- assertNotNull("Message was null", msg);
- assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
- }
-
- Message msg = sub.receive(15000);
- assertNotNull("Message was null", msg);
- assertEquals("high priority", HIGH_PRI, msg.getJMSPriority());
-
- for (; count < 600; count++) {
- msg = sub.receive(15000);
- assertNotNull("Message was null", msg);
- assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
- }
- }
-
- public void initCombosForTestQueueBacklog() {
- // the cache limits the priority ordering to available memory
- addCombinationValues("useCache", new Object[]{new Boolean(false)});
- // expiry processing can fill the cursor with a snapshot of the producer
- // priority, before producers are complete
- addCombinationValues("expireMessagePeriod", new Object[]{new Integer(0)});
- }
-
- public void testQueueBacklog() throws Exception {
- final int backlog = 180000;
- ActiveMQQueue queue = (ActiveMQQueue) sess.createQueue("TEST");
-
- ProducerThread lowPri = new ProducerThread(queue, backlog, LOW_PRI);
- ProducerThread highPri = new ProducerThread(queue, 10, HIGH_PRI);
-
- lowPri.start();
- lowPri.join();
- highPri.start();
- highPri.join();
-
- LOG.info("Starting consumer...");
- MessageConsumer queueConsumer = sess.createConsumer(queue);
- for (int i = 0; i < 500; i++) {
- Message msg = queueConsumer.receive(20000);
- LOG.debug("received i=" + i + ", " + (msg != null ? msg.getJMSMessageID() : null));
- if (msg == null)
- dumpAllThreads("backlog");
- assertNotNull("Message " + i + " was null", msg);
- assertEquals("Message " + i + " has wrong priority", i < 10 ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java
deleted file mode 100644
index f7ab98d..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-// https://issues.apache.org/activemq/browse/AMQ-2594
-public abstract class StoreOrderTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(StoreOrderTest.class);
-
- protected BrokerService broker;
- private ActiveMQConnection connection;
- public Destination destination = new ActiveMQQueue("StoreOrderTest?consumer.prefetchSize=0");
-
- protected abstract void setPersistentAdapter(BrokerService brokerService) throws Exception;
-
- protected void dumpMessages() throws Exception {
- }
-
- public class TransactedSend implements Runnable {
-
- private CountDownLatch readyForCommit;
- private CountDownLatch firstDone;
- private boolean first;
- private Session session;
- private MessageProducer producer;
-
- public TransactedSend(CountDownLatch readyForCommit, CountDownLatch firstDone, boolean b) throws Exception {
- this.readyForCommit = readyForCommit;
- this.firstDone = firstDone;
- this.first = b;
- session = connection.createSession(true, Session.SESSION_TRANSACTED);
- producer = session.createProducer(destination);
- }
-
- @Override
- public void run() {
- try {
- if (!first) {
- firstDone.await(30, TimeUnit.SECONDS);
- }
- producer.send(session.createTextMessage(first ? "first" : "second"));
- if (first) {
- firstDone.countDown();
- }
- readyForCommit.countDown();
-
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("unexpected ex on run " + e);
- }
- }
-
- public void commit() throws Exception {
- session.commit();
- session.close();
- }
- }
-
- @Before
- public void setup() throws Exception {
- broker = createBroker();
- initConnection();
- }
-
- public void initConnection() throws Exception {
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
- connection = (ActiveMQConnection) connectionFactory.createConnection();
- connection.setWatchTopicAdvisories(false);
- connection.start();
- }
-
- @After
- public void stopBroker() throws Exception {
- if (connection != null) {
- connection.close();
- }
- if (broker != null) {
- broker.stop();
- }
- }
-
- @Test
- public void testCompositeSendReceiveAfterRestart() throws Exception {
- destination = new ActiveMQQueue("StoreOrderTest,SecondStoreOrderTest");
- enqueueOneMessage();
-
- LOG.info("restart broker");
- stopBroker();
- broker = createRestartedBroker();
- dumpMessages();
- initConnection();
- destination = new ActiveMQQueue("StoreOrderTest");
- assertNotNull("got one message from first dest", receiveOne());
- dumpMessages();
- destination = new ActiveMQQueue("SecondStoreOrderTest");
- assertNotNull("got one message from second dest", receiveOne());
- }
-
- @Test
- public void validateUnorderedTxCommit() throws Exception {
-
- Executor executor = Executors.newCachedThreadPool();
- CountDownLatch readyForCommit = new CountDownLatch(2);
- CountDownLatch firstDone = new CountDownLatch(1);
-
- TransactedSend first = new TransactedSend(readyForCommit, firstDone, true);
- TransactedSend second = new TransactedSend(readyForCommit, firstDone, false);
- executor.execute(first);
- executor.execute(second);
-
- assertTrue("both started", readyForCommit.await(20, TimeUnit.SECONDS));
-
- LOG.info("commit out of order");
- // send interleaved so sequence id at time of commit could be reversed
- second.commit();
-
- // force usage over the limit before second commit to flush cache
- enqueueOneMessage();
-
- // can get lost in the cursor as it is behind the last sequenceId that was cached
- first.commit();
-
- LOG.info("send/commit done..");
-
- dumpMessages();
-
- String received1, received2, received3 = null;
- if (true) {
- LOG.info("receive and rollback...");
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- received1 = receive(session);
- received2 = receive(session);
- received3 = receive(session);
-
- assertEquals("second", received1);
- assertEquals("middle", received2);
- assertEquals("first", received3);
-
- session.rollback();
- session.close();
- }
-
- LOG.info("restart broker");
- stopBroker();
- broker = createRestartedBroker();
- initConnection();
-
- if (true) {
- LOG.info("receive and rollback after restart...");
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- received1 = receive(session);
- received2 = receive(session);
- received3 = receive(session);
- assertEquals("second", received1);
- assertEquals("middle", received2);
- assertEquals("first", received3);
- session.rollback();
- session.close();
- }
-
- LOG.info("receive and ack each message");
- received1 = receiveOne();
- received2 = receiveOne();
- received3 = receiveOne();
-
- assertEquals("second", received1);
- assertEquals("middle", received2);
- assertEquals("first", received3);
- }
-
- private void enqueueOneMessage() throws Exception {
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageProducer producer = session.createProducer(destination);
- producer.send(session.createTextMessage("middle"));
- session.commit();
- session.close();
- }
-
- private String receiveOne() throws Exception {
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- String received = receive(session);
- session.commit();
- session.close();
- return received;
- }
-
- private String receive(Session session) throws Exception {
- MessageConsumer consumer = session.createConsumer(destination);
- String result = null;
- TextMessage message = (TextMessage) consumer.receive(5000);
- if (message != null) {
- LOG.info("got message: " + message);
- result = message.getText();
- }
- consumer.close();
- return result;
- }
-
- protected BrokerService createBroker() throws Exception {
- boolean deleteMessagesOnStartup = true;
- return startBroker(deleteMessagesOnStartup);
- }
-
- protected BrokerService createRestartedBroker() throws Exception {
- boolean deleteMessagesOnStartup = false;
- return startBroker(deleteMessagesOnStartup);
- }
-
- protected BrokerService startBroker(boolean deleteMessagesOnStartup) throws Exception {
- BrokerService newBroker = new BrokerService();
- configureBroker(newBroker);
- newBroker.setDeleteAllMessagesOnStartup(deleteMessagesOnStartup);
- newBroker.start();
- return newBroker;
- }
-
- protected void configureBroker(BrokerService brokerService) throws Exception {
- setPersistentAdapter(brokerService);
- brokerService.setAdvisorySupport(false);
-
- PolicyMap map = new PolicyMap();
- PolicyEntry defaultEntry = new PolicyEntry();
- defaultEntry.setMemoryLimit(1024 * 3);
- defaultEntry.setCursorMemoryHighWaterMark(68);
- defaultEntry.setExpireMessagesPeriod(0);
- map.setDefaultEntry(defaultEntry);
- brokerService.setDestinationPolicy(map);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java
deleted file mode 100644
index cc144d0..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Vector;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore;
-import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class StorePerDestinationTest {
-
- static final Logger LOG = LoggerFactory.getLogger(StorePerDestinationTest.class);
- final static int maxFileLength = 1024 * 100;
- final static int numToSend = 5000;
- final Vector<Throwable> exceptions = new Vector<>();
- BrokerService brokerService;
-
- protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception {
-
- BrokerService broker = new BrokerService();
- broker.setUseJmx(false);
- broker.setPersistenceAdapter(kaha);
- return broker;
- }
-
- protected PersistenceAdapter createStore(boolean delete) throws IOException {
- KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
- kaha.setJournalMaxFileLength(maxFileLength);
- kaha.setCleanupInterval(5000);
- if (delete) {
- kaha.deleteAllMessages();
- }
- return kaha;
- }
-
- @Before
- public void prepareCleanBrokerWithMultiStore() throws Exception {
- prepareBrokerWithMultiStore(true);
- }
-
- public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception {
-
- MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
- if (deleteAllMessages) {
- multiKahaDBPersistenceAdapter.deleteAllMessages();
- }
- ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<>();
-
- FilteredKahaDBPersistenceAdapter theRest = new FilteredKahaDBPersistenceAdapter();
- theRest.setPersistenceAdapter(createStore(deleteAllMessages));
- // default destination when not set is a match for all
- adapters.add(theRest);
-
- // separate store for FastQ
- FilteredKahaDBPersistenceAdapter fastQStore = new FilteredKahaDBPersistenceAdapter();
- fastQStore.setPersistenceAdapter(createStore(deleteAllMessages));
- fastQStore.setDestination(new ActiveMQQueue("FastQ"));
- adapters.add(fastQStore);
-
- multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
- brokerService = createBroker(multiKahaDBPersistenceAdapter);
- }
-
- @After
- public void tearDown() throws Exception {
- brokerService.stop();
- }
-
- @Test
- public void testTransactedSendReceive() throws Exception {
- brokerService.start();
- sendMessages(true, "SlowQ", 1, 0);
- assertEquals("got one", 1, receiveMessages(true, "SlowQ", 1));
- }
-
- @Test
- public void testTransactedSendReceiveAcrossStores() throws Exception {
- brokerService.start();
- sendMessages(true, "SlowQ,FastQ", 1, 0);
- assertEquals("got one", 2, receiveMessages(true, "SlowQ,FastQ", 2));
- }
-
- @Test
- public void testCommitRecovery() throws Exception {
- doTestRecovery(true);
- }
-
- @Test
- public void testRollbackRecovery() throws Exception {
- doTestRecovery(false);
- }
-
- public void doTestRecovery(final boolean haveOutcome) throws Exception {
- final MultiKahaDBPersistenceAdapter persistenceAdapter = (MultiKahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
- MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(persistenceAdapter) {
- @Override
- public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
- if (haveOutcome) {
- super.persistOutcome(tx, txid);
- }
- try {
- // IOExceptions will stop the broker
- persistenceAdapter.stop();
- }
- catch (Exception e) {
- LOG.error("ex on stop ", e);
- exceptions.add(e);
- }
- }
- };
- persistenceAdapter.setTransactionStore(transactionStore);
- brokerService.start();
-
- ExecutorService executorService = Executors.newCachedThreadPool();
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- try {
- // commit will block
- sendMessages(true, "SlowQ,FastQ", 1, 0);
- }
- catch (Exception expected) {
- LOG.info("expected", expected);
- }
- }
- });
-
- brokerService.waitUntilStopped();
- // interrupt the send thread
- executorService.shutdownNow();
-
- // verify auto recovery
- prepareBrokerWithMultiStore(false);
- brokerService.start();
-
- assertEquals("expect to get the recovered message", haveOutcome ? 2 : 0, receiveMessages(false, "SlowQ,FastQ", 2));
- assertEquals("all transactions are complete", 0, brokerService.getBroker().getPreparedTransactions(null).length);
- }
-
- @Test
- public void testDirectoryDefault() throws Exception {
- MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
- ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<>();
-
- FilteredKahaDBPersistenceAdapter otherFilteredKahaDBPersistenceAdapter = new FilteredKahaDBPersistenceAdapter();
- PersistenceAdapter otherStore = createStore(false);
- File someOtherDisk = new File("target" + File.separator + "someOtherDisk");
- otherStore.setDirectory(someOtherDisk);
- otherFilteredKahaDBPersistenceAdapter.setPersistenceAdapter(otherStore);
- otherFilteredKahaDBPersistenceAdapter.setDestination(new ActiveMQQueue("Other"));
- adapters.add(otherFilteredKahaDBPersistenceAdapter);
-
- FilteredKahaDBPersistenceAdapter filteredKahaDBPersistenceAdapterDefault = new FilteredKahaDBPersistenceAdapter();
- PersistenceAdapter storeDefault = createStore(false);
- filteredKahaDBPersistenceAdapterDefault.setPersistenceAdapter(storeDefault);
- adapters.add(filteredKahaDBPersistenceAdapterDefault);
-
- multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
-
- assertEquals(multiKahaDBPersistenceAdapter.getDirectory(), storeDefault.getDirectory().getParentFile());
- assertEquals(someOtherDisk, otherStore.getDirectory().getParentFile());
- }
-
- @Test
- public void testSlowFastDestinationsStoreUsage() throws Exception {
- brokerService.start();
- ExecutorService executorService = Executors.newCachedThreadPool();
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- try {
- sendMessages(false, "SlowQ", 50, 500);
- }
- catch (Exception e) {
- exceptions.add(e);
- }
- }
- });
-
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- try {
- sendMessages(false, "FastQ", numToSend, 0);
- }
- catch (Exception e) {
- exceptions.add(e);
- }
- }
- });
-
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- try {
- assertEquals("Got all sent", numToSend, receiveMessages(false, "FastQ", numToSend));
- }
- catch (Exception e) {
- exceptions.add(e);
- }
- }
- });
-
- executorService.shutdown();
- assertTrue("consumers executor finished on time", executorService.awaitTermination(5 * 60, TimeUnit.SECONDS));
- final SystemUsage usage = brokerService.getSystemUsage();
- assertTrue("Store is not hogged", Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- long storeUsage = usage.getStoreUsage().getUsage();
- LOG.info("Store Usage: " + storeUsage);
- return storeUsage < 5 * maxFileLength;
- }
- }));
- assertTrue("no exceptions", exceptions.isEmpty());
- }
-
- private void sendMessages(boolean transacted, String destName, int count, long sleep) throws Exception {
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
- Connection connection = cf.createConnection();
- try {
- Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(new ActiveMQQueue(destName));
- for (int i = 0; i < count; i++) {
- if (sleep > 0) {
- TimeUnit.MILLISECONDS.sleep(sleep);
- }
- producer.send(session.createTextMessage(createContent(i)));
- }
- if (transacted) {
- session.commit();
- }
- }
- finally {
- connection.close();
- }
- }
-
- private int receiveMessages(boolean transacted, String destName, int max) throws JMSException {
- int rc = 0;
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
- Connection connection = cf.createConnection();
- try {
- connection.start();
- Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer messageConsumer = session.createConsumer(new ActiveMQQueue(destName));
- while (rc < max && messageConsumer.receive(4000) != null) {
- rc++;
-
- if (transacted && rc % 200 == 0) {
- session.commit();
- }
- }
- if (transacted) {
- session.commit();
- }
- return rc;
- }
- finally {
- connection.close();
- }
- }
-
- private String createContent(int i) {
- StringBuilder sb = new StringBuilder(i + ":");
- while (sb.length() < 1024) {
- sb.append("*");
- }
- return sb.toString();
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/BrokenPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/BrokenPersistenceAdapter.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/BrokenPersistenceAdapter.java
deleted file mode 100644
index a484c64..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/BrokenPersistenceAdapter.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.store.jdbc;
-
-import java.io.IOException;
-
-import org.apache.activemq.broker.ConnectionContext;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class BrokenPersistenceAdapter extends JDBCPersistenceAdapter {
-
- private final Logger LOG = LoggerFactory.getLogger(BrokenPersistenceAdapter.class);
-
- private boolean shouldBreak = false;
-
- @Override
- public void commitTransaction(ConnectionContext context) throws IOException {
- if (shouldBreak) {
- LOG.warn("Throwing exception on purpose");
- throw new IOException("Breaking on purpose");
- }
- LOG.debug("in commitTransaction");
- super.commitTransaction(context);
- }
-
- public void setShouldBreak(boolean shouldBreak) {
- this.shouldBreak = shouldBreak;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/DatabaseLockerConfigTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/DatabaseLockerConfigTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/DatabaseLockerConfigTest.java
deleted file mode 100644
index 5a4aae8..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/DatabaseLockerConfigTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.jdbc;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.activemq.broker.AbstractLocker;
-import org.junit.Test;
-
-public class DatabaseLockerConfigTest {
-
- @Test
- public void testSleepConfig() throws Exception {
- LeaseDatabaseLocker underTest = new LeaseDatabaseLocker();
- underTest.setLockAcquireSleepInterval(50);
- underTest.configure(null);
- assertEquals("configured sleep value retained", 50, underTest.getLockAcquireSleepInterval());
- }
-
- @Test
- public void testDefaultSleepConfig() throws Exception {
- LeaseDatabaseLocker underTest = new LeaseDatabaseLocker();
- underTest.configure(null);
- assertEquals("configured sleep value retained", AbstractLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL, underTest.getLockAcquireSleepInterval());
- }
-
- @Test
- public void testSleepConfigOrig() throws Exception {
- DefaultDatabaseLocker underTest = new DefaultDatabaseLocker();
- underTest.setLockAcquireSleepInterval(50);
- underTest.configure(null);
- assertEquals("configured sleep value retained", 50, underTest.getLockAcquireSleepInterval());
- }
-
- @Test
- public void testDefaultSleepConfigOrig() throws Exception {
- DefaultDatabaseLocker underTest = new DefaultDatabaseLocker();
- underTest.configure(null);
- assertEquals("configured sleep value retained", AbstractLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL, underTest.getLockAcquireSleepInterval());
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
deleted file mode 100644
index 00c501f..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.store.jdbc;
-
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.derby.jdbc.EmbeddedDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-// https://issues.apache.org/activemq/browse/AMQ-2880
-public class JDBCCommitExceptionTest extends TestCase {
-
- private static final Logger LOG = LoggerFactory.getLogger(JDBCCommitExceptionTest.class);
-
- protected static final int messagesExpected = 10;
- protected ActiveMQConnectionFactory factory;
- protected BrokerService broker;
- protected String connectionUri;
- protected EmbeddedDataSource dataSource;
- protected java.sql.Connection dbConnection;
- protected BrokenPersistenceAdapter jdbc;
-
- @Override
- public void setUp() throws Exception {
- broker = createBroker();
- broker.start();
-
- factory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=" + messagesExpected);
- }
-
- @Override
- public void tearDown() throws Exception {
- broker.stop();
- }
-
- public void testSqlException() throws Exception {
- doTestSqlException();
- }
-
- public void doTestSqlException() throws Exception {
- sendMessages(messagesExpected);
- int messagesReceived = receiveMessages(messagesExpected);
-
- dumpMessages();
- assertEquals("Messages expected doesn't equal messages received", messagesExpected, messagesReceived);
- broker.stop();
- }
-
- protected void dumpMessages() throws Exception {
- WireFormat wireFormat = new OpenWireFormat();
- java.sql.Connection conn = ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).getDataSource().getConnection();
- PreparedStatement statement = conn.prepareStatement("SELECT ID, MSG FROM ACTIVEMQ_MSGS");
- ResultSet result = statement.executeQuery();
- LOG.info("Messages left in broker after test");
- while (result.next()) {
- long id = result.getLong(1);
- org.apache.activemq.command.Message message = (org.apache.activemq.command.Message) wireFormat.unmarshal(new ByteSequence(result.getBytes(2)));
- LOG.info("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message);
- }
- statement.close();
- conn.close();
- }
-
- protected int receiveMessages(int messagesExpected) throws Exception {
- javax.jms.Connection connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-
- jdbc.setShouldBreak(true);
-
- // first try and receive these messages, they'll continually fail
- receiveMessages(messagesExpected, session);
-
- jdbc.setShouldBreak(false);
-
- // now that the store is sane, try and get all the messages sent
- return receiveMessages(messagesExpected, session);
- }
-
- protected int receiveMessages(int messagesExpected, Session session) throws Exception {
- int messagesReceived = 0;
-
- for (int i = 0; i < messagesExpected; i++) {
- Destination destination = session.createQueue("TEST");
- MessageConsumer consumer = session.createConsumer(destination);
- Message message = null;
- try {
- LOG.debug("Receiving message " + (messagesReceived + 1) + " of " + messagesExpected);
- message = consumer.receive(2000);
- LOG.info("Received : " + message);
- if (message != null) {
- session.commit();
- messagesReceived++;
- }
- }
- catch (Exception e) {
- LOG.debug("Caught exception " + e);
- session.rollback();
- }
- finally {
- if (consumer != null) {
- consumer.close();
- }
- }
- }
- return messagesReceived;
- }
-
- protected void sendMessages(int messagesExpected) throws Exception {
- javax.jms.Connection connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createQueue("TEST");
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- for (int i = 0; i < messagesExpected; i++) {
- LOG.debug("Sending message " + (i + 1) + " of " + messagesExpected);
- producer.send(session.createTextMessage("test message " + (i + 1)));
- }
- }
-
- protected BrokerService createBroker() throws Exception {
-
- BrokerService broker = new BrokerService();
- jdbc = new BrokenPersistenceAdapter();
-
- dataSource = new EmbeddedDataSource();
- dataSource.setDatabaseName("target/derbyDb");
- dataSource.setCreateDatabase("create");
-
- jdbc.setDataSource(dataSource);
- jdbc.setUseLock(false);
- jdbc.deleteAllMessages();
-
- broker.setPersistenceAdapter(jdbc);
- broker.setPersistent(true);
- connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
-
- return broker;
- }
-}
-
-
-
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerMockeryTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerMockeryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerMockeryTest.java
deleted file mode 100644
index fa7b848..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerMockeryTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.jdbc;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.Locker;
-import org.apache.activemq.broker.SuppressReplyException;
-import org.apache.activemq.util.LeaseLockerIOExceptionHandler;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.Wait;
-import org.jmock.Expectations;
-import org.jmock.Mockery;
-import org.jmock.States;
-import org.jmock.lib.legacy.ClassImposteriser;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class JDBCIOExceptionHandlerMockeryTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(JDBCIOExceptionHandlerMockeryTest.class);
- private HashMap<Thread, Throwable> exceptions = new HashMap<>();
-
- @Test
- public void testShutdownWithoutTransportRestart() throws Exception {
-
- Mockery context = new Mockery() {{
- setImposteriser(ClassImposteriser.INSTANCE);
- }};
-
- Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- LOG.error("unexpected exception {} on thread {}", e, t);
- exceptions.put(t, e);
- }
- });
-
- final BrokerService brokerService = context.mock(BrokerService.class);
- final JDBCPersistenceAdapter jdbcPersistenceAdapter = context.mock(JDBCPersistenceAdapter.class);
- final Locker locker = context.mock(Locker.class);
-
- final States jdbcConn = context.states("jdbc").startsAs("down");
- final States broker = context.states("broker").startsAs("started");
-
- // simulate jdbc up between hasLock and checkpoint, so hasLock fails to verify
- context.checking(new Expectations() {{
- allowing(brokerService).isRestartAllowed();
- will(returnValue(false));
- allowing(brokerService).stopAllConnectors(with(any(ServiceStopper.class)));
- allowing(brokerService).getPersistenceAdapter();
- will(returnValue(jdbcPersistenceAdapter));
- allowing(jdbcPersistenceAdapter).getLocker();
- will(returnValue(locker));
- allowing(locker).keepAlive();
- when(jdbcConn.is("down"));
- will(returnValue(true));
- allowing(locker).keepAlive();
- when(jdbcConn.is("up"));
- will(returnValue(false));
-
- allowing(jdbcPersistenceAdapter).checkpoint(with(true));
- then(jdbcConn.is("up"));
- allowing(brokerService).stop();
- then(broker.is("stopped"));
-
- }});
-
- LeaseLockerIOExceptionHandler underTest = new LeaseLockerIOExceptionHandler();
- underTest.setBrokerService(brokerService);
-
- try {
- underTest.handle(new IOException());
- fail("except suppress reply ex");
- }
- catch (SuppressReplyException expected) {
- }
-
- assertTrue("broker stopped state triggered", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- LOG.info("broker state {}", broker);
- return broker.is("stopped").isActive();
- }
- }));
- context.assertIsSatisfied();
-
- assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java
deleted file mode 100644
index 6c43646..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.jdbc;
-
-import java.io.PrintWriter;
-import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.jms.Connection;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.LeaseLockerIOExceptionHandler;
-import org.apache.activemq.util.Wait;
-import org.apache.derby.jdbc.EmbeddedDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test to see if the JDBCExceptionIOHandler will restart the transport connectors correctly after
- * the underlying DB has been stopped and restarted
- *
- * see AMQ-4575
- */
-public class JDBCIOExceptionHandlerTest extends TestCase {
-
- private static final Logger LOG = LoggerFactory.getLogger(JDBCIOExceptionHandlerTest.class);
- private static final String TRANSPORT_URL = "tcp://0.0.0.0:0";
-
- private static final String DATABASE_NAME = "DERBY_OVERRIDE";
- private ActiveMQConnectionFactory factory;
- private ReconnectingEmbeddedDataSource dataSource;
- private BrokerService broker;
-
- protected BrokerService createBroker(boolean withJMX) throws Exception {
- return createBroker("localhost", withJMX, true, true);
- }
-
- protected BrokerService createBroker(String name,
- boolean withJMX,
- boolean leaseLocker,
- boolean startStopConnectors) throws Exception {
- BrokerService broker = new BrokerService();
- broker.setBrokerName(name);
-
- broker.setUseJmx(withJMX);
-
- EmbeddedDataSource embeddedDataSource = new EmbeddedDataSource();
- embeddedDataSource.setDatabaseName(DATABASE_NAME);
- embeddedDataSource.setCreateDatabase("create");
-
- // create a wrapper to EmbeddedDataSource to allow the connection be
- // reestablished to derby db
- dataSource = new ReconnectingEmbeddedDataSource(embeddedDataSource);
-
- JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
- jdbc.setDataSource(dataSource);
-
- jdbc.setLockKeepAlivePeriod(1000L);
- if (leaseLocker) {
- LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
- leaseDatabaseLocker.setHandleStartException(true);
- leaseDatabaseLocker.setLockAcquireSleepInterval(2000L);
- jdbc.setLocker(leaseDatabaseLocker);
- }
-
- broker.setPersistenceAdapter(jdbc);
- LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler();
- ioExceptionHandler.setResumeCheckSleepPeriod(1000L);
- ioExceptionHandler.setStopStartConnectors(startStopConnectors);
- broker.setIoExceptionHandler(ioExceptionHandler);
- String connectionUri = broker.addConnector(TRANSPORT_URL).getPublishableConnectString();
-
- factory = new ActiveMQConnectionFactory(connectionUri);
-
- return broker;
- }
-
- /*
- * run test without JMX enabled
- */
- public void testRecoverWithOutJMX() throws Exception {
- recoverFromDisconnectDB(false);
- }
-
- /*
- * run test with JMX enabled
- */
- public void testRecoverWithJMX() throws Exception {
- recoverFromDisconnectDB(true);
- }
-
- public void testSlaveStoppedLease() throws Exception {
- testSlaveStopped(true);
- }
-
- public void testSlaveStoppedDefault() throws Exception {
- testSlaveStopped(false);
- }
-
- public void testSlaveStopped(final boolean lease) throws Exception {
- final BrokerService master = createBroker("master", true, lease, false);
- master.start();
- master.waitUntilStarted();
-
- final AtomicReference<BrokerService> slave = new AtomicReference<>();
-
- Thread slaveThread = new Thread() {
- @Override
- public void run() {
- try {
- BrokerService broker = new BrokerService();
- broker.setBrokerName("slave");
-
- JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
- jdbc.setDataSource(dataSource);
-
- jdbc.setLockKeepAlivePeriod(1000L);
-
- if (lease) {
- LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
- leaseDatabaseLocker.setHandleStartException(true);
- leaseDatabaseLocker.setLockAcquireSleepInterval(2000L);
- jdbc.setLocker(leaseDatabaseLocker);
- }
-
- broker.setPersistenceAdapter(jdbc);
- LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler();
- ioExceptionHandler.setResumeCheckSleepPeriod(1000L);
- ioExceptionHandler.setStopStartConnectors(false);
- broker.setIoExceptionHandler(ioExceptionHandler);
- slave.set(broker);
- broker.start();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- };
-
- slaveThread.start();
-
- Thread.sleep(5000);
-
- dataSource.stopDB();
-
- assertTrue("Master hasn't been stopped", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return master.isStopped();
- }
- }));
-
- assertTrue("Slave hasn't been stopped", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return slave.get().isStopped();
- }
- }));
-
- }
-
- public void recoverFromDisconnectDB(boolean withJMX) throws Exception {
- try {
- broker = createBroker(withJMX);
- broker.start();
- broker.waitUntilStarted();
-
- // broker started - stop db underneath it
- dataSource.stopDB();
-
- // wait - allow the leaselocker to kick the JDBCIOExceptionHandler
- TimeUnit.SECONDS.sleep(3);
-
- // check connector has shutdown
- checkTransportConnectorStopped();
-
- // restart db underneath
- dataSource.restartDB();
-
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- LOG.debug("*** checking connector to start...");
- try {
- checkTransportConnectorStarted();
- return true;
- }
- catch (Throwable t) {
- LOG.debug(t.toString());
- }
- return false;
- }
- });
-
- }
- finally {
- LOG.debug("*** broker is stopping...");
- broker.stop();
- }
- }
-
- private void checkTransportConnectorStopped() {
- // connection is expected to fail
- try {
- factory.createConnection();
- fail("Transport connector should be stopped");
- }
- catch (Exception ex) {
- // expected an exception
- LOG.debug(" checkTransportConnectorStopped() threw", ex);
- }
- }
-
- private void checkTransportConnectorStarted() {
- // connection is expected to succeed
- try {
- Connection conn = factory.createConnection();
- conn.close();
- }
- catch (Exception ex) {
- LOG.debug("checkTransportConnectorStarted() threw", ex);
- fail("Transport connector should have been started");
- }
- }
-
- /*
- * Wrapped the derby datasource object to get DB reconnect functionality as I not
- * manage to get that working directly on the EmbeddedDataSource
- *
- * NOTE: Not a thread Safe but for this unit test it should be fine
- */
- public class ReconnectingEmbeddedDataSource implements javax.sql.DataSource {
-
- private EmbeddedDataSource realDatasource;
-
- public ReconnectingEmbeddedDataSource(EmbeddedDataSource datasource) {
- this.realDatasource = datasource;
- }
-
- @Override
- public PrintWriter getLogWriter() throws SQLException {
- return this.realDatasource.getLogWriter();
- }
-
- @Override
- public void setLogWriter(PrintWriter out) throws SQLException {
- this.realDatasource.setLogWriter(out);
-
- }
-
- @Override
- public void setLoginTimeout(int seconds) throws SQLException {
- this.realDatasource.setLoginTimeout(seconds);
- }
-
- @Override
- public int getLoginTimeout() throws SQLException {
- return this.realDatasource.getLoginTimeout();
- }
-
- @Override
- public <T> T unwrap(Class<T> iface) throws SQLException {
- return this.unwrap(iface);
- }
-
- @Override
- public boolean isWrapperFor(Class<?> iface) throws SQLException {
- return this.isWrapperFor(iface);
- }
-
- @Override
- public java.sql.Connection getConnection() throws SQLException {
- return this.realDatasource.getConnection();
- }
-
- @Override
- public java.sql.Connection getConnection(String username, String password) throws SQLException {
- return this.getConnection(username, password);
- }
-
- /**
- * To simulate a db reconnect I just create a new EmbeddedDataSource .
- *
- * @throws SQLException
- */
- public void restartDB() throws SQLException {
- EmbeddedDataSource newDatasource = new EmbeddedDataSource();
- newDatasource.setDatabaseName(DATABASE_NAME);
- newDatasource.getConnection();
- LOG.info("*** DB restarted now...");
- this.realDatasource = newDatasource;
- }
-
- public void stopDB() {
- try {
- realDatasource.setShutdownDatabase("shutdown");
- LOG.info("***DB is being shutdown...");
- dataSource.getConnection();
- fail("should have thrown a db closed exception");
- }
- catch (Exception ex) {
- ex.printStackTrace(System.out);
- }
- }
-
- @Override
- public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefix.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefix.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefix.xml
deleted file mode 100644
index ac70fa7..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefix.xml
+++ /dev/null
@@ -1,58 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<beans
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:amq="http://activemq.apache.org/schema/core"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
- http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
- <!-- normal ActiveMQ XML config which is less verbose & can be validated -->
- <amq:broker brokerName="brokerConfigTest" populateJMSXUserID="false"
- useLoggingForShutdownErrors="true" useJmx="false"
- persistent="true" vmConnectorURI="vm://javacoola"
- useShutdownHook="false" deleteAllMessagesOnStartup="true">
-
- <amq:persistenceAdapter>
- <amq:jdbcPersistenceAdapter dataDirectory="target/activemq-data" dataSource="#derby-ds" lockKeepAlivePeriod="5000" createTablesOnStartup="true">
- <!-- test that we can define the locker before th statements,
- but the locker will still pickup the statements -->
- <amq:locker>
- <amq:lease-database-locker lockAcquireSleepInterval="10000"/>
- </amq:locker>
- <amq:statements>
- <amq:statements tablePrefix="TTT_" messageTableName="AMQ_MSGS2" durableSubAcksTableName="AMQ_ACKS2" lockTableName="AMQ_LOCK2"/>
- </amq:statements>
- <amq:adapter>
- <amq:defaultJDBCAdapter/>
- </amq:adapter>
- </amq:jdbcPersistenceAdapter>
- </amq:persistenceAdapter>
-
- <amq:transportConnectors>
- <amq:transportConnector uri="vm://brokerConfigTest"/>
- </amq:transportConnectors>
-
- </amq:broker>
-
- <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
- <property name="databaseName" value="target/derbyDb"/>
- <property name="connectionAttributes" value=";create=true"/>
- </bean>
-
-</beans>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefixTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefixTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefixTest.java
deleted file mode 100644
index 854dd7a..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefixTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.jdbc;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.store.PersistenceAdapter;
-
-public class JDBCLockTablePrefixTest extends TestCase {
-
- public void testLockTable() throws Exception {
- BrokerService broker = BrokerFactory.createBroker("xbean:org/apache/activemq/store/jdbc/JDBCLockTablePrefix.xml");
- broker.waitUntilStarted();
-
- PersistenceAdapter pa = broker.getPersistenceAdapter();
- assertNotNull(pa);
-
- JDBCPersistenceAdapter jpa = (JDBCPersistenceAdapter) pa;
- assertEquals("TTT_", jpa.getStatements().getTablePrefix());
- assertEquals("AMQ_MSGS2", jpa.getStatements().getMessageTableName());
- assertEquals("AMQ_LOCK2", jpa.getStatements().getLockTableName());
-
- broker.stop();
- broker.waitUntilStopped();
- }
-
-}