You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/18 02:42:00 UTC
[11/65] [abbrv] activemq-artemis git commit: open wire changes
equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java
deleted file mode 100644
index b0e7bd3..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.File;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.RedeliveryPolicy;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
-import org.apache.activemq.broker.util.RedeliveryPlugin;
-import org.apache.activemq.util.IOHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Testing if the the broker "sends" the message as expected after the redeliveryPlugin has redelivered the
- * message previously.
- */
-
-public class RedeliveryPluginHeaderTest extends TestCase {
-
- private static final String TEST_QUEUE_ONE = "TEST_QUEUE_ONE";
- private static final String TEST_QUEUE_TWO = "TEST_QUEUE_TWO";
- private static final Logger LOG = LoggerFactory.getLogger(RedeliveryPluginHeaderTest.class);
- private String transportURL;
- private BrokerService broker;
-
- /**
- * Test
- * - consumes message from Queue1
- * - rolls back message to Queue1 and message is scheduled for redelivery to Queue1 by brokers plugin
- * - consumes message from Queue1 again
- * - sends same message to Queue2
- * - expects to consume message from Queue2 immediately
- */
-
- public void testSendAfterRedelivery() throws Exception {
- broker = this.createBroker(false);
- broker.start();
- broker.waitUntilStarted();
-
- LOG.info("***Broker started...");
-
- //pushed message to broker
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(transportURL + "?trace=true&jms.redeliveryPolicy.maximumRedeliveries=0");
-
- Connection connection = factory.createConnection();
- connection.start();
-
- try {
-
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-
- Destination destinationQ1 = session.createQueue(TEST_QUEUE_ONE);
- Destination destinationQ2 = session.createQueue(TEST_QUEUE_TWO);
-
- MessageProducer producerQ1 = session.createProducer(destinationQ1);
- producerQ1.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- Message m = session.createTextMessage("testMessage");
- LOG.info("*** send message to broker...");
- producerQ1.send(m);
- session.commit();
-
- //consume message from Q1 and rollback to get it redelivered
- MessageConsumer consumerQ1 = session.createConsumer(destinationQ1);
-
- LOG.info("*** consume message from Q1 and rolled back..");
-
- TextMessage textMessage = (TextMessage) consumerQ1.receive();
- LOG.info("got redelivered: " + textMessage);
- assertFalse("JMSRedelivered flag is not set", textMessage.getJMSRedelivered());
- session.rollback();
-
- LOG.info("*** consumed message from Q1 again and sending to Q2..");
- TextMessage textMessage2 = (TextMessage) consumerQ1.receive();
- LOG.info("got: " + textMessage2);
- session.commit();
- assertTrue("JMSRedelivered flag is set", textMessage2.getJMSRedelivered());
-
- //send message to Q2 and consume from Q2
- MessageConsumer consumerQ2 = session.createConsumer(destinationQ2);
- MessageProducer producer_two = session.createProducer(destinationQ2);
- producer_two.send(textMessage2);
- session.commit();
-
- //Message should be available straight away on the queue_two
- Message textMessage3 = consumerQ2.receive(1000);
- assertNotNull("should have consumed a message from TEST_QUEUE_TWO", textMessage3);
- assertFalse("JMSRedelivered flag is not set", textMessage3.getJMSRedelivered());
- session.commit();
-
- }
- finally {
-
- connection.close();
-
- if (broker != null) {
- broker.stop();
- }
-
- }
-
- }
-
- protected BrokerService createBroker(boolean withJMX) throws Exception {
- File schedulerDirectory = new File("target/scheduler");
- IOHelper.mkdirs(schedulerDirectory);
- IOHelper.deleteChildren(schedulerDirectory);
-
- BrokerService answer = new BrokerService();
- answer.setAdvisorySupport(false);
- answer.setDataDirectory("target");
- answer.setSchedulerDirectoryFile(schedulerDirectory);
- answer.setSchedulerSupport(true);
- answer.setPersistent(true);
- answer.setDeleteAllMessagesOnStartup(true);
- answer.setUseJmx(withJMX);
-
- RedeliveryPlugin redeliveryPlugin = new RedeliveryPlugin();
- RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
- RedeliveryPolicy defaultEntry = new RedeliveryPolicy();
- defaultEntry.setInitialRedeliveryDelay(5000);
- defaultEntry.setMaximumRedeliveries(5);
- redeliveryPolicyMap.setDefaultEntry(defaultEntry);
- redeliveryPlugin.setRedeliveryPolicyMap(redeliveryPolicyMap);
-
- answer.setPlugins(new BrokerPlugin[]{redeliveryPlugin});
- TransportConnector transportConnector = answer.addConnector("tcp://localhost:0");
-
- transportURL = transportConnector.getConnectUri().toASCIIString();
-
- return answer;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SlowConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SlowConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SlowConsumerTest.java
deleted file mode 100644
index b4858c1..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SlowConsumerTest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SlowConsumerTest extends TestCase {
-
- private static final Logger LOG = LoggerFactory.getLogger(SlowConsumerTest.class);
- private static final int MESSAGES_COUNT = 10000;
-
- private final int messageLogFrequency = 2500;
- private final long messageReceiveTimeout = 10000L;
-
- private Socket stompSocket;
- private ByteArrayOutputStream inputBuffer;
- private int messagesCount;
-
- /**
- * @param args
- * @throws Exception
- */
- public void testRemoveSubscriber() throws Exception {
- final BrokerService broker = new BrokerService();
- broker.setPersistent(true);
- broker.setUseJmx(true);
- broker.setDeleteAllMessagesOnStartup(true);
-
- broker.addConnector("tcp://localhost:0").setName("Default");
- broker.start();
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
- final Connection connection = factory.createConnection();
- connection.start();
-
- Thread producingThread = new Thread("Producing thread") {
- @Override
- public void run() {
- try {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(new ActiveMQQueue(getDestinationName()));
- for (int idx = 0; idx < MESSAGES_COUNT; ++idx) {
- Message message = session.createTextMessage("" + idx);
- producer.send(message);
- LOG.debug("Sending: " + idx);
- }
- producer.close();
- session.close();
- }
- catch (Throwable ex) {
- ex.printStackTrace();
- }
- }
- };
- producingThread.setPriority(Thread.MAX_PRIORITY);
- producingThread.start();
- Thread.sleep(1000);
-
- Thread consumingThread = new Thread("Consuming thread") {
-
- @Override
- public void run() {
- try {
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationName()));
- int diff = 0;
- while (messagesCount != MESSAGES_COUNT) {
- Message msg = consumer.receive(messageReceiveTimeout);
- if (msg == null) {
- LOG.warn("Got null message at count: " + messagesCount + ". Continuing...");
- break;
- }
- String text = ((TextMessage) msg).getText();
- int currentMsgIdx = Integer.parseInt(text);
- LOG.debug("Received: " + text + " messageCount: " + messagesCount);
- msg.acknowledge();
- if ((messagesCount + diff) != currentMsgIdx) {
- LOG.debug("Message(s) skipped!! Should be message no.: " + messagesCount + " but got: " + currentMsgIdx);
- diff = currentMsgIdx - messagesCount;
- }
- ++messagesCount;
- if (messagesCount % messageLogFrequency == 0) {
- LOG.info("Received: " + messagesCount + " messages so far");
- }
- // Thread.sleep(70);
- }
- }
- catch (Throwable ex) {
- ex.printStackTrace();
- }
- }
- };
- consumingThread.start();
- consumingThread.join();
-
- assertEquals(MESSAGES_COUNT, messagesCount);
-
- }
-
- public void sendFrame(String data) throws Exception {
- byte[] bytes = data.getBytes("UTF-8");
- OutputStream outputStream = stompSocket.getOutputStream();
- for (int i = 0; i < bytes.length; i++) {
- outputStream.write(bytes[i]);
- }
- outputStream.flush();
- }
-
- public String receiveFrame(long timeOut) throws Exception {
- stompSocket.setSoTimeout((int) timeOut);
- InputStream is = stompSocket.getInputStream();
- int c = 0;
- for (;;) {
- c = is.read();
- if (c < 0) {
- throw new IOException("socket closed.");
- }
- else if (c == 0) {
- c = is.read();
- byte[] ba = inputBuffer.toByteArray();
- inputBuffer.reset();
- return new String(ba, "UTF-8");
- }
- else {
- inputBuffer.write(c);
- }
- }
- }
-
- protected String getDestinationName() {
- return getClass().getName() + "." + getName();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupLevelDBStoreTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupLevelDBStoreTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupLevelDBStoreTest.java
deleted file mode 100644
index 3e22dc2..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupLevelDBStoreTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.leveldb.LevelDBStore;
-
-public class SparseAckReplayAfterStoreCleanupLevelDBStoreTest extends AMQ2832Test {
-
- @Override
- protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception {
- LevelDBStore store = new LevelDBStore();
- store.setFlushDelay(0);
- brokerService.setPersistenceAdapter(store);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempQueueDeleteOnCloseTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempQueueDeleteOnCloseTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempQueueDeleteOnCloseTest.java
deleted file mode 100644
index f521d40..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempQueueDeleteOnCloseTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.junit.Test;
-
-/**
- * Demonstrates how unmarshalled VM advisory messages for temporary queues prevent other connections from being closed.
- */
-public class TempQueueDeleteOnCloseTest {
-
- @Test
- public void test() throws Exception {
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
-
- // create a connection and session with a temporary queue
- Connection connectionA = connectionFactory.createConnection();
- connectionA.setClientID("ConnectionA");
- Session sessionA = connectionA.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination tempQueueA = sessionA.createTemporaryQueue();
- MessageConsumer consumer = sessionA.createConsumer(tempQueueA);
- connectionA.start();
-
- // start and stop another connection
- Connection connectionB = connectionFactory.createConnection();
- connectionB.setClientID("ConnectionB");
- connectionB.start();
- connectionB.close();
-
- consumer.close();
- connectionA.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java
deleted file mode 100644
index dc15f87..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.File;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.ResourceAllocationException;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQPrefetchPolicy;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
-import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.StoreUsage;
-import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.usage.TempUsage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TempStorageBlockedBrokerTest extends TestSupport {
-
- public int deliveryMode = DeliveryMode.PERSISTENT;
-
- private static final Logger LOG = LoggerFactory.getLogger(TempStorageBlockedBrokerTest.class);
- private static final int MESSAGES_COUNT = 1000;
- private static byte[] buf = new byte[4 * 1024];
- private BrokerService broker;
- AtomicInteger messagesSent = new AtomicInteger(0);
- AtomicInteger messagesConsumed = new AtomicInteger(0);
-
- protected long messageReceiveTimeout = 10000L;
-
- Destination destination = new ActiveMQTopic("FooTwo");
-
- private String connectionUri;
-
- public void testRunProducerWithHungConsumer() throws Exception {
-
- final long origTempUsage = broker.getSystemUsage().getTempUsage().getUsage();
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
- // ensure messages are spooled to disk for this consumer
- ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
- prefetch.setTopicPrefetch(10);
- factory.setPrefetchPolicy(prefetch);
- Connection consumerConnection = factory.createConnection();
- consumerConnection.start();
-
- Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSession.createConsumer(destination);
-
- final Connection producerConnection = factory.createConnection();
- producerConnection.start();
-
- final CountDownLatch producerHasSentTenMessages = new CountDownLatch(10);
- Thread producingThread = new Thread("Producing thread") {
- @Override
- public void run() {
- try {
- Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(deliveryMode);
- for (int idx = 0; idx < MESSAGES_COUNT; ++idx) {
- Message message = session.createTextMessage(new String(buf) + idx);
-
- producer.send(message);
- messagesSent.incrementAndGet();
- producerHasSentTenMessages.countDown();
- Thread.sleep(10);
- if (idx != 0 && idx % 100 == 0) {
- LOG.info("Sent Message " + idx);
- LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage());
- }
- }
- producer.close();
- session.close();
- }
- catch (Throwable ex) {
- ex.printStackTrace();
- }
- }
- };
- producingThread.start();
-
- assertTrue("producer has sent 10 in a reasonable time", producerHasSentTenMessages.await(30, TimeUnit.SECONDS));
-
- int count = 0;
-
- Message m = null;
- while ((m = consumer.receive(messageReceiveTimeout)) != null) {
- count++;
- if (count != 0 && count % 10 == 0) {
- LOG.info("Received Message (" + count + "):" + m);
- }
- messagesConsumed.incrementAndGet();
- try {
- Thread.sleep(100);
- }
- catch (Exception e) {
- LOG.info("error sleeping");
- }
- }
-
- LOG.info("Connection Timeout: Retrying.. count: " + count);
-
- while ((m = consumer.receive(messageReceiveTimeout)) != null) {
- count++;
- if (count != 0 && count % 100 == 0) {
- LOG.info("Received Message (" + count + "):" + m);
- }
- messagesConsumed.incrementAndGet();
- try {
- Thread.sleep(100);
- }
- catch (Exception e) {
- LOG.info("error sleeping");
- }
- }
-
- LOG.info("consumer session closing: consumed count: " + count);
-
- consumerSession.close();
-
- producingThread.join();
-
- final long tempUsageBySubscription = broker.getSystemUsage().getTempUsage().getUsage();
- LOG.info("Orig Usage: " + origTempUsage + ", currentUsage: " + tempUsageBySubscription);
-
- producerConnection.close();
- consumerConnection.close();
-
- LOG.info("Subscrition Usage: " + tempUsageBySubscription + ", endUsage: " + broker.getSystemUsage().getTempUsage().getUsage());
-
- // do a cleanup
- ((PListStoreImpl) broker.getTempDataStore()).run();
- LOG.info("Subscrition Usage: " + tempUsageBySubscription + ", endUsage: " + broker.getSystemUsage().getTempUsage().getUsage());
-
- assertEquals("Incorrect number of Messages Sent: " + messagesSent.get(), messagesSent.get(), MESSAGES_COUNT);
- assertEquals("Incorrect number of Messages Consumed: " + messagesConsumed.get(), messagesConsumed.get(), MESSAGES_COUNT);
- }
-
- public void testFillTempAndConsume() throws Exception {
-
- broker.getSystemUsage().setSendFailIfNoSpace(true);
- destination = new ActiveMQQueue("Foo");
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
- final ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection();
- // so we can easily catch the ResourceAllocationException on send
- producerConnection.setAlwaysSyncSend(true);
- producerConnection.start();
-
- Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- try {
- while (true) {
- Message message = session.createTextMessage(new String(buf) + messagesSent.toString());
- producer.send(message);
- messagesSent.incrementAndGet();
- if (messagesSent.get() % 100 == 0) {
- LOG.info("Sent Message " + messagesSent.get());
- LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage());
- }
- }
- }
- catch (ResourceAllocationException ex) {
- LOG.info("Got resource exception : " + ex + ", after sent: " + messagesSent.get());
- }
-
- // consume all sent
- Connection consumerConnection = factory.createConnection();
- consumerConnection.start();
-
- Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSession.createConsumer(destination);
-
- while (consumer.receive(messageReceiveTimeout) != null) {
- messagesConsumed.incrementAndGet();
- if (messagesConsumed.get() % 1000 == 0) {
- LOG.info("received Message " + messagesConsumed.get());
- LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage());
- }
- }
-
- assertEquals("Incorrect number of Messages Consumed: " + messagesConsumed.get(), messagesConsumed.get(), messagesSent.get());
- }
-
- @Override
- public void setUp() throws Exception {
-
- broker = new BrokerService();
- broker.setDataDirectory("target" + File.separator + "activemq-data");
- broker.setPersistent(true);
- broker.setUseJmx(true);
- broker.setAdvisorySupport(false);
- broker.setDeleteAllMessagesOnStartup(true);
-
- setDefaultPersistenceAdapter(broker);
- SystemUsage sysUsage = broker.getSystemUsage();
- MemoryUsage memUsage = new MemoryUsage();
- memUsage.setLimit((1024 * 1024));
- StoreUsage storeUsage = new StoreUsage();
- storeUsage.setLimit((1024 * 1024) * 38);
- TempUsage tmpUsage = new TempUsage();
- tmpUsage.setLimit((1024 * 1024) * 38);
-
- PolicyEntry defaultPolicy = new PolicyEntry();
- // defaultPolicy.setTopic("FooTwo");
- defaultPolicy.setProducerFlowControl(false);
- defaultPolicy.setMemoryLimit(10 * 1024);
-
- PolicyMap policyMap = new PolicyMap();
- policyMap.setDefaultEntry(defaultPolicy);
-
- sysUsage.setMemoryUsage(memUsage);
- sysUsage.setStoreUsage(storeUsage);
- sysUsage.setTempUsage(tmpUsage);
-
- broker.setDestinationPolicy(policyMap);
- broker.setSystemUsage(sysUsage);
-
- broker.addConnector("tcp://localhost:0").setName("Default");
- broker.start();
-
- connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
- }
-
- @Override
- public void tearDown() throws Exception {
- if (broker != null) {
- broker.stop();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java
deleted file mode 100644
index d04cc3f..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.ResourceAllocationException;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
-import org.junit.After;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test that when configuring small temp store limits the journal size must also
- * be smaller than the configured limit, but will still send a ResourceAllocationException
- * if its not when sendFailIfNoSpace is enabled.
- */
-public class TempStorageConfigBrokerTest {
-
- public int deliveryMode = DeliveryMode.PERSISTENT;
-
- private static final Logger LOG = LoggerFactory.getLogger(TempStorageConfigBrokerTest.class);
- private static byte[] buf = new byte[4 * 1024];
- private BrokerService broker;
- private AtomicInteger messagesSent = new AtomicInteger(0);
- private AtomicInteger messagesConsumed = new AtomicInteger(0);
-
- private String brokerUri;
- private long messageReceiveTimeout = 10000L;
- private Destination destination = new ActiveMQTopic("FooTwo");
-
- @Test(timeout = 360000)
- @Ignore("blocks in hudson, needs investigation")
- public void testFillTempAndConsumeWithBadTempStoreConfig() throws Exception {
-
- createBrokerWithInvalidTempStoreConfig();
-
- broker.getSystemUsage().setSendFailIfNoSpace(true);
- destination = new ActiveMQQueue("Foo");
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
- final ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection();
- // so we can easily catch the ResourceAllocationException on send
- producerConnection.setAlwaysSyncSend(true);
- producerConnection.start();
-
- Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- try {
- while (true) {
- Message message = session.createTextMessage(new String(buf) + messagesSent.toString());
- producer.send(message);
- messagesSent.incrementAndGet();
- if (messagesSent.get() % 100 == 0) {
- LOG.info("Sent Message " + messagesSent.get());
- LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage());
- }
- }
- }
- catch (ResourceAllocationException ex) {
- assertTrue("Should not be able to send 100 messages: ", messagesSent.get() < 100);
- LOG.info("Got resource exception : " + ex + ", after sent: " + messagesSent.get());
- }
- }
-
- @Test(timeout = 360000)
- @Ignore("blocks in hudson, needs investigation")
- public void testFillTempAndConsumeWithGoodTempStoreConfig() throws Exception {
-
- createBrokerWithValidTempStoreConfig();
-
- broker.getSystemUsage().setSendFailIfNoSpace(true);
- destination = new ActiveMQQueue("Foo");
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
- final ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection();
- // so we can easily catch the ResourceAllocationException on send
- producerConnection.setAlwaysSyncSend(true);
- producerConnection.start();
-
- Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- try {
- while (true) {
- Message message = session.createTextMessage(new String(buf) + messagesSent.toString());
- producer.send(message);
- messagesSent.incrementAndGet();
- if (messagesSent.get() % 100 == 0) {
- LOG.info("Sent Message " + messagesSent.get());
- LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage());
- }
- }
- }
- catch (ResourceAllocationException ex) {
- assertTrue("Should be able to send at least 200 messages but was: " + messagesSent.get(), messagesSent.get() > 200);
- LOG.info("Got resource exception : " + ex + ", after sent: " + messagesSent.get());
- }
-
- // consume all sent
- Connection consumerConnection = factory.createConnection();
- consumerConnection.start();
-
- Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSession.createConsumer(destination);
-
- while (consumer.receive(messageReceiveTimeout) != null) {
- messagesConsumed.incrementAndGet();
- if (messagesConsumed.get() % 1000 == 0) {
- LOG.info("received Message " + messagesConsumed.get());
- LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage());
- }
- }
-
- assertEquals("Incorrect number of Messages Consumed: " + messagesConsumed.get(), messagesConsumed.get(), messagesSent.get());
- }
-
- private void createBrokerWithValidTempStoreConfig() throws Exception {
- broker = new BrokerService();
- broker.setDataDirectory("target" + File.separator + "activemq-data");
- broker.setPersistent(true);
- broker.setUseJmx(true);
- broker.setAdvisorySupport(false);
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setPersistenceAdapter(new KahaDBPersistenceAdapter());
-
- broker.getSystemUsage().setSendFailIfNoSpace(true);
- broker.getSystemUsage().getMemoryUsage().setLimit(1048576);
- broker.getSystemUsage().getTempUsage().setLimit(2 * 1048576);
- ((PListStoreImpl) broker.getSystemUsage().getTempUsage().getStore()).setJournalMaxFileLength(2 * 1048576);
- broker.getSystemUsage().getStoreUsage().setLimit(20 * 1048576);
-
- PolicyEntry defaultPolicy = new PolicyEntry();
- defaultPolicy.setProducerFlowControl(false);
- defaultPolicy.setMemoryLimit(10 * 1024);
-
- PolicyMap policyMap = new PolicyMap();
- policyMap.setDefaultEntry(defaultPolicy);
-
- broker.setDestinationPolicy(policyMap);
- broker.addConnector("tcp://localhost:0").setName("Default");
- broker.start();
-
- brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
- }
-
- private void createBrokerWithInvalidTempStoreConfig() throws Exception {
- broker = new BrokerService();
- broker.setDataDirectory("target" + File.separator + "activemq-data");
- broker.setPersistent(true);
- broker.setUseJmx(true);
- broker.setAdvisorySupport(false);
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setPersistenceAdapter(new KahaDBPersistenceAdapter());
-
- broker.getSystemUsage().setSendFailIfNoSpace(true);
- broker.getSystemUsage().getMemoryUsage().setLimit(1048576);
- broker.getSystemUsage().getTempUsage().setLimit(2 * 1048576);
- broker.getSystemUsage().getStoreUsage().setLimit(2 * 1048576);
-
- PolicyEntry defaultPolicy = new PolicyEntry();
- defaultPolicy.setProducerFlowControl(false);
- defaultPolicy.setMemoryLimit(10 * 1024);
-
- PolicyMap policyMap = new PolicyMap();
- policyMap.setDefaultEntry(defaultPolicy);
-
- broker.setDestinationPolicy(policyMap);
- broker.addConnector("tcp://localhost:0").setName("Default");
- broker.start();
-
- brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
- }
-
- @After
- public void tearDown() throws Exception {
- if (broker != null) {
- broker.stop();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
deleted file mode 100644
index 8051a59..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.Broker;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TempStoreDataCleanupTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(TempStoreDataCleanupTest.class);
- private static final String QUEUE_NAME = TempStoreDataCleanupTest.class.getName() + "Queue";
-
- private final String str = new String("QAa0bcLdUK2eHfJgTP8XhiFj61DOklNm9nBoI5pGqYVrs3CtSuMZvwWx4yE7zR");
-
- private BrokerService broker;
- private String connectionUri;
- private ExecutorService pool;
- private String queueName;
- private Random r = new Random();
-
- @Before
- public void setUp() throws Exception {
-
- broker = new BrokerService();
- broker.setDataDirectory("target" + File.separator + "activemq-data");
- broker.setPersistent(true);
- broker.setUseJmx(true);
- broker.setDedicatedTaskRunner(false);
- broker.setAdvisorySupport(false);
- broker.setDeleteAllMessagesOnStartup(true);
-
- SharedDeadLetterStrategy strategy = new SharedDeadLetterStrategy();
- strategy.setProcessExpired(false);
- strategy.setProcessNonPersistent(false);
-
- PolicyEntry defaultPolicy = new PolicyEntry();
- defaultPolicy.setQueue(">");
- defaultPolicy.setOptimizedDispatch(true);
- defaultPolicy.setDeadLetterStrategy(strategy);
- defaultPolicy.setMemoryLimit(9000000);
-
- PolicyMap policyMap = new PolicyMap();
- policyMap.setDefaultEntry(defaultPolicy);
-
- broker.setDestinationPolicy(policyMap);
-
- broker.getSystemUsage().getMemoryUsage().setLimit(300000000L);
-
- broker.addConnector("tcp://localhost:0").setName("Default");
- broker.start();
- broker.waitUntilStarted();
-
- connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
- pool = Executors.newFixedThreadPool(10);
- }
-
- @After
- public void tearDown() throws Exception {
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- }
-
- if (pool != null) {
- pool.shutdown();
- }
- }
-
- @Test
- public void testIt() throws Exception {
-
- int startPercentage = broker.getAdminView().getMemoryPercentUsage();
- LOG.info("MemoryUsage at test start = " + startPercentage);
-
- for (int i = 0; i < 2; i++) {
- LOG.info("Started the test iteration: " + i + " using queueName = " + queueName);
- queueName = QUEUE_NAME + i;
- final CountDownLatch latch = new CountDownLatch(11);
-
- pool.execute(new Runnable() {
- @Override
- public void run() {
- receiveAndDiscard100messages(latch);
- }
- });
-
- for (int j = 0; j < 10; j++) {
- pool.execute(new Runnable() {
- @Override
- public void run() {
- send10000messages(latch);
- }
- });
- }
-
- LOG.info("Waiting on the send / receive latch");
- latch.await(5, TimeUnit.MINUTES);
- LOG.info("Resumed");
-
- destroyQueue();
- TimeUnit.SECONDS.sleep(2);
- }
-
- LOG.info("MemoryUsage before awaiting temp store cleanup = " + broker.getAdminView().getMemoryPercentUsage());
-
- final PListStoreImpl pa = (PListStoreImpl) broker.getTempDataStore();
- assertTrue("only one journal file should be left: " + pa.getJournal().getFileMap().size(), Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return pa.getJournal().getFileMap().size() == 1;
- }
- }, TimeUnit.MINUTES.toMillis(3)));
-
- int endPercentage = broker.getAdminView().getMemoryPercentUsage();
- LOG.info("MemoryUsage at test end = " + endPercentage);
-
- assertEquals(startPercentage, endPercentage);
- }
-
- public void destroyQueue() {
- try {
- Broker broker = this.broker.getBroker();
- if (!broker.isStopped()) {
- LOG.info("Removing: " + queueName);
- broker.removeDestination(this.broker.getAdminConnectionContext(), new ActiveMQQueue(queueName), 10);
- }
- }
- catch (Exception e) {
- LOG.warn("Got an error while removing the test queue", e);
- }
- }
-
- private void send10000messages(CountDownLatch latch) {
- ActiveMQConnection activeMQConnection = null;
- try {
- activeMQConnection = createConnection(null);
- Session session = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(session.createQueue(queueName));
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- activeMQConnection.start();
- for (int i = 0; i < 10000; i++) {
- TextMessage textMessage = session.createTextMessage();
- textMessage.setText(generateBody(1000));
- textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
- producer.send(textMessage);
- try {
- Thread.sleep(10);
- }
- catch (InterruptedException e) {
- }
- }
- producer.close();
- }
- catch (JMSException e) {
- LOG.warn("Got an error while sending the messages", e);
- }
- finally {
- if (activeMQConnection != null) {
- try {
- activeMQConnection.close();
- }
- catch (JMSException e) {
- }
- }
- }
- latch.countDown();
- }
-
- private void receiveAndDiscard100messages(CountDownLatch latch) {
- ActiveMQConnection activeMQConnection = null;
- try {
- activeMQConnection = createConnection(null);
- Session session = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer messageConsumer = session.createConsumer(session.createQueue(queueName));
- activeMQConnection.start();
- for (int i = 0; i < 100; i++) {
- messageConsumer.receive();
- }
- messageConsumer.close();
- LOG.info("Created and disconnected");
- }
- catch (JMSException e) {
- LOG.warn("Got an error while receiving the messages", e);
- }
- finally {
- if (activeMQConnection != null) {
- try {
- activeMQConnection.close();
- }
- catch (JMSException e) {
- }
- }
- }
- latch.countDown();
- }
-
- private ActiveMQConnection createConnection(String id) throws JMSException {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
- if (id != null) {
- factory.setClientID(id);
- }
-
- ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
- return connection;
- }
-
- private String generateBody(int length) {
-
- StringBuilder sb = new StringBuilder();
- int te = 0;
- for (int i = 1; i <= length; i++) {
- te = r.nextInt(62);
- sb.append(str.charAt(te));
- }
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java
deleted file mode 100644
index db3888a..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.AutoFailTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertTrue;
-
-// https://issues.apache.org/jira/browse/AMQ-4262
-public class TransactedStoreUsageSuspendResumeTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(TransactedStoreUsageSuspendResumeTest.class);
-
- private static final int MAX_MESSAGES = 10000;
-
- private static final String QUEUE_NAME = "test.queue";
-
- private BrokerService broker;
-
- private final CountDownLatch messagesReceivedCountDown = new CountDownLatch(MAX_MESSAGES);
- private final CountDownLatch messagesSentCountDown = new CountDownLatch(MAX_MESSAGES);
- private final CountDownLatch consumerStartLatch = new CountDownLatch(1);
-
- private class ConsumerThread extends Thread {
-
- @Override
- public void run() {
- try {
-
- consumerStartLatch.await(30, TimeUnit.SECONDS);
-
- ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
- Connection connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-
- // wait for producer to stop
- long currentSendCount;
- do {
- currentSendCount = messagesSentCountDown.getCount();
- TimeUnit.SECONDS.sleep(5);
- } while (currentSendCount != messagesSentCountDown.getCount());
-
- LOG.info("Starting consumer at: " + currentSendCount);
-
- MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
-
- do {
- Message message = consumer.receive(5000);
- if (message != null) {
- session.commit();
- messagesReceivedCountDown.countDown();
- }
- if (messagesReceivedCountDown.getCount() % 500 == 0) {
- LOG.info("remaining to receive: " + messagesReceivedCountDown.getCount());
- }
- } while (messagesReceivedCountDown.getCount() != 0);
- consumer.close();
- session.close();
- connection.close();
- }
- catch (Exception e) {
- Assert.fail(e.getMessage());
- }
- }
- }
-
- @Before
- public void setup() throws Exception {
-
- broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setPersistent(true);
-
- KahaDBPersistenceAdapter kahaDB = new KahaDBPersistenceAdapter();
- kahaDB.setJournalMaxFileLength(500 * 1024);
- kahaDB.setCleanupInterval(10 * 1000);
- broker.setPersistenceAdapter(kahaDB);
-
- broker.getSystemUsage().getStoreUsage().setLimit(7 * 1024 * 1024);
-
- broker.start();
- broker.waitUntilStarted();
- }
-
- @After
- public void tearDown() throws Exception {
- broker.stop();
- }
-
- @Test
- public void testTransactedStoreUsageSuspendResume() throws Exception {
-
- ConsumerThread thread = new ConsumerThread();
- thread.start();
- ExecutorService sendExecutor = Executors.newSingleThreadExecutor();
- sendExecutor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- sendMessages();
- }
- catch (Exception ignored) {
- }
- }
- });
- sendExecutor.shutdown();
- sendExecutor.awaitTermination(5, TimeUnit.MINUTES);
-
- boolean allMessagesReceived = messagesReceivedCountDown.await(10, TimeUnit.MINUTES);
- if (!allMessagesReceived) {
- AutoFailTestSupport.dumpAllThreads("StuckConsumer!");
- }
- assertTrue("Got all messages: " + messagesReceivedCountDown, allMessagesReceived);
-
- // give consumers a chance to exit gracefully
- TimeUnit.SECONDS.sleep(2);
- }
-
- private void sendMessages() throws Exception {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
- factory.setAlwaysSyncSend(true);
- Connection connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Destination queue = session.createQueue(QUEUE_NAME);
- Destination retainQueue = session.createQueue(QUEUE_NAME + "-retain");
- MessageProducer producer = session.createProducer(null);
-
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- BytesMessage message = session.createBytesMessage();
- message.writeBytes(new byte[10]);
-
- for (int i = 0; i < 4240; i++) {
- // mostly fill the store with retained messages
- // so consumer only has a small bit of store usage to work with
- producer.send(retainQueue, message);
- session.commit();
- }
-
- consumerStartLatch.countDown();
- for (int i = 0; i < MAX_MESSAGES; i++) {
- producer.send(queue, message);
- if (i > 0 && i % 20 == 0) {
- session.commit();
- }
- messagesSentCountDown.countDown();
- if (i > 0 && i % 500 == 0) {
- LOG.info("Sent : " + i);
- }
-
- }
- session.commit();
- producer.close();
- session.close();
- connection.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java
deleted file mode 100644
index 2038279..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/*
- * simulate message flow which cause the following exception in the broker
- * (exception logged by client) <br> 2007-07-24 13:51:23,624
- * com.easynet.halo.Halo ERROR (LoggingErrorHandler.java: 23) JMS failure
- * javax.jms.JMSException: Transaction 'TX:ID:dmt-53625-1185281414694-1:0:344'
- * has not been started. at
- * org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:230)
- * This appears to be consistent in a MacBook. Haven't been able to replicate it
- * on Windows though
- */
-public class TransactionNotStartedErrorTest extends TestCase {
-
- private static final Logger LOG = LoggerFactory.getLogger(TransactionNotStartedErrorTest.class);
-
- private static final int counter = 500;
-
- private static int hectorToHaloCtr;
- private static int xenaToHaloCtr;
- private static int troyToHaloCtr;
-
- private static int haloToHectorCtr;
- private static int haloToXenaCtr;
- private static int haloToTroyCtr;
-
- private final String hectorToHalo = "hectorToHalo";
- private final String xenaToHalo = "xenaToHalo";
- private final String troyToHalo = "troyToHalo";
-
- private final String haloToHector = "haloToHector";
- private final String haloToXena = "haloToXena";
- private final String haloToTroy = "haloToTroy";
-
- private BrokerService broker;
-
- private Connection hectorConnection;
- private Connection xenaConnection;
- private Connection troyConnection;
- private Connection haloConnection;
-
- private final Object lock = new Object();
-
- public Connection createConnection() throws Exception {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
- return factory.createConnection();
- }
-
- public Session createSession(Connection connection, boolean transacted) throws JMSException {
- return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
- }
-
- public void startBroker() throws Exception {
- broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setPersistent(true);
- broker.setUseJmx(true);
- broker.addConnector("tcp://localhost:0").setName("Default");
- broker.start();
- LOG.info("Starting broker..");
- }
-
- @Override
- public void tearDown() throws Exception {
- hectorConnection.close();
- xenaConnection.close();
- troyConnection.close();
- haloConnection.close();
- broker.stop();
- }
-
- public void testTransactionNotStartedError() throws Exception {
- startBroker();
- hectorConnection = createConnection();
- Thread hectorThread = buildProducer(hectorConnection, hectorToHalo);
- Receiver hHectorReceiver = new Receiver() {
- @Override
- public void receive(String s) throws Exception {
- haloToHectorCtr++;
- if (haloToHectorCtr >= counter) {
- synchronized (lock) {
- lock.notifyAll();
- }
- }
- }
- };
- buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver);
-
- troyConnection = createConnection();
- Thread troyThread = buildProducer(troyConnection, troyToHalo);
- Receiver hTroyReceiver = new Receiver() {
- @Override
- public void receive(String s) throws Exception {
- haloToTroyCtr++;
- if (haloToTroyCtr >= counter) {
- synchronized (lock) {
- lock.notifyAll();
- }
- }
- }
- };
- buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver);
-
- xenaConnection = createConnection();
- Thread xenaThread = buildProducer(xenaConnection, xenaToHalo);
- Receiver hXenaReceiver = new Receiver() {
- @Override
- public void receive(String s) throws Exception {
- haloToXenaCtr++;
- if (haloToXenaCtr >= counter) {
- synchronized (lock) {
- lock.notifyAll();
- }
- }
- }
- };
- buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver);
-
- haloConnection = createConnection();
- final MessageSender hectorSender = buildTransactionalProducer(haloToHector, haloConnection);
- final MessageSender troySender = buildTransactionalProducer(haloToTroy, haloConnection);
- final MessageSender xenaSender = buildTransactionalProducer(haloToXena, haloConnection);
- Receiver hectorReceiver = new Receiver() {
- @Override
- public void receive(String s) throws Exception {
- hectorToHaloCtr++;
- troySender.send("halo to troy because of hector");
- if (hectorToHaloCtr >= counter) {
- synchronized (lock) {
- lock.notifyAll();
- }
- }
- }
- };
- Receiver xenaReceiver = new Receiver() {
- @Override
- public void receive(String s) throws Exception {
- xenaToHaloCtr++;
- hectorSender.send("halo to hector because of xena");
- if (xenaToHaloCtr >= counter) {
- synchronized (lock) {
- lock.notifyAll();
- }
- }
- }
- };
- Receiver troyReceiver = new Receiver() {
- @Override
- public void receive(String s) throws Exception {
- troyToHaloCtr++;
- xenaSender.send("halo to xena because of troy");
- if (troyToHaloCtr >= counter) {
- synchronized (lock) {
- lock.notifyAll();
- }
- }
- }
- };
- buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver);
- buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver);
- buildReceiver(haloConnection, troyToHalo, true, troyReceiver);
-
- haloConnection.start();
-
- troyConnection.start();
- troyThread.start();
-
- xenaConnection.start();
- xenaThread.start();
-
- hectorConnection.start();
- hectorThread.start();
- waitForMessagesToBeDelivered();
- // number of messages received should match messages sent
- assertEquals(hectorToHaloCtr, counter);
- LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages");
- assertEquals(xenaToHaloCtr, counter);
- LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages");
- assertEquals(troyToHaloCtr, counter);
- LOG.info("troyToHalo received " + troyToHaloCtr + " messages");
- assertEquals(haloToHectorCtr, counter);
- LOG.info("haloToHector received " + haloToHectorCtr + " messages");
- assertEquals(haloToXenaCtr, counter);
- LOG.info("haloToXena received " + haloToXenaCtr + " messages");
- assertEquals(haloToTroyCtr, counter);
- LOG.info("haloToTroy received " + haloToTroyCtr + " messages");
-
- }
-
- protected void waitForMessagesToBeDelivered() {
- // let's give the listeners enough time to read all messages
- long maxWaitTime = counter * 3000;
- long waitTime = maxWaitTime;
- long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
-
- synchronized (lock) {
- boolean hasMessages = true;
- while (hasMessages && waitTime >= 0) {
- try {
- lock.wait(200);
- }
- catch (InterruptedException e) {
- LOG.error(e.toString());
- }
- // check if all messages have been received
- hasMessages = hectorToHaloCtr < counter || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter || haloToXenaCtr < counter || haloToTroyCtr < counter;
- waitTime = maxWaitTime - (System.currentTimeMillis() - start);
- }
- }
- }
-
- public MessageSender buildTransactionalProducer(String queueName, Connection connection) throws Exception {
- return new MessageSender(queueName, connection, true, false);
- }
-
- public Thread buildProducer(Connection connection, final String queueName) throws Exception {
- final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final MessageSender producer = new MessageSender(queueName, connection, false, false);
- Thread thread = new Thread() {
-
- @Override
- public synchronized void run() {
- for (int i = 0; i < counter; i++) {
- try {
- producer.send(queueName);
- if (session.getTransacted()) {
- session.commit();
- }
-
- }
- catch (Exception e) {
- throw new RuntimeException("on " + queueName + " send", e);
- }
- }
- }
- };
- return thread;
- }
-
- public void buildReceiver(Connection connection,
- final String queueName,
- boolean transacted,
- final Receiver receiver) throws Exception {
- final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer inputMessageConsumer = session.createConsumer(session.createQueue(queueName));
- MessageListener messageListener = new MessageListener() {
-
- @Override
- public void onMessage(Message message) {
- try {
- ObjectMessage objectMessage = (ObjectMessage) message;
- String s = (String) objectMessage.getObject();
- receiver.receive(s);
- if (session.getTransacted()) {
- session.commit();
- }
-
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- };
- inputMessageConsumer.setMessageListener(messageListener);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
deleted file mode 100644
index 67b284f..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.IOException;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
-import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
-import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
-import org.apache.activemq.store.jdbc.TransactionContext;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.LeaseLockerIOExceptionHandler;
-import org.apache.derby.jdbc.EmbeddedDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test to demostrate a message trapped in the JDBC store and not
- * delivered to consumer
- *
- * The test throws issues the commit to the DB but throws
- * an exception back to the broker. This scenario could happen when a network
- * cable is disconnected - message is committed to DB but broker does not know.
- */
-
-public class TrapMessageInJDBCStoreTest extends TestCase {
-
- private static final String MY_TEST_Q = "MY_TEST_Q";
- private static final Logger LOG = LoggerFactory.getLogger(TrapMessageInJDBCStoreTest.class);
- private String transportUrl = "tcp://127.0.0.1:0";
- private BrokerService broker;
- private TestTransactionContext testTransactionContext;
- private TestJDBCPersistenceAdapter jdbc;
-
- protected BrokerService createBroker(boolean withJMX) throws Exception {
- BrokerService broker = new BrokerService();
-
- broker.setUseJmx(withJMX);
-
- EmbeddedDataSource embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory());
- embeddedDataSource.setCreateDatabase("create");
-
- //wire in a TestTransactionContext (wrapper to TransactionContext) that has an executeBatch()
- // method that can be configured to throw a SQL exception on demand
- jdbc = new TestJDBCPersistenceAdapter();
- jdbc.setDataSource(embeddedDataSource);
- jdbc.setCleanupPeriod(0);
- testTransactionContext = new TestTransactionContext(jdbc);
-
- jdbc.setLockKeepAlivePeriod(1000L);
- LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
- leaseDatabaseLocker.setLockAcquireSleepInterval(2000L);
- jdbc.setLocker(leaseDatabaseLocker);
-
- broker.setPersistenceAdapter(jdbc);
-
- broker.setIoExceptionHandler(new LeaseLockerIOExceptionHandler());
-
- transportUrl = broker.addConnector(transportUrl).getPublishableConnectString();
- return broker;
- }
-
- /**
- * sends 3 messages to the queue. When the second message is being committed to the JDBCStore, $
- * it throws a dummy SQL exception - the message has been committed to the embedded DB before the exception
- * is thrown
- *
- * Excepted correct outcome: receive 3 messages and the DB should contain no messages
- *
- * @throws Exception
- */
-
- public void testDBCommitException() throws Exception {
-
- broker = this.createBroker(false);
- broker.deleteAllMessages();
- broker.start();
- broker.waitUntilStarted();
-
- LOG.info("***Broker started...");
-
- // failover but timeout in 5 seconds so the test does not hang
- String failoverTransportURL = "failover:(" + transportUrl + ")?timeout=5000";
-
- sendMessage(MY_TEST_Q, failoverTransportURL);
-
- //check db contents
- ArrayList<Long> dbSeq = dbMessageCount();
- LOG.info("*** after send: db contains message seq " + dbSeq);
-
- List<TextMessage> consumedMessages = consumeMessages(MY_TEST_Q, failoverTransportURL);
-
- assertEquals("number of consumed messages", 3, consumedMessages.size());
-
- //check db contents
- dbSeq = dbMessageCount();
- LOG.info("*** after consume - db contains message seq " + dbSeq);
-
- assertEquals("number of messages in DB after test", 0, dbSeq.size());
-
- broker.stop();
- broker.waitUntilStopped();
- }
-
- public List<TextMessage> consumeMessages(String queue, String transportURL) throws JMSException {
- Connection connection = null;
- LOG.debug("*** consumeMessages() called ...");
-
- try {
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(transportURL);
-
- connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createQueue(queue);
-
- ArrayList<TextMessage> consumedMessages = new ArrayList<>();
-
- MessageConsumer messageConsumer = session.createConsumer(destination);
-
- while (true) {
- TextMessage textMessage = (TextMessage) messageConsumer.receive(4000);
- LOG.debug("*** consumed Messages :" + textMessage);
-
- if (textMessage == null) {
- return consumedMessages;
- }
- consumedMessages.add(textMessage);
- }
-
- }
- finally {
- if (connection != null) {
- connection.close();
- }
- }
- }
-
- public void sendMessage(String queue, String transportURL) throws Exception {
- Connection connection = null;
-
- try {
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(transportURL);
-
- connection = factory.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createQueue(queue);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- TextMessage m = session.createTextMessage("1");
-
- LOG.debug("*** send message 1 to broker...");
- producer.send(m);
-
- // trigger SQL exception in transactionContext
- LOG.debug("*** send message 2 to broker");
- m.setText("2");
- producer.send(m);
-
- //check db contents
- ArrayList<Long> dbSeq = dbMessageCount();
- LOG.info("*** after send 2 - db contains message seq " + dbSeq);
- assertEquals("number of messages in DB after send 2", 2, dbSeq.size());
-
- LOG.debug("*** send message 3 to broker");
- m.setText("3");
- producer.send(m);
- LOG.debug("*** Finished sending messages to broker");
-
- }
- finally {
- if (connection != null) {
- connection.close();
- }
- }
- }
-
- /**
- * query the DB to see what messages are left in the store
- *
- * @return
- * @throws SQLException
- * @throws IOException
- */
- private ArrayList<Long> dbMessageCount() throws SQLException, IOException {
- java.sql.Connection conn = ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).getDataSource().getConnection();
- PreparedStatement statement = conn.prepareStatement("SELECT MSGID_SEQ FROM ACTIVEMQ_MSGS");
-
- try {
-
- ResultSet result = statement.executeQuery();
- ArrayList<Long> dbSeq = new ArrayList<>();
-
- while (result.next()) {
- dbSeq.add(result.getLong(1));
- }
-
- return dbSeq;
-
- }
- finally {
- statement.close();
- conn.close();
-
- }
-
- }
-
- /*
- * Mock classes used for testing
- */
-
- public class TestJDBCPersistenceAdapter extends JDBCPersistenceAdapter {
-
- @Override
- public TransactionContext getTransactionContext() throws IOException {
- return testTransactionContext;
- }
- }
-
- public class TestTransactionContext extends TransactionContext {
-
- private int count;
-
- public TestTransactionContext(JDBCPersistenceAdapter jdbcPersistenceAdapter) throws IOException {
- super(jdbcPersistenceAdapter);
- }
-
- @Override
- public void executeBatch() throws SQLException {
- super.executeBatch();
- count++;
- LOG.debug("ExecuteBatchOverride: count:" + count, new RuntimeException("executeBatch"));
-
- // throw on second add message
- if (count == 16) {
- throw new SQLException("TEST SQL EXCEPTION from executeBatch after super.execution: count:" + count);
- }
- }
-
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VMTransportClosureTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VMTransportClosureTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VMTransportClosureTest.java
deleted file mode 100644
index 84c1765..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VMTransportClosureTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import java.io.IOException;
-
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.transport.TransportListener;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class VMTransportClosureTest extends EmbeddedBrokerTestSupport {
-
- private static final Log LOG = LogFactory.getLog(VMTransportClosureTest.class);
- private static final long MAX_TEST_TIME_MILLIS = 300000; // 5min
- private static final int NUM_ATTEMPTS = 100000;
-
- @Override
- public void setUp() throws Exception {
- setAutoFail(true);
- setMaxTestTime(MAX_TEST_TIME_MILLIS);
- super.setUp();
- }
-
- /**
- * EmbeddedBrokerTestSupport.createBroker() binds the broker to a VM
- * transport address, which results in a call to
- * VMTransportFactory.doBind(location):
- * <p>
- * <code>
- * public TransportServer doBind(URI location) throws IOException {
- * return bind(location, false);
- * }
- * </code>
- * </p>
- * As a result, VMTransportServer.disposeOnDisconnect is <code>false</code>.
- * To expose the bug, we need to have VMTransportServer.disposeOnDisconnect
- * <code>true</code>, which is the case when the VMTransportServer is not
- * already bound when the first connection is made.
- */
- @Override
- protected BrokerService createBroker() throws Exception {
- BrokerService answer = new BrokerService();
- answer.setPersistent(isPersistent());
- // answer.addConnector(bindAddress);
- return answer;
- }
-
- /**
- * This test demonstrates how the "disposeOnDisonnect" feature of
- * VMTransportServer can incorrectly close all VM connections to the local
- * broker.
- */
- public void testPrematureClosure() throws Exception {
-
- // Open a persistent connection to the local broker. The persistent
- // connection is maintained through the test and should prevent the
- // VMTransportServer from stopping itself when the local transport is
- // closed.
- ActiveMQConnection persistentConn = (ActiveMQConnection) createConnection();
- persistentConn.start();
- Session session = persistentConn.createSession(true, Session.SESSION_TRANSACTED);
- MessageProducer producer = session.createProducer(destination);
-
- for (int i = 0; i < NUM_ATTEMPTS; i++) {
- LOG.info("Attempt: " + i);
-
- // Open and close a local transport connection. As is done by by
- // most users of the transport, ensure that the transport is stopped
- // when closed by the peer (via ShutdownInfo). Closing the local
- // transport should not affect the persistent connection.
- final Transport localTransport = TransportFactory.connect(broker.getVmConnectorURI());
- localTransport.setTransportListener(new TransportListener() {
- @Override
- public void onCommand(Object command) {
- if (command instanceof ShutdownInfo) {
- try {
- localTransport.stop();
- }
- catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- }
-
- @Override
- public void onException(IOException error) {
- // ignore
- }
-
- @Override
- public void transportInterupted() {
- // ignore
- }
-
- @Override
- public void transportResumed() {
- // ignore
- }
- });
-
- localTransport.start();
- localTransport.stop();
-
- // Ensure that the persistent connection is still usable.
- producer.send(session.createMessage());
- session.rollback();
- }
-
- persistentConn.close();
- }
-}