You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:24 UTC
[15/42] activemq-artemis git commit: ARTEMIS-463 Improvement to the
openwire testsuite https://issues.apache.org/jira/browse/ARTEMIS-463
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java
deleted file mode 100644
index 7e46df4..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.File;
-import java.util.Vector;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQPrefetchPolicy;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.runners.BlockJUnit4ClassRunner;
-import org.junit.runner.RunWith;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@RunWith(BlockJUnit4ClassRunner.class)
-public class MemoryUsageBlockResumeTest extends TestSupport implements Thread.UncaughtExceptionHandler {
-
- public int deliveryMode = DeliveryMode.PERSISTENT;
-
- private static final Logger LOG = LoggerFactory.getLogger(MemoryUsageBlockResumeTest.class);
- private static byte[] buf = new byte[4 * 1024];
- private static byte[] bigBuf = new byte[48 * 1024];
-
- private BrokerService broker;
- AtomicInteger messagesSent = new AtomicInteger(0);
- AtomicInteger messagesConsumed = new AtomicInteger(0);
-
- protected long messageReceiveTimeout = 10000L;
-
- Destination destination = new ActiveMQQueue("FooTwo");
- Destination bigDestination = new ActiveMQQueue("FooTwoBig");
-
- private String connectionUri;
- private final Vector<Throwable> exceptions = new Vector<>();
-
- @Test(timeout = 60 * 1000)
- public void testBlockByOtherResumeNoException() throws Exception {
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
-
- // ensure more than on message can be pending when full
- factory.setProducerWindowSize(48 * 1024);
- // ensure messages are spooled to disk for this consumer
- ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
- prefetch.setTopicPrefetch(10);
- factory.setPrefetchPolicy(prefetch);
- Connection consumerConnection = factory.createConnection();
- consumerConnection.start();
-
- Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSession.createConsumer(bigDestination);
-
- final Connection producerConnection = factory.createConnection();
- producerConnection.start();
-
- final int fillWithBigCount = 10;
- Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
- producer.setDeliveryMode(deliveryMode);
- for (int idx = 0; idx < fillWithBigCount; ++idx) {
- Message message = session.createTextMessage(new String(bigBuf) + idx);
- producer.send(bigDestination, message);
- messagesSent.incrementAndGet();
- LOG.info("After big: " + idx + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
- }
-
- // will block on pfc
- final int toSend = 20;
- Thread producingThread = new Thread("Producing thread") {
- @Override
- public void run() {
- try {
- Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(deliveryMode);
- for (int idx = 0; idx < toSend; ++idx) {
- Message message = session.createTextMessage(new String(buf) + idx);
- producer.send(destination, message);
- messagesSent.incrementAndGet();
- LOG.info("After little:" + idx + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
- }
- }
- catch (Throwable ex) {
- ex.printStackTrace();
- }
- }
- };
- producingThread.start();
-
- Thread producingThreadTwo = new Thread("Producing thread") {
- @Override
- public void run() {
- try {
- Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(deliveryMode);
- for (int idx = 0; idx < toSend; ++idx) {
- Message message = session.createTextMessage(new String(buf) + idx);
- producer.send(destination, message);
- messagesSent.incrementAndGet();
- LOG.info("After little:" + idx + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
- }
- }
- catch (Throwable ex) {
- ex.printStackTrace();
- }
- }
- };
- producingThreadTwo.start();
-
- assertTrue("producer has sent x in a reasonable time", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- LOG.info("Checking for : X sent, System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() + ", sent: " + messagesSent);
- return messagesSent.get() > 20;
- }
- }));
-
- LOG.info("Consuming from big q to allow delivery to smaller q from pending");
- int count = 0;
-
- Message m = null;
-
- for (; count < 10; count++) {
- assertTrue((m = consumer.receive(messageReceiveTimeout)) != null);
- LOG.info("Received Message (" + count + "):" + m + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
- messagesConsumed.incrementAndGet();
- }
- consumer.close();
-
- producingThread.join();
- producingThreadTwo.join();
-
- assertEquals("Incorrect number of Messages Sent: " + messagesSent.get(), messagesSent.get(), fillWithBigCount + toSend * 2);
-
- // consume all little messages
- consumer = consumerSession.createConsumer(destination);
- for (count = 0; count < toSend * 2; count++) {
- assertTrue((m = consumer.receive(messageReceiveTimeout)) != null);
- LOG.info("Received Message (" + count + "):" + m + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
- messagesConsumed.incrementAndGet();
- }
-
- assertEquals("Incorrect number of Messages consumed: " + messagesConsumed.get(), messagesSent.get(), messagesConsumed.get());
-
- //assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
- }
-
- @Override
- @Before
- public void setUp() throws Exception {
-
- Thread.setDefaultUncaughtExceptionHandler(this);
- broker = new BrokerService();
- broker.setDataDirectory("target" + File.separator + "activemq-data");
- broker.setPersistent(true);
- broker.setUseJmx(false);
- broker.setAdvisorySupport(false);
- broker.setDeleteAllMessagesOnStartup(true);
-
- setDefaultPersistenceAdapter(broker);
- broker.getSystemUsage().getMemoryUsage().setLimit((30 * 16 * 1024));
-
- PolicyEntry defaultPolicy = new PolicyEntry();
- defaultPolicy.setOptimizedDispatch(true);
- PolicyMap policyMap = new PolicyMap();
- policyMap.setDefaultEntry(defaultPolicy);
- broker.setDestinationPolicy(policyMap);
-
- broker.addConnector("tcp://localhost:0");
- broker.start();
-
- connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
- }
-
- @Override
- @After
- public void tearDown() throws Exception {
- if (broker != null) {
- broker.stop();
- }
- }
-
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- LOG.error("Unexpected Unhandeled ex on: " + t, e);
- exceptions.add(e);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBrokerTest.java
deleted file mode 100644
index 4653ea6..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBrokerTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.BrokerTestSupport;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.store.kahadb.KahaDBStore;
-import org.apache.activemq.util.IOHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.*;
-import java.io.File;
-
-public class MemoryUsageBrokerTest extends BrokerTestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(MemoryUsageBrokerTest.class);
-
- @Override
- protected void setUp() throws Exception {
- this.setAutoFail(true);
- super.setUp();
- }
-
- @Override
- protected PolicyEntry getDefaultPolicy() {
- PolicyEntry policy = super.getDefaultPolicy();
- // Disable PFC and assign a large memory limit that's larger than the default broker memory limit for queues
- policy.setProducerFlowControl(false);
- policy.setQueue(">");
- policy.setMemoryLimit(128 * 1024 * 1024);
- return policy;
- }
-
- @Override
- protected BrokerService createBroker() throws Exception {
- BrokerService broker = new BrokerService();
- KahaDBStore kaha = new KahaDBStore();
- File directory = new File("target/activemq-data/kahadb");
- IOHelper.deleteChildren(directory);
- kaha.setDirectory(directory);
- kaha.deleteAllMessages();
- broker.setPersistenceAdapter(kaha);
- return broker;
- }
-
- protected ConnectionFactory createConnectionFactory() {
- return new ActiveMQConnectionFactory(broker.getVmConnectorURI());
- }
-
- protected Connection createJmsConnection() throws JMSException {
- return createConnectionFactory().createConnection();
- }
-
- public void testMemoryUsage() throws Exception {
- Connection conn = createJmsConnection();
- Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue("queue.a.b");
- MessageProducer producer = session.createProducer(queue);
- for (int i = 0; i < 100000; i++) {
- BytesMessage bm = session.createBytesMessage();
- bm.writeBytes(new byte[1024]);
- producer.send(bm);
- if ((i + 1) % 100 == 0) {
- session.commit();
- int memoryUsagePercent = broker.getSystemUsage().getMemoryUsage().getPercentUsage();
- LOG.info((i + 1) + " messages have been sent; broker memory usage " + memoryUsagePercent + "%");
- assertTrue("Used more than available broker memory", memoryUsagePercent <= 100);
- }
- }
- session.commit();
- producer.close();
- session.close();
- conn.close();
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageCleanupTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageCleanupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageCleanupTest.java
deleted file mode 100644
index e89c93f..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageCleanupTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.Broker;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MemoryUsageCleanupTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(MemoryUsageCleanupTest.class);
- private static final String QUEUE_NAME = MemoryUsageCleanupTest.class.getName() + "Queue";
-
- private final String str = new String("QAa0bcLdUK2eHfJgTP8XhiFj61DOklNm9nBoI5pGqYVrs3CtSuMZvwWx4yE7zR");
-
- private BrokerService broker;
- private String connectionUri;
- private ExecutorService pool;
- private String queueName;
- private Random r = new Random();
-
- @Before
- public void setUp() throws Exception {
-
- broker = new BrokerService();
- broker.setDataDirectory("target" + File.separator + "activemq-data");
- broker.setPersistent(true);
- broker.setUseJmx(true);
- broker.setDedicatedTaskRunner(false);
- broker.setAdvisorySupport(false);
- broker.setDeleteAllMessagesOnStartup(true);
-
- SharedDeadLetterStrategy strategy = new SharedDeadLetterStrategy();
- strategy.setProcessExpired(false);
- strategy.setProcessNonPersistent(false);
-
- PolicyEntry defaultPolicy = new PolicyEntry();
- defaultPolicy.setQueue(">");
- defaultPolicy.setOptimizedDispatch(true);
- defaultPolicy.setDeadLetterStrategy(strategy);
- defaultPolicy.setMemoryLimit(300000000);
-
- PolicyMap policyMap = new PolicyMap();
- policyMap.setDefaultEntry(defaultPolicy);
-
- broker.setDestinationPolicy(policyMap);
-
- broker.getSystemUsage().getMemoryUsage().setLimit(300000000L);
-
- broker.addConnector("tcp://localhost:0").setName("Default");
- broker.start();
- broker.waitUntilStarted();
-
- connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
- pool = Executors.newFixedThreadPool(10);
- }
-
- @After
- public void tearDown() throws Exception {
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- }
-
- if (pool != null) {
- pool.shutdown();
- }
- }
-
- @Test
- public void testIt() throws Exception {
-
- final int startPercentage = broker.getAdminView().getMemoryPercentUsage();
- LOG.info("MemoryUsage at test start = " + startPercentage);
-
- for (int i = 0; i < 2; i++) {
- LOG.info("Started the test iteration: " + i + " using queueName = " + queueName);
- queueName = QUEUE_NAME + i;
- final CountDownLatch latch = new CountDownLatch(11);
-
- pool.execute(new Runnable() {
- @Override
- public void run() {
- receiveAndDiscard100messages(latch);
- }
- });
-
- for (int j = 0; j < 10; j++) {
- pool.execute(new Runnable() {
- @Override
- public void run() {
- send10000messages(latch);
- }
- });
- }
-
- LOG.info("Waiting on the send / receive latch");
- latch.await(5, TimeUnit.MINUTES);
- LOG.info("Resumed");
-
- destroyQueue();
- TimeUnit.SECONDS.sleep(2);
- }
-
- LOG.info("MemoryUsage before awaiting temp store cleanup = " + broker.getAdminView().getMemoryPercentUsage());
-
- assertTrue("MemoryUsage should return to: " + startPercentage +
- "% but was " + broker.getAdminView().getMemoryPercentUsage() + "%", Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return broker.getAdminView().getMemoryPercentUsage() <= startPercentage + 1;
- }
- }));
-
- int endPercentage = broker.getAdminView().getMemoryPercentUsage();
- LOG.info("MemoryUsage at test end = " + endPercentage);
- }
-
- public void destroyQueue() {
- try {
- Broker broker = this.broker.getBroker();
- if (!broker.isStopped()) {
- LOG.info("Removing: " + queueName);
- broker.removeDestination(this.broker.getAdminConnectionContext(), new ActiveMQQueue(queueName), 10);
- }
- }
- catch (Exception e) {
- LOG.warn("Got an error while removing the test queue", e);
- }
- }
-
- private void send10000messages(CountDownLatch latch) {
- ActiveMQConnection activeMQConnection = null;
- try {
- activeMQConnection = createConnection(null);
- Session session = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(session.createQueue(queueName));
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- activeMQConnection.start();
- for (int i = 0; i < 10000; i++) {
- TextMessage textMessage = session.createTextMessage();
- textMessage.setText(generateBody(1000));
- textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
- producer.send(textMessage);
- try {
- Thread.sleep(10);
- }
- catch (InterruptedException e) {
- }
- }
- producer.close();
- }
- catch (JMSException e) {
- LOG.warn("Got an error while sending the messages", e);
- }
- finally {
- if (activeMQConnection != null) {
- try {
- activeMQConnection.close();
- }
- catch (JMSException e) {
- }
- }
- }
- latch.countDown();
- }
-
- private void receiveAndDiscard100messages(CountDownLatch latch) {
- ActiveMQConnection activeMQConnection = null;
- try {
- activeMQConnection = createConnection(null);
- Session session = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer messageConsumer = session.createConsumer(session.createQueue(queueName));
- activeMQConnection.start();
- for (int i = 0; i < 100; i++) {
- messageConsumer.receive();
- }
- messageConsumer.close();
- LOG.info("Created and disconnected");
- }
- catch (JMSException e) {
- LOG.warn("Got an error while receiving the messages", e);
- }
- finally {
- if (activeMQConnection != null) {
- try {
- activeMQConnection.close();
- }
- catch (JMSException e) {
- }
- }
- }
- latch.countDown();
- }
-
- private ActiveMQConnection createConnection(String id) throws JMSException {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
- if (id != null) {
- factory.setClientID(id);
- }
-
- ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
- return connection;
- }
-
- private String generateBody(int length) {
-
- StringBuilder sb = new StringBuilder();
- int te = 0;
- for (int i = 1; i <= length; i++) {
- te = r.nextInt(62);
- sb.append(str.charAt(te));
- }
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
deleted file mode 100644
index 3cdd0d6..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import javax.jms.*;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.DestinationViewMBean;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test to determine if expired messages are being reaped if there is
- * no active consumer connected to the broker.
- */
-public class MessageExpirationReaperTest {
-
- private BrokerService broker;
- private ConnectionFactory factory;
- private ActiveMQConnection connection;
- private final String destinationName = "TEST.Q";
- private final String brokerUrl = "tcp://localhost:0";
- private final String brokerName = "testBroker";
- private String connectionUri;
-
- @Before
- public void init() throws Exception {
- createBroker();
-
- connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
-
- factory = createConnectionFactory();
- connection = (ActiveMQConnection) factory.createConnection();
- connection.setClientID("test-connection");
- connection.start();
- }
-
- @After
- public void cleanUp() throws Exception {
- connection.close();
- broker.stop();
- }
-
- protected void createBroker() throws Exception {
- broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setBrokerName(brokerName);
- broker.addConnector(brokerUrl);
-
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry defaultEntry = new PolicyEntry();
- defaultEntry.setExpireMessagesPeriod(500);
- policyMap.setDefaultEntry(defaultEntry);
- broker.setDestinationPolicy(policyMap);
-
- broker.start();
- }
-
- protected ConnectionFactory createConnectionFactory() throws Exception {
- return new ActiveMQConnectionFactory(connectionUri);
- }
-
- protected Session createSession() throws Exception {
- return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- @Test
- public void testExpiredMessageReaping() throws Exception {
-
- Session producerSession = createSession();
- ActiveMQDestination destination = (ActiveMQDestination) producerSession.createQueue(destinationName);
- MessageProducer producer = producerSession.createProducer(destination);
- producer.setTimeToLive(1000);
-
- final int count = 3;
- // Send some messages with an expiration
- for (int i = 0; i < count; i++) {
- TextMessage message = producerSession.createTextMessage("" + i);
- producer.send(message);
- }
-
- // Let the messages expire
- Thread.sleep(2000);
-
- DestinationViewMBean view = createView(destination);
-
- assertEquals("Incorrect inflight count: " + view.getInFlightCount(), 0, view.getInFlightCount());
- assertEquals("Incorrect queue size count", 0, view.getQueueSize());
- assertEquals("Incorrect expired size count", view.getEnqueueCount(), view.getExpiredCount());
-
- // Send more messages with an expiration
- for (int i = 0; i < count; i++) {
- TextMessage message = producerSession.createTextMessage("" + i);
- producer.send(message);
- }
-
- // Let the messages expire
- Thread.sleep(2000);
-
- // Simply browse the queue
- Session browserSession = createSession();
- QueueBrowser browser = browserSession.createBrowser((Queue) destination);
- assertFalse("no message in the browser", browser.getEnumeration().hasMoreElements());
-
- // The messages expire and should be reaped because of the presence of
- // the queue browser
- assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), 0, view.getInFlightCount());
- }
-
- @Test
- public void testExpiredMessagesOnTopic() throws Exception {
- Session session = createSession();
-
- // use a zero prefetch so messages don't go inflight
- ActiveMQTopic destination = new ActiveMQTopic(destinationName + "?consumer.prefetchSize=0");
-
- MessageProducer producer = session.createProducer(destination);
-
- // should have a durable sub because it's a little tricky to get messages to expire in
- // non-durable subs.. with durable subs, we can just expire in the topic using the expire
- // period.. also.. durable sub has to be "inactive" for the expire checker to actually
- // expire the messages
- MessageConsumer consumer = session.createDurableSubscriber(destination, "test-durable");
-
- producer.setTimeToLive(500);
-
- final int count = 3;
- // Send some messages with an expiration
- for (int i = 0; i < count; i++) {
- TextMessage message = session.createTextMessage("" + i);
- producer.send(message);
- }
-
- DestinationViewMBean view = createView(destination);
- // not expired yet...
- assertEquals("Incorrect enqueue count", 3, view.getEnqueueCount());
-
- // close consumer so topic thinks consumer is inactive
- consumer.close();
-
- // Let the messages reach an expiry time
- Thread.sleep(2000);
-
- assertEquals("Incorrect inflight count: " + view.getInFlightCount(), 0, view.getInFlightCount());
- assertEquals("Incorrect queue size count", 0, view.getQueueSize());
- assertEquals("Incorrect expired size count", view.getEnqueueCount(), view.getExpiredCount());
- }
-
- protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
- String domain = "org.apache.activemq";
- ObjectName name;
- if (destination.isQueue()) {
- name = new ObjectName(domain + ":type=Broker,brokerName=" + brokerName + ",destinationType=Queue,destinationName=" + destinationName);
- }
- else {
- name = new ObjectName(domain + ":type=Broker,brokerName=" + brokerName + ",destinationType=Topic,destinationName=" + destinationName);
- }
- return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageSender.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageSender.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageSender.java
deleted file mode 100644
index e7d22b1..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageSender.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-
-public class MessageSender {
-
- private MessageProducer producer;
- private Session session;
-
- public MessageSender(String queueName,
- Connection connection,
- boolean useTransactedSession,
- boolean topic) throws Exception {
- session = useTransactedSession ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(topic ? session.createTopic(queueName) : session.createQueue(queueName));
- }
-
- public void send(String payload) throws Exception {
- ObjectMessage message = session.createObjectMessage();
- message.setObject(payload);
- producer.send(message);
- if (session.getTransacted()) {
- session.commit();
- }
- }
-
- public MessageProducer getProducer() {
- return producer;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java
deleted file mode 100644
index b278dc9..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java
+++ /dev/null
@@ -1,333 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.usage.SystemUsage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/*
- * Try and replicate:
- * Caused by: java.io.IOException: Could not locate data file data--188
- * at org.apache.activemq.kaha.impl.async.AsyncDataManager.getDataFile(AsyncDataManager.java:302)
- * at org.apache.activemq.kaha.impl.async.AsyncDataManager.read(AsyncDataManager.java:614)
- * at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:523)
- */
-
-public class MissingDataFileTest extends TestCase {
-
- private static final Logger LOG = LoggerFactory.getLogger(MissingDataFileTest.class);
-
- private static int counter = 500;
-
- private static int hectorToHaloCtr;
- private static int xenaToHaloCtr;
- private static int troyToHaloCtr;
-
- private static int haloToHectorCtr;
- private static int haloToXenaCtr;
- private static int haloToTroyCtr;
-
- private final String hectorToHalo = "hectorToHalo";
- private final String xenaToHalo = "xenaToHalo";
- private final String troyToHalo = "troyToHalo";
-
- private final String haloToHector = "haloToHector";
- private final String haloToXena = "haloToXena";
- private final String haloToTroy = "haloToTroy";
-
- private BrokerService broker;
-
- private Connection hectorConnection;
- private Connection xenaConnection;
- private Connection troyConnection;
- private Connection haloConnection;
-
- private final Object lock = new Object();
- final boolean useTopic = false;
- final boolean useSleep = true;
-
- protected static final String payload = new String(new byte[500]);
-
- public Connection createConnection() throws JMSException {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
- return factory.createConnection();
- }
-
- public Session createSession(Connection connection, boolean transacted) throws JMSException {
- return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
- }
-
- public void startBroker() throws Exception {
- broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setPersistent(true);
- broker.setUseJmx(true);
- broker.addConnector("tcp://localhost:61616").setName("Default");
-
- SystemUsage systemUsage;
- systemUsage = new SystemUsage();
- systemUsage.getMemoryUsage().setLimit(10 * 1024 * 1024); // Just a few messags
- broker.setSystemUsage(systemUsage);
-
- KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
- kahaDBPersistenceAdapter.setJournalMaxFileLength(16 * 1024);
- kahaDBPersistenceAdapter.setCleanupInterval(500);
- broker.setPersistenceAdapter(kahaDBPersistenceAdapter);
-
- broker.start();
- LOG.info("Starting broker..");
- }
-
- @Override
- public void tearDown() throws Exception {
- hectorConnection.close();
- xenaConnection.close();
- troyConnection.close();
- haloConnection.close();
- broker.stop();
- }
-
- public void testForNoDataFoundError() throws Exception {
-
- startBroker();
- hectorConnection = createConnection();
- Thread hectorThread = buildProducer(hectorConnection, hectorToHalo, false, useTopic);
- Receiver hHectorReceiver = new Receiver() {
- @Override
- public void receive(String s) throws Exception {
- haloToHectorCtr++;
- if (haloToHectorCtr >= counter) {
- synchronized (lock) {
- lock.notifyAll();
- }
- }
- possiblySleep(haloToHectorCtr);
- }
- };
- buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver, useTopic);
-
- troyConnection = createConnection();
- Thread troyThread = buildProducer(troyConnection, troyToHalo);
- Receiver hTroyReceiver = new Receiver() {
- @Override
- public void receive(String s) throws Exception {
- haloToTroyCtr++;
- if (haloToTroyCtr >= counter) {
- synchronized (lock) {
- lock.notifyAll();
- }
- }
- possiblySleep(haloToTroyCtr);
- }
- };
- buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver, false);
-
- xenaConnection = createConnection();
- Thread xenaThread = buildProducer(xenaConnection, xenaToHalo);
- Receiver hXenaReceiver = new Receiver() {
- @Override
- public void receive(String s) throws Exception {
- haloToXenaCtr++;
- if (haloToXenaCtr >= counter) {
- synchronized (lock) {
- lock.notifyAll();
- }
- }
- possiblySleep(haloToXenaCtr);
- }
- };
- buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver, false);
-
- haloConnection = createConnection();
- final MessageSender hectorSender = buildTransactionalProducer(haloToHector, haloConnection, false);
- final MessageSender troySender = buildTransactionalProducer(haloToTroy, haloConnection, false);
- final MessageSender xenaSender = buildTransactionalProducer(haloToXena, haloConnection, false);
- Receiver hectorReceiver = new Receiver() {
- @Override
- public void receive(String s) throws Exception {
- hectorToHaloCtr++;
- troySender.send(payload);
- if (hectorToHaloCtr >= counter) {
- synchronized (lock) {
- lock.notifyAll();
- }
- possiblySleep(hectorToHaloCtr);
- }
- }
- };
- Receiver xenaReceiver = new Receiver() {
- @Override
- public void receive(String s) throws Exception {
- xenaToHaloCtr++;
- hectorSender.send(payload);
- if (xenaToHaloCtr >= counter) {
- synchronized (lock) {
- lock.notifyAll();
- }
- }
- possiblySleep(xenaToHaloCtr);
- }
- };
- Receiver troyReceiver = new Receiver() {
- @Override
- public void receive(String s) throws Exception {
- troyToHaloCtr++;
- xenaSender.send(payload);
- if (troyToHaloCtr >= counter) {
- synchronized (lock) {
- lock.notifyAll();
- }
- }
- }
- };
- buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver, false);
- buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver, false);
- buildReceiver(haloConnection, troyToHalo, true, troyReceiver, false);
-
- haloConnection.start();
-
- troyConnection.start();
- troyThread.start();
-
- xenaConnection.start();
- xenaThread.start();
-
- hectorConnection.start();
- hectorThread.start();
- waitForMessagesToBeDelivered();
- // number of messages received should match messages sent
- assertEquals(hectorToHaloCtr, counter);
- LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages");
- assertEquals(xenaToHaloCtr, counter);
- LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages");
- assertEquals(troyToHaloCtr, counter);
- LOG.info("troyToHalo received " + troyToHaloCtr + " messages");
- assertEquals(haloToHectorCtr, counter);
- LOG.info("haloToHector received " + haloToHectorCtr + " messages");
- assertEquals(haloToXenaCtr, counter);
- LOG.info("haloToXena received " + haloToXenaCtr + " messages");
- assertEquals(haloToTroyCtr, counter);
- LOG.info("haloToTroy received " + haloToTroyCtr + " messages");
-
- }
-
- protected void possiblySleep(int count) throws InterruptedException {
- if (useSleep) {
- if (count % 100 == 0) {
- Thread.sleep(5000);
- }
- }
-
- }
-
- protected void waitForMessagesToBeDelivered() {
- // let's give the listeners enough time to read all messages
- long maxWaitTime = counter * 1000;
- long waitTime = maxWaitTime;
- long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
-
- synchronized (lock) {
- boolean hasMessages = true;
- while (hasMessages && waitTime >= 0) {
- try {
- lock.wait(200);
- }
- catch (InterruptedException e) {
- LOG.error(e.toString());
- }
- // check if all messages have been received
- hasMessages = hectorToHaloCtr < counter || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter || haloToXenaCtr < counter || haloToTroyCtr < counter;
- waitTime = maxWaitTime - (System.currentTimeMillis() - start);
- }
- }
- }
-
- public MessageSender buildTransactionalProducer(String queueName,
- Connection connection,
- boolean isTopic) throws Exception {
-
- return new MessageSender(queueName, connection, true, isTopic);
- }
-
- public Thread buildProducer(Connection connection, final String queueName) throws Exception {
- return buildProducer(connection, queueName, false, false);
- }
-
- public Thread buildProducer(Connection connection,
- final String queueName,
- boolean transacted,
- boolean isTopic) throws Exception {
- final MessageSender producer = new MessageSender(queueName, connection, transacted, isTopic);
- Thread thread = new Thread() {
- @Override
- public synchronized void run() {
- for (int i = 0; i < counter; i++) {
- try {
- producer.send(payload);
- }
- catch (Exception e) {
- throw new RuntimeException("on " + queueName + " send", e);
- }
- }
- }
- };
- return thread;
- }
-
- public void buildReceiver(Connection connection,
- final String queueName,
- boolean transacted,
- final Receiver receiver,
- boolean isTopic) throws Exception {
- final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer inputMessageConsumer = session.createConsumer(isTopic ? session.createTopic(queueName) : session.createQueue(queueName));
- MessageListener messageListener = new MessageListener() {
-
- @Override
- public void onMessage(Message message) {
- try {
- ObjectMessage objectMessage = (ObjectMessage) message;
- String s = (String) objectMessage.getObject();
- receiver.receive(s);
- if (session.getTransacted()) {
- session.commit();
- }
-
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- };
- inputMessageConsumer.setMessageListener(messageListener);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java
deleted file mode 100644
index 4bc92ad..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test for AMQ-3965.
- * A consumer may be stalled in case it uses optimizeAcknowledge and receives
- * a number of messages that expire before being dispatched to application code.
- * See for more details.
- */
-public class OptimizeAcknowledgeWithExpiredMsgsTest {
-
- private final static Logger LOG = LoggerFactory.getLogger(OptimizeAcknowledgeWithExpiredMsgsTest.class);
-
- private BrokerService broker = null;
-
- private String connectionUri;
-
- /**
- * Creates a broker instance but does not start it.
- *
- * @param brokerUri - transport uri of broker
- * @param brokerName - name for the broker
- * @return a BrokerService instance with transport uri and broker name set
- * @throws Exception
- */
- protected BrokerService createBroker() throws Exception {
- BrokerService broker = new BrokerService();
- broker.setPersistent(false);
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setUseJmx(false);
- connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
- return broker;
- }
-
- @Before
- public void setUp() throws Exception {
- broker = createBroker();
- broker.start();
- broker.waitUntilStarted();
- }
-
- @After
- public void tearDown() throws Exception {
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- broker = null;
- }
- }
-
- /**
- * Tests for AMQ-3965
- * Creates connection into broker using optimzeAcknowledge and prefetch=100
- * Creates producer and consumer. Producer sends 45 msgs that will expire
- * at consumer (but before being dispatched to app code).
- * Producer then sends 60 msgs without expiry.
- *
- * Consumer receives msgs using a MessageListener and increments a counter.
- * Main thread sleeps for 5 seconds and checks the counter value.
- * If counter != 60 msgs (the number of msgs that should get dispatched
- * to consumer) the test fails.
- */
- @Test
- public void testOptimizedAckWithExpiredMsgs() throws Exception {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
-
- // Create JMS resources
- Connection connection = connectionFactory.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createQueue("TEST.FOO");
-
- // ***** Consumer code *****
- MessageConsumer consumer = session.createConsumer(destination);
-
- final MyMessageListener listener = new MyMessageListener();
- connection.setExceptionListener(listener);
-
- // ***** Producer Code *****
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
- TextMessage message;
-
- // Produce msgs that will expire quickly
- for (int i = 0; i < 45; i++) {
- message = session.createTextMessage(text);
- producer.send(message, 1, 1, 100);
- LOG.trace("Sent message: " + message.getJMSMessageID() +
- " with expiry 10 msec");
- }
- // Produce msgs that don't expire
- for (int i = 0; i < 60; i++) {
- message = session.createTextMessage(text);
- producer.send(message, 1, 1, 60000);
- // producer.send(message);
- LOG.trace("Sent message: " + message.getJMSMessageID() +
- " with expiry 30 sec");
- }
- consumer.setMessageListener(listener);
-
- sleep(1000); // let the batch of 45 expire.
-
- connection.start();
-
- assertTrue("Should receive all expected messages, counter at " + listener.getCounter(), Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return listener.getCounter() == 60;
- }
- }));
-
- LOG.info("Received all expected messages with counter at: " + listener.getCounter());
-
- // Cleanup
- producer.close();
- consumer.close();
- session.close();
- connection.close();
- }
-
- @Test
- public void testOptimizedAckWithExpiredMsgsSync() throws Exception {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
-
- // Create JMS resources
- Connection connection = connectionFactory.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createQueue("TEST.FOO");
-
- // ***** Consumer code *****
- MessageConsumer consumer = session.createConsumer(destination);
-
- // ***** Producer Code *****
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
- TextMessage message;
-
- // Produce msgs that will expire quickly
- for (int i = 0; i < 45; i++) {
- message = session.createTextMessage(text);
- producer.send(message, 1, 1, 10);
- LOG.trace("Sent message: " + message.getJMSMessageID() +
- " with expiry 10 msec");
- }
- // Produce msgs that don't expire
- for (int i = 0; i < 60; i++) {
- message = session.createTextMessage(text);
- producer.send(message, 1, 1, 30000);
- // producer.send(message);
- LOG.trace("Sent message: " + message.getJMSMessageID() +
- " with expiry 30 sec");
- }
- sleep(200);
-
- int counter = 1;
- for (; counter <= 60; ++counter) {
- assertNotNull(consumer.receive(2000));
- LOG.info("counter at " + counter);
- }
- LOG.info("Received all expected messages with counter at: " + counter);
-
- // Cleanup
- producer.close();
- consumer.close();
- session.close();
- connection.close();
- }
-
- @Test
- public void testOptimizedAckWithExpiredMsgsSync2() throws Exception {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
-
- // Create JMS resources
- Connection connection = connectionFactory.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createQueue("TEST.FOO");
-
- // ***** Consumer code *****
- MessageConsumer consumer = session.createConsumer(destination);
-
- // ***** Producer Code *****
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
- TextMessage message;
-
- // Produce msgs that don't expire
- for (int i = 0; i < 56; i++) {
- message = session.createTextMessage(text);
- producer.send(message, 1, 1, 30000);
- // producer.send(message);
- LOG.trace("Sent message: " + message.getJMSMessageID() +
- " with expiry 30 sec");
- }
- // Produce msgs that will expire quickly
- for (int i = 0; i < 44; i++) {
- message = session.createTextMessage(text);
- producer.send(message, 1, 1, 10);
- LOG.trace("Sent message: " + message.getJMSMessageID() +
- " with expiry 10 msec");
- }
- // Produce some moremsgs that don't expire
- for (int i = 0; i < 4; i++) {
- message = session.createTextMessage(text);
- producer.send(message, 1, 1, 30000);
- // producer.send(message);
- LOG.trace("Sent message: " + message.getJMSMessageID() +
- " with expiry 30 sec");
- }
-
- sleep(200);
-
- int counter = 1;
- for (; counter <= 60; ++counter) {
- assertNotNull(consumer.receive(2000));
- LOG.info("counter at " + counter);
- }
- LOG.info("Received all expected messages with counter at: " + counter);
-
- // Cleanup
- producer.close();
- consumer.close();
- session.close();
- connection.close();
- }
-
- private void sleep(int milliSecondTime) {
- try {
- Thread.sleep(milliSecondTime);
- }
- catch (InterruptedException igonred) {
- }
- }
-
- /**
- * Standard JMS MessageListener
- */
- private class MyMessageListener implements MessageListener, ExceptionListener {
-
- private AtomicInteger counter = new AtomicInteger(0);
-
- @Override
- public void onMessage(final Message message) {
- try {
- LOG.trace("Got Message " + message.getJMSMessageID());
- LOG.info("counter at " + counter.incrementAndGet());
- }
- catch (final Exception e) {
- }
- }
-
- public int getCounter() {
- return counter.get();
- }
-
- @Override
- public synchronized void onException(JMSException ex) {
- LOG.error("JMS Exception occurred. Shutting down client.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java
deleted file mode 100644
index 2b84862..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class OutOfOrderTestCase extends TestCase {
-
- private static final Logger log = LoggerFactory.getLogger(OutOfOrderTestCase.class);
-
- private static final String BROKER_URL = "tcp://localhost:0";
- private static final int PREFETCH = 10;
- private static final String CONNECTION_URL_OPTIONS = "?jms.prefetchPolicy.all=" + PREFETCH;
-
- private static final String DESTINATION = "QUEUE?consumer.exclusive=true";
-
- private BrokerService brokerService;
- private Session session;
- private Connection connection;
- private String connectionUri;
-
- private int seq = 0;
-
- @Override
- public void setUp() throws Exception {
- brokerService = new BrokerService();
- brokerService.setUseJmx(true);
- brokerService.addConnector(BROKER_URL);
- brokerService.deleteAllMessages();
- brokerService.start();
- brokerService.waitUntilStarted();
-
- connectionUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
-
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri + CONNECTION_URL_OPTIONS);
- connection = connectionFactory.createConnection();
- connection.start();
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- }
-
- @Override
- protected void tearDown() throws Exception {
- session.close();
- connection.close();
- brokerService.stop();
- }
-
- public void testOrder() throws Exception {
-
- log.info("Producing messages 0-29 . . .");
- Destination destination = session.createQueue(DESTINATION);
- final MessageProducer messageProducer = session.createProducer(destination);
- try {
- for (int i = 0; i < 30; ++i) {
- final Message message = session.createTextMessage(createMessageText(i));
- message.setStringProperty("JMSXGroupID", "FOO");
-
- messageProducer.send(message);
- log.info("sent " + toString(message));
- }
- }
- finally {
- messageProducer.close();
- }
-
- log.info("Consuming messages 0-9 . . .");
- consumeBatch();
-
- log.info("Consuming messages 10-19 . . .");
- consumeBatch();
-
- log.info("Consuming messages 20-29 . . .");
- consumeBatch();
- }
-
- protected void consumeBatch() throws Exception {
- Destination destination = session.createQueue(DESTINATION);
- final MessageConsumer messageConsumer = session.createConsumer(destination);
- try {
- for (int i = 0; i < 10; ++i) {
- final Message message = messageConsumer.receive(1000L);
- log.info("received " + toString(message));
- assertEquals("Message out of order", createMessageText(seq++), ((TextMessage) message).getText());
- message.acknowledge();
- }
- }
- finally {
- messageConsumer.close();
- }
- }
-
- private String toString(final Message message) throws JMSException {
- String ret = "received message '" + ((TextMessage) message).getText() + "' - " + message.getJMSMessageID();
- if (message.getJMSRedelivered())
- ret += " (redelivered)";
- return ret;
-
- }
-
- private static String createMessageText(final int index) {
- return "message #" + index;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java
deleted file mode 100644
index 95057b9..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import java.io.Serializable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQPrefetchPolicy;
-import org.apache.activemq.broker.BrokerService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test case demonstrating situation where messages are not delivered to
- * consumers.
- */
-public class QueueWorkerPrefetchTest extends TestCase implements MessageListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(QueueWorkerPrefetchTest.class);
- private static final int BATCH_SIZE = 10;
- private static final long WAIT_TIMEOUT = 1000 * 10;
-
- /**
- * The connection URL.
- */
- private static final String BROKER_BIND_ADDRESS = "tcp://localhost:0";
-
- /**
- * The queue prefetch size to use. A value greater than 1 seems to make
- * things work.
- */
- private static final int QUEUE_PREFETCH_SIZE = 1;
-
- /**
- * The number of workers to use. A single worker with a prefetch of 1 works.
- */
- private static final int NUM_WORKERS = 2;
-
- /**
- * Embedded JMS broker.
- */
- private BrokerService broker;
-
- /**
- * The master's producer object for creating work items.
- */
- private MessageProducer workItemProducer;
-
- /**
- * The master's consumer object for consuming ack messages from workers.
- */
- private MessageConsumer masterItemConsumer;
-
- /**
- * The number of acks received by the master.
- */
- private final AtomicLong acksReceived = new AtomicLong(0);
-
- private final AtomicReference<CountDownLatch> latch = new AtomicReference<>();
-
- private String connectionUri;
-
- /**
- * Messages sent to the work-item queue.
- */
- private static class WorkMessage implements Serializable {
-
- private static final long serialVersionUID = 1L;
- private final int id;
-
- public WorkMessage(int id) {
- this.id = id;
- }
-
- @Override
- public String toString() {
- return "Work: " + id;
- }
- }
-
- /**
- * The worker process. Consume messages from the work-item queue, possibly
- * creating more messages to submit to the work-item queue. For each work
- * item, send an ack to the master.
- */
- private static class Worker implements MessageListener {
-
- /**
- * Counter shared between workers to decided when new work-item messages
- * are created.
- */
- private static AtomicInteger counter = new AtomicInteger(0);
-
- /**
- * Session to use.
- */
- private Session session;
-
- /**
- * Producer for sending ack messages to the master.
- */
- private MessageProducer masterItemProducer;
-
- /**
- * Producer for sending new work items to the work-items queue.
- */
- private MessageProducer workItemProducer;
-
- public Worker(Session session) throws JMSException {
- this.session = session;
- masterItemProducer = session.createProducer(session.createQueue("master-item"));
- Queue workItemQueue = session.createQueue("work-item");
- workItemProducer = session.createProducer(workItemQueue);
- MessageConsumer workItemConsumer = session.createConsumer(workItemQueue);
- workItemConsumer.setMessageListener(this);
- }
-
- @Override
- public void onMessage(javax.jms.Message message) {
- try {
- WorkMessage work = (WorkMessage) ((ObjectMessage) message).getObject();
-
- long c = counter.incrementAndGet();
-
- // Don't create a new work item for every BATCH_SIZE message. */
- if (c % BATCH_SIZE != 0) {
- // Send new work item to work-item queue.
- workItemProducer.send(session.createObjectMessage(new WorkMessage(work.id + 1)));
- }
-
- // Send ack to master.
- masterItemProducer.send(session.createObjectMessage(work));
- }
- catch (JMSException e) {
- throw new IllegalStateException("Something has gone wrong", e);
- }
- }
-
- /**
- * Close of JMS resources used by worker.
- */
- public void close() throws JMSException {
- masterItemProducer.close();
- workItemProducer.close();
- session.close();
- }
- }
-
- /**
- * Master message handler. Process ack messages.
- */
- @Override
- public void onMessage(javax.jms.Message message) {
- long acks = acksReceived.incrementAndGet();
- latch.get().countDown();
- if (acks % 1 == 0) {
- LOG.info("Master now has ack count of: " + acksReceived);
- }
- }
-
- @Override
- protected void setUp() throws Exception {
- // Create the message broker.
- super.setUp();
- broker = new BrokerService();
- broker.setPersistent(false);
- broker.setUseJmx(true);
- broker.addConnector(BROKER_BIND_ADDRESS);
- broker.start();
- broker.waitUntilStarted();
-
- connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
- }
-
- @Override
- protected void tearDown() throws Exception {
- // Shut down the message broker.
- broker.deleteAllMessages();
- broker.stop();
- super.tearDown();
- }
-
- public void testActiveMQ() throws Exception {
- // Create the connection to the broker.
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
- ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
- prefetchPolicy.setQueuePrefetch(QUEUE_PREFETCH_SIZE);
- connectionFactory.setPrefetchPolicy(prefetchPolicy);
- Connection connection = connectionFactory.createConnection();
- connection.start();
-
- Session masterSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- workItemProducer = masterSession.createProducer(masterSession.createQueue("work-item"));
- masterItemConsumer = masterSession.createConsumer(masterSession.createQueue("master-item"));
- masterItemConsumer.setMessageListener(this);
-
- // Create the workers.
- Worker[] workers = new Worker[NUM_WORKERS];
- for (int i = 0; i < NUM_WORKERS; i++) {
- workers[i] = new Worker(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
- }
-
- // Send a message to the work queue, and wait for the BATCH_SIZE acks
- // from the workers.
- acksReceived.set(0);
- latch.set(new CountDownLatch(BATCH_SIZE));
- workItemProducer.send(masterSession.createObjectMessage(new WorkMessage(1)));
-
- if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
- fail("First batch only received " + acksReceived + " messages");
- }
-
- LOG.info("First batch received");
-
- // Send another message to the work queue, and wait for the next 1000 acks. It is
- // at this point where the workers never get notified of this message, as they
- // have a large pending queue. Creating a new worker at this point however will
- // receive this new message.
- acksReceived.set(0);
- latch.set(new CountDownLatch(BATCH_SIZE));
- workItemProducer.send(masterSession.createObjectMessage(new WorkMessage(1)));
-
- if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
- fail("Second batch only received " + acksReceived + " messages");
- }
-
- LOG.info("Second batch received");
-
- // Cleanup all JMS resources.
- for (int i = 0; i < NUM_WORKERS; i++) {
- workers[i].close();
- }
- masterSession.close();
- connection.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackSharedConsumerTests.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackSharedConsumerTests.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackSharedConsumerTests.java
deleted file mode 100644
index 549922d..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackSharedConsumerTests.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class RawRollbackSharedConsumerTests {
-
- private static ConnectionFactory connectionFactory;
- private static Destination queue;
- private static BrokerService broker;
-
- @BeforeClass
- public static void clean() throws Exception {
- broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setUseJmx(true);
- broker.start();
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
- connectionFactory.setBrokerURL("vm://localhost?async=false");
- RawRollbackSharedConsumerTests.connectionFactory = connectionFactory;
- queue = new ActiveMQQueue("queue");
- }
-
- @AfterClass
- public static void close() throws Exception {
- broker.stop();
- }
-
- @Before
- public void clearData() throws Exception {
- getMessages(false); // drain queue
- convertAndSend("foo");
- convertAndSend("bar");
- }
-
- @After
- public void checkPostConditions() throws Exception {
-
- Thread.sleep(1000L);
- List<String> list = getMessages(false);
- assertEquals(2, list.size());
-
- }
-
- @Test
- public void testReceiveMessages() throws Exception {
-
- List<String> list = getMessages(true);
- assertEquals(2, list.size());
- assertTrue(list.contains("foo"));
-
- }
-
- private void convertAndSend(String msg) throws Exception {
- Connection connection = connectionFactory.createConnection();
- connection.start();
- Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(queue);
- producer.send(session.createTextMessage(msg));
- producer.close();
- session.commit();
- session.close();
- connection.close();
- }
-
- private List<String> getMessages(boolean rollback) throws Exception {
- Connection connection = connectionFactory.createConnection();
- connection.start();
- Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- String next = "";
- List<String> msgs = new ArrayList<>();
- MessageConsumer consumer = session.createConsumer(queue);
- while (next != null) {
- next = receiveAndConvert(consumer);
- if (next != null)
- msgs.add(next);
- }
- consumer.close();
- if (rollback) {
- session.rollback();
- }
- else {
- session.commit();
- }
- session.close();
- connection.close();
- return msgs;
- }
-
- private String receiveAndConvert(MessageConsumer consumer) throws Exception {
- Message message = consumer.receive(100L);
- if (message == null) {
- return null;
- }
- return ((TextMessage) message).getText();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackTests.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackTests.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackTests.java
deleted file mode 100644
index 74437b7..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackTests.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class RawRollbackTests {
-
- private static ConnectionFactory connectionFactory;
- private static Destination queue;
- private static BrokerService broker;
-
- @BeforeClass
- public static void clean() throws Exception {
- broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setUseJmx(true);
- broker.start();
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
- connectionFactory.setBrokerURL("vm://localhost?async=false&waitForStart=5000&jms.prefetchPolicy.all=0");
- RawRollbackTests.connectionFactory = connectionFactory;
- queue = new ActiveMQQueue("queue");
- }
-
- @AfterClass
- public static void close() throws Exception {
- broker.stop();
- }
-
- @Before
- public void clearData() throws Exception {
- getMessages(false); // drain queue
- convertAndSend("foo");
- convertAndSend("bar");
- }
-
- @After
- public void checkPostConditions() throws Exception {
-
- Thread.sleep(1000L);
- List<String> list = getMessages(false);
- assertEquals(2, list.size());
-
- }
-
- @Test
- public void testReceiveMessages() throws Exception {
-
- List<String> list = getMessages(true);
- assertEquals(2, list.size());
- assertTrue(list.contains("foo"));
-
- }
-
- private void convertAndSend(String msg) throws Exception {
- Connection connection = connectionFactory.createConnection();
- connection.start();
- Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(queue);
- producer.send(session.createTextMessage(msg));
- producer.close();
- session.commit();
- session.close();
- connection.close();
- }
-
- private List<String> getMessages(boolean rollback) throws Exception {
- Connection connection = connectionFactory.createConnection();
- connection.start();
- Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- String next = "";
- List<String> msgs = new ArrayList<>();
- while (next != null) {
- next = receiveAndConvert(session);
- if (next != null)
- msgs.add(next);
- }
- if (rollback) {
- session.rollback();
- }
- else {
- session.commit();
- }
- session.close();
- connection.close();
- return msgs;
- }
-
- private String receiveAndConvert(Session session) throws Exception {
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(100L);
- consumer.close();
- if (message == null) {
- return null;
- }
- return ((TextMessage) message).getText();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/Receiver.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/Receiver.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/Receiver.java
deleted file mode 100644
index e6d1d40..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/Receiver.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-public interface Receiver {
-
- void receive(String s) throws Exception;
-}