You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:25 UTC
[16/42] activemq-artemis git commit: ARTEMIS-463 Improvement to the
openwire testsuite https://issues.apache.org/jira/browse/ARTEMIS-463
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5450Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5450Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5450Test.java
deleted file mode 100644
index d0066a3..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5450Test.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
-import org.junit.After;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.*;
-
-public class AMQ5450Test {
-
- static final Logger LOG = LoggerFactory.getLogger(AMQ5450Test.class);
- private final static int maxFileLength = 1024 * 1024 * 32;
-
- private final static String POSTFIX_DESTINATION_NAME = ".dlq";
-
- private final static String DESTINATION_NAME = "test" + POSTFIX_DESTINATION_NAME;
- private final static String DESTINATION_NAME_2 = "2.test" + POSTFIX_DESTINATION_NAME;
- private final static String DESTINATION_NAME_3 = "3.2.test" + POSTFIX_DESTINATION_NAME;
-
- private final static String[] DESTS = new String[]{DESTINATION_NAME, DESTINATION_NAME_2, DESTINATION_NAME_3, DESTINATION_NAME, DESTINATION_NAME};
-
- BrokerService broker;
- private HashMap<Object, PersistenceAdapter> adapters = new HashMap<>();
-
- @After
- public void tearDown() throws Exception {
- broker.stop();
- }
-
- protected BrokerService createAndStartBroker(PersistenceAdapter persistenceAdapter) throws Exception {
- BrokerService broker = new BrokerService();
- broker.setUseJmx(false);
- broker.setBrokerName("localhost");
- broker.setPersistenceAdapter(persistenceAdapter);
- broker.setDeleteAllMessagesOnStartup(true);
- broker.start();
- broker.waitUntilStarted();
- return broker;
- }
-
- @Test
- public void testPostFixMatch() throws Exception {
- doTestPostFixMatch(false);
- }
-
- @Test
- public void testPostFixCompositeMatch() throws Exception {
- doTestPostFixMatch(true);
- }
-
- private void doTestPostFixMatch(boolean useComposite) throws Exception {
- prepareBrokerWithMultiStore(useComposite);
-
- sendMessage(DESTINATION_NAME, "test 1");
- sendMessage(DESTINATION_NAME_2, "test 1");
- sendMessage(DESTINATION_NAME_3, "test 1");
-
- assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)));
- assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)));
- assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_3)));
-
- for (String dest : DESTS) {
- Destination destination2 = broker.getDestination(new ActiveMQQueue(dest));
- assertNotNull(destination2);
- assertEquals(1, destination2.getMessageStore().getMessageCount());
- }
-
- HashMap<Integer, PersistenceAdapter> numDests = new HashMap<>();
- for (PersistenceAdapter pa : adapters.values()) {
- numDests.put(pa.getDestinations().size(), pa);
- }
-
- // ensure wildcard does not match any
- assertTrue("0 in wildcard matcher", adapters.get(null).getDestinations().isEmpty());
-
- assertEquals("only two values", 2, numDests.size());
- assertTrue("0 in others", numDests.containsKey(0));
-
- if (useComposite) {
- assertTrue("3 in one", numDests.containsKey(3));
- }
- else {
- assertTrue("1 in some", numDests.containsKey(1));
- }
-
- }
-
- protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
- KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
- kaha.setJournalMaxFileLength(maxFileLength);
- kaha.setCleanupInterval(5000);
- if (delete) {
- kaha.deleteAllMessages();
- }
- return kaha;
- }
-
- public void prepareBrokerWithMultiStore(boolean compositeMatch) throws Exception {
-
- MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
- multiKahaDBPersistenceAdapter.deleteAllMessages();
- ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<>();
-
- if (compositeMatch) {
- StringBuffer compositeDestBuf = new StringBuffer();
- for (int i = 1; i <= DESTS.length; i++) {
- for (int j = 0; j < i; j++) {
- compositeDestBuf.append("*");
- if ((j + 1 == i)) {
- compositeDestBuf.append(POSTFIX_DESTINATION_NAME);
- }
- else {
- compositeDestBuf.append(".");
- }
- }
- if (!(i + 1 > DESTS.length)) {
- compositeDestBuf.append(",");
- }
- }
- adapters.add(createFilteredKahaDBByDestinationPrefix(compositeDestBuf.toString(), true));
-
- }
- else {
- // destination map does not do post fix wild card matches on paths, so we need to cover
- // each path length
- adapters.add(createFilteredKahaDBByDestinationPrefix("*" + POSTFIX_DESTINATION_NAME, true));
- adapters.add(createFilteredKahaDBByDestinationPrefix("*.*" + POSTFIX_DESTINATION_NAME, true));
- adapters.add(createFilteredKahaDBByDestinationPrefix("*.*.*" + POSTFIX_DESTINATION_NAME, true));
- adapters.add(createFilteredKahaDBByDestinationPrefix("*.*.*.*" + POSTFIX_DESTINATION_NAME, true));
- }
-
- // ensure wildcard matcher is there for other dests
- adapters.add(createFilteredKahaDBByDestinationPrefix(null, true));
-
- multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
- broker = createAndStartBroker(multiKahaDBPersistenceAdapter);
- }
-
- private FilteredKahaDBPersistenceAdapter createFilteredKahaDBByDestinationPrefix(String destinationPrefix,
- boolean deleteAllMessages) throws IOException {
- FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter();
- template.setPersistenceAdapter(createStore(deleteAllMessages));
- if (destinationPrefix != null) {
- template.setQueue(destinationPrefix);
- }
- adapters.put(destinationPrefix, template.getPersistenceAdapter());
- return template;
- }
-
- private void sendMessage(String destinationName, String message) throws JMSException {
- ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost");
- f.setAlwaysSyncSend(true);
- Connection c = f.createConnection();
- c.start();
- Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = s.createProducer(new ActiveMQQueue(destinationName));
- producer.send(s.createTextMessage(message));
- producer.close();
- s.close();
- c.stop();
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java
deleted file mode 100644
index 5ed211b..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.File;
-import java.util.concurrent.TimeUnit;
-import javax.jms.JMSException;
-import javax.jms.TextMessage;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import junit.framework.Test;
-
-import org.apache.activemq.broker.BrokerRestartTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.StubConnection;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
-import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter;
-import org.apache.activemq.util.IOHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ5567Test extends BrokerRestartTestSupport {
-
- protected static final Logger LOG = LoggerFactory.getLogger(AMQ5567Test.class);
- ActiveMQQueue destination = new ActiveMQQueue("Q");
-
- @Override
- protected void configureBroker(BrokerService broker) throws Exception {
- super.configureBroker(broker);
- broker.setPersistenceAdapter(persistenceAdapter);
- }
-
- @Override
- protected PolicyEntry getDefaultPolicy() {
- PolicyEntry policy = new PolicyEntry();
- policy.setMemoryLimit(60 * 1024);
- return policy;
- }
-
- public void initCombosForTestPreparedTransactionNotDispatched() throws Exception {
- PersistenceAdapter[] persistenceAdapters = new PersistenceAdapter[]{new KahaDBPersistenceAdapter(), new LevelDBPersistenceAdapter(), new JDBCPersistenceAdapter(DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()), new OpenWireFormat())};
- for (PersistenceAdapter adapter : persistenceAdapters) {
- adapter.setDirectory(new File(IOHelper.getDefaultDataDirectory()));
- }
- addCombinationValues("persistenceAdapter", persistenceAdapters);
- }
-
- public void testPreparedTransactionNotDispatched() throws Exception {
-
- ActiveMQDestination destination = new ActiveMQQueue("Q");
-
- StubConnection connection = createConnection();
- ConnectionInfo connectionInfo = createConnectionInfo();
- SessionInfo sessionInfo = createSessionInfo(connectionInfo);
- ProducerInfo producerInfo = createProducerInfo(sessionInfo);
- connection.send(connectionInfo);
- connection.send(sessionInfo);
- connection.send(producerInfo);
-
- XATransactionId txid = createXATransaction(sessionInfo);
- connection.send(createBeginTransaction(connectionInfo, txid));
- Message message = createMessage(producerInfo, destination);
- message.setPersistent(true);
- message.setTransactionId(txid);
- connection.send(message);
-
- connection.send(createPrepareTransaction(connectionInfo, txid));
-
- // send another non tx, will poke dispatch
- message = createMessage(producerInfo, destination);
- message.setPersistent(true);
- connection.send(message);
-
- // Since prepared but not committed.. only one should get delivered
- StubConnection connectionC = createConnection();
- ConnectionInfo connectionInfoC = createConnectionInfo();
- SessionInfo sessionInfoC = createSessionInfo(connectionInfoC);
- ConsumerInfo consumerInfo = createConsumerInfo(sessionInfoC, destination);
- connectionC.send(connectionInfoC);
- connectionC.send(sessionInfoC);
- connectionC.send(consumerInfo);
-
- Message m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10));
- LOG.info("received: " + m);
- assertNotNull("Got message", m);
- assertNull("Got non tx message", m.getTransactionId());
-
- // cannot get the prepared message till commit
- assertNull(receiveMessage(connectionC));
- assertNoMessagesLeft(connectionC);
-
- LOG.info("commit: " + txid);
- connection.request(createCommitTransaction2Phase(connectionInfo, txid));
-
- m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10));
- LOG.info("received: " + m);
- assertNotNull("Got non null message", m);
-
- }
-
- public void initCombosForTestCursorStoreSync() throws Exception {
- PersistenceAdapter[] persistenceAdapters = new PersistenceAdapter[]{new KahaDBPersistenceAdapter(), new LevelDBPersistenceAdapter(), new JDBCPersistenceAdapter(DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()), new OpenWireFormat())};
- for (PersistenceAdapter adapter : persistenceAdapters) {
- adapter.setDirectory(new File(IOHelper.getDefaultDataDirectory()));
- }
- addCombinationValues("persistenceAdapter", persistenceAdapters);
- }
-
- public void testCursorStoreSync() throws Exception {
-
- StubConnection connection = createConnection();
- ConnectionInfo connectionInfo = createConnectionInfo();
- SessionInfo sessionInfo = createSessionInfo(connectionInfo);
- ProducerInfo producerInfo = createProducerInfo(sessionInfo);
- connection.send(connectionInfo);
- connection.send(sessionInfo);
- connection.send(producerInfo);
-
- XATransactionId txid = createXATransaction(sessionInfo);
- connection.send(createBeginTransaction(connectionInfo, txid));
- Message message = createMessage(producerInfo, destination);
- message.setPersistent(true);
- message.setTransactionId(txid);
- connection.request(message);
-
- connection.request(createPrepareTransaction(connectionInfo, txid));
-
- QueueViewMBean proxy = getProxyToQueueViewMBean();
- assertTrue("cache is enabled", proxy.isCacheEnabled());
-
- // send another non tx, will fill cursor
- String payload = new String(new byte[10 * 1024]);
- for (int i = 0; i < 6; i++) {
- message = createMessage(producerInfo, destination);
- message.setPersistent(true);
- ((TextMessage) message).setText(payload);
- connection.request(message);
- }
-
- assertTrue("cache is disabled", !proxy.isCacheEnabled());
-
- StubConnection connectionC = createConnection();
- ConnectionInfo connectionInfoC = createConnectionInfo();
- SessionInfo sessionInfoC = createSessionInfo(connectionInfoC);
- ConsumerInfo consumerInfo = createConsumerInfo(sessionInfoC, destination);
- connectionC.send(connectionInfoC);
- connectionC.send(sessionInfoC);
- connectionC.send(consumerInfo);
-
- Message m = null;
- for (int i = 0; i < 3; i++) {
- m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10));
- LOG.info("received: " + m);
- assertNotNull("Got message", m);
- assertNull("Got non tx message", m.getTransactionId());
- connectionC.request(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
- }
-
- LOG.info("commit: " + txid);
- connection.request(createCommitTransaction2Phase(connectionInfo, txid));
- // consume the rest including the 2pc send in TX
-
- for (int i = 0; i < 4; i++) {
- m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10));
- LOG.info("received[" + i + "] " + m);
- assertNotNull("Got message", m);
- if (i == 3) {
- assertNotNull("Got tx message", m.getTransactionId());
- }
- else {
- assertNull("Got non tx message", m.getTransactionId());
- }
- connectionC.request(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
- }
- }
-
- private QueueViewMBean getProxyToQueueViewMBean() throws MalformedObjectNameException, JMSException {
- ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + ":destinationType=Queue,destinationName=" + destination.getQueueName() + ",type=Broker,brokerName=localhost");
- QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
- return proxy;
- }
-
- public static Test suite() {
- return suite(AMQ5567Test.class);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java
deleted file mode 100644
index e8414d5..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
-import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * @author James Furness
- * https://issues.apache.org/jira/browse/AMQ-3607
- */
-public class ActiveMQSlowConsumerManualTest {
-
- private static final int PORT = 12345;
- private static final ActiveMQTopic TOPIC = new ActiveMQTopic("TOPIC");
- private static final String URL = "nio://localhost:" + PORT + "?socket.tcpNoDelay=true";
-
- @Test(timeout = 60000)
- public void testDefaultSettings() throws Exception {
- runTest("testDefaultSettings", 30, -1, -1, false, false, false, false);
- }
-
- @Test(timeout = 60000)
- public void testDefaultSettingsWithOptimiseAcknowledge() throws Exception {
- runTest("testDefaultSettingsWithOptimiseAcknowledge", 30, -1, -1, false, false, true, false);
- }
-
- @Test(timeout = 60000)
- public void testBounded() throws Exception {
- runTest("testBounded", 30, 5, 25, false, false, false, false);
- }
-
- @Test(timeout = 60000)
- public void testBoundedWithOptimiseAcknowledge() throws Exception {
- runTest("testBoundedWithOptimiseAcknowledge", 30, 5, 25, false, false, true, false);
- }
-
- public void runTest(String name,
- int sendMessageCount,
- int prefetchLimit,
- int messageLimit,
- boolean evictOldestMessage,
- boolean disableFlowControl,
- boolean optimizeAcknowledge,
- boolean persistent) throws Exception {
- BrokerService broker = createBroker(persistent);
- broker.setDestinationPolicy(buildPolicy(TOPIC, prefetchLimit, messageLimit, evictOldestMessage, disableFlowControl));
- broker.start();
-
- // Slow consumer
- Session slowConsumerSession = buildSession("SlowConsumer", URL, optimizeAcknowledge);
- final CountDownLatch blockSlowConsumer = new CountDownLatch(1);
- final AtomicInteger slowConsumerReceiveCount = new AtomicInteger();
- final List<Integer> slowConsumerReceived = sendMessageCount <= 1000 ? new ArrayList<Integer>() : null;
- MessageConsumer slowConsumer = createSubscriber(slowConsumerSession, new MessageListener() {
- @Override
- public void onMessage(Message message) {
- try {
- slowConsumerReceiveCount.incrementAndGet();
- int count = Integer.parseInt(((TextMessage) message).getText());
- if (slowConsumerReceived != null)
- slowConsumerReceived.add(count);
- if (count % 10000 == 0)
- System.out.println("SlowConsumer: Receive " + count);
- blockSlowConsumer.await();
- }
- catch (Exception ignored) {
- }
- }
- });
-
- // Fast consumer
- Session fastConsumerSession = buildSession("FastConsumer", URL, optimizeAcknowledge);
- final AtomicInteger fastConsumerReceiveCount = new AtomicInteger();
- final List<Integer> fastConsumerReceived = sendMessageCount <= 1000 ? new ArrayList<Integer>() : null;
- MessageConsumer fastConsumer = createSubscriber(fastConsumerSession, new MessageListener() {
- @Override
- public void onMessage(Message message) {
- try {
- fastConsumerReceiveCount.incrementAndGet();
- TimeUnit.MILLISECONDS.sleep(5);
- int count = Integer.parseInt(((TextMessage) message).getText());
- if (fastConsumerReceived != null)
- fastConsumerReceived.add(count);
- if (count % 10000 == 0)
- System.out.println("FastConsumer: Receive " + count);
- }
- catch (Exception ignored) {
- }
- }
- });
-
- // Wait for consumers to connect
- Thread.sleep(500);
-
- // Publisher
- AtomicInteger sentCount = new AtomicInteger();
- List<Integer> sent = sendMessageCount <= 1000 ? new ArrayList<Integer>() : null;
- Session publisherSession = buildSession("Publisher", URL, optimizeAcknowledge);
- MessageProducer publisher = createPublisher(publisherSession);
- for (int i = 0; i < sendMessageCount; i++) {
- sentCount.incrementAndGet();
- if (sent != null)
- sent.add(i);
- if (i % 10000 == 0)
- System.out.println("Publisher: Send " + i);
- publisher.send(publisherSession.createTextMessage(Integer.toString(i)));
- }
-
- // Wait for messages to arrive
- Thread.sleep(500);
-
- System.out.println(name + ": Publisher Sent: " + sentCount + " " + sent);
- System.out.println(name + ": Whilst slow consumer blocked:");
- System.out.println("\t\t- SlowConsumer Received: " + slowConsumerReceiveCount + " " + slowConsumerReceived);
- System.out.println("\t\t- FastConsumer Received: " + fastConsumerReceiveCount + " " + fastConsumerReceived);
-
- // Unblock slow consumer
- blockSlowConsumer.countDown();
-
- // Wait for messages to arrive
- Thread.sleep(500);
-
- System.out.println(name + ": After slow consumer unblocked:");
- System.out.println("\t\t- SlowConsumer Received: " + slowConsumerReceiveCount + " " + slowConsumerReceived);
- System.out.println("\t\t- FastConsumer Received: " + fastConsumerReceiveCount + " " + fastConsumerReceived);
- System.out.println();
-
- publisher.close();
- publisherSession.close();
- slowConsumer.close();
- slowConsumerSession.close();
- fastConsumer.close();
- fastConsumerSession.close();
- broker.stop();
-
- Assert.assertEquals("Fast consumer missed messages whilst slow consumer was blocking", sent, fastConsumerReceived);
- // this is too timine dependent as sometimes there is message eviction, would need to check the dlq
- //Assert.assertEquals("Slow consumer received incorrect message count", Math.min(sendMessageCount, prefetchLimit + (messageLimit > 0 ? messageLimit : Integer.MAX_VALUE)), slowConsumerReceived.size());
- }
-
- private static BrokerService createBroker(boolean persistent) throws Exception {
- BrokerService broker = new BrokerService();
- broker.setBrokerName("TestBroker");
- broker.setPersistent(persistent);
- broker.addConnector(URL);
- return broker;
- }
-
- private static MessageConsumer createSubscriber(Session session,
- MessageListener messageListener) throws JMSException {
- MessageConsumer consumer = session.createConsumer(TOPIC);
- consumer.setMessageListener(messageListener);
- return consumer;
- }
-
- private static MessageProducer createPublisher(Session session) throws JMSException {
- MessageProducer producer = session.createProducer(TOPIC);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- return producer;
- }
-
- private static Session buildSession(String clientId, String url, boolean optimizeAcknowledge) throws JMSException {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
-
- connectionFactory.setCopyMessageOnSend(false);
- connectionFactory.setDisableTimeStampsByDefault(true);
- connectionFactory.setOptimizeAcknowledge(optimizeAcknowledge);
- if (optimizeAcknowledge) {
- connectionFactory.setOptimizeAcknowledgeTimeOut(1);
- }
-
- Connection connection = connectionFactory.createConnection();
- connection.setClientID(clientId);
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- connection.start();
-
- return session;
- }
-
- private static PolicyMap buildPolicy(ActiveMQTopic topic,
- int prefetchLimit,
- int messageLimit,
- boolean evictOldestMessage,
- boolean disableFlowControl) {
- PolicyMap policyMap = new PolicyMap();
-
- PolicyEntry policyEntry = new PolicyEntry();
-
- if (evictOldestMessage) {
- policyEntry.setMessageEvictionStrategy(new OldestMessageEvictionStrategy());
- }
-
- if (disableFlowControl) {
- policyEntry.setProducerFlowControl(false);
- }
-
- if (prefetchLimit > 0) {
- policyEntry.setTopicPrefetch(prefetchLimit);
- }
-
- if (messageLimit > 0) {
- ConstantPendingMessageLimitStrategy messageLimitStrategy = new ConstantPendingMessageLimitStrategy();
- messageLimitStrategy.setLimit(messageLimit);
- policyEntry.setPendingMessageLimitStrategy(messageLimitStrategy);
- }
-
- policyMap.put(topic, policyEntry);
-
- return policyMap;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java
deleted file mode 100644
index 2d6a48c..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ConnectionPerMessageTest extends EmbeddedBrokerTestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(ConnectionPerMessageTest.class);
- private static final int COUNT = 2000;
- protected String bindAddress;
-
- public void testConnectionPerMessage() throws Exception {
- final String topicName = "test.topic";
-
- LOG.info("Initializing connection factory for JMS to URL: " + bindAddress);
- final ActiveMQConnectionFactory normalFactory = new ActiveMQConnectionFactory();
- normalFactory.setBrokerURL(bindAddress);
- for (int i = 0; i < COUNT; i++) {
-
- if (i % 100 == 0) {
- LOG.info(new Integer(i).toString());
- }
-
- Connection conn = null;
- try {
-
- conn = normalFactory.createConnection();
- final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Topic topic = session.createTopic(topicName);
- final MessageProducer producer = session.createProducer(topic);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- final MapMessage m = session.createMapMessage();
- m.setInt("hey", i);
-
- producer.send(m);
-
- }
- catch (JMSException e) {
- LOG.warn(e.getMessage(), e);
- }
- finally {
- if (conn != null)
- try {
- conn.close();
- }
- catch (JMSException e) {
- LOG.warn(e.getMessage(), e);
- }
- }
- }
- }
-
- @Override
- protected void setUp() throws Exception {
- bindAddress = "vm://localhost";
- super.setUp();
- }
-
- @Override
- protected BrokerService createBroker() throws Exception {
- BrokerService answer = new BrokerService();
- answer.setDeleteAllMessagesOnStartup(true);
- answer.setUseJmx(false);
- answer.setPersistent(isPersistent());
- answer.addConnector(bindAddress);
- return answer;
- }
-
- @Override
- protected boolean isPersistent() {
- return true;
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
deleted file mode 100644
index 35da06c..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.command.ActiveMQQueue;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-public class CraigsBugTest extends EmbeddedBrokerTestSupport {
-
- private String connectionUri;
-
- public void testConnectionFactory() throws Exception {
- final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri);
- final ActiveMQQueue queue = new ActiveMQQueue("testqueue");
- final Connection conn = cf.createConnection();
-
- Runnable r = new Runnable() {
- @Override
- public void run() {
- try {
- Session session = conn.createSession(false, 1);
- MessageConsumer consumer = session.createConsumer(queue, null);
- consumer.receive(1000);
- }
- catch (JMSException e) {
- e.printStackTrace();
- }
- }
- };
- new Thread(r).start();
- conn.start();
-
- try {
- new CountDownLatch(1).await(3, TimeUnit.SECONDS);
- }
- catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- protected void setUp() throws Exception {
- bindAddress = "tcp://localhost:0";
- super.setUp();
-
- connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java
deleted file mode 100644
index a79ca58..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import java.util.concurrent.TimeoutException;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.region.Queue;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.junit.Assert;
-
-public class DoubleExpireTest extends EmbeddedBrokerTestSupport {
-
- private static final long MESSAGE_TTL_MILLIS = 1000;
- private static final long MAX_TEST_TIME_MILLIS = 60000;
-
- @Override
- public void setUp() throws Exception {
- setAutoFail(true);
- setMaxTestTime(MAX_TEST_TIME_MILLIS);
- super.setUp();
- }
-
- /**
- * This test verifies that a message that expires can be be resent to queue
- * with a new expiration and that it will be processed as a new message and
- * allowed to re-expire.
- * <p>
- * <b>NOTE:</b> This test fails on AMQ 5.4.2 because the originalExpiration
- * timestamp is not cleared when the message is resent.
- */
- public void testDoubleExpireWithoutMove() throws Exception {
- // Create the default dead letter queue.
- final ActiveMQDestination DLQ = createDestination("ActiveMQ.DLQ");
-
- Connection conn = createConnection();
- try {
- conn.start();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // Verify that the test queue and DLQ are empty.
- Assert.assertEquals(0, getSize(destination));
- Assert.assertEquals(0, getSize(DLQ));
-
- // Enqueue a message to the test queue that will expire after 1s.
- MessageProducer producer = session.createProducer(destination);
- Message testMessage = session.createTextMessage("test message");
- producer.send(testMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, MESSAGE_TTL_MILLIS);
- Assert.assertEquals(1, getSize(destination));
-
- // Wait for the message to expire.
- waitForSize(destination, 0, MAX_TEST_TIME_MILLIS);
- Assert.assertEquals(1, getSize(DLQ));
-
- // Consume the message from the DLQ and re-enqueue it to the test
- // queue so that it expires after 1s.
- MessageConsumer consumer = session.createConsumer(DLQ);
- Message expiredMessage = consumer.receive();
- Assert.assertEquals(testMessage.getJMSMessageID(), expiredMessage.getJMSMessageID());
-
- producer.send(expiredMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, MESSAGE_TTL_MILLIS);
- Assert.assertEquals(1, getSize(destination));
- Assert.assertEquals(0, getSize(DLQ));
-
- // Verify that the resent message is "different" in that it has
- // another ID.
- Assert.assertNotSame(testMessage.getJMSMessageID(), expiredMessage.getJMSMessageID());
-
- // Wait for the message to re-expire.
- waitForSize(destination, 0, MAX_TEST_TIME_MILLIS);
- Assert.assertEquals(1, getSize(DLQ));
-
- // Re-consume the message from the DLQ.
- Message reexpiredMessage = consumer.receive();
- Assert.assertEquals(expiredMessage.getJMSMessageID(), reexpiredMessage.getJMSMessageID());
- }
- finally {
- conn.close();
- }
- }
-
- /**
- * A helper method that returns the embedded broker's implementation of a
- * JMS queue.
- */
- private Queue getPhysicalDestination(ActiveMQDestination destination) throws Exception {
- return (Queue) broker.getAdminView().getBroker().getDestinationMap().get(destination);
- }
-
- /**
- * A helper method that returns the size of the specified queue/topic.
- */
- private long getSize(ActiveMQDestination destination) throws Exception {
- return getPhysicalDestination(destination) != null ? getPhysicalDestination(destination).getDestinationStatistics().getMessages().getCount() : 0;
- }
-
- /**
- * A helper method that waits for a destination to reach a certain size.
- */
- private void waitForSize(ActiveMQDestination destination,
- int size,
- long timeoutMillis) throws Exception, TimeoutException {
- long startTimeMillis = System.currentTimeMillis();
-
- while (getSize(destination) != size && System.currentTimeMillis() < (startTimeMillis + timeoutMillis)) {
- Thread.sleep(250);
- }
-
- if (getSize(destination) != size) {
- throw new TimeoutException("Destination " + destination.getPhysicalName() + " did not reach size " + size + " within " + timeoutMillis + "ms.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
deleted file mode 100644
index 3046423..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
+++ /dev/null
@@ -1,479 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Vector;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import javax.management.ObjectName;
-
-import junit.framework.Test;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.CombinationTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBStore;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A Test case for AMQ-1479
- */
-public class DurableConsumerTest extends CombinationTestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(DurableConsumerTest.class);
- private static int COUNT = 1024;
- private static String CONSUMER_NAME = "DURABLE_TEST";
- protected BrokerService broker;
-
- protected String bindAddress = "tcp://localhost:61616";
-
- protected byte[] payload = new byte[1024 * 32];
- protected ConnectionFactory factory;
- protected Vector<Exception> exceptions = new Vector<>();
-
- private static final String TOPIC_NAME = "failoverTopic";
- private static final String CONNECTION_URL = "failover:(tcp://localhost:61616,tcp://localhost:61617)";
- public boolean useDedicatedTaskRunner = false;
-
- private class SimpleTopicSubscriber implements MessageListener, ExceptionListener {
-
- private TopicConnection topicConnection = null;
-
- public SimpleTopicSubscriber(String connectionURL, String clientId, String topicName) {
-
- ActiveMQConnectionFactory topicConnectionFactory = null;
- TopicSession topicSession = null;
- Topic topic = null;
- TopicSubscriber topicSubscriber = null;
-
- topicConnectionFactory = new ActiveMQConnectionFactory(connectionURL);
- try {
-
- topic = new ActiveMQTopic(topicName);
- topicConnection = topicConnectionFactory.createTopicConnection();
- topicConnection.setClientID((clientId));
- topicConnection.start();
-
- topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- topicSubscriber = topicSession.createDurableSubscriber(topic, (clientId));
- topicSubscriber.setMessageListener(this);
-
- }
- catch (JMSException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void onMessage(Message arg0) {
- }
-
- public void closeConnection() {
- if (topicConnection != null) {
- try {
- topicConnection.close();
- }
- catch (JMSException e) {
- }
- }
- }
-
- @Override
- public void onException(JMSException exception) {
- exceptions.add(exception);
- }
- }
-
- private class MessagePublisher implements Runnable {
-
- private final boolean shouldPublish = true;
-
- @Override
- public void run() {
- TopicConnectionFactory topicConnectionFactory = null;
- TopicConnection topicConnection = null;
- TopicSession topicSession = null;
- Topic topic = null;
- TopicPublisher topicPublisher = null;
- Message message = null;
-
- topicConnectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL);
- try {
- topic = new ActiveMQTopic(TOPIC_NAME);
- topicConnection = topicConnectionFactory.createTopicConnection();
- topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- topicPublisher = topicSession.createPublisher(topic);
- message = topicSession.createMessage();
- }
- catch (Exception ex) {
- exceptions.add(ex);
- }
- while (shouldPublish) {
- try {
- topicPublisher.publish(message, DeliveryMode.PERSISTENT, 1, 2 * 60 * 60 * 1000);
- }
- catch (JMSException ex) {
- exceptions.add(ex);
- }
- try {
- Thread.sleep(1);
- }
- catch (Exception ex) {
- }
- }
- }
- }
-
- private void configurePersistence(BrokerService broker) throws Exception {
- File dataDirFile = new File("target/" + getName());
- KahaDBPersistenceAdapter kahaDBAdapter = new KahaDBPersistenceAdapter();
- kahaDBAdapter.setDirectory(dataDirFile);
- broker.setPersistenceAdapter(kahaDBAdapter);
- }
-
- public void testFailover() throws Exception {
-
- configurePersistence(broker);
- broker.start();
-
- Thread publisherThread = new Thread(new MessagePublisher());
- publisherThread.start();
- final int numSubs = 100;
- final List<SimpleTopicSubscriber> list = new ArrayList<>(numSubs);
- for (int i = 0; i < numSubs; i++) {
-
- final int id = i;
- Thread thread = new Thread(new Runnable() {
- @Override
- public void run() {
- SimpleTopicSubscriber s = new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis() + "-" + id, TOPIC_NAME);
- list.add(s);
- }
- });
- thread.start();
-
- }
-
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return numSubs == list.size();
- }
- });
-
- broker.stop();
- broker = createBroker(false);
- configurePersistence(broker);
- broker.start();
- Thread.sleep(10000);
- for (SimpleTopicSubscriber s : list) {
- s.closeConnection();
- }
- assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
- }
-
- // makes heavy use of threads and can demonstrate https://issues.apache.org/activemq/browse/AMQ-2028
- // with use dedicatedTaskRunner=true and produce OOM
- public void initCombosForTestConcurrentDurableConsumer() {
- addCombinationValues("useDedicatedTaskRunner", new Object[]{Boolean.TRUE, Boolean.FALSE});
- }
-
- public void testConcurrentDurableConsumer() throws Exception {
-
- broker.start();
- broker.waitUntilStarted();
-
- factory = createConnectionFactory();
- final String topicName = getName();
- final int numMessages = 500;
- int numConsumers = 1;
- final CountDownLatch counsumerStarted = new CountDownLatch(numConsumers);
- final AtomicInteger receivedCount = new AtomicInteger();
- Runnable consumer = new Runnable() {
- @Override
- public void run() {
- final String consumerName = Thread.currentThread().getName();
- int acked = 0;
- int received = 0;
-
- try {
- while (acked < numMessages / 2) {
- // take one message and close, ack on occasion
- Connection consumerConnection = factory.createConnection();
- ((ActiveMQConnection) consumerConnection).setWatchTopicAdvisories(false);
- consumerConnection.setClientID(consumerName);
- Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Topic topic = consumerSession.createTopic(topicName);
- consumerConnection.start();
-
- MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, consumerName);
-
- counsumerStarted.countDown();
- Message msg = null;
- do {
- msg = consumer.receive(5000);
- if (msg != null) {
- receivedCount.incrementAndGet();
- if (received != 0 && received % 100 == 0) {
- LOG.info("Received msg: " + msg.getJMSMessageID());
- }
- if (++received % 2 == 0) {
- msg.acknowledge();
- acked++;
- }
- }
- } while (msg == null);
-
- consumerConnection.close();
- }
- assertTrue(received >= acked);
- }
- catch (Exception e) {
- e.printStackTrace();
- exceptions.add(e);
- }
- }
- };
-
- ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
-
- for (int i = 0; i < numConsumers; i++) {
- executor.execute(consumer);
- }
-
- assertTrue(counsumerStarted.await(30, TimeUnit.SECONDS));
-
- Connection producerConnection = factory.createConnection();
- ((ActiveMQConnection) producerConnection).setWatchTopicAdvisories(false);
- Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = producerSession.createTopic(topicName);
- MessageProducer producer = producerSession.createProducer(topic);
- producerConnection.start();
- for (int i = 0; i < numMessages; i++) {
- BytesMessage msg = producerSession.createBytesMessage();
- msg.writeBytes(payload);
- producer.send(msg);
- if (i != 0 && i % 100 == 0) {
- LOG.info("Sent msg " + i);
- }
- }
-
- executor.shutdown();
- executor.awaitTermination(30, TimeUnit.SECONDS);
-
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- LOG.info("receivedCount: " + receivedCount.get());
- return receivedCount.get() == numMessages;
- }
- }, 360 * 1000);
- assertEquals("got required some messages", numMessages, receivedCount.get());
- assertTrue("no exceptions, but: " + exceptions, exceptions.isEmpty());
- }
-
- public void testConsumerRecover() throws Exception {
- doTestConsumer(true);
- }
-
- public void testConsumer() throws Exception {
- doTestConsumer(false);
- }
-
- public void testPrefetchViaBrokerConfig() throws Exception {
-
- Integer prefetchVal = new Integer(150);
- PolicyEntry policyEntry = new PolicyEntry();
- policyEntry.setDurableTopicPrefetch(prefetchVal.intValue());
- policyEntry.setPrioritizedMessages(true);
- PolicyMap policyMap = new PolicyMap();
- policyMap.setDefaultEntry(policyEntry);
- broker.setDestinationPolicy(policyMap);
- broker.start();
-
- factory = createConnectionFactory();
- Connection consumerConnection = factory.createConnection();
- consumerConnection.setClientID(CONSUMER_NAME);
- Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = consumerSession.createTopic(getClass().getName());
- MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
- consumerConnection.start();
-
- ObjectName activeSubscriptionObjectName = broker.getAdminView().getDurableTopicSubscribers()[0];
- Object prefetchFromSubView = broker.getManagementContext().getAttribute(activeSubscriptionObjectName, "PrefetchSize");
- assertEquals(prefetchVal, prefetchFromSubView);
- }
-
- public void doTestConsumer(boolean forceRecover) throws Exception {
-
- if (forceRecover) {
- configurePersistence(broker);
- }
- broker.start();
-
- factory = createConnectionFactory();
- Connection consumerConnection = factory.createConnection();
- consumerConnection.setClientID(CONSUMER_NAME);
- Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = consumerSession.createTopic(getClass().getName());
- MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
- consumerConnection.start();
- consumerConnection.close();
- broker.stop();
- broker = createBroker(false);
- if (forceRecover) {
- configurePersistence(broker);
- }
- broker.start();
-
- Connection producerConnection = factory.createConnection();
-
- Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer producer = producerSession.createProducer(topic);
- producerConnection.start();
- for (int i = 0; i < COUNT; i++) {
- BytesMessage msg = producerSession.createBytesMessage();
- msg.writeBytes(payload);
- producer.send(msg);
- if (i != 0 && i % 1000 == 0) {
- LOG.info("Sent msg " + i);
- }
- }
- producerConnection.close();
- broker.stop();
- broker = createBroker(false);
- if (forceRecover) {
- configurePersistence(broker);
- }
- broker.start();
-
- consumerConnection = factory.createConnection();
- consumerConnection.setClientID(CONSUMER_NAME);
- consumerConnection.start();
- consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
- for (int i = 0; i < COUNT; i++) {
- Message msg = consumer.receive(10000);
- assertNotNull("Missing message: " + i, msg);
- if (i != 0 && i % 1000 == 0) {
- LOG.info("Received msg " + i);
- }
-
- }
- consumerConnection.close();
-
- }
-
- @Override
- protected void setUp() throws Exception {
- if (broker == null) {
- broker = createBroker(true);
- }
-
- super.setUp();
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- broker = null;
- }
- }
-
- protected Topic creatTopic(Session s, String destinationName) throws JMSException {
- return s.createTopic(destinationName);
- }
-
- /**
- * Factory method to create a new broker
- *
- * @throws Exception
- */
- protected BrokerService createBroker(boolean deleteStore) throws Exception {
- BrokerService answer = new BrokerService();
- configureBroker(answer, deleteStore);
- return answer;
- }
-
- protected void configureBroker(BrokerService answer, boolean deleteStore) throws Exception {
- answer.setDeleteAllMessagesOnStartup(deleteStore);
- KahaDBStore kaha = new KahaDBStore();
- //kaha.setConcurrentStoreAndDispatchTopics(false);
- File directory = new File("target/activemq-data/kahadb");
- if (deleteStore) {
- IOHelper.deleteChildren(directory);
- }
- kaha.setDirectory(directory);
- //kaha.setMaxAsyncJobs(10);
-
- answer.setPersistenceAdapter(kaha);
- answer.addConnector(bindAddress);
- answer.setUseShutdownHook(false);
- answer.setAdvisorySupport(false);
- answer.setDedicatedTaskRunner(useDedicatedTaskRunner);
- }
-
- protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(bindAddress);
- factory.setUseDedicatedTaskRunner(useDedicatedTaskRunner);
- return factory;
- }
-
- public static Test suite() {
- return suite(DurableConsumerTest.class);
- }
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(suite());
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java
deleted file mode 100644
index ef24795..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
-
-/**
- *
- */
-public class JMSDurableTopicNoLocalTest extends EmbeddedBrokerTestSupport {
-
- protected String bindAddress;
-
- public void testConsumeNoLocal() throws Exception {
- final String TEST_NAME = getClass().getName();
- Connection connection = createConnection();
- connection.setClientID(TEST_NAME);
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- TopicSubscriber subscriber = session.createDurableSubscriber((Topic) destination, "topicUser2", null, true);
-
- final CountDownLatch latch = new CountDownLatch(1);
- subscriber.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- System.out.println("Receive a message " + message);
- latch.countDown();
- }
- });
-
- connection.start();
-
- MessageProducer producer = session.createProducer(destination);
- TextMessage message = session.createTextMessage("THIS IS A TEST");
- producer.send(message);
- producer.close();
- latch.await(5, TimeUnit.SECONDS);
- assertEquals(latch.getCount(), 1);
- }
-
- @Override
- protected void setUp() throws Exception {
- bindAddress = "vm://localhost";
- useTopic = true;
- super.setUp();
- }
-
- @Override
- protected BrokerService createBroker() throws Exception {
- BrokerService answer = new BrokerService();
- answer.setUseJmx(false);
- answer.setPersistent(true);
- answer.setDeleteAllMessagesOnStartup(true);
- answer.addConnector(bindAddress);
- return answer;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
deleted file mode 100644
index 137caa3..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.Properties;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.test.JmsTopicSendReceiveTest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public class JmsDurableTopicSlowReceiveTest extends JmsTopicSendReceiveTest {
-
- static final int NMSG = 200;
- static final int MSIZE = 256000;
- private static final transient Logger LOG = LoggerFactory.getLogger(JmsDurableTopicSlowReceiveTest.class);
- private static final String COUNT_PROPERY_NAME = "count";
-
- protected Connection connection2;
- protected Session session2;
- protected Session consumeSession2;
- protected MessageConsumer consumer2;
- protected MessageProducer producer2;
- protected Destination consumerDestination2;
- BrokerService broker;
- private Connection connection3;
- private Session consumeSession3;
- private TopicSubscriber consumer3;
-
- /**
- * Set up a durable suscriber test.
- *
- * @see junit.framework.TestCase#setUp()
- */
- @Override
- protected void setUp() throws Exception {
- this.durable = true;
- broker = createBroker();
- super.setUp();
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- broker.stop();
- }
-
- @Override
- protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
- ActiveMQConnectionFactory result = new ActiveMQConnectionFactory("vm://localhost?async=false");
- Properties props = new Properties();
- props.put("prefetchPolicy.durableTopicPrefetch", "5");
- props.put("prefetchPolicy.optimizeDurableTopicPrefetch", "5");
- result.setProperties(props);
- return result;
- }
-
- protected BrokerService createBroker() throws Exception {
- BrokerService answer = new BrokerService();
- configureBroker(answer);
- answer.start();
- return answer;
- }
-
- protected void configureBroker(BrokerService answer) throws Exception {
- answer.setDeleteAllMessagesOnStartup(true);
- }
-
- /**
- * Test if all the messages sent are being received.
- *
- * @throws Exception
- */
- public void testSlowReceiver() throws Exception {
- connection2 = createConnection();
- connection2.setClientID("test");
- connection2.start();
- consumeSession2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumerDestination2 = session2.createTopic(getConsumerSubject() + "2");
- consumer2 = consumeSession2.createDurableSubscriber((Topic) consumerDestination2, getName());
-
- consumer2.close();
- connection2.close();
- new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- int count = 0;
- for (int loop = 0; loop < 4; loop++) {
- connection2 = createConnection();
- connection2.start();
- session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer2 = session2.createProducer(null);
- producer2.setDeliveryMode(deliveryMode);
- Thread.sleep(1000);
- for (int i = 0; i < NMSG / 4; i++) {
- BytesMessage message = session2.createBytesMessage();
- message.writeBytes(new byte[MSIZE]);
- message.setStringProperty("test", "test");
- message.setIntProperty(COUNT_PROPERY_NAME, count);
- message.setJMSType("test");
- producer2.send(consumerDestination2, message);
- Thread.sleep(50);
- if (verbose) {
- LOG.debug("Sent(" + loop + "): " + i);
- }
- count++;
- }
- producer2.close();
- connection2.stop();
- connection2.close();
- }
- }
- catch (Throwable e) {
- e.printStackTrace();
- }
- }
- }, "SENDER Thread").start();
- connection3 = createConnection();
- connection3.setClientID("test");
- connection3.start();
- consumeSession3 = connection3.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- consumer3 = consumeSession3.createDurableSubscriber((Topic) consumerDestination2, getName());
- connection3.close();
- int count = 0;
- for (int loop = 0; loop < 4; ++loop) {
- connection3 = createConnection();
- connection3.setClientID("test");
- connection3.start();
- consumeSession3 = connection3.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- consumer3 = consumeSession3.createDurableSubscriber((Topic) consumerDestination2, getName());
- Message msg = null;
- int i;
- for (i = 0; i < NMSG / 4; i++) {
- msg = consumer3.receive(10000);
- if (msg == null) {
- break;
- }
- if (verbose) {
- LOG.debug("Received(" + loop + "): " + i + " count = " + msg.getIntProperty(COUNT_PROPERY_NAME));
- }
- assertNotNull(msg);
- assertEquals(msg.getJMSType(), "test");
- assertEquals(msg.getStringProperty("test"), "test");
- assertEquals("Messages received out of order", count, msg.getIntProperty(COUNT_PROPERY_NAME));
- Thread.sleep(500);
- msg.acknowledge();
- count++;
- }
- consumer3.close();
- assertEquals("Receiver " + loop, NMSG / 4, i);
- connection3.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
deleted file mode 100644
index 81987be..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.ResourceAllocationException;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.transport.RequestTimedOutIOException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JmsTimeoutTest extends EmbeddedBrokerTestSupport {
-
- static final Logger LOG = LoggerFactory.getLogger(JmsTimeoutTest.class);
-
- private final int messageSize = 1024 * 64;
- private final int messageCount = 10000;
- private final AtomicInteger exceptionCount = new AtomicInteger(0);
-
- /**
- * Test the case where the broker is blocked due to a memory limit
- * and a producer timeout is set on the connection.
- *
- * @throws Exception
- */
- public void testBlockedProducerConnectionTimeout() throws Exception {
- final ActiveMQConnection cx = (ActiveMQConnection) createConnection();
- final ActiveMQDestination queue = createDestination("testqueue");
-
- // we should not take longer than 10 seconds to return from send
- cx.setSendTimeout(10000);
-
- Runnable r = new Runnable() {
- @Override
- public void run() {
- try {
- LOG.info("Sender thread starting");
- Session session = cx.createSession(false, 1);
- MessageProducer producer = session.createProducer(queue);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- TextMessage message = session.createTextMessage(createMessageText());
- for (int count = 0; count < messageCount; count++) {
- producer.send(message);
- }
- LOG.info("Done sending..");
- }
- catch (JMSException e) {
- if (e.getCause() instanceof RequestTimedOutIOException) {
- exceptionCount.incrementAndGet();
- }
- else {
- e.printStackTrace();
- }
- return;
- }
-
- }
- };
- cx.start();
- Thread producerThread = new Thread(r);
- producerThread.start();
- producerThread.join(30000);
- cx.close();
- // We should have a few timeout exceptions as memory store will fill up
- assertTrue("No exception from the broker", exceptionCount.get() > 0);
- }
-
- /**
- * Test the case where the broker is blocked due to a memory limit
- * with a fail timeout
- *
- * @throws Exception
- */
- public void testBlockedProducerUsageSendFailTimeout() throws Exception {
- final ActiveMQConnection cx = (ActiveMQConnection) createConnection();
- final ActiveMQDestination queue = createDestination("testqueue");
-
- broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000);
- Runnable r = new Runnable() {
- @Override
- public void run() {
- try {
- LOG.info("Sender thread starting");
- Session session = cx.createSession(false, 1);
- MessageProducer producer = session.createProducer(queue);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- TextMessage message = session.createTextMessage(createMessageText());
- for (int count = 0; count < messageCount; count++) {
- producer.send(message);
- }
- LOG.info("Done sending..");
- }
- catch (JMSException e) {
- if (e instanceof ResourceAllocationException || e.getCause() instanceof RequestTimedOutIOException) {
- exceptionCount.incrementAndGet();
- }
- else {
- e.printStackTrace();
- }
- return;
- }
- }
- };
- cx.start();
- Thread producerThread = new Thread(r);
- producerThread.start();
- producerThread.join(30000);
- cx.close();
- // We should have a few timeout exceptions as memory store will fill up
- assertTrue("No exception from the broker", exceptionCount.get() > 0);
- }
-
- @Override
- protected void setUp() throws Exception {
- exceptionCount.set(0);
- bindAddress = "tcp://localhost:0";
- broker = createBroker();
- broker.setDeleteAllMessagesOnStartup(true);
- broker.getSystemUsage().getMemoryUsage().setLimit(5 * 1024 * 1024);
-
- super.setUp();
- }
-
- @Override
- protected ConnectionFactory createConnectionFactory() throws Exception {
- return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
- }
-
- private String createMessageText() {
- StringBuffer buffer = new StringBuffer();
- buffer.append("<filler>");
- for (int i = buffer.length(); i < messageSize; i++) {
- buffer.append('X');
- }
- buffer.append("</filler>");
- return buffer.toString();
- }
-
-}